From ff731fc21cad9a44e12421c76e8ae6aa19bf0416 Mon Sep 17 00:00:00 2001 From: adria0 Date: Sun, 9 Feb 2020 20:26:41 +0100 Subject: [PATCH] wip --- examples/iot-solar-wip.rs | 24 ++++--- examples/ssb-cli.rs | 62 ++++++++++------- src/feed/message.rs | 2 +- src/lib.rs | 1 + src/patchwork/api.rs | 93 ++++++++------------------ src/rpc/client.rs | 137 +++++++++++++++++++++++++++----------- src/rpc/mod.rs | 2 +- 7 files changed, 179 insertions(+), 142 deletions(-) diff --git a/examples/iot-solar-wip.rs b/examples/iot-solar-wip.rs index a2d36cc..efc67dd 100644 --- a/examples/iot-solar-wip.rs +++ b/examples/iot-solar-wip.rs @@ -17,7 +17,7 @@ use actix_web::{get, web, App, HttpServer, Responder}; use async_std::io::{Read,Write}; use kuska_handshake::async_std::{BoxStream,handshake_client,TokioCompatExt,TokioCompatExtRead,TokioCompatExtWrite}; -use kuska_ssb::rpc::{Header,RequestNo,RpcClient}; +use kuska_ssb::rpc::{RequestNo,RecvMsg,RpcStream}; use kuska_ssb::patchwork::*; use tokio::net::TcpStream; @@ -28,20 +28,28 @@ async fn get_async<'a,R,W,T,F> (client: &mut ApiClient, req_no : RequestNo, where R: Read+Unpin, W: Write+Unpin, - F: Fn(&Header,&[u8])->Result, + F: Fn(&[u8])->Result, T: Debug { loop { - let (header,body) = client.rpc().recv().await?; - if header.req_no == req_no { - return f(&header,&body).map_err(|err| err.into()); + let (id,msg) = client.rpc().recv().await?; + if id == req_no { + match msg { + RecvMsg::BodyResponse(body) => { + return f(&body).map_err(|err| err.into()); + } + RecvMsg::ErrorResponse(message) => { + println!(" 😢 Failed {:}",message); + } + _ => unreachable!() + } } } } async fn run_task(api : &mut ApiClient, _command: &str) -> AnyResult { let req_id = api.send_whoami().await?; - let whoami = get_async(api,-req_id,parse_whoami).await?.id; + let whoami = get_async(api,req_id,parse_whoami).await?.id; println!("{}",whoami); @@ -89,7 +97,7 @@ async fn sync_loop(command_receiver: Receiver, stop_receiver : Receiver< BoxStream::from_handshake(read, write, handshake, 0x8000) .split_read_write(); - let rpc = RpcClient::new(box_stream_read, box_stream_write); + let rpc = RpcStream::new(box_stream_read, box_stream_write); let mut api = ApiClient::new(rpc); let mut commands_queue : Vec = Vec::new(); @@ -143,7 +151,7 @@ async fn command_loop(command_receiver: Receiver, stop_receiver : Receiv BoxStream::from_handshake(read, write, handshake, 0x8000) .split_read_write(); - let rpc = RpcClient::new(box_stream_read, box_stream_write); + let rpc = RpcStream::new(box_stream_read, box_stream_write); let mut api = ApiClient::new(rpc); let mut commands_queue : Vec = Vec::new(); diff --git a/examples/ssb-cli.rs b/examples/ssb-cli.rs index 32c2657..511f504 100644 --- a/examples/ssb-cli.rs +++ b/examples/ssb-cli.rs @@ -10,9 +10,10 @@ use async_std::io::{Read,Write}; use async_std::net::TcpStream; use kuska_handshake::async_std::{handshake_client,BoxStream}; -use kuska_ssb::rpc::{Header,RequestNo,RpcClient}; +use kuska_ssb::rpc::{RecvMsg,RequestNo,RpcStream}; use kuska_ssb::patchwork::*; use kuska_ssb::feed::{is_privatebox,privatebox_decipher}; +use kuska_ssb::patchwork::{parse_feed,parse_latest,parse_message,parse_whoami}; type AnyResult = std::result::Result>; @@ -20,13 +21,21 @@ async fn get_async<'a,R,W,T,F> (client: &mut ApiClient, req_no : RequestNo, where R: Read+Unpin, W: Write+Unpin, - F: Fn(&Header,&[u8])->Result, + F: Fn(&[u8])->Result, T: Debug { loop { - let (header,body) = client.rpc().recv().await?; - if header.req_no == req_no { - return f(&header,&body).map_err(|err| err.into()); + let (id,msg) = client.rpc().recv().await?; + if id == req_no { + match msg { + RecvMsg::BodyResponse(body) => { + return f(&body).map_err(|err| err.into()); + } + RecvMsg::ErrorResponse(message) => { + println!(" 😢 Failed {:}",message); + } + _ => unreachable!() + } } } } @@ -35,23 +44,26 @@ async fn print_source_until_eof<'a,R,W,T,F> (client: &mut ApiClient, req_no where R: Read+Unpin, W: Write+Unpin, - F: Fn(&Header,&[u8])->Result, + F: Fn(&[u8])->Result, T: Debug+serde::Deserialize<'a> { loop { - let (header,body) = client.rpc().recv().await?; - if header.req_no == req_no { - if !header.is_end_or_error { - match f(&header,&body) { - Ok(res) => { println!("{:?}",res) }, - Err(err) => println!(" 😢 Failed :( {:?} {}",err,String::from_utf8_lossy(&body)), + let (id,msg) = client.rpc().recv().await?; + if id == req_no { + match msg { + RecvMsg::BodyResponse(body) => { + let display = f(&body)?; + println!("{:?}",display); } - } else { - println!("STREAM FINISHED"); - return Ok(()) - } + RecvMsg::ErrorResponse(message) => { + println!(" 😢 Failed {:}",message); + } + RecvMsg::CancelStreamRespose() => break, + _ => unreachable!() + } } } + Ok(()) } #[async_std::main] @@ -72,10 +84,10 @@ async fn main() -> AnyResult<()> { BoxStream::from_handshake(&socket,&socket,handshake, 0x8000) .split_read_write(); - let mut client = ApiClient::new(RpcClient::new(box_stream_read, box_stream_write)); + let mut client = ApiClient::new(RpcStream::new(box_stream_read, box_stream_write)); let req_id = client.send_whoami().await?; - let whoami = get_async(&mut client,-req_id,parse_whoami).await?.id; + let whoami = get_async(&mut client,req_id,parse_whoami).await?.id; println!("😊 server says hello to {}",whoami); @@ -100,7 +112,7 @@ async fn main() -> AnyResult<()> { args[1].clone() }; let req_id = client.send_get(&msg_id).await?; - let msg = get_async(&mut client,-req_id,parse_message).await?; + let msg = get_async(&mut client,req_id,parse_message).await?; println!("{:?}",msg); } ("user",2) => { @@ -112,16 +124,16 @@ async fn main() -> AnyResult<()> { let args = CreateHistoryStreamArgs::new(&user_id); let req_id = client.send_create_history_stream(&args).await?; - print_source_until_eof(&mut client, -req_id, parse_feed).await?; + print_source_until_eof(&mut client, req_id, parse_feed).await?; } ("feed",1) => { let args = CreateStreamArgs::default(); let req_id = client.send_create_feed_stream(&args).await?; - print_source_until_eof(&mut client, -req_id, parse_feed).await?; + print_source_until_eof(&mut client, req_id, parse_feed).await?; } ("latest",1) => { let req_id = client.send_latest().await?; - print_source_until_eof(&mut client, -req_id, parse_latest).await?; + print_source_until_eof(&mut client, req_id, parse_latest).await?; } ("private",2) => { let user_id = if args[1] == "me" { @@ -130,8 +142,8 @@ async fn main() -> AnyResult<()> { &args[1] }; - let show_private = |header: &Header, body: &[u8]| { - let msg = parse_feed(header,body)?.into_message()?; + let show_private = |body: &[u8]| { + let msg = parse_feed(body)?.into_message()?; if let serde_json::Value::String(content) = msg.content() { if is_privatebox(&content) { let ret = privatebox_decipher(&content, &sk)? @@ -145,7 +157,7 @@ async fn main() -> AnyResult<()> { let args = CreateHistoryStreamArgs::new(&user_id); let req_id = client.send_create_history_stream(&args).await?; - print_source_until_eof(&mut client, -req_id, show_private).await?; + print_source_until_eof(&mut client, req_id, show_private).await?; } _ => println!("unknown command {}",line_buffer), } diff --git a/src/feed/message.rs b/src/feed/message.rs index 88bc249..052c967 100644 --- a/src/feed/message.rs +++ b/src/feed/message.rs @@ -38,7 +38,7 @@ macro_rules! cast_opt { } -#[derive(Debug)] +#[derive(Debug,Deserialize)] pub struct Message { value: serde_json::Value, } diff --git a/src/lib.rs b/src/lib.rs index e177173..df1124f 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,4 +1,5 @@ extern crate kuska_handshake; + #[macro_use] extern crate serde; extern crate serde_json; diff --git a/src/patchwork/api.rs b/src/patchwork/api.rs index f9639d1..af41ceb 100644 --- a/src/patchwork/api.rs +++ b/src/patchwork/api.rs @@ -2,11 +2,11 @@ use async_std::io::{Read, Write}; use serde_json; use std::str::FromStr; -use crate::rpc::{RpcClient, Header, RequestNo, RpcType, Body}; +use crate::rpc::{RpcStream, RequestNo, RpcType}; use crate::feed::Message; use crate::feed::Feed; -use super::error::{Error,Result}; +use super::error::Result; #[derive(Debug, Deserialize)] pub struct ErrorRes { @@ -217,80 +217,47 @@ impl<'a> CreateHistoryStreamArgs<'a> { } } - -fn parse_json<'a, T: serde::Deserialize<'a>>( - header: &'a Header, - body: &'a [u8], -) -> Result { - if header.is_end_or_error { - let error: ErrorRes = serde_json::from_slice(&body[..])?; - Err(Error::ServerMessage(format!("{:?}", error))) - } else { - let res: T = serde_json::from_slice(&body[..])?; - Ok(res) - } +pub fn parse_whoami(body: &[u8]) -> Result { + Ok(serde_json::from_slice(body)?) } -pub fn parse_whoami(header: &Header, body: &[u8]) -> Result { - parse_json::(&header, body) +pub fn parse_message(body: &[u8]) -> Result { + Ok(serde_json::from_slice(body)?) } -pub fn parse_message(header: &Header, body: &[u8]) -> Result { - if header.is_end_or_error { - let error: ErrorRes = serde_json::from_slice(&body[..])?; - Err(Error::ServerMessage(format!("{:?}", error))) - } else { - Ok(Message::from_slice(body)?) - } +pub fn parse_feed(body: &[u8]) -> Result { + Ok(Feed::from_str(&String::from_utf8_lossy(body))?) } -pub fn parse_feed(header: &Header, body: &[u8]) -> Result { - if header.is_end_or_error { - let error: ErrorRes = serde_json::from_slice(&body[..])?; - Err(Error::ServerMessage(format!("{:?}", error))) - } else { - Ok(Feed::from_str(&String::from_utf8_lossy(&body))?) - } -} - -pub fn parse_latest(header: &Header, body: &[u8]) -> Result { - parse_json::(&header, body) +pub fn parse_latest(body: &[u8]) -> Result { + Ok(serde_json::from_slice(body)?) } pub struct ApiClient { - rpc: RpcClient, + rpc: RpcStream, } impl ApiClient { - pub fn new(rpc: RpcClient) -> Self { + pub fn new(rpc: RpcStream) -> Self { Self { rpc } } - pub fn rpc(&mut self) -> &mut RpcClient { + pub fn rpc(&mut self) -> &mut RpcStream { &mut self.rpc } // whoami: sync // Get information about the current ssb-server user. pub async fn send_whoami(&mut self) -> Result { - let body = Body { - name : vec!["whoami".to_string()], - rpc_type : RpcType::Async, - args : Vec::::new() - }; - let req_no = self.rpc.send(&body).await?; + let args: [&str; 0] = []; + let req_no = self.rpc.send_request(&["whoami"], RpcType::Async, &args).await?; Ok(req_no) } // get: async // Get a message by its hash-id. (sould start with %) pub async fn send_get(&mut self, msg_id: &str) -> Result { - let body = Body { - name : vec!["get".to_string()], - rpc_type : RpcType::Async, - args : vec![msg_id.to_string()] - }; - let req_no = self.rpc.send(&body).await?; + let req_no = self.rpc.send_request(&["get"], RpcType::Async, &msg_id).await?; Ok(req_no) } // createHistoryStream: source @@ -299,12 +266,10 @@ impl ApiClient { &mut self, args: &'a CreateHistoryStreamArgs<'a>, ) -> Result { - let body = Body { - name : vec!["createHistoryStream".to_string()], - rpc_type : RpcType::Source, - args : args - }; - let req_no = self.rpc.send(&body).await?; + let req_no = self + .rpc + .send_request(&["createHistoryStream"], RpcType::Source, &args) + .await?; Ok(req_no) } @@ -314,24 +279,18 @@ impl ApiClient { &mut self, args: &'a CreateStreamArgs, ) -> Result { - let body = Body { - name : vec!["createFeedStream".to_string()], - rpc_type : RpcType::Source, - args - }; - let req_no = self.rpc.send(&body).await?; + let req_no = self + .rpc + .send_request(&["createFeedStream"], RpcType::Source, &args) + .await?; Ok(req_no) } // latest: source // Get the seq numbers of the latest messages of all users in the database. pub async fn send_latest(&mut self) -> Result { - let body = Body { - name : vec!["latest".to_string()], - rpc_type : RpcType::Source, - args : Vec::::new(), - }; - let req_no = self.rpc.send(&body).await?; + let args: [&str; 0] = []; + let req_no = self.rpc.send_request(&["latest"], RpcType::Source, &args).await?; Ok(req_no) } } diff --git a/src/rpc/client.rs b/src/rpc/client.rs index ff827c3..fc41acc 100644 --- a/src/rpc/client.rs +++ b/src/rpc/client.rs @@ -17,38 +17,36 @@ const RPC_HEADER_STREAM_FLAG : u8 = 1 << 3; const RPC_HEADER_END_OR_ERROR_FLAG : u8 = 1 << 2; const RPC_HEADER_BODY_TYPE_MASK : u8 = 0b11; -#[derive(Debug,PartialEq)] +#[derive(Copy, Clone, Debug,PartialEq)] pub enum BodyType { Binary, UTF8, JSON, } -/* - let mut body = String::from("{\"name\":"); - body.push_str(&serde_json::to_string(&name)?); - body.push_str(",\"type\":\""); - body.push_str(rpc_type.rpc_id()); - body.push_str("\",\"args\":["); - body.push_str(&serde_json::to_string(&args)?); - body.push_str("]}"); -*/ - -#[derive(Serialize)] -pub struct Body { +#[derive(Deserialize)] +pub struct Body { pub name : Vec, #[serde(rename="type")] pub rpc_type : RpcType, - pub args : T, + pub args : serde_json::Value, } -impl Body { - pub fn new(name: Vec, rpc_type : RpcType, args:T) -> Self { - Body { name, rpc_type, args } +#[derive(Serialize)] +pub struct BodyRef<'a,T:serde::Serialize> { + pub name : &'a [&'a str], + #[serde(rename="type")] + pub rpc_type : RpcType, + pub args : &'a T, +} + +impl<'a,T:serde::Serialize> BodyRef<'a,T> { + pub fn new(name:&'a [&'a str], rpc_type : RpcType, args:&'a T) -> Self { + BodyRef { name, rpc_type, args } } } -#[derive(Serialize,Debug,PartialEq)] +#[derive(Clone,Copy,Serialize,Deserialize,Debug,PartialEq)] pub enum RpcType { #[serde(rename="async")] Async, @@ -56,14 +54,6 @@ pub enum RpcType { Source, } -impl RpcType { - pub fn rpc_id(&self) -> &'static str { - match self { - RpcType::Async => "async", - RpcType::Source => "source", - } - } -} #[derive(Debug,PartialEq)] pub struct Header { pub req_no : RequestNo, @@ -73,6 +63,12 @@ pub struct Header { pub body_len : u32, } +#[derive(Serialize,Deserialize)] +struct ErrorMessage<'a> { + name : &'a str, + message : &'a str, +} + impl Header { pub fn from_slice(bytes: &[u8]) -> Result
{ if bytes.len() < HEADER_SIZE { @@ -126,45 +122,68 @@ impl Header { } } -pub struct RpcClient { +pub struct RpcStream { box_reader : BoxStreamRead, box_writer : BoxStreamWrite, req_no : RequestNo, } -impl RpcClient { +pub enum RecvMsg { + Request(Body), + ErrorResponse(String), + CancelStreamRespose(), + BodyResponse(Vec), +} - pub fn new(box_reader :BoxStreamRead, box_writer :BoxStreamWrite) -> RpcClient { - RpcClient { box_reader, box_writer, req_no : 0 } +impl RpcStream { + + pub fn new(box_reader :BoxStreamRead, box_writer :BoxStreamWrite) -> RpcStream { + RpcStream { box_reader, box_writer, req_no : 0 } } - pub async fn recv(&mut self) -> Result<(Header,Vec)> { + pub async fn recv(&mut self) -> Result<(RequestNo,RecvMsg)> { let mut rpc_header_raw = [0u8;9]; self.box_reader.read_exact(&mut rpc_header_raw[..]).await?; let rpc_header = Header::from_slice(&rpc_header_raw[..])?; - let mut rpc_body : Vec = vec![0;rpc_header.body_len as usize]; - self.box_reader.read_exact(&mut rpc_body[..]).await?; + let mut body_raw : Vec = vec![0;rpc_header.body_len as usize]; + self.box_reader.read_exact(&mut body_raw[..]).await?; - Ok((rpc_header,rpc_body)) + if rpc_header.req_no > 0 { + let rpc_body = serde_json::from_slice(&body_raw)?; + Ok((rpc_header.req_no,RecvMsg::Request(rpc_body))) + } else { + if rpc_header.is_end_or_error { + if rpc_header.is_stream { + Ok((-rpc_header.req_no,RecvMsg::CancelStreamRespose())) + } else { + let err : ErrorMessage = serde_json::from_slice(&body_raw)?; + Ok((-rpc_header.req_no,RecvMsg::ErrorResponse(err.message.to_string()))) + } + } else { + Ok((-rpc_header.req_no,RecvMsg::BodyResponse(body_raw))) + } + } } - pub async fn send(&mut self, body : &Body) -> Result{ + pub async fn send_request(&mut self, name: &[&str], rpc_type: RpcType, args : &T) -> Result{ self.req_no+=1; - let body_str = serde_json::to_string(body)?; + let body_str = serde_json::to_string(&BodyRef { + name, + rpc_type, + args: &[&args], + })?; let rpc_header = Header { req_no : self.req_no, - is_stream : body.rpc_type == RpcType::Source, + is_stream : rpc_type == RpcType::Source, is_end_or_error : false, body_type : BodyType::JSON, body_len : body_str.as_bytes().len() as u32, }.to_array(); - println!("\n{}\n",body_str); - self.box_writer.write_all(&rpc_header[..]).await?; self.box_writer.write_all(body_str.as_bytes()).await?; self.box_writer.flush().await?; @@ -172,7 +191,45 @@ impl RpcClient { Ok(self.req_no) } - pub async fn send_cancel_stream(&mut self, req_no: RequestNo) -> Result<()> { + pub async fn send_response(&mut self, req_no : RequestNo, rpc_type: RpcType, body_type : BodyType, body: &[u8] ) -> Result<()>{ + self.req_no+=1; + + let rpc_header = Header { + req_no, + is_stream : rpc_type == RpcType::Source, + is_end_or_error : false, + body_type : body_type, + body_len : body.len() as u32, + }.to_array(); + + self.box_writer.write_all(&rpc_header[..]).await?; + self.box_writer.write_all(body).await?; + self.box_writer.flush().await?; + + Ok(()) + } + + pub async fn send_error(&mut self, req_no: RequestNo, message: &str) -> Result<()> { + let body_bytes = serde_json::to_string(&ErrorMessage { + name : "Error", + message + })?; + + let rpc_header = Header { + req_no, + is_stream : false, + is_end_or_error : true, + body_type : BodyType::UTF8, + body_len : body_bytes.as_bytes().len() as u32, + }.to_array(); + + self.box_writer.write_all(&rpc_header[..]).await?; + self.box_writer.write_all(body_bytes.as_bytes()).await?; + + Ok(()) + } + + pub async fn send_cancel(&mut self, req_no: RequestNo) -> Result<()> { let body_bytes = b"true"; let rpc_header = Header { diff --git a/src/rpc/mod.rs b/src/rpc/mod.rs index c7d98ee..5a9fd99 100644 --- a/src/rpc/mod.rs +++ b/src/rpc/mod.rs @@ -1,5 +1,5 @@ mod client; mod error; -pub use client::{RpcClient,Header, RequestNo, RpcType, Body}; +pub use client::{RpcStream,RecvMsg, RequestNo, RpcType, Body, BodyRef}; pub use error::{Error,Result};