This commit is contained in:
adria0 2020-02-09 20:26:41 +01:00
parent 073e0df456
commit ff731fc21c
7 changed files with 179 additions and 142 deletions

View File

@ -17,7 +17,7 @@ use actix_web::{get, web, App, HttpServer, Responder};
use async_std::io::{Read,Write};
use kuska_handshake::async_std::{BoxStream,handshake_client,TokioCompatExt,TokioCompatExtRead,TokioCompatExtWrite};
use kuska_ssb::rpc::{Header,RequestNo,RpcClient};
use kuska_ssb::rpc::{RequestNo,RecvMsg,RpcStream};
use kuska_ssb::patchwork::*;
use tokio::net::TcpStream;
@ -28,20 +28,28 @@ async fn get_async<'a,R,W,T,F> (client: &mut ApiClient<R,W>, req_no : RequestNo,
where
R: Read+Unpin,
W: Write+Unpin,
F: Fn(&Header,&[u8])->Result<T>,
F: Fn(&[u8])->Result<T>,
T: Debug
{
loop {
let (header,body) = client.rpc().recv().await?;
if header.req_no == req_no {
return f(&header,&body).map_err(|err| err.into());
let (id,msg) = client.rpc().recv().await?;
if id == req_no {
match msg {
RecvMsg::BodyResponse(body) => {
return f(&body).map_err(|err| err.into());
}
RecvMsg::ErrorResponse(message) => {
println!(" 😢 Failed {:}",message);
}
_ => unreachable!()
}
}
}
}
async fn run_task<R:Read+Unpin,W:Write+Unpin>(api : &mut ApiClient<R,W>, _command: &str) -> AnyResult<bool> {
let req_id = api.send_whoami().await?;
let whoami = get_async(api,-req_id,parse_whoami).await?.id;
let whoami = get_async(api,req_id,parse_whoami).await?.id;
println!("{}",whoami);
@ -89,7 +97,7 @@ async fn sync_loop(command_receiver: Receiver<String>, stop_receiver : Receiver<
BoxStream::from_handshake(read, write, handshake, 0x8000)
.split_read_write();
let rpc = RpcClient::new(box_stream_read, box_stream_write);
let rpc = RpcStream::new(box_stream_read, box_stream_write);
let mut api = ApiClient::new(rpc);
let mut commands_queue : Vec<String> = Vec::new();
@ -143,7 +151,7 @@ async fn command_loop(command_receiver: Receiver<String>, stop_receiver : Receiv
BoxStream::from_handshake(read, write, handshake, 0x8000)
.split_read_write();
let rpc = RpcClient::new(box_stream_read, box_stream_write);
let rpc = RpcStream::new(box_stream_read, box_stream_write);
let mut api = ApiClient::new(rpc);
let mut commands_queue : Vec<String> = Vec::new();

View File

@ -10,9 +10,10 @@ use async_std::io::{Read,Write};
use async_std::net::TcpStream;
use kuska_handshake::async_std::{handshake_client,BoxStream};
use kuska_ssb::rpc::{Header,RequestNo,RpcClient};
use kuska_ssb::rpc::{RecvMsg,RequestNo,RpcStream};
use kuska_ssb::patchwork::*;
use kuska_ssb::feed::{is_privatebox,privatebox_decipher};
use kuska_ssb::patchwork::{parse_feed,parse_latest,parse_message,parse_whoami};
type AnyResult<T> = std::result::Result<T, Box<dyn std::error::Error>>;
@ -20,13 +21,21 @@ async fn get_async<'a,R,W,T,F> (client: &mut ApiClient<R,W>, req_no : RequestNo,
where
R: Read+Unpin,
W: Write+Unpin,
F: Fn(&Header,&[u8])->Result<T>,
F: Fn(&[u8])->Result<T>,
T: Debug
{
loop {
let (header,body) = client.rpc().recv().await?;
if header.req_no == req_no {
return f(&header,&body).map_err(|err| err.into());
let (id,msg) = client.rpc().recv().await?;
if id == req_no {
match msg {
RecvMsg::BodyResponse(body) => {
return f(&body).map_err(|err| err.into());
}
RecvMsg::ErrorResponse(message) => {
println!(" 😢 Failed {:}",message);
}
_ => unreachable!()
}
}
}
}
@ -35,23 +44,26 @@ async fn print_source_until_eof<'a,R,W,T,F> (client: &mut ApiClient<R,W>, req_no
where
R: Read+Unpin,
W: Write+Unpin,
F: Fn(&Header,&[u8])->Result<T>,
F: Fn(&[u8])->Result<T>,
T: Debug+serde::Deserialize<'a>
{
loop {
let (header,body) = client.rpc().recv().await?;
if header.req_no == req_no {
if !header.is_end_or_error {
match f(&header,&body) {
Ok(res) => { println!("{:?}",res) },
Err(err) => println!(" 😢 Failed :( {:?} {}",err,String::from_utf8_lossy(&body)),
let (id,msg) = client.rpc().recv().await?;
if id == req_no {
match msg {
RecvMsg::BodyResponse(body) => {
let display = f(&body)?;
println!("{:?}",display);
}
} else {
println!("STREAM FINISHED");
return Ok(())
}
RecvMsg::ErrorResponse(message) => {
println!(" 😢 Failed {:}",message);
}
RecvMsg::CancelStreamRespose() => break,
_ => unreachable!()
}
}
}
Ok(())
}
#[async_std::main]
@ -72,10 +84,10 @@ async fn main() -> AnyResult<()> {
BoxStream::from_handshake(&socket,&socket,handshake, 0x8000)
.split_read_write();
let mut client = ApiClient::new(RpcClient::new(box_stream_read, box_stream_write));
let mut client = ApiClient::new(RpcStream::new(box_stream_read, box_stream_write));
let req_id = client.send_whoami().await?;
let whoami = get_async(&mut client,-req_id,parse_whoami).await?.id;
let whoami = get_async(&mut client,req_id,parse_whoami).await?.id;
println!("😊 server says hello to {}",whoami);
@ -100,7 +112,7 @@ async fn main() -> AnyResult<()> {
args[1].clone()
};
let req_id = client.send_get(&msg_id).await?;
let msg = get_async(&mut client,-req_id,parse_message).await?;
let msg = get_async(&mut client,req_id,parse_message).await?;
println!("{:?}",msg);
}
("user",2) => {
@ -112,16 +124,16 @@ async fn main() -> AnyResult<()> {
let args = CreateHistoryStreamArgs::new(&user_id);
let req_id = client.send_create_history_stream(&args).await?;
print_source_until_eof(&mut client, -req_id, parse_feed).await?;
print_source_until_eof(&mut client, req_id, parse_feed).await?;
}
("feed",1) => {
let args = CreateStreamArgs::default();
let req_id = client.send_create_feed_stream(&args).await?;
print_source_until_eof(&mut client, -req_id, parse_feed).await?;
print_source_until_eof(&mut client, req_id, parse_feed).await?;
}
("latest",1) => {
let req_id = client.send_latest().await?;
print_source_until_eof(&mut client, -req_id, parse_latest).await?;
print_source_until_eof(&mut client, req_id, parse_latest).await?;
}
("private",2) => {
let user_id = if args[1] == "me" {
@ -130,8 +142,8 @@ async fn main() -> AnyResult<()> {
&args[1]
};
let show_private = |header: &Header, body: &[u8]| {
let msg = parse_feed(header,body)?.into_message()?;
let show_private = |body: &[u8]| {
let msg = parse_feed(body)?.into_message()?;
if let serde_json::Value::String(content) = msg.content() {
if is_privatebox(&content) {
let ret = privatebox_decipher(&content, &sk)?
@ -145,7 +157,7 @@ async fn main() -> AnyResult<()> {
let args = CreateHistoryStreamArgs::new(&user_id);
let req_id = client.send_create_history_stream(&args).await?;
print_source_until_eof(&mut client, -req_id, show_private).await?;
print_source_until_eof(&mut client, req_id, show_private).await?;
}
_ => println!("unknown command {}",line_buffer),
}

View File

@ -38,7 +38,7 @@ macro_rules! cast_opt {
}
#[derive(Debug)]
#[derive(Debug,Deserialize)]
pub struct Message {
value: serde_json::Value,
}

View File

@ -1,4 +1,5 @@
extern crate kuska_handshake;
#[macro_use]
extern crate serde;
extern crate serde_json;

View File

@ -2,11 +2,11 @@ use async_std::io::{Read, Write};
use serde_json;
use std::str::FromStr;
use crate::rpc::{RpcClient, Header, RequestNo, RpcType, Body};
use crate::rpc::{RpcStream, RequestNo, RpcType};
use crate::feed::Message;
use crate::feed::Feed;
use super::error::{Error,Result};
use super::error::Result;
#[derive(Debug, Deserialize)]
pub struct ErrorRes {
@ -217,80 +217,47 @@ impl<'a> CreateHistoryStreamArgs<'a> {
}
}
fn parse_json<'a, T: serde::Deserialize<'a>>(
header: &'a Header,
body: &'a [u8],
) -> Result<T> {
if header.is_end_or_error {
let error: ErrorRes = serde_json::from_slice(&body[..])?;
Err(Error::ServerMessage(format!("{:?}", error)))
} else {
let res: T = serde_json::from_slice(&body[..])?;
Ok(res)
}
pub fn parse_whoami(body: &[u8]) -> Result<WhoAmI> {
Ok(serde_json::from_slice(body)?)
}
pub fn parse_whoami(header: &Header, body: &[u8]) -> Result<WhoAmI> {
parse_json::<WhoAmI>(&header, body)
pub fn parse_message(body: &[u8]) -> Result<Message> {
Ok(serde_json::from_slice(body)?)
}
pub fn parse_message(header: &Header, body: &[u8]) -> Result<Message> {
if header.is_end_or_error {
let error: ErrorRes = serde_json::from_slice(&body[..])?;
Err(Error::ServerMessage(format!("{:?}", error)))
} else {
Ok(Message::from_slice(body)?)
}
pub fn parse_feed(body: &[u8]) -> Result<Feed> {
Ok(Feed::from_str(&String::from_utf8_lossy(body))?)
}
pub fn parse_feed(header: &Header, body: &[u8]) -> Result<Feed> {
if header.is_end_or_error {
let error: ErrorRes = serde_json::from_slice(&body[..])?;
Err(Error::ServerMessage(format!("{:?}", error)))
} else {
Ok(Feed::from_str(&String::from_utf8_lossy(&body))?)
}
}
pub fn parse_latest(header: &Header, body: &[u8]) -> Result<LatestUserMessage> {
parse_json::<LatestUserMessage>(&header, body)
pub fn parse_latest(body: &[u8]) -> Result<LatestUserMessage> {
Ok(serde_json::from_slice(body)?)
}
pub struct ApiClient<R: Read + Unpin, W: Write + Unpin> {
rpc: RpcClient<R, W>,
rpc: RpcStream<R, W>,
}
impl<R: Read + Unpin, W: Write + Unpin> ApiClient<R, W> {
pub fn new(rpc: RpcClient<R, W>) -> Self {
pub fn new(rpc: RpcStream<R, W>) -> Self {
Self { rpc }
}
pub fn rpc(&mut self) -> &mut RpcClient<R, W> {
pub fn rpc(&mut self) -> &mut RpcStream<R, W> {
&mut self.rpc
}
// whoami: sync
// Get information about the current ssb-server user.
pub async fn send_whoami(&mut self) -> Result<RequestNo> {
let body = Body {
name : vec!["whoami".to_string()],
rpc_type : RpcType::Async,
args : Vec::<String>::new()
};
let req_no = self.rpc.send(&body).await?;
let args: [&str; 0] = [];
let req_no = self.rpc.send_request(&["whoami"], RpcType::Async, &args).await?;
Ok(req_no)
}
// get: async
// Get a message by its hash-id. (sould start with %)
pub async fn send_get(&mut self, msg_id: &str) -> Result<RequestNo> {
let body = Body {
name : vec!["get".to_string()],
rpc_type : RpcType::Async,
args : vec![msg_id.to_string()]
};
let req_no = self.rpc.send(&body).await?;
let req_no = self.rpc.send_request(&["get"], RpcType::Async, &msg_id).await?;
Ok(req_no)
}
// createHistoryStream: source
@ -299,12 +266,10 @@ impl<R: Read + Unpin, W: Write + Unpin> ApiClient<R, W> {
&mut self,
args: &'a CreateHistoryStreamArgs<'a>,
) -> Result<RequestNo> {
let body = Body {
name : vec!["createHistoryStream".to_string()],
rpc_type : RpcType::Source,
args : args
};
let req_no = self.rpc.send(&body).await?;
let req_no = self
.rpc
.send_request(&["createHistoryStream"], RpcType::Source, &args)
.await?;
Ok(req_no)
}
@ -314,24 +279,18 @@ impl<R: Read + Unpin, W: Write + Unpin> ApiClient<R, W> {
&mut self,
args: &'a CreateStreamArgs<u64>,
) -> Result<RequestNo> {
let body = Body {
name : vec!["createFeedStream".to_string()],
rpc_type : RpcType::Source,
args
};
let req_no = self.rpc.send(&body).await?;
let req_no = self
.rpc
.send_request(&["createFeedStream"], RpcType::Source, &args)
.await?;
Ok(req_no)
}
// latest: source
// Get the seq numbers of the latest messages of all users in the database.
pub async fn send_latest(&mut self) -> Result<RequestNo> {
let body = Body {
name : vec!["latest".to_string()],
rpc_type : RpcType::Source,
args : Vec::<String>::new(),
};
let req_no = self.rpc.send(&body).await?;
let args: [&str; 0] = [];
let req_no = self.rpc.send_request(&["latest"], RpcType::Source, &args).await?;
Ok(req_no)
}
}

View File

@ -17,38 +17,36 @@ const RPC_HEADER_STREAM_FLAG : u8 = 1 << 3;
const RPC_HEADER_END_OR_ERROR_FLAG : u8 = 1 << 2;
const RPC_HEADER_BODY_TYPE_MASK : u8 = 0b11;
#[derive(Debug,PartialEq)]
#[derive(Copy, Clone, Debug,PartialEq)]
pub enum BodyType {
Binary,
UTF8,
JSON,
}
/*
let mut body = String::from("{\"name\":");
body.push_str(&serde_json::to_string(&name)?);
body.push_str(",\"type\":\"");
body.push_str(rpc_type.rpc_id());
body.push_str("\",\"args\":[");
body.push_str(&serde_json::to_string(&args)?);
body.push_str("]}");
*/
#[derive(Serialize)]
pub struct Body<T:serde::Serialize> {
#[derive(Deserialize)]
pub struct Body {
pub name : Vec<String>,
#[serde(rename="type")]
pub rpc_type : RpcType,
pub args : T,
pub args : serde_json::Value,
}
impl<T:serde::Serialize> Body<T> {
pub fn new(name: Vec<String>, rpc_type : RpcType, args:T) -> Self {
Body { name, rpc_type, args }
#[derive(Serialize)]
pub struct BodyRef<'a,T:serde::Serialize> {
pub name : &'a [&'a str],
#[serde(rename="type")]
pub rpc_type : RpcType,
pub args : &'a T,
}
impl<'a,T:serde::Serialize> BodyRef<'a,T> {
pub fn new(name:&'a [&'a str], rpc_type : RpcType, args:&'a T) -> Self {
BodyRef { name, rpc_type, args }
}
}
#[derive(Serialize,Debug,PartialEq)]
#[derive(Clone,Copy,Serialize,Deserialize,Debug,PartialEq)]
pub enum RpcType {
#[serde(rename="async")]
Async,
@ -56,14 +54,6 @@ pub enum RpcType {
Source,
}
impl RpcType {
pub fn rpc_id(&self) -> &'static str {
match self {
RpcType::Async => "async",
RpcType::Source => "source",
}
}
}
#[derive(Debug,PartialEq)]
pub struct Header {
pub req_no : RequestNo,
@ -73,6 +63,12 @@ pub struct Header {
pub body_len : u32,
}
#[derive(Serialize,Deserialize)]
struct ErrorMessage<'a> {
name : &'a str,
message : &'a str,
}
impl Header {
pub fn from_slice(bytes: &[u8]) -> Result<Header> {
if bytes.len() < HEADER_SIZE {
@ -126,45 +122,68 @@ impl Header {
}
}
pub struct RpcClient<R : io::Read + Unpin, W : io::Write + Unpin> {
pub struct RpcStream<R : io::Read + Unpin, W : io::Write + Unpin> {
box_reader : BoxStreamRead<R>,
box_writer : BoxStreamWrite<W>,
req_no : RequestNo,
}
impl<R:io::Read+Unpin , W:io::Write+Unpin> RpcClient<R,W> {
pub enum RecvMsg {
Request(Body),
ErrorResponse(String),
CancelStreamRespose(),
BodyResponse(Vec<u8>),
}
pub fn new(box_reader :BoxStreamRead<R>, box_writer :BoxStreamWrite<W>) -> RpcClient<R,W> {
RpcClient { box_reader, box_writer, req_no : 0 }
impl<R:io::Read+Unpin , W:io::Write+Unpin> RpcStream<R,W> {
pub fn new(box_reader :BoxStreamRead<R>, box_writer :BoxStreamWrite<W>) -> RpcStream<R,W> {
RpcStream { box_reader, box_writer, req_no : 0 }
}
pub async fn recv(&mut self) -> Result<(Header,Vec<u8>)> {
pub async fn recv(&mut self) -> Result<(RequestNo,RecvMsg)> {
let mut rpc_header_raw = [0u8;9];
self.box_reader.read_exact(&mut rpc_header_raw[..]).await?;
let rpc_header = Header::from_slice(&rpc_header_raw[..])?;
let mut rpc_body : Vec<u8> = vec![0;rpc_header.body_len as usize];
self.box_reader.read_exact(&mut rpc_body[..]).await?;
let mut body_raw : Vec<u8> = vec![0;rpc_header.body_len as usize];
self.box_reader.read_exact(&mut body_raw[..]).await?;
Ok((rpc_header,rpc_body))
if rpc_header.req_no > 0 {
let rpc_body = serde_json::from_slice(&body_raw)?;
Ok((rpc_header.req_no,RecvMsg::Request(rpc_body)))
} else {
if rpc_header.is_end_or_error {
if rpc_header.is_stream {
Ok((-rpc_header.req_no,RecvMsg::CancelStreamRespose()))
} else {
let err : ErrorMessage = serde_json::from_slice(&body_raw)?;
Ok((-rpc_header.req_no,RecvMsg::ErrorResponse(err.message.to_string())))
}
} else {
Ok((-rpc_header.req_no,RecvMsg::BodyResponse(body_raw)))
}
}
}
pub async fn send<T:serde::Serialize>(&mut self, body : &Body<T>) -> Result<RequestNo>{
pub async fn send_request<T:serde::Serialize>(&mut self, name: &[&str], rpc_type: RpcType, args : &T) -> Result<RequestNo>{
self.req_no+=1;
let body_str = serde_json::to_string(body)?;
let body_str = serde_json::to_string(&BodyRef {
name,
rpc_type,
args: &[&args],
})?;
let rpc_header = Header {
req_no : self.req_no,
is_stream : body.rpc_type == RpcType::Source,
is_stream : rpc_type == RpcType::Source,
is_end_or_error : false,
body_type : BodyType::JSON,
body_len : body_str.as_bytes().len() as u32,
}.to_array();
println!("\n{}\n",body_str);
self.box_writer.write_all(&rpc_header[..]).await?;
self.box_writer.write_all(body_str.as_bytes()).await?;
self.box_writer.flush().await?;
@ -172,7 +191,45 @@ impl<R:io::Read+Unpin , W:io::Write+Unpin> RpcClient<R,W> {
Ok(self.req_no)
}
pub async fn send_cancel_stream(&mut self, req_no: RequestNo) -> Result<()> {
pub async fn send_response(&mut self, req_no : RequestNo, rpc_type: RpcType, body_type : BodyType, body: &[u8] ) -> Result<()>{
self.req_no+=1;
let rpc_header = Header {
req_no,
is_stream : rpc_type == RpcType::Source,
is_end_or_error : false,
body_type : body_type,
body_len : body.len() as u32,
}.to_array();
self.box_writer.write_all(&rpc_header[..]).await?;
self.box_writer.write_all(body).await?;
self.box_writer.flush().await?;
Ok(())
}
pub async fn send_error(&mut self, req_no: RequestNo, message: &str) -> Result<()> {
let body_bytes = serde_json::to_string(&ErrorMessage {
name : "Error",
message
})?;
let rpc_header = Header {
req_no,
is_stream : false,
is_end_or_error : true,
body_type : BodyType::UTF8,
body_len : body_bytes.as_bytes().len() as u32,
}.to_array();
self.box_writer.write_all(&rpc_header[..]).await?;
self.box_writer.write_all(body_bytes.as_bytes()).await?;
Ok(())
}
pub async fn send_cancel(&mut self, req_no: RequestNo) -> Result<()> {
let body_bytes = b"true";
let rpc_header = Header {

View File

@ -1,5 +1,5 @@
mod client;
mod error;
pub use client::{RpcClient,Header, RequestNo, RpcType, Body};
pub use client::{RpcStream,RecvMsg, RequestNo, RpcType, Body, BodyRef};
pub use error::{Error,Result};