diff --git a/Cargo.toml b/Cargo.toml index 118505a..c3668f0 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -8,11 +8,11 @@ edition = "2018" name = "kuska_ssb" [dependencies] -kuska-handshake = { git = "https://github.com/Kuska-ssb/kuska-handshake", branch = "master" , features=["sync","async_std"] } +kuska-handshake = { path = "../kuska-handshake", branch = "master" , features=["sync","async_std"] } sodiumoxide = { git = "https://github.com/Dhole/sodiumoxidez", branch = "extra" } base64 = "0.11.0" hex = "0.4.0" -async-std = { version = "1.4.0", features=["unstable","attributes"] } +async-std = { version = "1.5.0", features=["unstable","attributes"] } crossbeam = "0.7.3" log = "0.4.8" env_logger = "0.7.1" diff --git a/examples/ssb-cli.rs b/examples/ssb-cli.rs index 7c4df0a..ed9dcbd 100644 --- a/examples/ssb-cli.rs +++ b/examples/ssb-cli.rs @@ -20,7 +20,7 @@ use kuska_ssb::discovery::ssb_net_id; use kuska_ssb::feed::{is_privatebox, privatebox_decipher, Feed, Message}; use kuska_ssb::keystore::from_patchwork_local; use kuska_ssb::keystore::OwnedIdentity; -use kuska_ssb::rpc::{RecvMsg, RequestNo, RpcStream}; +use kuska_ssb::rpc::{RecvMsg, RequestNo, RpcStreamReader, RpcStreamWriter}; use regex::Regex; use sodiumoxide::crypto::sign::ed25519; @@ -71,7 +71,8 @@ impl std::fmt::Display for AppError { } async fn get_async<'a, R, W, T, F>( - client: &mut ApiHelper, + rpc_reader : &mut RpcStreamReader, + client: &mut ApiHelper, req_no: RequestNo, f: F, ) -> SolarResult @@ -82,7 +83,7 @@ where T: Debug, { loop { - let (id, msg) = client.rpc().recv().await?; + let (id, msg) = rpc_reader.recv().await?; if id == req_no { match msg { RecvMsg::RpcResponse(_type, body) => { @@ -100,7 +101,8 @@ where } async fn print_source_until_eof<'a, R, W, T, F>( - client: &mut ApiHelper, + rpc_reader : &mut RpcStreamReader, + client: &mut ApiHelper, req_no: RequestNo, f: F, ) -> SolarResult<()> @@ -111,7 +113,7 @@ where T: Debug + serde::Deserialize<'a>, { loop { - let (id, msg) = client.rpc().recv().await?; + let (id, msg) = rpc_reader.recv().await?; if id == req_no { match msg { RecvMsg::RpcResponse(_type, body) => { @@ -189,10 +191,11 @@ async fn main() -> SolarResult<()> { let (box_stream_read, box_stream_write) = BoxStream::from_handshake(&socket, &socket, handshake, 0x8000).split_read_write(); - let mut client = ApiHelper::new(RpcStream::new(box_stream_read, box_stream_write)); + let mut rpc_reader = RpcStreamReader::new(box_stream_read); + let mut client = ApiHelper::new(RpcStreamWriter::new(box_stream_write)); let req_id = client.whoami_req_send().await?; - let whoami = get_async(&mut client, req_id, whoami_res_parse).await?.id; + let whoami = get_async(&mut rpc_reader, &mut client, req_id, whoami_res_parse).await?.id; println!("😊 server says hello to {}", whoami); @@ -211,7 +214,7 @@ async fn main() -> SolarResult<()> { } ("whoami", 1) => { let req_id = client.whoami_req_send().await?; - let whoami = get_async(&mut client, req_id, whoami_res_parse).await?.id; + let whoami = get_async(&mut rpc_reader, &mut client, req_id, whoami_res_parse).await?.id; println!("{}", whoami); } ("get", 2) => { @@ -221,7 +224,7 @@ async fn main() -> SolarResult<()> { args[1].clone() }; let req_id = client.get_req_send(&msg_id).await?; - let msg = get_async(&mut client, req_id, message_res_parse).await?; + let msg = get_async(&mut rpc_reader, &mut client, req_id, message_res_parse).await?; println!("{:?}", msg); } ("user", 2) => { @@ -229,16 +232,16 @@ async fn main() -> SolarResult<()> { let args = CreateHistoryStreamIn::new(user_id.clone()); let req_id = client.create_history_stream_req_send(&args).await?; - print_source_until_eof(&mut client, req_id, feed_res_parse).await?; + print_source_until_eof(&mut rpc_reader, &mut client, req_id, feed_res_parse).await?; } ("feed", 1) => { let args = CreateStreamIn::default(); let req_id = client.create_feed_stream_req_send(&args).await?; - print_source_until_eof(&mut client, req_id, feed_res_parse).await?; + print_source_until_eof(&mut rpc_reader, &mut client, req_id, feed_res_parse).await?; } ("latest", 1) => { let req_id = client.latest_req_send().await?; - print_source_until_eof(&mut client, req_id, latest_res_parse).await?; + print_source_until_eof(&mut rpc_reader, &mut client, req_id, latest_res_parse).await?; } ("private", 2) => { let user_id = if args[1] == "me" { &whoami } else { &args[1] }; @@ -257,7 +260,7 @@ async fn main() -> SolarResult<()> { let args = CreateHistoryStreamIn::new(user_id.clone()); let req_id = client.create_history_stream_req_send(&args).await?; - print_source_until_eof(&mut client, req_id, show_private).await?; + print_source_until_eof(&mut rpc_reader, &mut client, req_id, show_private).await?; } _ => println!("unknown command {}", line_buffer), } diff --git a/src/api/dto/content.rs b/src/api/dto/content.rs index a15ef9c..88146b7 100644 --- a/src/api/dto/content.rs +++ b/src/api/dto/content.rs @@ -96,7 +96,10 @@ pub enum TypedMessage { #[serde(rename = "pub")] Pub { address: Option }, #[serde(rename = "post")] - Post { post: Post }, + Post { + text: String, + mentions: Option>, + }, #[serde(rename = "contact")] Contact { contact: Option, diff --git a/src/api/dto/history_stream.rs b/src/api/dto/history_stream.rs index eb429d0..d64bba3 100644 --- a/src/api/dto/history_stream.rs +++ b/src/api/dto/history_stream.rs @@ -34,7 +34,7 @@ impl CreateHistoryStreamIn { limit: None, } } - pub fn starting_seq(self: Self, seq: u64) -> Self { + pub fn after_seq(self: Self, seq: u64) -> Self { Self { seq: Some(seq), ..self diff --git a/src/api/helper.rs b/src/api/helper.rs index 2e00e4b..34aa757 100644 --- a/src/api/helper.rs +++ b/src/api/helper.rs @@ -1,7 +1,6 @@ use crate::feed::Message; -use crate::rpc::{Body, BodyType, RequestNo, RpcStream, RpcType}; -use async_std::io::{Read, Write}; -use serde_json; +use crate::rpc::{Body, BodyType, RequestNo, RpcStreamWriter, RpcType}; +use async_std::io::Write; use super::dto; use super::error::Result; @@ -51,16 +50,16 @@ impl ApiMethod { } } -pub struct ApiHelper { - rpc: RpcStream, +pub struct ApiHelper { + rpc: RpcStreamWriter, } -impl ApiHelper { - pub fn new(rpc: RpcStream) -> Self { +impl ApiHelper { + pub fn new(rpc: RpcStreamWriter) -> Self { Self { rpc } } - pub fn rpc(&mut self) -> &mut RpcStream { + pub fn rpc(&mut self) -> &mut RpcStreamWriter { &mut self.rpc } diff --git a/src/crypto/sodium.rs b/src/crypto/sodium.rs index d7d712b..ef007cb 100644 --- a/src/crypto/sodium.rs +++ b/src/crypto/sodium.rs @@ -1,4 +1,3 @@ -use base64; use sodiumoxide::crypto::hash::sha256; use sodiumoxide::crypto::sign::ed25519; @@ -33,6 +32,12 @@ impl<'a> ToSsbId for ed25519::SecretKey { } } +impl<'a> ToSsbId for sha256::Digest { + fn to_ssb_id(&self) -> String { + format!("{}{}", base64::encode(self), SHA256_SUFFIX) + } +} + impl ToSodiumObject for str { fn to_ed25519_pk(self: &str) -> Result { if !self.ends_with(CURVE_ED25519_SUFFIX) { diff --git a/src/rpc/mod.rs b/src/rpc/mod.rs index 8119efe..0129768 100644 --- a/src/rpc/mod.rs +++ b/src/rpc/mod.rs @@ -2,4 +2,4 @@ mod error; mod stream; pub use error::{Error, Result}; -pub use stream::{Body, BodyType, RecvMsg, RequestNo, RpcStream, RpcType}; +pub use stream::{Body, BodyType, RecvMsg, RequestNo, RpcStreamReader,RpcStreamWriter, RpcType}; diff --git a/src/rpc/stream.rs b/src/rpc/stream.rs index 533bcbd..92c1f04 100644 --- a/src/rpc/stream.rs +++ b/src/rpc/stream.rs @@ -3,6 +3,8 @@ use super::error::{Error, Result}; use async_std::io; use async_std::prelude::*; use log::{trace, warn}; +use std::pin::Pin; +use std::task::{Context,Poll}; use kuska_handshake::async_std::{BoxStreamRead, BoxStreamWrite}; @@ -125,8 +127,11 @@ impl Header { } } -pub struct RpcStream { +pub struct RpcStreamReader { box_reader: BoxStreamRead, +} + +pub struct RpcStreamWriter { box_writer: BoxStreamWrite, req_no: RequestNo, } @@ -140,12 +145,10 @@ pub enum RecvMsg { CancelStreamRespose(), } -impl RpcStream { - pub fn new(box_reader: BoxStreamRead, box_writer: BoxStreamWrite) -> RpcStream { - RpcStream { - box_reader, - box_writer, - req_no: 0, +impl RpcStreamReader { + pub fn new(box_reader: BoxStreamRead) -> RpcStreamReader { + RpcStreamReader { + box_reader } } @@ -187,6 +190,22 @@ impl RpcStream { )) } } +} +/* +impl Stream for RpcStreamReader { + type Item = (RequestNo, RecvMsg); + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + futures::ready!(self.recv()) + } +} +*/ +impl RpcStreamWriter { + pub fn new(box_writer: BoxStreamWrite) -> RpcStreamWriter { + RpcStreamWriter { + box_writer, + req_no: 0, + } + } pub async fn send_request( &mut self,