patchwork interop

This commit is contained in:
adria0 2020-03-31 00:30:56 +02:00 committed by adria0.eth
parent 0fe919d00c
commit 4994495781
4 changed files with 101 additions and 42 deletions

View File

@ -3,13 +3,13 @@ extern crate kuska_ssb;
extern crate base64; extern crate base64;
extern crate crossbeam; extern crate crossbeam;
extern crate structopt;
extern crate regex; extern crate regex;
extern crate structopt;
use std::fmt::Debug; use std::fmt::Debug;
use async_std::io::{Read, Write}; use async_std::io::{Read, Write};
use async_std::net::{UdpSocket,TcpStream}; use async_std::net::{TcpStream, UdpSocket};
use kuska_handshake::async_std::{handshake_client, BoxStream}; use kuska_handshake::async_std::{handshake_client, BoxStream};
use kuska_ssb::api::{ use kuska_ssb::api::{
@ -21,8 +21,8 @@ use kuska_ssb::keystore::from_patchwork_local;
use kuska_ssb::keystore::OwnedIdentity; use kuska_ssb::keystore::OwnedIdentity;
use kuska_ssb::rpc::{RecvMsg, RequestNo, RpcStream}; use kuska_ssb::rpc::{RecvMsg, RequestNo, RpcStream};
use sodiumoxide::crypto::sign::ed25519;
use regex::Regex; use regex::Regex;
use sodiumoxide::crypto::sign::ed25519;
use structopt::StructOpt; use structopt::StructOpt;
type AnyResult<T> = std::result::Result<T, Box<dyn std::error::Error>>; type AnyResult<T> = std::result::Result<T, Box<dyn std::error::Error>>;
@ -84,7 +84,7 @@ where
let (id, msg) = client.rpc().recv().await?; let (id, msg) = client.rpc().recv().await?;
if id == req_no { if id == req_no {
match msg { match msg {
RecvMsg::BodyResponse(body) => { RecvMsg::RpcResponse(_type, body) => {
return f(&body).map_err(|err| err.into()); return f(&body).map_err(|err| err.into());
} }
RecvMsg::ErrorResponse(message) => { RecvMsg::ErrorResponse(message) => {
@ -113,7 +113,7 @@ where
let (id, msg) = client.rpc().recv().await?; let (id, msg) = client.rpc().recv().await?;
if id == req_no { if id == req_no {
match msg { match msg {
RecvMsg::BodyResponse(body) => { RecvMsg::RpcResponse(_type, body) => {
let display = f(&body)?; let display = f(&body)?;
println!("{:?}", display); println!("{:?}", display);
} }
@ -137,34 +137,47 @@ async fn main() -> AnyResult<()> {
let OwnedIdentity { pk, sk, id } = from_patchwork_local().await.expect("read local secret"); let OwnedIdentity { pk, sk, id } = from_patchwork_local().await.expect("read local secret");
println!("connecting with identity {}", id); println!("connecting with identity {}", id);
let opt = Opt::from_args(); let opt = Opt::from_args();
let (ip,port,server_pk) = if let Some(connect) = opt.connect { let (ip, port, server_pk) = if let Some(connect) = opt.connect {
let connect: Vec<_> = connect.split(":").collect(); let connect: Vec<_> = connect.split(":").collect();
if connect.len() != 3 { if connect.len() != 3 {
panic!("connection string should be server:port:id"); panic!("connection string should be server:port:id");
} }
(connect[0].to_string(),connect[1].to_string(),connect[2].to_string()) (
connect[0].to_string(),
connect[1].to_string(),
connect[2].to_string(),
)
} else { } else {
println!("Waiting server broadcast..."); println!("Waiting server broadcast...");
let socket = UdpSocket::bind("0.0.0.0:8008").await?; let socket = UdpSocket::bind("0.0.0.0:8008").await?;
socket.set_broadcast(true)?; socket.set_broadcast(true)?;
let mut buf = [0; 128]; let mut buf = [0; 128];
let (amt, _) = socket.recv_from(&mut buf).await.unwrap(); let (amt, _) = socket.recv_from(&mut buf).await.unwrap();
let msg = String::from_utf8(buf[..amt].to_vec())?; let msg = String::from_utf8(buf[..amt].to_vec())?;
println!("got broadcasted {}",msg); println!("got broadcasted {}", msg);
let broadcast_regexp = r"net:([0-9]+\.[0-9]+\.[0-9]+\.[0-9]+):([0-9]+)~shs:([0-9a-zA-Z=/]+)"; let broadcast_regexp =
let captures = Regex::new(broadcast_regexp).unwrap().captures(&msg).unwrap(); r"net:([0-9]+\.[0-9]+\.[0-9]+\.[0-9]+):([0-9]+)~shs:([0-9a-zA-Z=/]+)";
(captures[1].to_string(),captures[2].to_string(),captures[3].to_string()) let captures = Regex::new(broadcast_regexp)
.unwrap()
.captures(&msg)
.unwrap();
(
captures[1].to_string(),
captures[2].to_string(),
captures[3].to_string(),
)
}; };
let server_pk = ed25519::PublicKey::from_slice(&base64::decode(&server_pk)?).expect("bad public key"); let server_pk =
ed25519::PublicKey::from_slice(&base64::decode(&server_pk)?).expect("bad public key");
let server_ipport = format!("{}:{}", ip, port); let server_ipport = format!("{}:{}", ip, port);
println!("server_ip_port={}",server_ipport); println!("server_ip_port={}", server_ipport);
let mut socket = TcpStream::connect(server_ipport).await?; let mut socket = TcpStream::connect(server_ipport).await?;

View File

@ -59,7 +59,7 @@ pub struct CreateStreamArgs<K> {
/// limit (number, default: -1): limit the number of results collected by this stream. This number represents a maximum number of results and may not be reached if you get to the end of the data first. A value of -1 means there is no limit. When reverse=true the highest keys will be returned instead of the lowest keys. /// limit (number, default: -1): limit the number of results collected by this stream. This number represents a maximum number of results and may not be reached if you get to the end of the data first. A value of -1 means there is no limit. When reverse=true the highest keys will be returned instead of the lowest keys.
#[serde(skip_serializing_if = "Option::is_none")] #[serde(skip_serializing_if = "Option::is_none")]
pub limit: Option<u64>, pub limit: Option<i64>,
/// fillCache (boolean, default: false): wheather LevelDB's LRU-cache should be filled with data read. /// fillCache (boolean, default: false): wheather LevelDB's LRU-cache should be filled with data read.
#[serde(rename = "fillCache")] #[serde(rename = "fillCache")]
@ -143,7 +143,7 @@ impl<K> CreateStreamArgs<K> {
..self ..self
} }
} }
pub fn limit(self: Self, limit: u64) -> Self { pub fn limit(self: Self, limit: i64) -> Self {
Self { Self {
limit: Some(limit), limit: Some(limit),
..self ..self
@ -173,7 +173,7 @@ pub struct CreateHistoryStreamArgs {
/// limit (number, default: -1): limit the number of results collected by this stream. This number represents a maximum number of results and may not be reached if you get to the end of the data first. A value of -1 means there is no limit. When reverse=true the highest keys will be returned instead of the lowest keys. /// limit (number, default: -1): limit the number of results collected by this stream. This number represents a maximum number of results and may not be reached if you get to the end of the data first. A value of -1 means there is no limit. When reverse=true the highest keys will be returned instead of the lowest keys.
#[serde(skip_serializing_if = "Option::is_none")] #[serde(skip_serializing_if = "Option::is_none")]
pub limit: Option<u64>, pub limit: Option<i64>,
} }
impl CreateHistoryStreamArgs { impl CreateHistoryStreamArgs {
@ -206,7 +206,7 @@ impl CreateHistoryStreamArgs {
..self ..self
} }
} }
pub fn limit(self: Self, limit: u64) -> Self { pub fn limit(self: Self, limit: i64) -> Self {
Self { Self {
limit: Some(limit), limit: Some(limit),
..self ..self
@ -317,7 +317,7 @@ impl<R: Read + Unpin, W: Write + Unpin> ApiHelper<R, W> {
} }
pub async fn feed_res_send(&mut self, req_no: RequestNo, feed: &str) -> Result<()> { pub async fn feed_res_send(&mut self, req_no: RequestNo, feed: &str) -> Result<()> {
self.rpc self.rpc
.send_response(req_no, RpcType::Async, BodyType::JSON, feed.as_bytes()) .send_response(req_no, RpcType::Source, BodyType::JSON, feed.as_bytes())
.await?; .await?;
Ok(()) Ok(())
} }

View File

@ -70,6 +70,7 @@ impl Message {
Value::Number(serde_json::Number::from(prev.sequence() + 1)), Value::Number(serde_json::Number::from(prev.sequence() + 1)),
); );
} else { } else {
value.insert(MSG_PREVIOUS.to_string(), Value::Null);
value.insert( value.insert(
MSG_SEQUENCE.to_string(), MSG_SEQUENCE.to_string(),
Value::Number(serde_json::Number::from(1)), Value::Number(serde_json::Number::from(1)),

View File

@ -2,7 +2,7 @@ use super::error::{Error, Result};
use async_std::io; use async_std::io;
use async_std::prelude::*; use async_std::prelude::*;
use log::trace; use log::debug;
use kuska_handshake::async_std::{BoxStreamRead, BoxStreamWrite}; use kuska_handshake::async_std::{BoxStreamRead, BoxStreamWrite};
@ -59,6 +59,7 @@ pub struct Header {
#[derive(Serialize, Deserialize)] #[derive(Serialize, Deserialize)]
struct ErrorMessage<'a> { struct ErrorMessage<'a> {
name: &'a str, name: &'a str,
stack: &'a str,
message: &'a str, message: &'a str,
} }
@ -127,10 +128,11 @@ pub struct RpcStream<R: io::Read + Unpin, W: io::Write + Unpin> {
} }
pub enum RecvMsg { pub enum RecvMsg {
Request(Body), RpcRequest(Body),
RpcResponse(BodyType, Vec<u8>),
OtherRequest(BodyType, Vec<u8>),
ErrorResponse(String), ErrorResponse(String),
CancelStreamRespose(), CancelStreamRespose(),
BodyResponse(Vec<u8>),
} }
impl<R: io::Read + Unpin, W: io::Write + Unpin> RpcStream<R, W> { impl<R: io::Read + Unpin, W: io::Write + Unpin> RpcStream<R, W> {
@ -150,11 +152,20 @@ impl<R: io::Read + Unpin, W: io::Write + Unpin> RpcStream<R, W> {
let mut body_raw: Vec<u8> = vec![0; rpc_header.body_len as usize]; let mut body_raw: Vec<u8> = vec![0; rpc_header.body_len as usize];
self.box_reader.read_exact(&mut body_raw[..]).await?; self.box_reader.read_exact(&mut body_raw[..]).await?;
trace!("got {}",String::from_utf8_lossy(&body_raw[..])); debug!(
"rpc-recv {:?} '{}'",
rpc_header,
String::from_utf8_lossy(&body_raw[..])
);
if rpc_header.req_no > 0 { if rpc_header.req_no > 0 {
let rpc_body = serde_json::from_slice(&body_raw)?; match serde_json::from_slice(&body_raw) {
Ok((rpc_header.req_no, RecvMsg::Request(rpc_body))) Ok(rpc_body) => Ok((rpc_header.req_no, RecvMsg::RpcRequest(rpc_body))),
Err(_) => Ok((
rpc_header.req_no,
RecvMsg::OtherRequest(rpc_header.body_type, body_raw),
)),
}
} else if rpc_header.is_end_or_error { } else if rpc_header.is_end_or_error {
if rpc_header.is_stream { if rpc_header.is_stream {
Ok((-rpc_header.req_no, RecvMsg::CancelStreamRespose())) Ok((-rpc_header.req_no, RecvMsg::CancelStreamRespose()))
@ -166,7 +177,10 @@ impl<R: io::Read + Unpin, W: io::Write + Unpin> RpcStream<R, W> {
)) ))
} }
} else { } else {
Ok((-rpc_header.req_no, RecvMsg::BodyResponse(body_raw))) Ok((
-rpc_header.req_no,
RecvMsg::RpcResponse(rpc_header.body_type, body_raw),
))
} }
} }
@ -190,10 +204,13 @@ impl<R: io::Read + Unpin, W: io::Write + Unpin> RpcStream<R, W> {
is_end_or_error: false, is_end_or_error: false,
body_type: BodyType::JSON, body_type: BodyType::JSON,
body_len: body_str.as_bytes().len() as u32, body_len: body_str.as_bytes().len() as u32,
} };
.to_array();
self.box_writer.write_all(&rpc_header[..]).await?; debug!("rpc-send {:?} '{}'", rpc_header, body_str);
self.box_writer
.write_all(&rpc_header.to_array()[..])
.await?;
self.box_writer.write_all(body_str.as_bytes()).await?; self.box_writer.write_all(body_str.as_bytes()).await?;
self.box_writer.flush().await?; self.box_writer.flush().await?;
@ -213,32 +230,53 @@ impl<R: io::Read + Unpin, W: io::Write + Unpin> RpcStream<R, W> {
is_end_or_error: false, is_end_or_error: false,
body_type, body_type,
body_len: body.len() as u32, body_len: body.len() as u32,
} };
.to_array();
self.box_writer.write_all(&rpc_header[..]).await?; debug!(
"rpc-send {:?} '{}'",
rpc_header,
String::from_utf8_lossy(body)
);
self.box_writer
.write_all(&rpc_header.to_array()[..])
.await?;
self.box_writer.write_all(body).await?; self.box_writer.write_all(body).await?;
self.box_writer.flush().await?; self.box_writer.flush().await?;
Ok(()) Ok(())
} }
pub async fn send_error(&mut self, req_no: RequestNo, message: &str) -> Result<()> { pub async fn send_error(
&mut self,
req_no: RequestNo,
rpc_type: RpcType,
message: &str,
) -> Result<()> {
let body_bytes = serde_json::to_string(&ErrorMessage { let body_bytes = serde_json::to_string(&ErrorMessage {
name: "Error", name: "Error",
stack: "",
message, message,
})?; })?;
let is_stream = match rpc_type {
RpcType::Async => false,
_ => true,
};
let rpc_header = Header { let rpc_header = Header {
req_no: -req_no, req_no: -req_no,
is_stream: false, is_stream,
is_end_or_error: true, is_end_or_error: true,
body_type: BodyType::UTF8, body_type: BodyType::UTF8,
body_len: body_bytes.as_bytes().len() as u32, body_len: body_bytes.as_bytes().len() as u32,
} };
.to_array();
self.box_writer.write_all(&rpc_header[..]).await?; debug!("rpc-send {:?} '{}'", rpc_header, body_bytes);
self.box_writer
.write_all(&rpc_header.to_array()[..])
.await?;
self.box_writer.write_all(body_bytes.as_bytes()).await?; self.box_writer.write_all(body_bytes.as_bytes()).await?;
self.box_writer.flush().await?; self.box_writer.flush().await?;
@ -254,10 +292,17 @@ impl<R: io::Read + Unpin, W: io::Write + Unpin> RpcStream<R, W> {
is_end_or_error: true, is_end_or_error: true,
body_type: BodyType::JSON, body_type: BodyType::JSON,
body_len: body_bytes.len() as u32, body_len: body_bytes.len() as u32,
} };
.to_array();
self.box_writer.write_all(&rpc_header[..]).await?; debug!(
"rpc-send {:?} '{}'",
rpc_header,
String::from_utf8_lossy(body_bytes)
);
self.box_writer
.write_all(&rpc_header.to_array()[..])
.await?;
self.box_writer.write_all(&body_bytes[..]).await?; self.box_writer.write_all(&body_bytes[..]).await?;
self.box_writer.flush().await?; self.box_writer.flush().await?;
Ok(()) Ok(())