This commit is contained in:
adria0 2020-04-11 15:39:43 +02:00
parent 2dd8c213f9
commit be722d5474
14 changed files with 368 additions and 262 deletions

View File

@ -1,6 +1,6 @@
[package]
name = "kuska-ssb"
version = "0.1.4"
version = "0.1.5"
authors = ["Dhole <dhole@riseup.net>", "Adria Massanet <adria@codecontext.io>"]
edition = "2018"

View File

@ -13,7 +13,8 @@ use async_std::net::{TcpStream, UdpSocket};
use kuska_handshake::async_std::{handshake_client, BoxStream};
use kuska_ssb::api::{
ApiHelper, CreateHistoryStreamArgs, CreateStreamArgs, LatestUserMessage, WhoAmI,
dto::{CreateHistoryStreamIn, CreateStreamIn, LatestOut, WhoAmIOut},
ApiHelper,
};
use kuska_ssb::discovery::ssb_net_id;
use kuska_ssb::feed::{is_privatebox, privatebox_decipher, Feed, Message};
@ -25,7 +26,7 @@ use regex::Regex;
use sodiumoxide::crypto::sign::ed25519;
use structopt::StructOpt;
type AnyResult<T> = std::result::Result<T, Box<dyn std::error::Error>>;
type SolarResult<T> = std::result::Result<T, Box<dyn std::error::Error>>;
#[derive(Debug, StructOpt)]
#[structopt(name = "example", about = "An example of StructOpt usage.")]
@ -36,16 +37,16 @@ struct Opt {
connect: Option<String>,
}
pub fn whoami_res_parse(body: &[u8]) -> AnyResult<WhoAmI> {
pub fn whoami_res_parse(body: &[u8]) -> SolarResult<WhoAmIOut> {
Ok(serde_json::from_slice(body)?)
}
pub fn message_res_parse(body: &[u8]) -> AnyResult<Message> {
pub fn message_res_parse(body: &[u8]) -> SolarResult<Message> {
Ok(Message::from_slice(body)?)
}
pub fn feed_res_parse(body: &[u8]) -> AnyResult<Feed> {
pub fn feed_res_parse(body: &[u8]) -> SolarResult<Feed> {
Ok(Feed::from_slice(&body)?)
}
pub fn latest_res_parse(body: &[u8]) -> AnyResult<LatestUserMessage> {
pub fn latest_res_parse(body: &[u8]) -> SolarResult<LatestOut> {
Ok(serde_json::from_slice(body)?)
}
@ -73,11 +74,11 @@ async fn get_async<'a, R, W, T, F>(
client: &mut ApiHelper<R, W>,
req_no: RequestNo,
f: F,
) -> AnyResult<T>
) -> SolarResult<T>
where
R: Read + Unpin,
W: Write + Unpin,
F: Fn(&[u8]) -> AnyResult<T>,
F: Fn(&[u8]) -> SolarResult<T>,
T: Debug,
{
loop {
@ -102,11 +103,11 @@ async fn print_source_until_eof<'a, R, W, T, F>(
client: &mut ApiHelper<R, W>,
req_no: RequestNo,
f: F,
) -> AnyResult<()>
) -> SolarResult<()>
where
R: Read + Unpin,
W: Write + Unpin,
F: Fn(&[u8]) -> AnyResult<T>,
F: Fn(&[u8]) -> SolarResult<T>,
T: Debug + serde::Deserialize<'a>,
{
loop {
@ -131,7 +132,7 @@ where
}
#[async_std::main]
async fn main() -> AnyResult<()> {
async fn main() -> SolarResult<()> {
env_logger::init();
log::set_max_level(log::LevelFilter::max());
@ -226,17 +227,17 @@ async fn main() -> AnyResult<()> {
("user", 2) => {
let user_id = if args[1] == "me" { &whoami } else { &args[1] };
let args = CreateHistoryStreamArgs::new(user_id.clone());
let args = CreateHistoryStreamIn::new(user_id.clone());
let req_id = client.create_history_stream_req_send(&args).await?;
print_source_until_eof(&mut client, req_id, feed_res_parse).await?;
}
("feed", 1) => {
let args = CreateStreamArgs::default();
let req_id = client.send_create_feed_stream(&args).await?;
let args = CreateStreamIn::default();
let req_id = client.create_feed_stream_req_send(&args).await?;
print_source_until_eof(&mut client, req_id, feed_res_parse).await?;
}
("latest", 1) => {
let req_id = client.send_latest().await?;
let req_id = client.latest_req_send().await?;
print_source_until_eof(&mut client, req_id, latest_res_parse).await?;
}
("private", 2) => {
@ -253,7 +254,7 @@ async fn main() -> AnyResult<()> {
return Ok("".to_string());
};
let args = CreateHistoryStreamArgs::new(user_id.clone());
let args = CreateHistoryStreamIn::new(user_id.clone());
let req_id = client.create_history_stream_req_send(&args).await?;
print_source_until_eof(&mut client, req_id, show_private).await?;

35
src/api/dto/blobs.rs Normal file
View File

@ -0,0 +1,35 @@
#[derive(Debug, Serialize, Deserialize)]
pub struct BlobsGetIn {
// key : ID of the blob. Required.
pub key: String,
// size : Expected size of the blob in bytes.
// If the blob is not exactly this size then reject the request. Optional.
pub size: Option<u64>,
// max Maximum size of the blob in bytes. If the blob is larger then reject
// the request. Only makes sense to specify max if you dont already know size. Optional.
pub max: Option<u64>,
}
impl BlobsGetIn {
pub fn new(key: String) -> Self {
Self {
key,
size: None,
max: None,
}
}
pub fn size(self: Self, size: u64) -> Self {
Self {
size: Some(size),
..self
}
}
pub fn max(self: Self, max: u64) -> Self {
Self {
max: Some(max),
..self
}
}
}

8
src/api/dto/error.rs Normal file
View File

@ -0,0 +1,8 @@
/// data transfer objects
#[derive(Debug, Serialize, Deserialize)]
pub struct ErrorOut {
pub name: String,
pub message: String,
pub stack: String,
}

View File

@ -0,0 +1,62 @@
#[derive(Debug, Serialize, Deserialize)]
pub struct CreateHistoryStreamIn {
// id (FeedID, required): The id of the feed to fetch.
pub id: String,
/// (number, default: 0): If seq > 0, then only stream messages with sequence numbers greater than seq.
#[serde(skip_serializing_if = "Option::is_none")]
pub seq: Option<u64>,
/// live (boolean, default: false): Keep the stream open and emit new messages as they are received
#[serde(skip_serializing_if = "Option::is_none")]
pub live: Option<bool>,
/// keys (boolean, default: true): whether the data event should contain keys. If set to true and values set to false then data events will simply be keys, rather than objects with a key property.
#[serde(skip_serializing_if = "Option::is_none")]
pub keys: Option<bool>,
/// values (boolean, default: true): whether the data event should contain values. If set to true and keys set to false then data events will simply be values, rather than objects with a value property.
#[serde(skip_serializing_if = "Option::is_none")]
pub values: Option<bool>,
/// limit (number, default: -1): limit the number of results collected by this stream. This number represents a maximum number of results and may not be reached if you get to the end of the data first. A value of -1 means there is no limit. When reverse=true the highest keys will be returned instead of the lowest keys.
#[serde(skip_serializing_if = "Option::is_none")]
pub limit: Option<i64>,
}
impl CreateHistoryStreamIn {
pub fn new(id: String) -> Self {
Self {
id,
seq: None,
live: None,
keys: None,
values: None,
limit: None,
}
}
pub fn starting_seq(self: Self, seq: u64) -> Self {
Self {
seq: Some(seq),
..self
}
}
pub fn live(self: Self, live: bool) -> Self {
Self {
live: Some(live),
..self
}
}
pub fn keys_values(self: Self, keys: bool, values: bool) -> Self {
Self {
keys: Some(keys),
values: Some(values),
..self
}
}
pub fn limit(self: Self, limit: i64) -> Self {
Self {
limit: Some(limit),
..self
}
}
}

6
src/api/dto/latest.rs Normal file
View File

@ -0,0 +1,6 @@
#[derive(Debug, Serialize, Deserialize)]
pub struct LatestOut {
pub id: String,
pub sequence: u64,
pub ts: f64,
}

14
src/api/dto/mod.rs Normal file
View File

@ -0,0 +1,14 @@
mod blobs;
pub mod content;
mod error;
mod history_stream;
mod latest;
mod stream;
mod whoami;
pub use blobs::*;
pub use error::*;
pub use history_stream::*;
pub use latest::*;
pub use stream::*;
pub use whoami::*;

126
src/api/dto/stream.rs Normal file
View File

@ -0,0 +1,126 @@
// https://github.com/ssbc/ssb-db/blob/master/api.md
#[derive(Debug, Serialize)]
pub struct CreateStreamIn<K> {
/// live (boolean, default: false): Keep the stream open and emit new messages as they are received
#[serde(skip_serializing_if = "Option::is_none")]
pub live: Option<bool>,
/// gt (greater than), gte (greater than or equal) define the lower bound of the range to be streamed
#[serde(skip_serializing_if = "Option::is_none")]
pub gt: Option<K>,
#[serde(skip_serializing_if = "Option::is_none")]
pub gte: Option<K>,
/// lt (less than), lte (less than or equal) define the higher bound of the range to be streamed. Only key/value pairs where the key is less than (or equal to) this option will be included in the range. When reverse=true the order will be reversed, but the records streamed will be the same.
#[serde(skip_serializing_if = "Option::is_none")]
pub lt: Option<K>,
#[serde(skip_serializing_if = "Option::is_none")]
pub lte: Option<K>,
/// reverse (boolean, default: false): a boolean, set true and the stream output will be reversed. Beware that due to the way LevelDB works, a reverse seek will be slower than a forward seek.
#[serde(skip_serializing_if = "Option::is_none")]
pub reverse: Option<bool>,
/// keys (boolean, default: true): whether the data event should contain keys. If set to true and values set to false then data events will simply be keys, rather than objects with a key property.
#[serde(skip_serializing_if = "Option::is_none")]
pub keys: Option<bool>,
/// values (boolean, default: true): whether the data event should contain values. If set to true and keys set to false then data events will simply be values, rather than objects with a value property.
#[serde(skip_serializing_if = "Option::is_none")]
pub values: Option<bool>,
/// limit (number, default: -1): limit the number of results collected by this stream. This number represents a maximum number of results and may not be reached if you get to the end of the data first. A value of -1 means there is no limit. When reverse=true the highest keys will be returned instead of the lowest keys.
#[serde(skip_serializing_if = "Option::is_none")]
pub limit: Option<i64>,
/// fillCache (boolean, default: false): wheather LevelDB's LRU-cache should be filled with data read.
#[serde(rename = "fillCache")]
#[serde(skip_serializing_if = "Option::is_none")]
pub fill_cache: Option<bool>,
/// keyEncoding / valueEncoding (string): the encoding applied to each read piece of data.
#[serde(rename = "keyEncoding")]
pub key_encoding: Option<String>,
#[serde(rename = "valueEncoding")]
pub value_encoding: Option<String>,
}
impl<K> Default for CreateStreamIn<K> {
fn default() -> Self {
Self {
live: None,
gt: None,
gte: None,
lt: None,
lte: None,
reverse: None,
keys: None,
values: None,
limit: None,
fill_cache: None,
key_encoding: None,
value_encoding: None,
}
}
}
impl<K> CreateStreamIn<K> {
pub fn live(self: Self, live: bool) -> Self {
Self {
live: Some(live),
..self
}
}
pub fn gt(self: Self, v: K) -> Self {
Self {
gt: Some(v),
..self
}
}
pub fn gte(self: Self, v: K) -> Self {
Self {
gte: Some(v),
..self
}
}
pub fn lt(self: Self, v: K) -> Self {
Self {
lt: Some(v),
..self
}
}
pub fn lte(self: Self, v: K) -> Self {
Self {
lte: Some(v),
..self
}
}
pub fn reverse(self: Self, reversed: bool) -> Self {
Self {
reverse: Some(reversed),
..self
}
}
pub fn keys_values(self: Self, keys: bool, values: bool) -> Self {
Self {
keys: Some(keys),
values: Some(values),
..self
}
}
pub fn encoding(self: Self, keys: String, values: String) -> Self {
Self {
key_encoding: Some(keys),
value_encoding: Some(values),
..self
}
}
pub fn limit(self: Self, limit: i64) -> Self {
Self {
limit: Some(limit),
..self
}
}
}

5
src/api/dto/whoami.rs Normal file
View File

@ -0,0 +1,5 @@
#[derive(Debug, Serialize, Deserialize)]
pub struct WhoAmIOut {
pub id: String,
}

View File

@ -3,216 +3,10 @@ use crate::rpc::{Body, BodyType, RequestNo, RpcStream, RpcType};
use async_std::io::{Read, Write};
use serde_json;
use super::dto;
use super::error::Result;
#[derive(Debug, Serialize, Deserialize)]
pub struct ErrorRes {
pub name: String,
pub message: String,
pub stack: String,
}
#[derive(Debug, Serialize, Deserialize)]
pub struct WhoAmI {
pub id: String,
}
#[derive(Debug, Serialize, Deserialize)]
pub struct LatestUserMessage {
pub id: String,
pub sequence: u64,
pub ts: f64,
}
// https://github.com/ssbc/ssb-db/blob/master/api.md
#[derive(Debug, Serialize)]
pub struct CreateStreamArgs<K> {
/// live (boolean, default: false): Keep the stream open and emit new messages as they are received
#[serde(skip_serializing_if = "Option::is_none")]
pub live: Option<bool>,
/// gt (greater than), gte (greater than or equal) define the lower bound of the range to be streamed
#[serde(skip_serializing_if = "Option::is_none")]
pub gt: Option<K>,
#[serde(skip_serializing_if = "Option::is_none")]
pub gte: Option<K>,
/// lt (less than), lte (less than or equal) define the higher bound of the range to be streamed. Only key/value pairs where the key is less than (or equal to) this option will be included in the range. When reverse=true the order will be reversed, but the records streamed will be the same.
#[serde(skip_serializing_if = "Option::is_none")]
pub lt: Option<K>,
#[serde(skip_serializing_if = "Option::is_none")]
pub lte: Option<K>,
/// reverse (boolean, default: false): a boolean, set true and the stream output will be reversed. Beware that due to the way LevelDB works, a reverse seek will be slower than a forward seek.
#[serde(skip_serializing_if = "Option::is_none")]
pub reverse: Option<bool>,
/// keys (boolean, default: true): whether the data event should contain keys. If set to true and values set to false then data events will simply be keys, rather than objects with a key property.
#[serde(skip_serializing_if = "Option::is_none")]
pub keys: Option<bool>,
/// values (boolean, default: true): whether the data event should contain values. If set to true and keys set to false then data events will simply be values, rather than objects with a value property.
#[serde(skip_serializing_if = "Option::is_none")]
pub values: Option<bool>,
/// limit (number, default: -1): limit the number of results collected by this stream. This number represents a maximum number of results and may not be reached if you get to the end of the data first. A value of -1 means there is no limit. When reverse=true the highest keys will be returned instead of the lowest keys.
#[serde(skip_serializing_if = "Option::is_none")]
pub limit: Option<i64>,
/// fillCache (boolean, default: false): wheather LevelDB's LRU-cache should be filled with data read.
#[serde(rename = "fillCache")]
#[serde(skip_serializing_if = "Option::is_none")]
pub fill_cache: Option<bool>,
/// keyEncoding / valueEncoding (string): the encoding applied to each read piece of data.
#[serde(rename = "keyEncoding")]
pub key_encoding: Option<String>,
#[serde(rename = "valueEncoding")]
pub value_encoding: Option<String>,
}
impl<K> Default for CreateStreamArgs<K> {
fn default() -> Self {
Self {
live: None,
gt: None,
gte: None,
lt: None,
lte: None,
reverse: None,
keys: None,
values: None,
limit: None,
fill_cache: None,
key_encoding: None,
value_encoding: None,
}
}
}
impl<K> CreateStreamArgs<K> {
pub fn live(self: Self, live: bool) -> Self {
Self {
live: Some(live),
..self
}
}
pub fn gt(self: Self, v: K) -> Self {
Self {
gt: Some(v),
..self
}
}
pub fn gte(self: Self, v: K) -> Self {
Self {
gte: Some(v),
..self
}
}
pub fn lt(self: Self, v: K) -> Self {
Self {
lt: Some(v),
..self
}
}
pub fn lte(self: Self, v: K) -> Self {
Self {
lte: Some(v),
..self
}
}
pub fn reverse(self: Self, reversed: bool) -> Self {
Self {
reverse: Some(reversed),
..self
}
}
pub fn keys_values(self: Self, keys: bool, values: bool) -> Self {
Self {
keys: Some(keys),
values: Some(values),
..self
}
}
pub fn encoding(self: Self, keys: String, values: String) -> Self {
Self {
key_encoding: Some(keys),
value_encoding: Some(values),
..self
}
}
pub fn limit(self: Self, limit: i64) -> Self {
Self {
limit: Some(limit),
..self
}
}
}
#[derive(Debug, Serialize, Deserialize)]
pub struct CreateHistoryStreamArgs {
// id (FeedID, required): The id of the feed to fetch.
pub id: String,
/// (number, default: 0): If seq > 0, then only stream messages with sequence numbers greater than seq.
#[serde(skip_serializing_if = "Option::is_none")]
pub seq: Option<u64>,
/// live (boolean, default: false): Keep the stream open and emit new messages as they are received
#[serde(skip_serializing_if = "Option::is_none")]
pub live: Option<bool>,
/// keys (boolean, default: true): whether the data event should contain keys. If set to true and values set to false then data events will simply be keys, rather than objects with a key property.
#[serde(skip_serializing_if = "Option::is_none")]
pub keys: Option<bool>,
/// values (boolean, default: true): whether the data event should contain values. If set to true and keys set to false then data events will simply be values, rather than objects with a value property.
#[serde(skip_serializing_if = "Option::is_none")]
pub values: Option<bool>,
/// limit (number, default: -1): limit the number of results collected by this stream. This number represents a maximum number of results and may not be reached if you get to the end of the data first. A value of -1 means there is no limit. When reverse=true the highest keys will be returned instead of the lowest keys.
#[serde(skip_serializing_if = "Option::is_none")]
pub limit: Option<i64>,
}
impl CreateHistoryStreamArgs {
pub fn new(id: String) -> Self {
Self {
id,
seq: None,
live: None,
keys: None,
values: None,
limit: None,
}
}
pub fn starting_seq(self: Self, seq: u64) -> Self {
Self {
seq: Some(seq),
..self
}
}
pub fn live(self: Self, live: bool) -> Self {
Self {
live: Some(live),
..self
}
}
pub fn keys_values(self: Self, keys: bool, values: bool) -> Self {
Self {
keys: Some(keys),
values: Some(values),
..self
}
}
pub fn limit(self: Self, limit: i64) -> Self {
Self {
limit: Some(limit),
..self
}
}
}
const MAX_RPC_BODY_LEN: usize = 65536;
#[derive(Debug)]
pub enum ApiMethod {
@ -221,6 +15,8 @@ pub enum ApiMethod {
CreateHistoryStream,
CreateFeedStream,
Latest,
BlobsGet,
BlobsCreateWants,
}
impl ApiMethod {
@ -232,6 +28,8 @@ impl ApiMethod {
CreateHistoryStream => &["createHistoryStream"],
CreateFeedStream => &["createFeedStream"],
Latest => &["latest"],
BlobsGet => &["blobs", "get"],
BlobsCreateWants => &["blobs", "createWants"],
}
}
pub fn from_selector(s: &[&str]) -> Option<Self> {
@ -242,6 +40,8 @@ impl ApiMethod {
["createHistoryStream"] => Some(CreateHistoryStream),
["createFeedStream"] => Some(CreateFeedStream),
["latest"] => Some(Latest),
["blobs", "get"] => Some(BlobsGet),
["blobs", "createWants"] => Some(BlobsCreateWants),
_ => None,
}
}
@ -264,8 +64,7 @@ impl<R: Read + Unpin, W: Write + Unpin> ApiHelper<R, W> {
&mut self.rpc
}
// whoami: sync
// Get information about the current ssb-server user.
/// Send ["whoami"] request.
pub async fn whoami_req_send(&mut self) -> Result<RequestNo> {
let args: [&str; 0] = [];
let req_no = self
@ -274,16 +73,17 @@ impl<R: Read + Unpin, W: Write + Unpin> ApiHelper<R, W> {
.await?;
Ok(req_no)
}
/// Send ["whoami"] response.
pub async fn whoami_res_send(&mut self, req_no: RequestNo, id: String) -> Result<()> {
let body = serde_json::to_string(&WhoAmI { id })?;
let body = serde_json::to_string(&dto::WhoAmIOut { id })?;
Ok(self
.rpc
.send_response(req_no, RpcType::Async, BodyType::JSON, body.as_bytes())
.await?)
}
// get: async
// Get a message by its hash-id. (sould start with %)
/// Send ["get"] request.
pub async fn get_req_send(&mut self, msg_id: &str) -> Result<RequestNo> {
let req_no = self
.rpc
@ -291,6 +91,8 @@ impl<R: Read + Unpin, W: Write + Unpin> ApiHelper<R, W> {
.await?;
Ok(req_no)
}
/// Send ["get"] response.
pub async fn get_res_send(&mut self, req_no: RequestNo, msg: &Message) -> Result<()> {
self.rpc
.send_response(
@ -303,11 +105,10 @@ impl<R: Read + Unpin, W: Write + Unpin> ApiHelper<R, W> {
Ok(())
}
// createHistoryStream: source
// (hist) Fetch messages from a specific user, ordered by sequence numbers.
/// Send ["createHistoryStream"] request.
pub async fn create_history_stream_req_send(
&mut self,
args: &CreateHistoryStreamArgs,
args: &dto::CreateHistoryStreamIn,
) -> Result<RequestNo> {
let req_no = self
.rpc
@ -319,18 +120,11 @@ impl<R: Read + Unpin, W: Write + Unpin> ApiHelper<R, W> {
.await?;
Ok(req_no)
}
pub async fn feed_res_send(&mut self, req_no: RequestNo, feed: &str) -> Result<()> {
self.rpc
.send_response(req_no, RpcType::Source, BodyType::JSON, feed.as_bytes())
.await?;
Ok(())
}
// createFeedStream: source
// (feed) Fetch messages ordered by their claimed timestamps.
pub async fn send_create_feed_stream<'a>(
/// Send ["createFeedStream"] request.
pub async fn create_feed_stream_req_send<'a>(
&mut self,
args: &CreateStreamArgs<u64>,
args: &dto::CreateStreamIn<u64>,
) -> Result<RequestNo> {
let req_no = self
.rpc
@ -343,9 +137,8 @@ impl<R: Read + Unpin, W: Write + Unpin> ApiHelper<R, W> {
Ok(req_no)
}
// latest: source
// Get the seq numbers of the latest messages of all users in the database.
pub async fn send_latest(&mut self) -> Result<RequestNo> {
/// Send ["latest"] request.
pub async fn latest_req_send(&mut self) -> Result<RequestNo> {
let args: [&str; 0] = [];
let req_no = self
.rpc
@ -353,4 +146,59 @@ impl<R: Read + Unpin, W: Write + Unpin> ApiHelper<R, W> {
.await?;
Ok(req_no)
}
/// Send ["blobs","get"] request.
pub async fn blobs_get_req_send(&mut self, args: &dto::BlobsGetIn) -> Result<RequestNo> {
let req_no = self
.rpc
.send_request(ApiMethod::BlobsGet.selector(), RpcType::Source, &args)
.await?;
Ok(req_no)
}
/// Send feed response
pub async fn feed_res_send(&mut self, req_no: RequestNo, feed: &str) -> Result<()> {
self.rpc
.send_response(req_no, RpcType::Source, BodyType::JSON, feed.as_bytes())
.await?;
Ok(())
}
/// Send blob create wants
pub async fn blob_create_wants_req_send(&mut self) -> Result<RequestNo> {
let args: [&str; 0] = [];
let req_no = self
.rpc
.send_request(
ApiMethod::BlobsCreateWants.selector(),
RpcType::Source,
&args,
)
.await?;
Ok(req_no)
}
/// Send blob response
pub async fn blobs_get_res_send<D: AsRef<[u8]>>(
&mut self,
req_no: RequestNo,
data: D,
) -> Result<()> {
let mut offset = 0;
let data = data.as_ref();
while offset < data.len() {
let limit = std::cmp::min(data.len(), offset + MAX_RPC_BODY_LEN);
self.rpc
.send_response(
req_no,
RpcType::Source,
BodyType::Binary,
&data[offset..limit],
)
.await?;
offset += MAX_RPC_BODY_LEN;
}
self.rpc.send_stream_eof(req_no).await?;
Ok(())
}
}

View File

@ -1,8 +1,6 @@
pub mod dto;
mod error;
mod helper;
pub mod msgs;
pub use error::{Error, Result};
pub use helper::{
ApiHelper, ApiMethod, CreateHistoryStreamArgs, CreateStreamArgs, LatestUserMessage, WhoAmI,
};
pub use helper::{ApiHelper, ApiMethod};

View File

@ -1,7 +1,7 @@
#[derive(Debug)]
pub enum Error {
HeaderSizeTooSmall,
InvalidBodyType,
InvalidBodyType(u8),
Io(async_std::io::Error),
Json(serde_json::Error),
}

View File

@ -2,7 +2,7 @@ use super::error::{Error, Result};
use async_std::io;
use async_std::prelude::*;
use log::debug;
use log::{trace, warn};
use kuska_handshake::async_std::{BoxStreamRead, BoxStreamWrite};
@ -72,11 +72,15 @@ impl Header {
let is_stream = (bytes[0] & RPC_HEADER_STREAM_FLAG) == RPC_HEADER_STREAM_FLAG;
let is_end_or_error =
(bytes[0] & RPC_HEADER_END_OR_ERROR_FLAG) == RPC_HEADER_END_OR_ERROR_FLAG;
let body_type = match bytes[0] & RPC_HEADER_BODY_TYPE_MASK {
let body_type_value = bytes[0] & RPC_HEADER_BODY_TYPE_MASK;
let body_type = match body_type_value {
0 => BodyType::Binary,
1 => BodyType::UTF8,
2 => BodyType::JSON,
_ => return Err(Error::InvalidBodyType),
_ => {
warn!("rare message: {}", String::from_utf8_lossy(bytes));
return Err(Error::InvalidBodyType(body_type_value));
}
};
let mut body_len_buff = [0u8; 4];
@ -153,8 +157,7 @@ impl<R: io::Read + Unpin, W: io::Write + Unpin> RpcStream<R, W> {
let mut body_raw: Vec<u8> = vec![0; rpc_header.body_len as usize];
self.box_reader.read_exact(&mut body_raw[..]).await?;
debug!(
"rpc-recv {:?} '{}'",
trace!(target: "ssb-rpc", "recv {:?} '{}'",
rpc_header,
String::from_utf8_lossy(&body_raw[..])
);
@ -207,7 +210,7 @@ impl<R: io::Read + Unpin, W: io::Write + Unpin> RpcStream<R, W> {
body_len: body_str.as_bytes().len() as u32,
};
debug!("rpc-send {:?} '{}'", rpc_header, body_str);
trace!(target: "ssb-rpc", "send {:?} '{}'", rpc_header, body_str);
self.box_writer
.write_all(&rpc_header.to_array()[..])
@ -233,8 +236,8 @@ impl<R: io::Read + Unpin, W: io::Write + Unpin> RpcStream<R, W> {
body_len: body.len() as u32,
};
debug!(
"rpc-send {:?} '{}'",
trace!(target: "ssb-rpc",
"send {:?} '{}'",
rpc_header,
String::from_utf8_lossy(body)
);
@ -273,7 +276,7 @@ impl<R: io::Read + Unpin, W: io::Write + Unpin> RpcStream<R, W> {
body_len: body_bytes.as_bytes().len() as u32,
};
debug!("rpc-send {:?} '{}'", rpc_header, body_bytes);
trace!(target: "ssb-rpc", "send {:?} '{}'", rpc_header, body_bytes);
self.box_writer
.write_all(&rpc_header.to_array()[..])
@ -295,8 +298,8 @@ impl<R: io::Read + Unpin, W: io::Write + Unpin> RpcStream<R, W> {
body_len: body_bytes.len() as u32,
};
debug!(
"rpc-send {:?} '{}'",
trace!(target: "ssb-rpc",
"send {:?} '{}'",
rpc_header,
String::from_utf8_lossy(body_bytes)
);