From be722d5474e6e6a287cda71d3f74b1e9b21e1f9e Mon Sep 17 00:00:00 2001 From: adria0 Date: Sat, 11 Apr 2020 15:39:43 +0200 Subject: [PATCH] 0.1.5 --- Cargo.toml | 2 +- examples/ssb-cli.rs | 33 +-- src/api/dto/blobs.rs | 35 ++++ src/api/{msgs.rs => dto/content.rs} | 0 src/api/dto/error.rs | 8 + src/api/dto/history_stream.rs | 62 ++++++ src/api/dto/latest.rs | 6 + src/api/dto/mod.rs | 14 ++ src/api/dto/stream.rs | 126 ++++++++++++ src/api/dto/whoami.rs | 5 + src/api/helper.rs | 306 +++++++--------------------- src/api/mod.rs | 6 +- src/rpc/error.rs | 2 +- src/rpc/stream.rs | 25 ++- 14 files changed, 368 insertions(+), 262 deletions(-) create mode 100644 src/api/dto/blobs.rs rename src/api/{msgs.rs => dto/content.rs} (100%) create mode 100644 src/api/dto/error.rs create mode 100644 src/api/dto/history_stream.rs create mode 100644 src/api/dto/latest.rs create mode 100644 src/api/dto/mod.rs create mode 100644 src/api/dto/stream.rs create mode 100644 src/api/dto/whoami.rs diff --git a/Cargo.toml b/Cargo.toml index 3f42e48..118505a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "kuska-ssb" -version = "0.1.4" +version = "0.1.5" authors = ["Dhole ", "Adria Massanet "] edition = "2018" diff --git a/examples/ssb-cli.rs b/examples/ssb-cli.rs index 28025a8..7c4df0a 100644 --- a/examples/ssb-cli.rs +++ b/examples/ssb-cli.rs @@ -13,7 +13,8 @@ use async_std::net::{TcpStream, UdpSocket}; use kuska_handshake::async_std::{handshake_client, BoxStream}; use kuska_ssb::api::{ - ApiHelper, CreateHistoryStreamArgs, CreateStreamArgs, LatestUserMessage, WhoAmI, + dto::{CreateHistoryStreamIn, CreateStreamIn, LatestOut, WhoAmIOut}, + ApiHelper, }; use kuska_ssb::discovery::ssb_net_id; use kuska_ssb::feed::{is_privatebox, privatebox_decipher, Feed, Message}; @@ -25,7 +26,7 @@ use regex::Regex; use sodiumoxide::crypto::sign::ed25519; use structopt::StructOpt; -type AnyResult = std::result::Result>; +type SolarResult = std::result::Result>; #[derive(Debug, StructOpt)] #[structopt(name = "example", about = "An example of StructOpt usage.")] @@ -36,16 +37,16 @@ struct Opt { connect: Option, } -pub fn whoami_res_parse(body: &[u8]) -> AnyResult { +pub fn whoami_res_parse(body: &[u8]) -> SolarResult { Ok(serde_json::from_slice(body)?) } -pub fn message_res_parse(body: &[u8]) -> AnyResult { +pub fn message_res_parse(body: &[u8]) -> SolarResult { Ok(Message::from_slice(body)?) } -pub fn feed_res_parse(body: &[u8]) -> AnyResult { +pub fn feed_res_parse(body: &[u8]) -> SolarResult { Ok(Feed::from_slice(&body)?) } -pub fn latest_res_parse(body: &[u8]) -> AnyResult { +pub fn latest_res_parse(body: &[u8]) -> SolarResult { Ok(serde_json::from_slice(body)?) } @@ -73,11 +74,11 @@ async fn get_async<'a, R, W, T, F>( client: &mut ApiHelper, req_no: RequestNo, f: F, -) -> AnyResult +) -> SolarResult where R: Read + Unpin, W: Write + Unpin, - F: Fn(&[u8]) -> AnyResult, + F: Fn(&[u8]) -> SolarResult, T: Debug, { loop { @@ -102,11 +103,11 @@ async fn print_source_until_eof<'a, R, W, T, F>( client: &mut ApiHelper, req_no: RequestNo, f: F, -) -> AnyResult<()> +) -> SolarResult<()> where R: Read + Unpin, W: Write + Unpin, - F: Fn(&[u8]) -> AnyResult, + F: Fn(&[u8]) -> SolarResult, T: Debug + serde::Deserialize<'a>, { loop { @@ -131,7 +132,7 @@ where } #[async_std::main] -async fn main() -> AnyResult<()> { +async fn main() -> SolarResult<()> { env_logger::init(); log::set_max_level(log::LevelFilter::max()); @@ -226,17 +227,17 @@ async fn main() -> AnyResult<()> { ("user", 2) => { let user_id = if args[1] == "me" { &whoami } else { &args[1] }; - let args = CreateHistoryStreamArgs::new(user_id.clone()); + let args = CreateHistoryStreamIn::new(user_id.clone()); let req_id = client.create_history_stream_req_send(&args).await?; print_source_until_eof(&mut client, req_id, feed_res_parse).await?; } ("feed", 1) => { - let args = CreateStreamArgs::default(); - let req_id = client.send_create_feed_stream(&args).await?; + let args = CreateStreamIn::default(); + let req_id = client.create_feed_stream_req_send(&args).await?; print_source_until_eof(&mut client, req_id, feed_res_parse).await?; } ("latest", 1) => { - let req_id = client.send_latest().await?; + let req_id = client.latest_req_send().await?; print_source_until_eof(&mut client, req_id, latest_res_parse).await?; } ("private", 2) => { @@ -253,7 +254,7 @@ async fn main() -> AnyResult<()> { return Ok("".to_string()); }; - let args = CreateHistoryStreamArgs::new(user_id.clone()); + let args = CreateHistoryStreamIn::new(user_id.clone()); let req_id = client.create_history_stream_req_send(&args).await?; print_source_until_eof(&mut client, req_id, show_private).await?; diff --git a/src/api/dto/blobs.rs b/src/api/dto/blobs.rs new file mode 100644 index 0000000..fdb17fa --- /dev/null +++ b/src/api/dto/blobs.rs @@ -0,0 +1,35 @@ +#[derive(Debug, Serialize, Deserialize)] +pub struct BlobsGetIn { + // key : ID of the blob. Required. + pub key: String, + + // size : Expected size of the blob in bytes. + // If the blob is not exactly this size then reject the request. Optional. + pub size: Option, + + // max Maximum size of the blob in bytes. If the blob is larger then reject + // the request. Only makes sense to specify max if you don’t already know size. Optional. + pub max: Option, +} + +impl BlobsGetIn { + pub fn new(key: String) -> Self { + Self { + key, + size: None, + max: None, + } + } + pub fn size(self: Self, size: u64) -> Self { + Self { + size: Some(size), + ..self + } + } + pub fn max(self: Self, max: u64) -> Self { + Self { + max: Some(max), + ..self + } + } +} diff --git a/src/api/msgs.rs b/src/api/dto/content.rs similarity index 100% rename from src/api/msgs.rs rename to src/api/dto/content.rs diff --git a/src/api/dto/error.rs b/src/api/dto/error.rs new file mode 100644 index 0000000..a148d57 --- /dev/null +++ b/src/api/dto/error.rs @@ -0,0 +1,8 @@ +/// data transfer objects + +#[derive(Debug, Serialize, Deserialize)] +pub struct ErrorOut { + pub name: String, + pub message: String, + pub stack: String, +} diff --git a/src/api/dto/history_stream.rs b/src/api/dto/history_stream.rs new file mode 100644 index 0000000..eb429d0 --- /dev/null +++ b/src/api/dto/history_stream.rs @@ -0,0 +1,62 @@ +#[derive(Debug, Serialize, Deserialize)] +pub struct CreateHistoryStreamIn { + // id (FeedID, required): The id of the feed to fetch. + pub id: String, + + /// (number, default: 0): If seq > 0, then only stream messages with sequence numbers greater than seq. + #[serde(skip_serializing_if = "Option::is_none")] + pub seq: Option, + + /// live (boolean, default: false): Keep the stream open and emit new messages as they are received + #[serde(skip_serializing_if = "Option::is_none")] + pub live: Option, + /// keys (boolean, default: true): whether the data event should contain keys. If set to true and values set to false then data events will simply be keys, rather than objects with a key property. + #[serde(skip_serializing_if = "Option::is_none")] + pub keys: Option, + + /// values (boolean, default: true): whether the data event should contain values. If set to true and keys set to false then data events will simply be values, rather than objects with a value property. + #[serde(skip_serializing_if = "Option::is_none")] + pub values: Option, + + /// limit (number, default: -1): limit the number of results collected by this stream. This number represents a maximum number of results and may not be reached if you get to the end of the data first. A value of -1 means there is no limit. When reverse=true the highest keys will be returned instead of the lowest keys. + #[serde(skip_serializing_if = "Option::is_none")] + pub limit: Option, +} + +impl CreateHistoryStreamIn { + pub fn new(id: String) -> Self { + Self { + id, + seq: None, + live: None, + keys: None, + values: None, + limit: None, + } + } + pub fn starting_seq(self: Self, seq: u64) -> Self { + Self { + seq: Some(seq), + ..self + } + } + pub fn live(self: Self, live: bool) -> Self { + Self { + live: Some(live), + ..self + } + } + pub fn keys_values(self: Self, keys: bool, values: bool) -> Self { + Self { + keys: Some(keys), + values: Some(values), + ..self + } + } + pub fn limit(self: Self, limit: i64) -> Self { + Self { + limit: Some(limit), + ..self + } + } +} diff --git a/src/api/dto/latest.rs b/src/api/dto/latest.rs new file mode 100644 index 0000000..f465400 --- /dev/null +++ b/src/api/dto/latest.rs @@ -0,0 +1,6 @@ +#[derive(Debug, Serialize, Deserialize)] +pub struct LatestOut { + pub id: String, + pub sequence: u64, + pub ts: f64, +} diff --git a/src/api/dto/mod.rs b/src/api/dto/mod.rs new file mode 100644 index 0000000..e2d7711 --- /dev/null +++ b/src/api/dto/mod.rs @@ -0,0 +1,14 @@ +mod blobs; +pub mod content; +mod error; +mod history_stream; +mod latest; +mod stream; +mod whoami; + +pub use blobs::*; +pub use error::*; +pub use history_stream::*; +pub use latest::*; +pub use stream::*; +pub use whoami::*; diff --git a/src/api/dto/stream.rs b/src/api/dto/stream.rs new file mode 100644 index 0000000..b1a8d05 --- /dev/null +++ b/src/api/dto/stream.rs @@ -0,0 +1,126 @@ +// https://github.com/ssbc/ssb-db/blob/master/api.md +#[derive(Debug, Serialize)] +pub struct CreateStreamIn { + /// live (boolean, default: false): Keep the stream open and emit new messages as they are received + #[serde(skip_serializing_if = "Option::is_none")] + pub live: Option, + + /// gt (greater than), gte (greater than or equal) define the lower bound of the range to be streamed + #[serde(skip_serializing_if = "Option::is_none")] + pub gt: Option, + + #[serde(skip_serializing_if = "Option::is_none")] + pub gte: Option, + + /// lt (less than), lte (less than or equal) define the higher bound of the range to be streamed. Only key/value pairs where the key is less than (or equal to) this option will be included in the range. When reverse=true the order will be reversed, but the records streamed will be the same. + #[serde(skip_serializing_if = "Option::is_none")] + pub lt: Option, + + #[serde(skip_serializing_if = "Option::is_none")] + pub lte: Option, + + /// reverse (boolean, default: false): a boolean, set true and the stream output will be reversed. Beware that due to the way LevelDB works, a reverse seek will be slower than a forward seek. + #[serde(skip_serializing_if = "Option::is_none")] + pub reverse: Option, + + /// keys (boolean, default: true): whether the data event should contain keys. If set to true and values set to false then data events will simply be keys, rather than objects with a key property. + #[serde(skip_serializing_if = "Option::is_none")] + pub keys: Option, + + /// values (boolean, default: true): whether the data event should contain values. If set to true and keys set to false then data events will simply be values, rather than objects with a value property. + #[serde(skip_serializing_if = "Option::is_none")] + pub values: Option, + + /// limit (number, default: -1): limit the number of results collected by this stream. This number represents a maximum number of results and may not be reached if you get to the end of the data first. A value of -1 means there is no limit. When reverse=true the highest keys will be returned instead of the lowest keys. + #[serde(skip_serializing_if = "Option::is_none")] + pub limit: Option, + + /// fillCache (boolean, default: false): wheather LevelDB's LRU-cache should be filled with data read. + #[serde(rename = "fillCache")] + #[serde(skip_serializing_if = "Option::is_none")] + pub fill_cache: Option, + /// keyEncoding / valueEncoding (string): the encoding applied to each read piece of data. + #[serde(rename = "keyEncoding")] + pub key_encoding: Option, + + #[serde(rename = "valueEncoding")] + pub value_encoding: Option, +} + +impl Default for CreateStreamIn { + fn default() -> Self { + Self { + live: None, + gt: None, + gte: None, + lt: None, + lte: None, + reverse: None, + keys: None, + values: None, + limit: None, + fill_cache: None, + key_encoding: None, + value_encoding: None, + } + } +} + +impl CreateStreamIn { + pub fn live(self: Self, live: bool) -> Self { + Self { + live: Some(live), + ..self + } + } + pub fn gt(self: Self, v: K) -> Self { + Self { + gt: Some(v), + ..self + } + } + pub fn gte(self: Self, v: K) -> Self { + Self { + gte: Some(v), + ..self + } + } + pub fn lt(self: Self, v: K) -> Self { + Self { + lt: Some(v), + ..self + } + } + pub fn lte(self: Self, v: K) -> Self { + Self { + lte: Some(v), + ..self + } + } + pub fn reverse(self: Self, reversed: bool) -> Self { + Self { + reverse: Some(reversed), + ..self + } + } + pub fn keys_values(self: Self, keys: bool, values: bool) -> Self { + Self { + keys: Some(keys), + values: Some(values), + ..self + } + } + pub fn encoding(self: Self, keys: String, values: String) -> Self { + Self { + key_encoding: Some(keys), + value_encoding: Some(values), + ..self + } + } + pub fn limit(self: Self, limit: i64) -> Self { + Self { + limit: Some(limit), + ..self + } + } +} diff --git a/src/api/dto/whoami.rs b/src/api/dto/whoami.rs new file mode 100644 index 0000000..a56125f --- /dev/null +++ b/src/api/dto/whoami.rs @@ -0,0 +1,5 @@ +#[derive(Debug, Serialize, Deserialize)] + +pub struct WhoAmIOut { + pub id: String, +} diff --git a/src/api/helper.rs b/src/api/helper.rs index df52059..2e00e4b 100644 --- a/src/api/helper.rs +++ b/src/api/helper.rs @@ -3,216 +3,10 @@ use crate::rpc::{Body, BodyType, RequestNo, RpcStream, RpcType}; use async_std::io::{Read, Write}; use serde_json; +use super::dto; use super::error::Result; -#[derive(Debug, Serialize, Deserialize)] -pub struct ErrorRes { - pub name: String, - pub message: String, - pub stack: String, -} - -#[derive(Debug, Serialize, Deserialize)] -pub struct WhoAmI { - pub id: String, -} - -#[derive(Debug, Serialize, Deserialize)] -pub struct LatestUserMessage { - pub id: String, - pub sequence: u64, - pub ts: f64, -} - -// https://github.com/ssbc/ssb-db/blob/master/api.md -#[derive(Debug, Serialize)] -pub struct CreateStreamArgs { - /// live (boolean, default: false): Keep the stream open and emit new messages as they are received - #[serde(skip_serializing_if = "Option::is_none")] - pub live: Option, - - /// gt (greater than), gte (greater than or equal) define the lower bound of the range to be streamed - #[serde(skip_serializing_if = "Option::is_none")] - pub gt: Option, - - #[serde(skip_serializing_if = "Option::is_none")] - pub gte: Option, - - /// lt (less than), lte (less than or equal) define the higher bound of the range to be streamed. Only key/value pairs where the key is less than (or equal to) this option will be included in the range. When reverse=true the order will be reversed, but the records streamed will be the same. - #[serde(skip_serializing_if = "Option::is_none")] - pub lt: Option, - - #[serde(skip_serializing_if = "Option::is_none")] - pub lte: Option, - - /// reverse (boolean, default: false): a boolean, set true and the stream output will be reversed. Beware that due to the way LevelDB works, a reverse seek will be slower than a forward seek. - #[serde(skip_serializing_if = "Option::is_none")] - pub reverse: Option, - - /// keys (boolean, default: true): whether the data event should contain keys. If set to true and values set to false then data events will simply be keys, rather than objects with a key property. - #[serde(skip_serializing_if = "Option::is_none")] - pub keys: Option, - - /// values (boolean, default: true): whether the data event should contain values. If set to true and keys set to false then data events will simply be values, rather than objects with a value property. - #[serde(skip_serializing_if = "Option::is_none")] - pub values: Option, - - /// limit (number, default: -1): limit the number of results collected by this stream. This number represents a maximum number of results and may not be reached if you get to the end of the data first. A value of -1 means there is no limit. When reverse=true the highest keys will be returned instead of the lowest keys. - #[serde(skip_serializing_if = "Option::is_none")] - pub limit: Option, - - /// fillCache (boolean, default: false): wheather LevelDB's LRU-cache should be filled with data read. - #[serde(rename = "fillCache")] - #[serde(skip_serializing_if = "Option::is_none")] - pub fill_cache: Option, - /// keyEncoding / valueEncoding (string): the encoding applied to each read piece of data. - #[serde(rename = "keyEncoding")] - pub key_encoding: Option, - - #[serde(rename = "valueEncoding")] - pub value_encoding: Option, -} - -impl Default for CreateStreamArgs { - fn default() -> Self { - Self { - live: None, - gt: None, - gte: None, - lt: None, - lte: None, - reverse: None, - keys: None, - values: None, - limit: None, - fill_cache: None, - key_encoding: None, - value_encoding: None, - } - } -} - -impl CreateStreamArgs { - pub fn live(self: Self, live: bool) -> Self { - Self { - live: Some(live), - ..self - } - } - pub fn gt(self: Self, v: K) -> Self { - Self { - gt: Some(v), - ..self - } - } - pub fn gte(self: Self, v: K) -> Self { - Self { - gte: Some(v), - ..self - } - } - pub fn lt(self: Self, v: K) -> Self { - Self { - lt: Some(v), - ..self - } - } - pub fn lte(self: Self, v: K) -> Self { - Self { - lte: Some(v), - ..self - } - } - pub fn reverse(self: Self, reversed: bool) -> Self { - Self { - reverse: Some(reversed), - ..self - } - } - pub fn keys_values(self: Self, keys: bool, values: bool) -> Self { - Self { - keys: Some(keys), - values: Some(values), - ..self - } - } - pub fn encoding(self: Self, keys: String, values: String) -> Self { - Self { - key_encoding: Some(keys), - value_encoding: Some(values), - ..self - } - } - pub fn limit(self: Self, limit: i64) -> Self { - Self { - limit: Some(limit), - ..self - } - } -} - -#[derive(Debug, Serialize, Deserialize)] -pub struct CreateHistoryStreamArgs { - // id (FeedID, required): The id of the feed to fetch. - pub id: String, - - /// (number, default: 0): If seq > 0, then only stream messages with sequence numbers greater than seq. - #[serde(skip_serializing_if = "Option::is_none")] - pub seq: Option, - - /// live (boolean, default: false): Keep the stream open and emit new messages as they are received - #[serde(skip_serializing_if = "Option::is_none")] - pub live: Option, - /// keys (boolean, default: true): whether the data event should contain keys. If set to true and values set to false then data events will simply be keys, rather than objects with a key property. - #[serde(skip_serializing_if = "Option::is_none")] - pub keys: Option, - - /// values (boolean, default: true): whether the data event should contain values. If set to true and keys set to false then data events will simply be values, rather than objects with a value property. - #[serde(skip_serializing_if = "Option::is_none")] - pub values: Option, - - /// limit (number, default: -1): limit the number of results collected by this stream. This number represents a maximum number of results and may not be reached if you get to the end of the data first. A value of -1 means there is no limit. When reverse=true the highest keys will be returned instead of the lowest keys. - #[serde(skip_serializing_if = "Option::is_none")] - pub limit: Option, -} - -impl CreateHistoryStreamArgs { - pub fn new(id: String) -> Self { - Self { - id, - seq: None, - live: None, - keys: None, - values: None, - limit: None, - } - } - pub fn starting_seq(self: Self, seq: u64) -> Self { - Self { - seq: Some(seq), - ..self - } - } - pub fn live(self: Self, live: bool) -> Self { - Self { - live: Some(live), - ..self - } - } - pub fn keys_values(self: Self, keys: bool, values: bool) -> Self { - Self { - keys: Some(keys), - values: Some(values), - ..self - } - } - pub fn limit(self: Self, limit: i64) -> Self { - Self { - limit: Some(limit), - ..self - } - } -} +const MAX_RPC_BODY_LEN: usize = 65536; #[derive(Debug)] pub enum ApiMethod { @@ -221,6 +15,8 @@ pub enum ApiMethod { CreateHistoryStream, CreateFeedStream, Latest, + BlobsGet, + BlobsCreateWants, } impl ApiMethod { @@ -232,6 +28,8 @@ impl ApiMethod { CreateHistoryStream => &["createHistoryStream"], CreateFeedStream => &["createFeedStream"], Latest => &["latest"], + BlobsGet => &["blobs", "get"], + BlobsCreateWants => &["blobs", "createWants"], } } pub fn from_selector(s: &[&str]) -> Option { @@ -242,6 +40,8 @@ impl ApiMethod { ["createHistoryStream"] => Some(CreateHistoryStream), ["createFeedStream"] => Some(CreateFeedStream), ["latest"] => Some(Latest), + ["blobs", "get"] => Some(BlobsGet), + ["blobs", "createWants"] => Some(BlobsCreateWants), _ => None, } } @@ -264,8 +64,7 @@ impl ApiHelper { &mut self.rpc } - // whoami: sync - // Get information about the current ssb-server user. + /// Send ["whoami"] request. pub async fn whoami_req_send(&mut self) -> Result { let args: [&str; 0] = []; let req_no = self @@ -274,16 +73,17 @@ impl ApiHelper { .await?; Ok(req_no) } + + /// Send ["whoami"] response. pub async fn whoami_res_send(&mut self, req_no: RequestNo, id: String) -> Result<()> { - let body = serde_json::to_string(&WhoAmI { id })?; + let body = serde_json::to_string(&dto::WhoAmIOut { id })?; Ok(self .rpc .send_response(req_no, RpcType::Async, BodyType::JSON, body.as_bytes()) .await?) } - // get: async - // Get a message by its hash-id. (sould start with %) + /// Send ["get"] request. pub async fn get_req_send(&mut self, msg_id: &str) -> Result { let req_no = self .rpc @@ -291,6 +91,8 @@ impl ApiHelper { .await?; Ok(req_no) } + + /// Send ["get"] response. pub async fn get_res_send(&mut self, req_no: RequestNo, msg: &Message) -> Result<()> { self.rpc .send_response( @@ -303,11 +105,10 @@ impl ApiHelper { Ok(()) } - // createHistoryStream: source - // (hist) Fetch messages from a specific user, ordered by sequence numbers. + /// Send ["createHistoryStream"] request. pub async fn create_history_stream_req_send( &mut self, - args: &CreateHistoryStreamArgs, + args: &dto::CreateHistoryStreamIn, ) -> Result { let req_no = self .rpc @@ -319,18 +120,11 @@ impl ApiHelper { .await?; Ok(req_no) } - pub async fn feed_res_send(&mut self, req_no: RequestNo, feed: &str) -> Result<()> { - self.rpc - .send_response(req_no, RpcType::Source, BodyType::JSON, feed.as_bytes()) - .await?; - Ok(()) - } - // createFeedStream: source - // (feed) Fetch messages ordered by their claimed timestamps. - pub async fn send_create_feed_stream<'a>( + /// Send ["createFeedStream"] request. + pub async fn create_feed_stream_req_send<'a>( &mut self, - args: &CreateStreamArgs, + args: &dto::CreateStreamIn, ) -> Result { let req_no = self .rpc @@ -343,9 +137,8 @@ impl ApiHelper { Ok(req_no) } - // latest: source - // Get the seq numbers of the latest messages of all users in the database. - pub async fn send_latest(&mut self) -> Result { + /// Send ["latest"] request. + pub async fn latest_req_send(&mut self) -> Result { let args: [&str; 0] = []; let req_no = self .rpc @@ -353,4 +146,59 @@ impl ApiHelper { .await?; Ok(req_no) } + + /// Send ["blobs","get"] request. + pub async fn blobs_get_req_send(&mut self, args: &dto::BlobsGetIn) -> Result { + let req_no = self + .rpc + .send_request(ApiMethod::BlobsGet.selector(), RpcType::Source, &args) + .await?; + Ok(req_no) + } + + /// Send feed response + pub async fn feed_res_send(&mut self, req_no: RequestNo, feed: &str) -> Result<()> { + self.rpc + .send_response(req_no, RpcType::Source, BodyType::JSON, feed.as_bytes()) + .await?; + Ok(()) + } + + /// Send blob create wants + pub async fn blob_create_wants_req_send(&mut self) -> Result { + let args: [&str; 0] = []; + let req_no = self + .rpc + .send_request( + ApiMethod::BlobsCreateWants.selector(), + RpcType::Source, + &args, + ) + .await?; + Ok(req_no) + } + + /// Send blob response + pub async fn blobs_get_res_send>( + &mut self, + req_no: RequestNo, + data: D, + ) -> Result<()> { + let mut offset = 0; + let data = data.as_ref(); + while offset < data.len() { + let limit = std::cmp::min(data.len(), offset + MAX_RPC_BODY_LEN); + self.rpc + .send_response( + req_no, + RpcType::Source, + BodyType::Binary, + &data[offset..limit], + ) + .await?; + offset += MAX_RPC_BODY_LEN; + } + self.rpc.send_stream_eof(req_no).await?; + Ok(()) + } } diff --git a/src/api/mod.rs b/src/api/mod.rs index fbb83dc..3bc1600 100644 --- a/src/api/mod.rs +++ b/src/api/mod.rs @@ -1,8 +1,6 @@ +pub mod dto; mod error; mod helper; -pub mod msgs; pub use error::{Error, Result}; -pub use helper::{ - ApiHelper, ApiMethod, CreateHistoryStreamArgs, CreateStreamArgs, LatestUserMessage, WhoAmI, -}; +pub use helper::{ApiHelper, ApiMethod}; diff --git a/src/rpc/error.rs b/src/rpc/error.rs index c3f17ea..9148f1e 100644 --- a/src/rpc/error.rs +++ b/src/rpc/error.rs @@ -1,7 +1,7 @@ #[derive(Debug)] pub enum Error { HeaderSizeTooSmall, - InvalidBodyType, + InvalidBodyType(u8), Io(async_std::io::Error), Json(serde_json::Error), } diff --git a/src/rpc/stream.rs b/src/rpc/stream.rs index 54d1aa7..731b61a 100644 --- a/src/rpc/stream.rs +++ b/src/rpc/stream.rs @@ -2,7 +2,7 @@ use super::error::{Error, Result}; use async_std::io; use async_std::prelude::*; -use log::debug; +use log::{trace, warn}; use kuska_handshake::async_std::{BoxStreamRead, BoxStreamWrite}; @@ -72,11 +72,15 @@ impl Header { let is_stream = (bytes[0] & RPC_HEADER_STREAM_FLAG) == RPC_HEADER_STREAM_FLAG; let is_end_or_error = (bytes[0] & RPC_HEADER_END_OR_ERROR_FLAG) == RPC_HEADER_END_OR_ERROR_FLAG; - let body_type = match bytes[0] & RPC_HEADER_BODY_TYPE_MASK { + let body_type_value = bytes[0] & RPC_HEADER_BODY_TYPE_MASK; + let body_type = match body_type_value { 0 => BodyType::Binary, 1 => BodyType::UTF8, 2 => BodyType::JSON, - _ => return Err(Error::InvalidBodyType), + _ => { + warn!("rare message: {}", String::from_utf8_lossy(bytes)); + return Err(Error::InvalidBodyType(body_type_value)); + } }; let mut body_len_buff = [0u8; 4]; @@ -153,8 +157,7 @@ impl RpcStream { let mut body_raw: Vec = vec![0; rpc_header.body_len as usize]; self.box_reader.read_exact(&mut body_raw[..]).await?; - debug!( - "rpc-recv {:?} '{}'", + trace!(target: "ssb-rpc", "recv {:?} '{}'", rpc_header, String::from_utf8_lossy(&body_raw[..]) ); @@ -207,7 +210,7 @@ impl RpcStream { body_len: body_str.as_bytes().len() as u32, }; - debug!("rpc-send {:?} '{}'", rpc_header, body_str); + trace!(target: "ssb-rpc", "send {:?} '{}'", rpc_header, body_str); self.box_writer .write_all(&rpc_header.to_array()[..]) @@ -233,8 +236,8 @@ impl RpcStream { body_len: body.len() as u32, }; - debug!( - "rpc-send {:?} '{}'", + trace!(target: "ssb-rpc", + "send {:?} '{}'", rpc_header, String::from_utf8_lossy(body) ); @@ -273,7 +276,7 @@ impl RpcStream { body_len: body_bytes.as_bytes().len() as u32, }; - debug!("rpc-send {:?} '{}'", rpc_header, body_bytes); + trace!(target: "ssb-rpc", "send {:?} '{}'", rpc_header, body_bytes); self.box_writer .write_all(&rpc_header.to_array()[..]) @@ -295,8 +298,8 @@ impl RpcStream { body_len: body_bytes.len() as u32, }; - debug!( - "rpc-send {:?} '{}'", + trace!(target: "ssb-rpc", + "send {:?} '{}'", rpc_header, String::from_utf8_lossy(body_bytes) );