diff --git a/Cargo.toml b/Cargo.toml index ad48def..87c0389 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "kuska-ssb" -version = "0.1.0" +version = "0.1.2" authors = ["Dhole ", "Adria Massanet "] edition = "2018" @@ -8,27 +8,18 @@ edition = "2018" name = "kuska_ssb" [dependencies] -kuska-handshake = { git = "https://github.com/Kuska-ssb/kuska-handshake", branch = "feature/moderror" , features=["sync","async_std"] } +kuska-handshake = { git = "https://github.com/Kuska-ssb/kuska-handshake", branch = "master" , features=["sync","async_std"] } sodiumoxide = { git = "https://github.com/Dhole/sodiumoxidez", branch = "extra" } base64 = "0.11.0" hex = "0.4.0" -async-std = { version = "1.1.0", features=["unstable","attributes"] } +async-std = { version = "1.4.0", features=["unstable","attributes"] } crossbeam = "0.7.3" log = "0.4.8" env_logger = "0.7.1" -serde = "1.0.104" -serde_json = { version = "1.0.41", features=["preserve_order","arbitrary_precision"] } -serde_derive = "1.0.104" +serde = { version = "1.0.104", features = ["derive"] } +serde_json = { version = "1.0.48", features=["preserve_order","arbitrary_precision"] } dirs = "2.0" -futures = "0.3.1" -snap = "0.2.5" -signal-hook = "0.1.12" +futures = "0.3.4" lazy_static = "1.4.0" -rand = "0.7.2" - -[dev-dependencies] -kuska-handshake = { git = "https://github.com/Kuska-ssb/kuska-handshake", branch = "feature/moderror", features=["sync","async_std","tokio_compat"] } -tokio = { version = "0.2.6", features=["full"] } -actix-rt = "1.0.0" -actix-web = "2.0.0" +rand = "0.7.3" \ No newline at end of file diff --git a/examples/iot-solar-wip.rs b/examples/iot-solar-wip.rs deleted file mode 100644 index 3d5e634..0000000 --- a/examples/iot-solar-wip.rs +++ /dev/null @@ -1,224 +0,0 @@ -#![allow(dead_code)] - -#[macro_use] -extern crate lazy_static; - -use async_std::sync::{Receiver,Sender,channel}; -use async_std::task; -use std::fmt::Debug; -use std::cell::{RefCell}; -use std::sync::{Arc,Mutex}; - -use std::time::Duration; -use signal_hook::{iterator::Signals, SIGTERM, SIGINT, SIGHUP, SIGQUIT}; - -use actix_web::{get, web, App, HttpServer, Responder}; - -use async_std::io::{Read,Write}; - -use kuska_handshake::async_std::{BoxStream,handshake_client,TokioCompatExt,TokioCompatExtRead,TokioCompatExtWrite}; -use kuska_ssb::rpc::{Header,RequestNo,RpcClient}; -use kuska_ssb::patchwork::*; - -use tokio::net::TcpStream; - -type AnyResult = std::result::Result>; - -async fn get_async<'a,R,W,T,F> (client: &mut ApiClient, req_no : RequestNo, f : F) -> AnyResult -where - R: Read+Unpin, - W: Write+Unpin, - F: Fn(&Header,&[u8])->Result, - T: Debug -{ - loop { - let (header,body) = client.rpc().recv().await?; - if header.req_no == req_no { - return f(&header,&body).map_err(|err| err.into()); - } - } -} - -async fn run_task(api : &mut ApiClient, _command: &str) -> AnyResult { - let req_id = api.send_whoami().await?; - let whoami = get_async(api,-req_id,parse_whoami).await?.id; - - println!("{}",whoami); - - Ok(false) -} - -async fn sync_loop(command_receiver: Receiver, stop_receiver : Receiver) -> AnyResult<()>{ - - /* - sync loop functionality - friend_list = [] - discovered_peers = [] - connected_peers = [] - loop { - foreach peer in discoverd_peer not in conected_peers { - peer.handshake.createboxstream - peer.feed_callback = { |msg| - process_feed_message(msg) - } - for each friend in friend_list { - peer.createUserStream(friend) - } - conected_peers.push(peer) - } - } - */ - - - let IdentitySecret{pk,sk,..} = IdentitySecret::from_local_config() - .expect("read local secret"); - - let tokio_socket : TcpStream = TcpStream::connect("127.0.0.1:8008").await?; - let asyncstd_socket = TokioCompatExt::wrap(tokio_socket); - - let (asyncstd_socket,handshake) = handshake_client(asyncstd_socket, ssb_net_id(), pk, sk.clone(), pk).await?; - - - let mut tokio_socket = asyncstd_socket.into_inner(); - let (read,write) = tokio_socket.split(); - - let read = TokioCompatExtRead::wrap(read); - let write = TokioCompatExtWrite::wrap(write); - - let (box_stream_read, box_stream_write) = - BoxStream::from_handhake(read, write, handshake, 0x8000) - .split_read_write(); - - let rpc = RpcClient::new(box_stream_read, box_stream_write); - let mut api = ApiClient::new(rpc); - - let mut commands_queue : Vec = Vec::new(); - - loop { - - if !stop_receiver.is_empty() { - stop_receiver.recv().await; - println!("finished loop"); - return Ok(()); - } - - // read all pending requests - while !command_receiver.is_empty() { - if let Some(msg) = command_receiver.recv().await { - commands_queue.push(msg); - } - } - - if let Some(command) = commands_queue.pop() { - run_task(&mut api,&command).await?; - } else { - task::sleep(Duration::from_secs(1)).await; - println!("waiting!"); - } - - } - -} - - -async fn command_loop(command_receiver: Receiver, stop_receiver : Receiver) -> AnyResult<()>{ - - let IdentitySecret{pk,sk,..} = IdentitySecret::from_local_config() - .expect("read local secret"); - - let tokio_socket : TcpStream = TcpStream::connect("127.0.0.1:8008").await?; - let asyncstd_socket = TokioCompatExt::wrap(tokio_socket); - - let (asyncstd_socket,handshake) = handshake_client(asyncstd_socket, ssb_net_id(), pk, sk.clone(), pk).await?; - - println!("💃 handshake complete"); - - let mut tokio_socket = asyncstd_socket.into_inner(); - let (read,write) = tokio_socket.split(); - - let read = TokioCompatExtRead::wrap(read); - let write = TokioCompatExtWrite::wrap(write); - - let (box_stream_read, box_stream_write) = - BoxStream::from_handhake(read, write, handshake, 0x8000) - .split_read_write(); - - let rpc = RpcClient::new(box_stream_read, box_stream_write); - let mut api = ApiClient::new(rpc); - - let mut commands_queue : Vec = Vec::new(); - - loop { - - if !stop_receiver.is_empty() { - stop_receiver.recv().await; - println!("finished loop"); - return Ok(()); - } - - // read all pending requests - while !command_receiver.is_empty() { - if let Some(msg) = command_receiver.recv().await { - commands_queue.push(msg); - } - } - - if let Some(command) = commands_queue.pop() { - run_task(&mut api,&command).await?; - } else { - task::sleep(Duration::from_secs(1)).await; - println!("waiting!"); - } - - } - -} - -lazy_static! { - static ref COMMAND_SENDER : Arc>>>> = Arc::new(Mutex::new(RefCell::new(None))); -} - - -#[get("/{id}/{name}/index.html")] -async fn index(info: web::Path<(u32, String)>) -> impl Responder { - COMMAND_SENDER.lock().unwrap().borrow().as_ref().unwrap().send("hola".to_owned()).await; - format!("Hello {}! id:{}", info.1, info.0) -} - - -async fn web_handler() -> std::io::Result<()> { - HttpServer::new(|| App::new().service(index)) - .bind("127.0.0.1:8080")? - .run() - .await -} - -async fn sigterm_handler(stop_sender : Sender, count : usize, ) { - let signals = Signals::new(&[SIGTERM, SIGHUP, SIGINT, SIGQUIT]).expect("cannot capture SIGTERM"); - loop { - if signals.pending().next().is_some() { - for _ in 0..count { - stop_sender.send(true).await; - } - return; - } - task::sleep(Duration::from_secs(1)).await; - } -} - -#[actix_rt::main] -async fn main() { - println!("started"); - - let (stop_sender, stop_receiver) = channel::(1); - let (command_sender, command_receiver) = channel::(1); - COMMAND_SENDER.lock().unwrap().replace(Some(command_sender)); - - let future_sigterm = sigterm_handler(stop_sender,1); - let future_loop = command_loop(command_receiver,stop_receiver.clone()); - let future_web = web_handler(); - - let (_,loop_res,_) = futures::join!(future_sigterm,future_loop,future_web); - loop_res.expect("main loop failed"); -} - diff --git a/examples/ssb-cli.rs b/examples/ssb-cli.rs index 259f139..6df35f2 100644 --- a/examples/ssb-cli.rs +++ b/examples/ssb-cli.rs @@ -6,52 +6,113 @@ extern crate crossbeam; use std::fmt::Debug; -use async_std::io::{Read,Write}; +use async_std::io::{Read, Write}; use async_std::net::TcpStream; -use kuska_handshake::async_std::{handshake_client,BoxStream}; -use kuska_ssb::rpc::{Header,RequestNo,RpcClient}; -use kuska_ssb::patchwork::*; -use kuska_ssb::feed::{is_privatebox,privatebox_decipher}; +use kuska_handshake::async_std::{handshake_client, BoxStream}; +use kuska_ssb::api::{ + ApiHelper, CreateHistoryStreamArgs, CreateStreamArgs, LatestUserMessage, WhoAmI, +}; +use kuska_ssb::discovery::ssb_net_id; +use kuska_ssb::feed::{is_privatebox, privatebox_decipher, Feed, Message}; +use kuska_ssb::keystore::from_patchwork_local; +use kuska_ssb::keystore::OwnedIdentity; +use kuska_ssb::rpc::{RecvMsg, RequestNo, RpcStream}; type AnyResult = std::result::Result>; -async fn get_async<'a,R,W,T,F> (client: &mut ApiClient, req_no : RequestNo, f : F) -> AnyResult +pub fn whoami_res_parse(body: &[u8]) -> AnyResult { + Ok(serde_json::from_slice(body)?) +} +pub fn message_res_parse(body: &[u8]) -> AnyResult { + Ok(Message::from_slice(body)?) +} +pub fn feed_res_parse(body: &[u8]) -> AnyResult { + Ok(Feed::from_slice(&body)?) +} +pub fn latest_res_parse(body: &[u8]) -> AnyResult { + Ok(serde_json::from_slice(body)?) +} + +#[derive(Debug)] +struct AppError { + message: String, +} +impl AppError { + pub fn new(message: String) -> Self { + AppError { message } + } +} +impl std::error::Error for AppError { + fn description(&self) -> &str { + &self.message + } +} +impl std::fmt::Display for AppError { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + write!(f, "{}", self.message) + } +} + +async fn get_async<'a, R, W, T, F>( + client: &mut ApiHelper, + req_no: RequestNo, + f: F, +) -> AnyResult where - R: Read+Unpin, - W: Write+Unpin, - F: Fn(&Header,&[u8])->Result, - T: Debug + R: Read + Unpin, + W: Write + Unpin, + F: Fn(&[u8]) -> AnyResult, + T: Debug, { loop { - let (header,body) = client.rpc().recv().await?; - if header.req_no == req_no { - return f(&header,&body).map_err(|err| err.into()); + let (id, msg) = client.rpc().recv().await?; + if id == req_no { + match msg { + RecvMsg::BodyResponse(body) => { + return f(&body).map_err(|err| err.into()); + } + RecvMsg::ErrorResponse(message) => { + return std::result::Result::Err(Box::new(AppError::new(message))); + } + _ => unreachable!(), + } + } else { + println!("discarded message {}", id); } } } -async fn print_source_until_eof<'a,R,W,T,F> (client: &mut ApiClient, req_no : RequestNo, f : F) -> AnyResult<()> +async fn print_source_until_eof<'a, R, W, T, F>( + client: &mut ApiHelper, + req_no: RequestNo, + f: F, +) -> AnyResult<()> where - R: Read+Unpin, - W: Write+Unpin, - F: Fn(&Header,&[u8])->Result, - T: Debug+serde::Deserialize<'a> + R: Read + Unpin, + W: Write + Unpin, + F: Fn(&[u8]) -> AnyResult, + T: Debug + serde::Deserialize<'a>, { loop { - let (header,body) = client.rpc().recv().await?; - if header.req_no == req_no { - if !header.is_end_or_error { - match f(&header,&body) { - Ok(res) => { println!("{:?}",res) }, - Err(err) => println!(" 😢 Failed :( {:?} {}",err,String::from_utf8_lossy(&body)), + let (id, msg) = client.rpc().recv().await?; + if id == req_no { + match msg { + RecvMsg::BodyResponse(body) => { + let display = f(&body)?; + println!("{:?}", display); } - } else { - println!("STREAM FINISHED"); - return Ok(()) + RecvMsg::ErrorResponse(message) => { + return std::result::Result::Err(Box::new(AppError::new(message))); + } + RecvMsg::CancelStreamRespose() => break, + _ => unreachable!(), } + } else { + println!("discarded message {}", id); } } + Ok(()) } #[async_std::main] @@ -59,97 +120,89 @@ async fn main() -> AnyResult<()> { env_logger::init(); log::set_max_level(log::LevelFilter::max()); - let IdentitySecret{pk,sk,..} = IdentitySecret::from_local_config() - .expect("read local secret"); + let OwnedIdentity { pk, sk, .. } = from_patchwork_local().expect("read local secret"); - let mut socket = TcpStream::connect("127.0.0.1:8008").await?; - - let (_,handshake) = handshake_client(&mut socket, ssb_net_id(), pk, sk.clone(), pk).await?; + let mut socket = TcpStream::connect("127.0.0.1:8080").await?; + let handshake = handshake_client(&mut socket, ssb_net_id(), pk, sk.clone(), pk).await?; println!("💃 handshake complete"); let (box_stream_read, box_stream_write) = - BoxStream::from_handhake(&socket, &socket, handshake, 0x8000) - .split_read_write(); + BoxStream::from_handshake(&socket, &socket, handshake, 0x8000).split_read_write(); - let mut client = ApiClient::new(RpcClient::new(box_stream_read, box_stream_write)); + let mut client = ApiHelper::new(RpcStream::new(box_stream_read, box_stream_write)); - let req_id = client.send_whoami().await?; - let whoami = get_async(&mut client,-req_id,parse_whoami).await?.id; + let req_id = client.whoami_req_send().await?; + let whoami = get_async(&mut client, req_id, whoami_res_parse).await?.id; - println!("😊 server says hello to {}",whoami); + println!("😊 server says hello to {}", whoami); let mut line_buffer = String::new(); while let Ok(_) = std::io::stdin().read_line(&mut line_buffer) { - - let args : Vec = line_buffer + let args: Vec = line_buffer .replace("\n", "") .split_whitespace() .map(|arg| arg.to_string()) .collect(); match (args[0].as_str(), args.len()) { - ("exit",1) => { + ("exit", 1) => { client.rpc().close().await?; break; } - ("get",2) => { + ("whoami", 1) => { + let req_id = client.whoami_req_send().await?; + let whoami = get_async(&mut client, req_id, whoami_res_parse).await?.id; + println!("{}", whoami); + } + ("get", 2) => { let msg_id = if args[1] == "any" { "%TL34NIX8JpMJN+ubHWx6cRhIwEal8VqHdKVg2t6lFcg=.sha256".to_string() } else { args[1].clone() }; - let req_id = client.send_get(&msg_id).await?; - let msg = get_async(&mut client,-req_id,parse_message).await?; - println!("{:?}",msg); + let req_id = client.get_req_send(&msg_id).await?; + let msg = get_async(&mut client, req_id, message_res_parse).await?; + println!("{:?}", msg); } - ("user",2) => { - let user_id = if args[1] == "me" { - &whoami - } else { - &args[1] - }; + ("user", 2) => { + let user_id = if args[1] == "me" { &whoami } else { &args[1] }; - let args = CreateHistoryStreamArgs::new(&user_id); - let req_id = client.send_create_history_stream(&args).await?; - print_source_until_eof(&mut client, -req_id, parse_feed).await?; + let args = CreateHistoryStreamArgs::new(user_id.clone()); + let req_id = client.create_history_stream_req_send(&args).await?; + print_source_until_eof(&mut client, req_id, feed_res_parse).await?; } - ("feed",1) => { + ("feed", 1) => { let args = CreateStreamArgs::default(); let req_id = client.send_create_feed_stream(&args).await?; - print_source_until_eof(&mut client, -req_id, parse_feed).await?; + print_source_until_eof(&mut client, req_id, feed_res_parse).await?; } - ("latest",1) => { + ("latest", 1) => { let req_id = client.send_latest().await?; - print_source_until_eof(&mut client, -req_id, parse_latest).await?; + print_source_until_eof(&mut client, req_id, latest_res_parse).await?; } - ("private",2) => { - let user_id = if args[1] == "me" { - &whoami - } else { - &args[1] - }; + ("private", 2) => { + let user_id = if args[1] == "me" { &whoami } else { &args[1] }; - let show_private = |header: &Header, body: &[u8]| { - let msg = parse_feed(header,body)?.into_message()?; + let show_private = |body: &[u8]| { + let msg = feed_res_parse(body)?.into_message()?; if let serde_json::Value::String(content) = msg.content() { if is_privatebox(&content) { - let ret = privatebox_decipher(&content, &sk)? - .unwrap_or("".to_string()); + let ret = privatebox_decipher(&content, &sk)?.unwrap_or("".to_string()); return Ok(ret); } } return Ok("".to_string()); - }; + }; - let args = CreateHistoryStreamArgs::new(&user_id); - let req_id = client.send_create_history_stream(&args).await?; + let args = CreateHistoryStreamArgs::new(user_id.clone()); + let req_id = client.create_history_stream_req_send(&args).await?; - print_source_until_eof(&mut client, -req_id, show_private).await?; + print_source_until_eof(&mut client, req_id, show_private).await?; } - _ => println!("unknown command {}",line_buffer), + _ => println!("unknown command {}", line_buffer), } line_buffer.clear(); } Ok(()) -} \ No newline at end of file +} diff --git a/src/patchwork/error.rs b/src/api/error.rs similarity index 93% rename from src/patchwork/error.rs rename to src/api/error.rs index e6592f6..ddd9309 100644 --- a/src/patchwork/error.rs +++ b/src/api/error.rs @@ -54,6 +54,6 @@ impl std::fmt::Display for Error { write!(f, "{:?}", self) } } -impl std::error::Error for Error { } +impl std::error::Error for Error {} -pub type Result = std::result::Result; \ No newline at end of file +pub type Result = std::result::Result; diff --git a/src/patchwork/api.rs b/src/api/helper.rs similarity index 70% rename from src/patchwork/api.rs rename to src/api/helper.rs index 0a70cb2..df0f9fb 100644 --- a/src/patchwork/api.rs +++ b/src/api/helper.rs @@ -1,30 +1,27 @@ +use crate::feed::Message; +use crate::rpc::{BodyType, RequestNo, RpcStream, RpcType}; use async_std::io::{Read, Write}; use serde_json; -use std::str::FromStr; -use crate::rpc::{RpcClient, Header, RequestNo, RpcType}; -use crate::feed::Message; -use crate::feed::Feed; +use super::error::Result; -use super::error::{Error,Result}; - -#[derive(Debug, Deserialize)] +#[derive(Debug, Serialize, Deserialize)] pub struct ErrorRes { pub name: String, pub message: String, pub stack: String, } -#[derive(Debug, Deserialize)] +#[derive(Debug, Serialize, Deserialize)] pub struct WhoAmI { pub id: String, } -#[derive(Debug, Deserialize)] +#[derive(Debug, Serialize, Deserialize)] pub struct LatestUserMessage { pub id: String, pub sequence: u64, - pub ts: u64, + pub ts: f64, } // https://github.com/ssbc/ssb-db/blob/master/api.md @@ -154,10 +151,10 @@ impl CreateStreamArgs { } } -#[derive(Debug, Serialize)] -pub struct CreateHistoryStreamArgs<'a> { +#[derive(Debug, Serialize, Deserialize)] +pub struct CreateHistoryStreamArgs { // id (FeedID, required): The id of the feed to fetch. - pub id: &'a str, + pub id: String, /// (number, default: 0): If seq > 0, then only stream messages with sequence numbers greater than seq. #[serde(skip_serializing_if = "Option::is_none")] @@ -179,8 +176,8 @@ pub struct CreateHistoryStreamArgs<'a> { pub limit: Option, } -impl<'a> CreateHistoryStreamArgs<'a> { - pub fn new(id: &'a str) -> Self { +impl CreateHistoryStreamArgs { + pub fn new(id: String) -> Self { Self { id, seq: None, @@ -217,95 +214,127 @@ impl<'a> CreateHistoryStreamArgs<'a> { } } +#[derive(Debug)] +pub enum ApiMethod { + WhoAmI, + Get, + CreateHistoryStream, + CreateFeedStream, + Latest, +} -fn parse_json<'a, T: serde::Deserialize<'a>>( - header: &'a Header, - body: &'a [u8], -) -> Result { - if header.is_end_or_error { - let error: ErrorRes = serde_json::from_slice(&body[..])?; - Err(Error::ServerMessage(format!("{:?}", error))) - } else { - let res: T = serde_json::from_slice(&body[..])?; - Ok(res) +impl ApiMethod { + pub fn selector(&self) -> &'static [&'static str] { + use ApiMethod::*; + match self { + WhoAmI => &["whoami"], + Get => &["get"], + CreateHistoryStream => &["createHistoryStream"], + CreateFeedStream => &["createFeedStream"], + Latest => &["latest"], + } + } + pub fn from_selector(s: &[&str]) -> Option { + use ApiMethod::*; + match s { + ["whoami"] => Some(WhoAmI), + ["get"] => Some(Get), + ["createHistoryStream"] => Some(CreateHistoryStream), + ["createFeedStream"] => Some(CreateFeedStream), + ["latest"] => Some(Latest), + _ => None, + } } } -pub fn parse_whoami(header: &Header, body: &[u8]) -> Result { - parse_json::(&header, body) +pub struct ApiHelper { + rpc: RpcStream, } -pub fn parse_message(header: &Header, body: &[u8]) -> Result { - if header.is_end_or_error { - let error: ErrorRes = serde_json::from_slice(&body[..])?; - Err(Error::ServerMessage(format!("{:?}", error))) - } else { - Ok(Message::from_slice(body)?) - } -} - -pub fn parse_feed(header: &Header, body: &[u8]) -> Result { - if header.is_end_or_error { - let error: ErrorRes = serde_json::from_slice(&body[..])?; - Err(Error::ServerMessage(format!("{:?}", error))) - } else { - Ok(Feed::from_str(&String::from_utf8_lossy(&body))?) - } -} - -pub fn parse_latest(header: &Header, body: &[u8]) -> Result { - parse_json::(&header, body) -} - -pub struct ApiClient { - rpc: RpcClient, -} - -impl ApiClient { - pub fn new(rpc: RpcClient) -> Self { +impl ApiHelper { + pub fn new(rpc: RpcStream) -> Self { Self { rpc } } - pub fn rpc(&mut self) -> &mut RpcClient { + pub fn rpc(&mut self) -> &mut RpcStream { &mut self.rpc } // whoami: sync // Get information about the current ssb-server user. - pub async fn send_whoami(&mut self) -> Result { + pub async fn whoami_req_send(&mut self) -> Result { let args: [&str; 0] = []; - let req_no = self.rpc.send(&["whoami"], RpcType::Async, &args).await?; + let req_no = self + .rpc + .send_request(ApiMethod::WhoAmI.selector(), RpcType::Async, &args) + .await?; Ok(req_no) } + pub async fn whoami_res_send(&mut self, req_no: RequestNo, id: String) -> Result<()> { + let body = serde_json::to_string(&WhoAmI { id })?; + Ok(self + .rpc + .send_response(req_no, RpcType::Async, BodyType::JSON, body.as_bytes()) + .await?) + } // get: async // Get a message by its hash-id. (sould start with %) - pub async fn send_get(&mut self, msg_id: &str) -> Result { - let req_no = self.rpc.send(&["get"], RpcType::Async, &msg_id).await?; + pub async fn get_req_send(&mut self, msg_id: &str) -> Result { + let req_no = self + .rpc + .send_request(ApiMethod::Get.selector(), RpcType::Async, &msg_id) + .await?; Ok(req_no) } + pub async fn get_res_send(&mut self, req_no: RequestNo, msg: &Message) -> Result<()> { + self.rpc + .send_response( + req_no, + RpcType::Async, + BodyType::JSON, + msg.to_string().as_bytes(), + ) + .await?; + Ok(()) + } + // createHistoryStream: source // (hist) Fetch messages from a specific user, ordered by sequence numbers. - pub async fn send_create_history_stream<'a>( + pub async fn create_history_stream_req_send( &mut self, - args: &'a CreateHistoryStreamArgs<'a>, + args: &CreateHistoryStreamArgs, ) -> Result { let req_no = self .rpc - .send(&["createHistoryStream"], RpcType::Source, &args) + .send_request( + ApiMethod::CreateHistoryStream.selector(), + RpcType::Source, + &args, + ) .await?; Ok(req_no) } + pub async fn feed_res_send(&mut self, req_no: RequestNo, feed: &str) -> Result<()> { + self.rpc + .send_response(req_no, RpcType::Async, BodyType::JSON, feed.as_bytes()) + .await?; + Ok(()) + } // createFeedStream: source // (feed) Fetch messages ordered by their claimed timestamps. pub async fn send_create_feed_stream<'a>( &mut self, - args: &'a CreateStreamArgs, + args: &CreateStreamArgs, ) -> Result { let req_no = self .rpc - .send(&["createFeedStream"], RpcType::Source, &args) + .send_request( + ApiMethod::CreateFeedStream.selector(), + RpcType::Source, + &args, + ) .await?; Ok(req_no) } @@ -314,8 +343,10 @@ impl ApiClient { // Get the seq numbers of the latest messages of all users in the database. pub async fn send_latest(&mut self) -> Result { let args: [&str; 0] = []; - let req_no = self.rpc.send(&["latest"], RpcType::Source, &args).await?; + let req_no = self + .rpc + .send_request(ApiMethod::Latest.selector(), RpcType::Async, &args) + .await?; Ok(req_no) } } - diff --git a/src/api/mod.rs b/src/api/mod.rs new file mode 100644 index 0000000..fbb83dc --- /dev/null +++ b/src/api/mod.rs @@ -0,0 +1,8 @@ +mod error; +mod helper; +pub mod msgs; + +pub use error::{Error, Result}; +pub use helper::{ + ApiHelper, ApiMethod, CreateHistoryStreamArgs, CreateStreamArgs, LatestUserMessage, WhoAmI, +}; diff --git a/src/patchwork/messagetypes.rs b/src/api/msgs.rs similarity index 84% rename from src/patchwork/messagetypes.rs rename to src/api/msgs.rs index 8982236..a4ec7df 100644 --- a/src/patchwork/messagetypes.rs +++ b/src/api/msgs.rs @@ -5,21 +5,33 @@ use std::collections::HashMap; pub type SsbHash = String; pub type SsbId = String; - -#[derive(Debug, Deserialize)] +#[derive(Debug, Serialize, Deserialize)] pub struct Mention { pub link: SsbId, pub name: Option, } -#[derive(Debug, Deserialize)] -pub struct MessageContent { +#[derive(Debug, Serialize, Deserialize)] +pub struct Post { #[serde(rename = "type")] pub xtype: String, pub text: String, pub mentions: Option>, } +impl Post { + pub fn new(text: String, mentions: Option>) -> Self { + Post { + xtype: String::from("post"), + text, + mentions, + } + } + pub fn to_msg(&self) -> serde_json::Result { + serde_json::to_value(self) + } +} + #[derive(Debug, Deserialize)] pub struct PubAddress { pub host: Option, @@ -84,16 +96,7 @@ pub enum FeedTypedContent { #[serde(rename = "pub")] Pub { address: Option }, #[serde(rename = "post")] - Post { - text: Option, - post: Option, // the same than text - channel: Option, - mentions: Option, - root: Option, - branch: Option, - reply: Option>, - recps: Option, - }, + Post, #[serde(rename = "contact")] Contact { contact: Option, @@ -118,4 +121,3 @@ pub enum FeedTypedContent { #[serde(rename = "vote")] Vote { vote: Vote }, } - diff --git a/src/crypto/error.rs b/src/crypto/error.rs index 91e8745..25fe8b1 100644 --- a/src/crypto/error.rs +++ b/src/crypto/error.rs @@ -18,7 +18,6 @@ impl std::fmt::Display for Error { write!(f, "{:?}", self) } } -impl std::error::Error for Error { } +impl std::error::Error for Error {} pub type Result = std::result::Result; - diff --git a/src/crypto/mod.rs b/src/crypto/mod.rs index a8407d9..bd068ed 100644 --- a/src/crypto/mod.rs +++ b/src/crypto/mod.rs @@ -1,5 +1,7 @@ -mod sodium; mod error; +mod sodium; -pub use sodium::ToSodiumObject; -pub use error::{Error,Result}; \ No newline at end of file +pub use error::{Error, Result}; +pub use sodium::{ + ToSodiumObject, ToSsbId, CURVE_ED25519_SUFFIX, ED25519_SIGNATURE_SUFFIX, SHA256_SUFFIX, +}; diff --git a/src/crypto/sodium.rs b/src/crypto/sodium.rs index 96d58ce..4202e50 100644 --- a/src/crypto/sodium.rs +++ b/src/crypto/sodium.rs @@ -1,69 +1,74 @@ -use sodiumoxide::crypto::sign::ed25519; -use sodiumoxide::crypto::hash::sha256; use base64; +use sodiumoxide::crypto::hash::sha256; +use sodiumoxide::crypto::sign::ed25519; -use super::error::{Error,Result}; +use super::error::{Error, Result}; -const CURVE_ED25519_SUFFIX : &str = ".ed25519"; -const ED25519_SIGNATURE_SUFFIX : &str = ".sig.ed25519"; -const SHA256_SUFFIX : &str = ".sha256"; +pub const CURVE_ED25519_SUFFIX: &str = ".ed25519"; +pub const ED25519_SIGNATURE_SUFFIX: &str = ".sig.ed25519"; +pub const SHA256_SUFFIX: &str = ".sha256"; pub trait ToSodiumObject { fn to_ed25519_pk(&self) -> Result; fn to_ed25519_sk(&self) -> Result; fn to_ed25519_sk_no_suffix(&self) -> Result; - fn to_ed25519_signature(&self) -> Result; + fn to_ed25519_signature(&self) -> Result; fn to_sha256(&self) -> Result; } +pub trait ToSsbId { + fn to_ssb_id(&self) -> String; +} + +impl<'a> ToSsbId for ed25519::PublicKey { + fn to_ssb_id(&self) -> String { + format!("{}{}", base64::encode(self), CURVE_ED25519_SUFFIX) + } +} + impl ToSodiumObject for str { - fn to_ed25519_pk(self : &str) -> Result { + fn to_ed25519_pk(self: &str) -> Result { if !self.ends_with(CURVE_ED25519_SUFFIX) { return Err(Error::InvalidSuffix); } - - let key_len = self.len()-CURVE_ED25519_SUFFIX.len(); + + let key_len = self.len() - CURVE_ED25519_SUFFIX.len(); let bytes = base64::decode(&self[..key_len])?; - - ed25519::PublicKey::from_slice(&bytes) - .ok_or_else(|| Error::BadPublicKey) - } - - fn to_ed25519_sk(self : &str) -> Result { - if !self.ends_with(CURVE_ED25519_SUFFIX) { - return Err(Error::InvalidSuffix); - } - - let key_len = self.len()-CURVE_ED25519_SUFFIX.len(); - let bytes = base64::decode(&self[..key_len])?; - - ed25519::SecretKey::from_slice(&bytes) - .ok_or_else(|| Error::BadSecretKey) + + ed25519::PublicKey::from_slice(&bytes).ok_or_else(|| Error::BadPublicKey) } - fn to_ed25519_sk_no_suffix(self : &str) -> Result { + fn to_ed25519_sk(self: &str) -> Result { + if !self.ends_with(CURVE_ED25519_SUFFIX) { + return Err(Error::InvalidSuffix); + } + + let key_len = self.len() - CURVE_ED25519_SUFFIX.len(); + let bytes = base64::decode(&self[..key_len])?; + + ed25519::SecretKey::from_slice(&bytes).ok_or_else(|| Error::BadSecretKey) + } + + fn to_ed25519_sk_no_suffix(self: &str) -> Result { let bytes = base64::decode(&self[..])?; - - ed25519::SecretKey::from_slice(&bytes) - .ok_or_else(|| Error::BadSecretKey) + + ed25519::SecretKey::from_slice(&bytes).ok_or_else(|| Error::BadSecretKey) } - fn to_sha256(self : &str) -> Result { + fn to_sha256(self: &str) -> Result { if !self.ends_with(SHA256_SUFFIX) { return Err(Error::InvalidSuffix); } - let key = base64::decode(&self[..self.len()-SHA256_SUFFIX.len()])?; + let key = base64::decode(&self[..self.len() - SHA256_SUFFIX.len()])?; - sha256::Digest::from_slice(&key) - .ok_or(Error::InvalidDigest) + sha256::Digest::from_slice(&key).ok_or(Error::InvalidDigest) } - fn to_ed25519_signature(self : &str) -> Result { + fn to_ed25519_signature(self: &str) -> Result { if !self.ends_with(ED25519_SIGNATURE_SUFFIX) { return Err(Error::InvalidSuffix); } - let signature = base64::decode(&self[..self.len()-ED25519_SIGNATURE_SUFFIX.len()])?; + let signature = base64::decode(&self[..self.len() - ED25519_SIGNATURE_SUFFIX.len()])?; - ed25519::Signature::from_slice(&signature) - .ok_or(Error::CannotCreateSignature) + ed25519::Signature::from_slice(&signature).ok_or(Error::CannotCreateSignature) } } diff --git a/src/db/feeds.rs b/src/db/feeds.rs deleted file mode 100644 index 4dec7a4..0000000 --- a/src/db/feeds.rs +++ /dev/null @@ -1,324 +0,0 @@ -use std::io; -use std::path::PathBuf; -use std::fs::{File,OpenOptions}; -use std::io::SeekFrom; -use std::io::prelude::*; -use super::error::{Error,Result}; - -pub struct FeedsStorage { - path : PathBuf -} - -impl FeedsStorage { - fn filename(&self, user_id : &str) -> PathBuf { - let name = user_id.chars().map(|ch| match ch { - '+' => '-', - '/' => '_', - _ => ch, - }).collect::(); - let mut path = PathBuf::new(); - path.push(&self.path); - path.push(name); - - path - } - - pub fn new(path : PathBuf) -> Self{ - FeedsStorage { path } - } - pub fn user(&self, user_id : String) -> FeedStorage { - FeedStorage { - path : self.filename(&user_id) - } - } -} - -pub struct FeedStorage { - path : PathBuf, -} - -impl FeedStorage { - - /* - raw feed storage structure: - - last sequence in feed - 32 bits be - - * | message-len 32 bits be - | message - | message-len 32 bits be - */ - - pub fn append(&self, seq_no: u32, feed: &str) -> Result<()> { - let mut file = OpenOptions::new() - .create(true) - .read(true) - .write(true) - .open(&self.path)?; - - // check and update feed sequence number - let created = file.seek(SeekFrom::End(0))? == 0; - if created { - if seq_no != 1{ - return Err(Error::InvalidSequenceNo); - } - } else { - let file_seq_no = self.get_last_seq(&mut file)?; - if file_seq_no + 1 != seq_no { - return Err(Error::InvalidSequenceNo); - } - } - self.set_last_seq(&mut file,seq_no)?; - - file.seek(SeekFrom::End(0))?; - - // write feed size dummy - file.write_all( &(0 as u32).to_be_bytes()[..] )?; - - // write compressed feed - let offset = file.seek(SeekFrom::Current(0))?; - let mut wtr = snap::Writer::new(file); - io::copy(&mut feed.as_bytes(), &mut wtr)?; - - let mut file = wtr.into_inner().map_err( - |err| Error::CompressionError(format!("{:?}",err)) - )?; - let len = file.seek(SeekFrom::Current(0))?- offset; - - // write feed size - file.write_all( &(len as u32).to_be_bytes()[..] )?; - - file.seek(SeekFrom::End(-((8+len) as i64)))?; - file.write_all( &(len as u32).to_be_bytes()[..] )?; - - Ok(()) - } - - pub fn last_seq(&self) -> Result { - - if !self.path.exists() { - return Ok(0); - } - - let mut file = OpenOptions::new() - .read(true) - .open(&self.path)?; - self.get_last_seq(&mut file) - } - - fn get_last_seq(&self, file: &mut File) -> Result { - let mut file_seq_no = [0u8;4]; - file.seek(SeekFrom::Start(0))?; - file.read_exact(&mut file_seq_no[..])?; - Ok(u32::from_be_bytes(file_seq_no)) - } - - fn set_last_seq(&self, file: &mut File, seq_no: u32) -> Result<()> { - file.seek(SeekFrom::Start(0))?; - file.write_all( &seq_no.to_be_bytes()[..] )?; - Ok(()) - } - - pub fn iter(&self) -> Result { - let mut file = OpenOptions::new() - .read(true) - .open(&self.path)?; - - let last_seq_no = self.get_last_seq(&mut file)?; - - Ok(FeedStorageIterator{ - file, - current_seq_no : 0, - last_seq_no, - }) - } - - pub fn rev_iter(&self) -> Result { - let mut file = OpenOptions::new() - .read(true) - .open(&self.path)?; - - let last_seq_no = self.get_last_seq(&mut file)?; - file.seek(SeekFrom::End(-4))?; - - Ok(FeedStorageReverseIterator{ - file, - current_seq_no : last_seq_no, - }) - } - -} - -#[derive(PartialEq,Debug)] -pub struct Feed { - pub seq_no : u32, - pub value : String, -} - -pub struct FeedStorageIterator { - file : File, - current_seq_no : u32, - last_seq_no : u32, -} - -impl Iterator for FeedStorageIterator { - type Item = Result; - - // next() is the only required method - fn next(&mut self) -> Option { - - if self.current_seq_no >= self.last_seq_no { - return None; - } - - // read compressed size - let mut size_buf = [0u8;4]; - if let Err(err) = self.file.read_exact(&mut size_buf[..]) { - return Some(Err(Error::Io(err))); - } - let size = u32::from_be_bytes(size_buf); - - // read compressed data - let mut compressed = vec![0; size as usize]; - if let Err(err) = self.file.read_exact(&mut compressed[..]) { - return Some(Err(Error::Io(err))); - } - - let mut rdr = snap::Reader::new(&compressed[..]); - let mut plaintext : Vec = Vec::new(); - - if let Err(err) = io::copy(&mut rdr, &mut plaintext) { - return Some(Err(Error::Io(err))); - } - - // read compresed size again - if let Err(err) = self.file.read_exact(&mut size_buf[..]) { - return Some(Err(Error::Io(err))); - } - if u32::from_be_bytes(size_buf) != size { - return Some(Err(Error::MismatchReadingSecondSize)); - } - - self.current_seq_no += 1; - let ret = match String::from_utf8(plaintext) { - Err(err) => Err(Error::Utf8(err)), - Ok(value) => Ok(Feed { seq_no : self.current_seq_no, value }) - }; - Some(ret) - } -} - -pub struct FeedStorageReverseIterator { - file : File, - current_seq_no : u32, -} - -impl Iterator for FeedStorageReverseIterator { - type Item = Result; - - // next() is the only required method - fn next(&mut self) -> Option { - - if self.current_seq_no == 0 { - return None; - } - - // read compressed size - let mut size_buf = [0u8;4]; - if let Err(err) = self.file.read_exact(&mut size_buf[..]) { - return Some(Err(Error::Io(err))); - } - let size = u32::from_be_bytes(size_buf); - if let Err(err) = self.file.seek(SeekFrom::Current(-((size+8) as i64))) { - return Some(Err(Error::Io(err))); - } - - // read compresed size again - if let Err(err) = self.file.read_exact(&mut size_buf[..]) { - return Some(Err(Error::Io(err))); - } - if u32::from_be_bytes(size_buf) != size { - return Some(Err(Error::MismatchReadingSecondSize)); - } - - // read compressed data - let mut compressed = vec![0u8; size as usize]; - if let Err(err) = self.file.read_exact(&mut compressed[..]) { - return Some(Err(Error::Io(err))); - } - - let mut rdr = snap::Reader::new(&compressed[..]); - let mut plaintext : Vec = Vec::new(); - - if let Err(err) = io::copy(&mut rdr, &mut plaintext) { - return Some(Err(Error::Io(err))); - } - // prepare offset for the next read - if let Err(err) = self.file.seek(SeekFrom::Current(-((size+8) as i64))) { - return Some(Err(Error::Io(err))); - } - - let ret = match String::from_utf8(plaintext) { - Err(err) => Err(Error::Utf8(err)), - Ok(value) => Ok(Feed { seq_no : self.current_seq_no, value }) - }; - - self.current_seq_no -= 1; - - Some(ret) - } -} - -#[cfg(test)] -mod test { - use super::*; - use rand::distributions::Alphanumeric; - use rand::{thread_rng, Rng}; - use std::iter; - - fn rand_folder() -> Result { - let mut rng = thread_rng(); - let name: String = iter::repeat(()) - .map(|()| rng.sample(Alphanumeric)) - .take(12) - .collect(); - - let mut tmp_folder = std::env::temp_dir(); - tmp_folder.push(name); - - std::fs::create_dir(&tmp_folder)?; - - Ok(tmp_folder) - } - - #[test] - fn test_db_feeds() -> Result<()> { - let user_id = "@ZFWw+UclcUgYi081/C8lhgH+KQ9s7YJRoOYGnzxW/JQ=.ed25519"; - let feeds = FeedsStorage::new(rand_folder()?); - let feed = feeds.user(user_id.to_owned()); - - let f1 = Feed{seq_no:1, value:"123".to_string()}; - let f2 = Feed{seq_no:2, value:"8181".to_string()}; - let f3 = Feed{seq_no:3, value:"182881".to_string()}; - - assert_eq!(0,feed.last_seq()?); - feed.append(f1.seq_no, &f1.value)?; - assert_eq!(1,feed.last_seq()?); - feed.append(f2.seq_no, &f2.value)?; - assert_eq!(2,feed.last_seq()?); - feed.append(f3.seq_no, &f3.value)?; - assert_eq!(3,feed.last_seq()?); - - let mut it = feed.iter()?; - assert_eq!(it.next().unwrap()?,f1); - assert_eq!(it.next().unwrap()?,f2); - assert_eq!(it.next().unwrap()?,f3); - assert_eq!(it.next().is_none(),true); - - let mut rev_it = feed.rev_iter()?; - assert_eq!(rev_it.next().unwrap()?,f3); - assert_eq!(rev_it.next().unwrap()?,f2); - assert_eq!(rev_it.next().unwrap()?,f1); - assert_eq!(rev_it.next().is_none(),true); - - Ok(()) - } -} \ No newline at end of file diff --git a/src/db/mod.rs b/src/db/mod.rs deleted file mode 100644 index 249fd04..0000000 --- a/src/db/mod.rs +++ /dev/null @@ -1,4 +0,0 @@ -mod error; -pub mod feeds; - -pub use error::{Error,Result}; \ No newline at end of file diff --git a/src/discovery/error.rs b/src/discovery/error.rs new file mode 100644 index 0000000..05cb7f3 --- /dev/null +++ b/src/discovery/error.rs @@ -0,0 +1,27 @@ +#[derive(Debug)] +pub enum Error { + ParseInt(std::num::ParseIntError), + InvalidInviteCode, + CryptoFormat(crate::crypto::Error), +} + +impl From for Error { + fn from(err: crate::crypto::Error) -> Self { + Error::CryptoFormat(err) + } +} + +impl From for Error { + fn from(err: std::num::ParseIntError) -> Self { + Error::ParseInt(err) + } +} + +impl std::fmt::Display for Error { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + write!(f, "{:?}", self) + } +} +impl std::error::Error for Error {} + +pub type Result = std::result::Result; diff --git a/src/discovery/mod.rs b/src/discovery/mod.rs new file mode 100644 index 0000000..7f3b6a9 --- /dev/null +++ b/src/discovery/mod.rs @@ -0,0 +1,6 @@ +mod error; +mod network; +mod pubs; + +pub use network::ssb_net_id; +pub use pubs::Invite; diff --git a/src/discovery/network.rs b/src/discovery/network.rs new file mode 100644 index 0000000..6f3205c --- /dev/null +++ b/src/discovery/network.rs @@ -0,0 +1,6 @@ +use sodiumoxide::crypto::auth; + +pub const SSB_NET_ID: &str = "d4a1cb88a66f02f8db635ce26441cc5dac1b08420ceaac230839b755845a9ffb"; +pub fn ssb_net_id() -> auth::Key { + auth::Key::from_slice(&hex::decode(SSB_NET_ID).unwrap()).unwrap() +} diff --git a/src/patchwork/pubs.rs b/src/discovery/pubs.rs similarity index 68% rename from src/patchwork/pubs.rs rename to src/discovery/pubs.rs index a511d01..abbaaf5 100644 --- a/src/patchwork/pubs.rs +++ b/src/discovery/pubs.rs @@ -1,25 +1,25 @@ -use sodiumoxide::crypto::sign::ed25519; use crate::crypto::ToSodiumObject; +use sodiumoxide::crypto::sign::ed25519; -use super::error::{Error,Result}; +use super::error::{Error, Result}; pub struct Invite { - pub domain : String, - pub port : u16, + pub domain: String, + pub port: u16, pub pub_pk: ed25519::PublicKey, pub invite_sk: ed25519::SecretKey, } impl Invite { - pub fn from_code(code : &str) -> Result { - let domain_port_keys : Vec<_> = code.split(':').collect(); + pub fn from_code(code: &str) -> Result { + let domain_port_keys: Vec<_> = code.split(':').collect(); if domain_port_keys.len() != 3 { return Err(Error::InvalidInviteCode); } let domain = domain_port_keys[0].to_string(); let port = domain_port_keys[1].parse::()?; - let pk_sk :Vec<_> = domain_port_keys[2].split('~').collect(); + let pk_sk: Vec<_> = domain_port_keys[2].split('~').collect(); if pk_sk.len() != 2 { return Err(Error::InvalidInviteCode); @@ -27,9 +27,13 @@ impl Invite { let pub_pk = pk_sk[0][1..].to_ed25519_pk()?; let invite_sk = pk_sk[1][..].to_ed25519_sk_no_suffix()?; - Ok(Invite { domain, port, pub_pk, invite_sk }) + Ok(Invite { + domain, + port, + pub_pk, + invite_sk, + }) } - } #[cfg(test)] diff --git a/src/feed/base.rs b/src/feed/base.rs index b0c0890..abc0a02 100644 --- a/src/feed/base.rs +++ b/src/feed/base.rs @@ -1,12 +1,12 @@ -use std::str::FromStr; - use serde_json::Value; +use super::error::{Error, Result}; use super::message::Message; use super::ssb_sha256; -use super::error::{Error,Result}; +use std::time::SystemTime; +use std::time::UNIX_EPOCH; -#[derive(Debug,Serialize,Deserialize)] +#[derive(Debug, Serialize, Deserialize)] pub struct Feed { pub key: String, pub value: Value, @@ -14,18 +14,32 @@ pub struct Feed { pub rts: Option, } -impl Feed { - pub fn into_message(self) -> Result { - Message::from_value(self.value) +impl ToString for Feed { + fn to_string(&self) -> String { + serde_json::to_string(self).unwrap() } } -impl FromStr for Feed { - type Err = Error; - - fn from_str(s: &str) -> std::result::Result { - let feed : Feed = serde_json::from_str(&s)?; - let digest = format!("%{}.sha256",base64::encode(&ssb_sha256(&feed.value)?)); +impl Feed { + pub fn into_message(self) -> Result { + Message::from_value(self.value) + } + pub fn new(m: Message) -> Self { + let key = format!("%{}.sha256", base64::encode(&ssb_sha256(&m.value).unwrap())); + let timestamp = SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap() + .as_secs_f64(); + Feed { + key, + value: m.value, + timestamp, + rts: None, + } + } + pub fn from_slice(s: &[u8]) -> Result { + let feed: Feed = serde_json::from_slice(&s)?; + let digest = format!("%{}.sha256", base64::encode(&ssb_sha256(&feed.value)?)); if digest != feed.key { return Err(Error::FeedDigestMismatch); @@ -38,11 +52,11 @@ impl FromStr for Feed { #[cfg(test)] mod test { use super::*; - + #[test] fn test_verify_feed_integrity() -> Result<()> { let feed = r#"{"key":"%Cg0ZpZ8cV85G8UIIropgBOvM8+Srlv9LSGDNGnpdK44=.sha256","value":{"previous":"%seUEAo7PTyA7vNwnOrmGIsUFfpyRzOvzGVv1QCb/Fz8=.sha256","author":"@BIbVppzlrNiRJogxDYz3glUS7G4s4D4NiXiPEAEzxdE=.ed25519","sequence":37,"timestamp":1439392020612,"hash":"sha256","content":{"type":"post","text":"@paul real time replies didn't work.","repliesTo":"%xWKunF6nXD7XMC+D4cjwDMZWmBnmRu69w9T25iLNa1Q=.sha256","mentions":["%7UKRfZb2u8al4tYWHqM55R9xpE/KKVh9U0M6BdugGt4=.sha256"],"recps":[{"link":"@hxGxqPrplLjRG2vtjQL87abX4QKqeLgCwQpS730nNwE=.ed25519","name":"paul"}]},"signature":"gGxSPdBJZxp6x5f3HzQGoQSeSdh/C5AtymIn+miWa+lcC6DdqpRSgaeH9KHeLf+/CKhU6REYIpWaLr4CKDMfCg==.sig.ed25519"},"timestamp":1573574678194,"rts":1439392020612}"#; - Feed::from_str(&feed)?; + Feed::from_slice(feed.as_bytes())?; Ok(()) } -} \ No newline at end of file +} diff --git a/src/feed/encoding.rs b/src/feed/encoding.rs index cffdc3c..db8f94b 100644 --- a/src/feed/encoding.rs +++ b/src/feed/encoding.rs @@ -1,6 +1,6 @@ +use super::error::Result; use serde_json::Value; use sodiumoxide::crypto::hash::sha256; -use super::error::Result; pub fn ssb_sha256(v: &Value) -> Result { let v8encoding = stringify_json(&v)? @@ -60,8 +60,8 @@ pub fn stringify_json(v: &Value) -> Result { Value::Number(value) => { let mut as_str = value.to_string(); if as_str.contains('e') && !as_str.contains("e-") { - as_str = as_str.replace("e","e+") - } + as_str = as_str.replace("e", "e+") + } buffer.push_str(&as_str); } Value::Bool(value) => { @@ -82,7 +82,7 @@ pub fn stringify_json(v: &Value) -> Result { mod test { use super::*; - const JSON : &str = r#"{"a":0,"b":1.1,"c":null,"d":true,"f":false,"g":{},"h":{"h1":1},"i":[],"j":[1],"k":[1,2]}"#; + const JSON: &str = r#"{"a":0,"b":1.1,"c":null,"d":true,"f":false,"g":{},"h":{"h1":1},"i":[],"j":[1],"k":[1,2]}"#; #[test] fn test_json_stringify() -> Result<()> { let v: Value = serde_json::from_str(JSON)?; @@ -124,8 +124,8 @@ mod test { fn test_msg_with_float_mantissa() -> Result<()> { let expected = "RUcldndjJUkEcZ5hX6zAj/xLlnh0n4BZ6ThJOW5RvIk="; let message = r#"{"previous":"%gbem82xZNVHbOM2pyOlxymsAfstdMFfGSoawWQtObX8=.sha256","author":"@TXKFQehlyoSn8UJAIVP/k2BjFINC591MlBC2e2d24mA=.ed25519","sequence":1557,"timestamp":1495245157893,"hash":"sha256","content":{"type":"post","transactionHash":9.691449834862513e+76,"address":7.073631810716965e+46,"event":"ActionAdded","text":"{\"actionID\":\"1\",\"amount\":\"0\",\"description\":\"Bind Ethereum events to Secure Scuttlebutt posts\"}}"},"signature":"/Qvm9ozEfl0Thyvs+mnwhLDReZ8xeKXA3hSXOxm53SFkLEnnJ+IF0l7LSqc56Y3vl8FwarJ6k0PGmcU3U8FMAw==.sig.ed25519"}"#; - let message_value: Value = serde_json::from_str(&message)?; - let current = base64::encode(&ssb_sha256(&message_value)?); + let message_value: Value = serde_json::from_str(&message)?; + let current = base64::encode(&ssb_sha256(&message_value)?); assert_eq!(expected, current); Ok(()) } @@ -134,10 +134,9 @@ mod test { fn test_msg_with_float_precision() -> Result<()> { let expected = "BUtTVIJyN5fUXzQy2uQfCCzlAg0s6laQQqFIu+kGnFM="; let message = r#"{"previous":"%ButTjV+H9VfONhX+lLbJb5LR+W14SFqbmjOfdMPZ5+4=.sha256","sequence":15034,"author":"@6ilZq3kN0F+dXFHAPjAwMm87JEb/VdB+LC9eIMW3sa0=.ed25519","timestamp":1567190273951.0159,"hash":"sha256","content":{"type":"vote","channel":null,"vote":{"link":"%GvtUsekEwsCj1cQ6+4Gihkm+ek99BhB537g1xUKjhsA=.sha256","value":1,"expression":"Like"}},"signature":"UkVfqDmBhHrDfMvFT8iUhEispAku/zbdXKCyRVlxYp2wNtJ4okwKE7hTkKhbiMVA7sGIV5dzHZyMotXCL46iDw==.sig.ed25519"}"#; - let message_value: Value = serde_json::from_str(&message)?; - let current = base64::encode(&ssb_sha256(&message_value)?); + let message_value: Value = serde_json::from_str(&message)?; + let current = base64::encode(&ssb_sha256(&message_value)?); assert_eq!(expected, current); Ok(()) } } - diff --git a/src/feed/error.rs b/src/feed/error.rs index 8b768bd..e7f9af4 100644 --- a/src/feed/error.rs +++ b/src/feed/error.rs @@ -44,7 +44,6 @@ impl std::fmt::Display for Error { write!(f, "{:?}", self) } } -impl std::error::Error for Error { } +impl std::error::Error for Error {} pub type Result = std::result::Result; - diff --git a/src/feed/message.rs b/src/feed/message.rs index 88bc249..6c86a6e 100644 --- a/src/feed/message.rs +++ b/src/feed/message.rs @@ -1,29 +1,30 @@ -use std::time::SystemTime; use std::str::FromStr; +use std::time::SystemTime; -use sodiumoxide::crypto::{sign::ed25519}; use serde_json::Value; +use sodiumoxide::crypto::sign::ed25519; +use super::error::{Error, Result}; +use super::{ssb_sha256, stringify_json}; use crate::crypto::ToSodiumObject; -use crate::patchwork::IdentitySecret; -use super::{stringify_json,ssb_sha256}; -use super::error::{Error,Result}; +use crate::keystore::OwnedIdentity; +use sodiumoxide::crypto::hash::sha256; -const MSG_PREVIOUS : &str = "previous"; -const MSG_AUTHOR : &str = "author"; -const MSG_SEQUENCE : &str = "sequence"; -const MSG_TIMESTAMP : &str = "timestamp"; -const MSG_HASH : &str = "hash"; -const MSG_CONTENT : &str = "content"; -const MSG_SIGNATURE : &str = "signature"; +const MSG_PREVIOUS: &str = "previous"; +const MSG_AUTHOR: &str = "author"; +const MSG_SEQUENCE: &str = "sequence"; +const MSG_TIMESTAMP: &str = "timestamp"; +const MSG_HASH: &str = "hash"; +const MSG_CONTENT: &str = "content"; +const MSG_SIGNATURE: &str = "signature"; macro_rules! cast { ($input:expr,$pth:path) => { match $input { Some($pth(x)) => Ok(x), - _ => Err(Error::InvalidJson), + _ => Err(Error::InvalidJson), }; - } + }; } macro_rules! cast_opt { @@ -32,88 +33,93 @@ macro_rules! cast_opt { None => Ok(None), Some(Value::Null) => Ok(None), Some($pth(x)) => Ok(Some(x)), - _ => Err(Error::InvalidJson) + _ => Err(Error::InvalidJson), }; + }; +} + +pub struct MessageId(sha256::Digest); +impl ToString for MessageId { + fn to_string(&self) -> String { + let digest = base64::encode(&self.0); + format!("%{}.sha256", digest) } } +impl AsRef<[u8]> for MessageId { + fn as_ref(&self) -> &[u8] { + self.0.as_ref() + } +} -#[derive(Debug)] +#[derive(Debug, Deserialize)] pub struct Message { - value: serde_json::Value, + pub value: serde_json::Value, } impl Message { - - pub fn new(prev : Option<&Message>, identity: &IdentitySecret, content: Value) -> Result { - let mut value : serde_json::Map = serde_json::Map::new(); + pub fn sign(prev: Option<&Message>, identity: &OwnedIdentity, content: Value) -> Result { + let mut value: serde_json::Map = serde_json::Map::new(); if let Some(prev) = prev { - value.insert(MSG_PREVIOUS.to_string(), Value::String(prev.id()?)); + value.insert( + MSG_PREVIOUS.to_string(), + Value::String(prev.id().to_string()), + ); value.insert( MSG_SEQUENCE.to_string(), - Value::Number(serde_json::Number::from(prev.sequence()+1)) + Value::Number(serde_json::Number::from(prev.sequence() + 1)), ); } else { value.insert( MSG_SEQUENCE.to_string(), - Value::Number(serde_json::Number::from(1)) + Value::Number(serde_json::Number::from(1)), ); } - let timestamp = SystemTime::now().duration_since(SystemTime::UNIX_EPOCH)? + let timestamp = SystemTime::now() + .duration_since(SystemTime::UNIX_EPOCH)? .as_millis() as u64; let timestamp = Value::Number(serde_json::Number::from(timestamp)); - value.insert( - MSG_AUTHOR.to_string(), - Value::String(identity.id.clone()) - ); - value.insert( - MSG_TIMESTAMP.to_string(), - timestamp - ); - value.insert( - MSG_HASH.to_string(), - Value::String("sha256".to_string()) - ); - value.insert( - MSG_CONTENT.to_string(), - content - ); + value.insert(MSG_AUTHOR.to_string(), Value::String(identity.id.clone())); + value.insert(MSG_TIMESTAMP.to_string(), timestamp); + value.insert(MSG_HASH.to_string(), Value::String("sha256".to_string())); + value.insert(MSG_CONTENT.to_string(), content); let value = Value::Object(value); let to_sign_text = stringify_json(&value)?; - let mut value = cast!(Some(value),Value::Object)?; + let mut value = cast!(Some(value), Value::Object)?; let signature = ed25519::sign_detached(to_sign_text.as_bytes(), &identity.sk); value.insert( MSG_SIGNATURE.to_string(), - Value::String(format!("{}.sig.ed25519",base64::encode(&signature))) + Value::String(format!("{}.sig.ed25519", base64::encode(&signature))), ); - - Ok(Message { value : Value::Object(value) }) + + Ok(Message { + value: Value::Object(value), + }) } - pub fn from_slice(s : &[u8]) -> Result { + pub fn from_slice(s: &[u8]) -> Result { Self::from_value(serde_json::from_slice(&s)?) } - pub fn from_value(v : Value) -> Result { + pub fn from_value(v: Value) -> Result { + let mut v = cast!(Some(v), Value::Object)?; - let mut v = cast!(Some(v),Value::Object)?; - - // check if ok - cast_opt!(v.get(MSG_PREVIOUS),Value::String)?; - cast!(v.get(MSG_SEQUENCE),Value::Number)?; - cast!(v.get(MSG_SEQUENCE),Value::Number)?; - cast!(v.get(MSG_TIMESTAMP),Value::Number)?; - cast!(v.get(MSG_HASH),Value::String)?; + // check if ok + cast_opt!(v.get(MSG_PREVIOUS), Value::String)?; + cast!(v.get(MSG_SEQUENCE), Value::Number)?; + cast!(v.get(MSG_SEQUENCE), Value::Number)?; + cast!(v.get(MSG_TIMESTAMP), Value::Number)?; + cast!(v.get(MSG_HASH), Value::String)?; v.get(MSG_CONTENT).ok_or(Error::InvalidJson)?; // verify signature - let signature = cast!(v.remove(MSG_SIGNATURE),Value::String)?; - let author = cast!(v.get(MSG_AUTHOR),Value::String)?; + let signature = cast!(v.remove(MSG_SIGNATURE), Value::String)?; + let author = cast!(v.get(MSG_AUTHOR), Value::String)?; let sig = signature.to_ed25519_signature()?; let signer = author[1..].to_ed25519_pk()?; @@ -122,42 +128,43 @@ impl Message { if !ed25519::verify_detached(&sig, &signed_text.as_ref(), &signer) { return Err(Error::InvalidSignature); } - + // put signature back - let mut v = cast!(Some(value),Value::Object)?; + let mut v = cast!(Some(value), Value::Object)?; v.insert(MSG_SIGNATURE.to_string(), Value::String(signature)); - Ok(Message { value : Value::Object(v) }) - + Ok(Message { + value: Value::Object(v), + }) } - pub fn id(&self) -> Result { - let digest = base64::encode(&ssb_sha256(&self.value)?); - Ok(format!("%{}.sha256",digest)) + pub fn id(&self) -> MessageId { + MessageId(ssb_sha256(&self.value).unwrap()) } pub fn previous(&self) -> Option<&String> { - cast_opt!(self.value.get(MSG_PREVIOUS),Value::String) - .unwrap() + cast_opt!(self.value.get(MSG_PREVIOUS), Value::String).unwrap() } pub fn author(&self) -> &String { - cast!(self.value.get(MSG_AUTHOR),Value::String) + cast!(self.value.get(MSG_AUTHOR), Value::String).unwrap() + } + + pub fn sequence(&self) -> u64 { + cast!(self.value.get(MSG_SEQUENCE), Value::Number) + .unwrap() + .as_u64() .unwrap() } - - pub fn sequence(&self) -> u64 { - cast!(self.value.get(MSG_SEQUENCE),Value::Number) - .unwrap().as_u64().unwrap() - } - + pub fn timestamp(&self) -> f64 { - cast!(self.value.get(MSG_TIMESTAMP),Value::Number) - .unwrap().as_f64().unwrap() + cast!(self.value.get(MSG_TIMESTAMP), Value::Number) + .unwrap() + .as_f64() + .unwrap() } pub fn hash(&self) -> &String { - cast!(self.value.get(MSG_HASH),Value::String) - .unwrap() + cast!(self.value.get(MSG_HASH), Value::String).unwrap() } pub fn content(&self) -> &Value { @@ -165,10 +172,8 @@ impl Message { } pub fn signature(&self) -> &String { - cast!(self.value.get(MSG_SIGNATURE),Value::String) - .unwrap() + cast!(self.value.get(MSG_SIGNATURE), Value::String).unwrap() } - } impl FromStr for Message { @@ -188,25 +193,24 @@ impl ToString for Message { #[cfg(test)] mod test { use super::*; - + #[test] fn test_verify_known_msg_integrity() -> Result<()> { - let message_id ="%Cg0ZpZ8cV85G8UIIropgBOvM8+Srlv9LSGDNGnpdK44=.sha256"; + let message_id = "%Cg0ZpZ8cV85G8UIIropgBOvM8+Srlv9LSGDNGnpdK44=.sha256"; let message = r#"{"previous":"%seUEAo7PTyA7vNwnOrmGIsUFfpyRzOvzGVv1QCb/Fz8=.sha256","author":"@BIbVppzlrNiRJogxDYz3glUS7G4s4D4NiXiPEAEzxdE=.ed25519","sequence":37,"timestamp":1439392020612,"hash":"sha256","content":{"type":"post","text":"@paul real time replies didn't work.","repliesTo":"%xWKunF6nXD7XMC+D4cjwDMZWmBnmRu69w9T25iLNa1Q=.sha256","mentions":["%7UKRfZb2u8al4tYWHqM55R9xpE/KKVh9U0M6BdugGt4=.sha256"],"recps":[{"link":"@hxGxqPrplLjRG2vtjQL87abX4QKqeLgCwQpS730nNwE=.ed25519","name":"paul"}]},"signature":"gGxSPdBJZxp6x5f3HzQGoQSeSdh/C5AtymIn+miWa+lcC6DdqpRSgaeH9KHeLf+/CKhU6REYIpWaLr4CKDMfCg==.sig.ed25519"}"#; let msg = Message::from_str(&message)?; - assert_eq!(msg.id()?,message_id); + assert_eq!(msg.id().to_string(), message_id); Ok(()) } #[test] fn test_sign_verify() -> Result<()> { let content = Value::String("thistest".to_string()); - let id = IdentitySecret::new(); - let msg1 = Message::new(None,&id,content.clone())?.to_string(); + let id = OwnedIdentity::create(); + let msg1 = Message::sign(None, &id, content.clone())?.to_string(); let msg1 = Message::from_str(&msg1)?; - let msg2 = Message::new(Some(&msg1),&id,content)?.to_string(); + let msg2 = Message::sign(Some(&msg1), &id, content)?.to_string(); Message::from_str(&msg2)?; Ok(()) } - -} \ No newline at end of file +} diff --git a/src/feed/mod.rs b/src/feed/mod.rs index 5f8c792..cb04e94 100644 --- a/src/feed/mod.rs +++ b/src/feed/mod.rs @@ -1,11 +1,11 @@ -mod encoding; mod base; +mod encoding; +mod error; mod message; mod privatebox; -mod error; -pub use message::Message; pub use base::Feed; -pub use privatebox::{is_privatebox,privatebox_cipher,privatebox_decipher}; -pub use encoding::{ssb_sha256,stringify_json}; -pub use error::{Error,Result}; \ No newline at end of file +pub use encoding::{ssb_sha256, stringify_json}; +pub use error::{Error, Result}; +pub use message::Message; +pub use privatebox::{is_privatebox, privatebox_cipher, privatebox_decipher}; diff --git a/src/feed/privatebox.rs b/src/feed/privatebox.rs index 7619dec..fd3f545 100644 --- a/src/feed/privatebox.rs +++ b/src/feed/privatebox.rs @@ -1,58 +1,52 @@ use crate::crypto::ToSodiumObject; use sodiumoxide::crypto::sign::SecretKey; -use sodiumoxide::crypto::{sign::ed25519, scalarmult::curve25519, secretbox}; +use sodiumoxide::crypto::{scalarmult::curve25519, secretbox, sign::ed25519}; -use super::error::{Error,Result}; +use super::error::{Error, Result}; -pub const SUFFIX : &str = ".box"; +pub const SUFFIX: &str = ".box"; -pub const MAX_RECIPIENTS : u8 = 7; +pub const MAX_RECIPIENTS: u8 = 7; -const RECIPIENT_COUNT_LEN : usize = 1; -const ENCRYPTED_HEADER_LEN : usize = - RECIPIENT_COUNT_LEN - + secretbox::KEYBYTES - + secretbox::MACBYTES; +const RECIPIENT_COUNT_LEN: usize = 1; +const ENCRYPTED_HEADER_LEN: usize = RECIPIENT_COUNT_LEN + secretbox::KEYBYTES + secretbox::MACBYTES; -pub fn is_privatebox(text : &str) -> bool { - text.ends_with(SUFFIX) +pub fn is_privatebox(text: &str) -> bool { + text.ends_with(SUFFIX) } -pub fn privatebox_cipher(plaintext : &str, recipients: &[&str]) -> Result { - let recipients : crate::crypto::Result> = recipients +pub fn privatebox_cipher(plaintext: &str, recipients: &[&str]) -> Result { + let recipients: crate::crypto::Result> = recipients .iter() - .map(|id| { - id[1..].to_ed25519_pk() - }).collect(); - + .map(|id| id[1..].to_ed25519_pk()) + .collect(); + let recipients = recipients?; - let recipients_ref : Vec<_> = - recipients.iter().map(|r| r).collect(); + let recipients_ref: Vec<_> = recipients.iter().map(|r| r).collect(); - let ciphertext = cipher(plaintext.as_bytes(),&recipients_ref[..])?; + let ciphertext = cipher(plaintext.as_bytes(), &recipients_ref[..])?; - Ok(format!("{}{}",base64::encode(&ciphertext),SUFFIX)) -} + Ok(format!("{}{}", base64::encode(&ciphertext), SUFFIX)) +} -pub fn privatebox_decipher(ciphertext : &str, sk : &SecretKey) -> Result> { - let msg = &ciphertext.as_bytes()[..ciphertext.len()-SUFFIX.len()]; +pub fn privatebox_decipher(ciphertext: &str, sk: &SecretKey) -> Result> { + let msg = &ciphertext.as_bytes()[..ciphertext.len() - SUFFIX.len()]; let msg = base64::decode(msg)?; - let plaintext = decipher(&msg, &sk)? - .map(|msg| String::from_utf8_lossy(&msg).to_string()); + let plaintext = decipher(&msg, &sk)?.map(|msg| String::from_utf8_lossy(&msg).to_string()); Ok(plaintext) -} +} -fn cipher(plaintext : &[u8], recipients : &[&ed25519::PublicKey]) -> Result> { +fn cipher(plaintext: &[u8], recipients: &[&ed25519::PublicKey]) -> Result> { // Precondition checks if plaintext.is_empty() { return Err(Error::EmptyPlaintext); } - if recipients.is_empty() || recipients.len() > MAX_RECIPIENTS as usize{ + if recipients.is_empty() || recipients.len() > MAX_RECIPIENTS as usize { return Err(Error::BadRecipientCount); } @@ -61,7 +55,7 @@ fn cipher(plaintext : &[u8], recipients : &[&ed25519::PublicKey]) -> Result Result = Vec::with_capacity( - secretbox::NONCEBYTES - + ed25519::PUBLICKEYBYTES - + ENCRYPTED_HEADER_LEN*recipients.len() - + secretbox::MACBYTES+plaintext.len() + let mut buffer: Vec = Vec::with_capacity( + secretbox::NONCEBYTES + + ed25519::PUBLICKEYBYTES + + ENCRYPTED_HEADER_LEN * recipients.len() + + secretbox::MACBYTES + + plaintext.len(), ); buffer.extend_from_slice(&nonce[..]); @@ -87,20 +82,18 @@ fn cipher(plaintext : &[u8], recipients : &[&ed25519::PublicKey]) -> Result Result>> { - +fn decipher(ciphertext: &[u8], sk: &SecretKey) -> Result>> { let mut cursor = ciphertext; - + let nonce = secretbox::Nonce::from_slice(&cursor[..secretbox::NONCEBYTES]) .ok_or(Error::CannotReadNonce)?; cursor = &cursor[secretbox::NONCEBYTES..]; @@ -108,17 +101,16 @@ fn decipher(ciphertext : &[u8], sk : &SecretKey) -> Result>> { let h_pk = curve25519::GroupElement::from_slice(&cursor[..ed25519::PUBLICKEYBYTES]) .ok_or(Error::CannotReadNonce)?; cursor = &cursor[ed25519::PUBLICKEYBYTES..]; - + let key = curve25519::scalarmult(&sk.to_curve25519(), &h_pk) .and_then(|key| secretbox::Key::from_slice(&key[..]).ok_or(())) .map_err(|_| Error::CannotCreateKey)?; let mut header_no = 0; - while header_no < MAX_RECIPIENTS && cursor.len() > ENCRYPTED_HEADER_LEN+secretbox::MACBYTES { + while header_no < MAX_RECIPIENTS && cursor.len() > ENCRYPTED_HEADER_LEN + secretbox::MACBYTES { if let Ok(header) = secretbox::open(&cursor[..ENCRYPTED_HEADER_LEN], &nonce, &key) { - let encrypted_message_offset = ENCRYPTED_HEADER_LEN*(header[0]-header_no) as usize; - let y = secretbox::Key::from_slice(&header[1..]) - .ok_or(Error::CannotCreateKey)?; + let encrypted_message_offset = ENCRYPTED_HEADER_LEN * (header[0] - header_no) as usize; + let y = secretbox::Key::from_slice(&header[1..]).ok_or(Error::CannotCreateKey)?; let plaintext = secretbox::open(&cursor[encrypted_message_offset..], &nonce, &y) .map_err(|_| Error::FailedToDecipher)?; return Ok(Some(plaintext)); @@ -133,26 +125,26 @@ fn decipher(ciphertext : &[u8], sk : &SecretKey) -> Result>> { #[cfg(test)] mod test { use super::*; - use crate::patchwork::IdentitySecret; - + use crate::keystore::OwnedIdentity; + #[test] fn test_msg_cipher_to_one() -> Result<()> { let (u1_pk, u1_sk) = ed25519::gen_keypair(); let plaintext = "hola".as_bytes(); let ciphertext = cipher(plaintext, &[&u1_pk])?; let plaintext_1 = decipher(&ciphertext, &u1_sk)?.unwrap(); - assert_eq!(plaintext.to_vec(),plaintext_1); + assert_eq!(plaintext.to_vec(), plaintext_1); Ok(()) } #[test] fn test_msg_cipher_to_one_helper() -> Result<()> { - let id = IdentitySecret::new(); + let id = OwnedIdentity::create(); let plaintext = "holar"; let ciphertext = privatebox_cipher(plaintext, &[&id.id])?; - assert_eq!(is_privatebox(&ciphertext),true); + assert_eq!(is_privatebox(&ciphertext), true); let plaintext_1 = privatebox_decipher(&ciphertext, &id.sk)?.unwrap(); - assert_eq!(plaintext,plaintext_1); + assert_eq!(plaintext, plaintext_1); Ok(()) } @@ -163,21 +155,21 @@ mod test { let plaintext = "hola".as_bytes(); let ciphertext = cipher(plaintext, &[&u1_pk])?; let plaintext_1 = decipher(&ciphertext, &u1_sk)?; - assert_eq!(None,plaintext_1); + assert_eq!(None, plaintext_1); Ok(()) } #[test] fn test_msg_cipher_to_multiple() -> Result<()> { let u = (0..7).map(|_| ed25519::gen_keypair()).collect::>(); - let u_pk = u.iter().map(|(pk,_)| pk).collect::>(); + let u_pk = u.iter().map(|(pk, _)| pk).collect::>(); let plaintext = "hola".as_bytes(); let ciphertext = cipher(plaintext, &u_pk)?; - for (_,sk) in u.iter() { + for (_, sk) in u.iter() { let plaintext_1 = decipher(&ciphertext, sk)?.unwrap(); - assert_eq!(plaintext.to_vec(),plaintext_1); + assert_eq!(plaintext.to_vec(), plaintext_1); } Ok(()) diff --git a/src/db/error.rs b/src/keystore/error.rs similarity index 56% rename from src/db/error.rs rename to src/keystore/error.rs index c769210..7f71ead 100644 --- a/src/db/error.rs +++ b/src/keystore/error.rs @@ -1,11 +1,15 @@ #[derive(Debug)] pub enum Error { - InvalidSequenceNo, - MismatchReadingSecondSize, - CompressionError(String), - Utf8(std::string::FromUtf8Error), + HomeNotFound, + InvalidConfig, + CryptoFormat(crate::crypto::Error), Io(std::io::Error), } +impl From for Error { + fn from(err: crate::crypto::Error) -> Self { + Error::CryptoFormat(err) + } +} impl From for Error { fn from(err: std::io::Error) -> Self { @@ -13,17 +17,11 @@ impl From for Error { } } -impl From for Error { - fn from(err: std::string::FromUtf8Error) -> Self { - Error::Utf8(err) - } -} - impl std::fmt::Display for Error { fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { write!(f, "{:?}", self) } } -impl std::error::Error for Error { } -pub type Result = std::result::Result; +impl std::error::Error for Error {} +pub type Result = std::result::Result; diff --git a/src/keystore/identity.rs b/src/keystore/identity.rs new file mode 100644 index 0000000..c9816d3 --- /dev/null +++ b/src/keystore/identity.rs @@ -0,0 +1,20 @@ +use crate::crypto::CURVE_ED25519_SUFFIX; +use sodiumoxide::crypto::sign::ed25519; + +#[derive(Debug, Clone)] +pub struct OwnedIdentity { + pub id: String, + pub pk: ed25519::PublicKey, + pub sk: ed25519::SecretKey, +} + +impl OwnedIdentity { + pub fn create() -> OwnedIdentity { + let (pk, sk) = ed25519::gen_keypair(); + OwnedIdentity { + pk, + sk, + id: format!("@{}{}", base64::encode(&pk), CURVE_ED25519_SUFFIX), + } + } +} diff --git a/src/keystore/mod.rs b/src/keystore/mod.rs new file mode 100644 index 0000000..6d3629f --- /dev/null +++ b/src/keystore/mod.rs @@ -0,0 +1,6 @@ +mod error; +mod identity; +pub mod patchwork; + +pub use identity::OwnedIdentity; +pub use patchwork::{from_patchwork_config, from_patchwork_local}; diff --git a/src/keystore/patchwork.rs b/src/keystore/patchwork.rs new file mode 100644 index 0000000..51cac70 --- /dev/null +++ b/src/keystore/patchwork.rs @@ -0,0 +1,51 @@ +use std::io; +use std::string::ToString; + +use crate::crypto::ToSodiumObject; + +use super::error::{Error, Result}; +use super::OwnedIdentity; + +pub const CURVE_ED25519: &str = "ed25519"; + +#[derive(Deserialize)] +struct JsonSSBSecret { + id: String, + curve: String, + public: String, + private: String, +} + +fn to_ioerr(err: T) -> io::Error { + io::Error::new(io::ErrorKind::Other, err.to_string()) +} + +pub fn from_patchwork_local() -> Result { + let home_dir = dirs::home_dir().ok_or(Error::HomeNotFound)?; + let local_key_file = format!("{}/.ssb/secret", home_dir.to_string_lossy()); + let content = std::fs::read_to_string(local_key_file)?; + Ok(from_patchwork_config(content)?) +} + +pub fn from_patchwork_config>(config: T) -> Result { + // strip all comments + let json = config + .as_ref() + .lines() + .filter(|line| !line.starts_with('#')) + .collect::>() + .join(""); + + // parse json + let secret: JsonSSBSecret = serde_json::from_str(json.as_ref()).map_err(to_ioerr)?; + + if secret.curve != CURVE_ED25519 { + return Err(Error::InvalidConfig); + } + + Ok(OwnedIdentity { + id: secret.id, + pk: secret.public.to_ed25519_pk()?, + sk: secret.private.to_ed25519_sk()?, + }) +} diff --git a/src/lib.rs b/src/lib.rs index e177173..89448b4 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,11 +1,13 @@ extern crate kuska_handshake; + #[macro_use] extern crate serde; -extern crate serde_json; extern crate async_std; +extern crate serde_json; -pub mod rpc; +pub mod api; pub mod crypto; -pub mod patchwork; +pub mod discovery; pub mod feed; -pub mod db; +pub mod keystore; +pub mod rpc; diff --git a/src/patchwork/config.rs b/src/patchwork/config.rs deleted file mode 100644 index a7b30c0..0000000 --- a/src/patchwork/config.rs +++ /dev/null @@ -1,80 +0,0 @@ -use std::io; -use std::string::ToString; - -use sodiumoxide::crypto::sign::ed25519; -use sodiumoxide::crypto::auth; - -use crate::crypto::ToSodiumObject; - -use super::error::{Error,Result}; - -const CURVE_ED25519 : &str = "ed25519"; -pub const SSB_NET_ID : &str = "d4a1cb88a66f02f8db635ce26441cc5dac1b08420ceaac230839b755845a9ffb"; - -#[derive(Debug)] -pub struct IdentitySecret { - pub id: String, - pub pk: ed25519::PublicKey, - pub sk: ed25519::SecretKey, -} - -#[derive(Deserialize)] -struct JsonSSBSecret { - id: String, - curve: String, - public: String, - private: String, -} - -pub fn ssb_net_id() -> auth::Key { - auth::Key::from_slice(&hex::decode(SSB_NET_ID).unwrap()).unwrap() -} - -fn to_ioerr(err: T) -> io::Error { - io::Error::new(io::ErrorKind::Other, err.to_string()) -} - -#[allow(clippy::new_without_default)] -impl IdentitySecret { - - pub fn new() -> IdentitySecret { - let (pk, sk) = ed25519::gen_keypair(); - IdentitySecret { - pk, sk, - id : format!("@{}.{}",base64::encode(&pk),CURVE_ED25519), - } - } - - pub fn from_local_config() -> Result { - let home_dir = dirs::home_dir().ok_or(Error::HomeNotFound)?; - let local_key_file = format!("{}/.ssb/secret",home_dir.to_string_lossy()); - let content = std::fs::read_to_string(local_key_file)?; - Ok(IdentitySecret::from_config(content)?) - } - - pub fn from_config>(config: T) -> Result { - - // strip all comments - let json = config.as_ref() - .lines() - .filter(|line| !line.starts_with('#')) - .collect::>() - .join(""); - - // parse json - let secret : JsonSSBSecret = serde_json::from_str(json.as_ref()) - .map_err(to_ioerr)?; - - if secret.curve != CURVE_ED25519 { - return Err(Error::InvalidConfig); - } - - Ok(IdentitySecret { - id : secret.id, - pk : secret.public.to_ed25519_pk()?, - sk : secret.private.to_ed25519_sk()?, - }) - } -} - - diff --git a/src/patchwork/mod.rs b/src/patchwork/mod.rs deleted file mode 100644 index e355fde..0000000 --- a/src/patchwork/mod.rs +++ /dev/null @@ -1,12 +0,0 @@ -mod api; -mod config; -mod messagetypes; -mod error; -pub mod pubs; - -pub use api::{ - parse_feed, parse_latest, parse_message, parse_whoami, ApiClient, CreateHistoryStreamArgs, - CreateStreamArgs, -}; -pub use config::{ssb_net_id, IdentitySecret}; -pub use error::{Error,Result}; \ No newline at end of file diff --git a/src/rpc/client.rs b/src/rpc/client.rs deleted file mode 100644 index 646663c..0000000 --- a/src/rpc/client.rs +++ /dev/null @@ -1,212 +0,0 @@ - -use super::error::{Error,Result}; - -use async_std::io; -use async_std::prelude::*; - -use kuska_handshake::async_std::{ - BoxStreamRead, - BoxStreamWrite -}; - -pub type RequestNo = i32; - -const HEADER_SIZE : usize = 9; - -const RPC_HEADER_STREAM_FLAG : u8 = 1 << 3; -const RPC_HEADER_END_OR_ERROR_FLAG : u8 = 1 << 2; -const RPC_HEADER_BODY_TYPE_MASK : u8 = 0b11; - -#[derive(Debug,PartialEq)] -pub enum BodyType { - Binary, - UTF8, - JSON, -} - -#[derive(Debug,PartialEq)] -pub enum RpcType { - Async, - Source, -} - -impl RpcType { - pub fn rpc_id(&self) -> &'static str { - match self { - RpcType::Async => "async", - RpcType::Source => "source", - } - } -} - -#[derive(Debug,PartialEq)] -pub struct Header { - pub req_no : RequestNo, - pub is_stream : bool, - pub is_end_or_error : bool, - pub body_type : BodyType, - pub body_len : u32, -} - -impl Header { - pub fn from_slice(bytes: &[u8]) -> Result
{ - if bytes.len() < HEADER_SIZE { - return Err(Error::HeaderSizeTooSmall); - } - - let is_stream = (bytes[0] & RPC_HEADER_STREAM_FLAG) == RPC_HEADER_STREAM_FLAG; - let is_end_or_error = (bytes[0] & RPC_HEADER_END_OR_ERROR_FLAG) == RPC_HEADER_END_OR_ERROR_FLAG; - let body_type = match bytes[0] & RPC_HEADER_BODY_TYPE_MASK { - 0 => BodyType::Binary, - 1 => BodyType::UTF8, - 2 => BodyType::JSON, - _ => return Err(Error::InvalidBodyType), - }; - - let mut body_len_buff = [0u8;4]; - body_len_buff.copy_from_slice(&bytes[1..5]); - let body_len = u32::from_be_bytes(body_len_buff); - - let mut reqno_buff = [0u8;4]; - reqno_buff.copy_from_slice(&bytes[5..9]); - let req_no = i32::from_be_bytes(reqno_buff); - - Ok(Header{ - req_no, is_stream, is_end_or_error, body_type, body_len - }) - } - - pub fn to_array(&self) -> [u8;9] { - let mut flags : u8 = 0; - if self.is_end_or_error { - flags |= RPC_HEADER_END_OR_ERROR_FLAG; - } - if self.is_stream { - flags |= RPC_HEADER_STREAM_FLAG; - } - flags |= match self.body_type { - BodyType::Binary => 0, - BodyType::UTF8 => 1, - BodyType::JSON => 2, - }; - let len = self.body_len.to_be_bytes(); - let req_no = self.req_no.to_be_bytes(); - - let mut encoded = [0u8;9]; - encoded[0] = flags; - encoded[1..5].copy_from_slice(&len[..]); - encoded[5..9].copy_from_slice(&req_no[..]); - - encoded - } -} - -pub struct RpcClient { - box_reader : BoxStreamRead, - box_writer : BoxStreamWrite, - req_no : RequestNo, -} - -impl RpcClient { - - pub fn new(box_reader :BoxStreamRead, box_writer :BoxStreamWrite) -> RpcClient { - RpcClient { box_reader, box_writer, req_no : 0 } - } - - pub async fn recv(&mut self) -> Result<(Header,Vec)> { - let mut rpc_header_raw = [0u8;9]; - self.box_reader.read_exact(&mut rpc_header_raw[..]).await?; - let rpc_header = Header::from_slice(&rpc_header_raw[..])?; - - let mut rpc_body : Vec = vec![0;rpc_header.body_len as usize]; - self.box_reader.read_exact(&mut rpc_body[..]).await?; - - Ok((rpc_header,rpc_body)) - } - - pub async fn send(&mut self, name : &[&str], rpc_type: RpcType, args :&T) -> Result{ - - self.req_no+=1; - - let mut body = String::from("{\"name\":"); - body.push_str(&serde_json::to_string(&name)?); - body.push_str(",\"type\":\""); - body.push_str(rpc_type.rpc_id()); - body.push_str("\",\"args\":["); - body.push_str(&serde_json::to_string(&args)?); - body.push_str("]}"); - - let rpc_header = Header { - req_no : self.req_no, - is_stream : rpc_type == RpcType::Source, - is_end_or_error : false, - body_type : BodyType::JSON, - body_len : body.len() as u32, - }.to_array(); - - self.box_writer.write_all(&rpc_header[..]).await?; - self.box_writer.write_all(body.as_bytes()).await?; - self.box_writer.flush().await?; - - Ok(self.req_no) - } - - pub async fn send_cancel_stream(&mut self, req_no: RequestNo) -> Result<()> { - let body_bytes = b"true"; - - let rpc_header = Header { - req_no, - is_stream : true, - is_end_or_error : true, - body_type : BodyType::JSON, - body_len : body_bytes.len() as u32, - }.to_array(); - - self.box_writer.write_all(&rpc_header[..]).await?; - self.box_writer.write_all(&body_bytes[..]).await?; - Ok(()) - } - - pub async fn close(&mut self) -> Result<()> { - self.box_writer.goodbye().await?; - Ok(()) - } - -} - -#[cfg(test)] -mod test { - use super::{Header,BodyType}; - - #[test] - fn test_header_encoding_1() { - let h = Header::from_slice(&(Header{ - req_no : 5, - is_stream : true, - is_end_or_error : false, - body_type : BodyType::JSON, - body_len : 123, - }.to_array())[..]).unwrap(); - assert_eq!(h.req_no,5); - assert_eq!(h.is_stream, true); - assert_eq!(h.is_end_or_error, false); - assert_eq!(h.body_type, BodyType::JSON); - assert_eq!(h.body_len, 123); - } - - #[test] - fn test_header_encoding_2() { - let h = Header::from_slice(&(Header{ - req_no : -5, - is_stream : false, - is_end_or_error : true, - body_type : BodyType::Binary, - body_len : 2123, - }.to_array())[..]).unwrap(); - assert_eq!(h.req_no,-5); - assert_eq!(h.is_stream, false); - assert_eq!(h.is_end_or_error, true); - assert_eq!(h.body_type, BodyType::Binary); - assert_eq!(h.body_len, 2123); - } -} \ No newline at end of file diff --git a/src/rpc/error.rs b/src/rpc/error.rs index 9c0828a..c3f17ea 100644 --- a/src/rpc/error.rs +++ b/src/rpc/error.rs @@ -23,7 +23,6 @@ impl std::fmt::Display for Error { write!(f, "{:?}", self) } } -impl std::error::Error for Error { } +impl std::error::Error for Error {} pub type Result = std::result::Result; - diff --git a/src/rpc/mod.rs b/src/rpc/mod.rs index 965416d..8119efe 100644 --- a/src/rpc/mod.rs +++ b/src/rpc/mod.rs @@ -1,5 +1,5 @@ -mod client; mod error; +mod stream; -pub use client::{RpcClient,Header, RequestNo, RpcType}; -pub use error::{Error,Result}; +pub use error::{Error, Result}; +pub use stream::{Body, BodyType, RecvMsg, RequestNo, RpcStream, RpcType}; diff --git a/src/rpc/stream.rs b/src/rpc/stream.rs new file mode 100644 index 0000000..945a30a --- /dev/null +++ b/src/rpc/stream.rs @@ -0,0 +1,310 @@ +use super::error::{Error, Result}; + +use async_std::io; +use async_std::prelude::*; + +use kuska_handshake::async_std::{BoxStreamRead, BoxStreamWrite}; + +pub type RequestNo = i32; + +const HEADER_SIZE: usize = 9; + +const RPC_HEADER_STREAM_FLAG: u8 = 1 << 3; +const RPC_HEADER_END_OR_ERROR_FLAG: u8 = 1 << 2; +const RPC_HEADER_BODY_TYPE_MASK: u8 = 0b11; + +#[derive(Copy, Clone, Debug, PartialEq)] +pub enum BodyType { + Binary, + UTF8, + JSON, +} + +#[derive(Deserialize)] +pub struct Body { + pub name: Vec, + #[serde(rename = "type")] + pub rpc_type: RpcType, + pub args: serde_json::Value, +} + +#[derive(Serialize)] +pub struct BodyRef<'a, T: serde::Serialize> { + pub name: &'a [&'a str], + #[serde(rename = "type")] + pub rpc_type: RpcType, + pub args: &'a T, +} + +#[derive(Clone, Copy, Serialize, Deserialize, Debug, PartialEq)] +pub enum RpcType { + #[serde(rename = "async")] + Async, + #[serde(rename = "source")] + Source, +} + +#[derive(Debug, PartialEq)] +pub struct Header { + pub req_no: RequestNo, + pub is_stream: bool, + pub is_end_or_error: bool, + pub body_type: BodyType, + pub body_len: u32, +} + +#[derive(Serialize, Deserialize)] +struct ErrorMessage<'a> { + name: &'a str, + message: &'a str, +} + +impl Header { + pub fn from_slice(bytes: &[u8]) -> Result
{ + if bytes.len() < HEADER_SIZE { + return Err(Error::HeaderSizeTooSmall); + } + + let is_stream = (bytes[0] & RPC_HEADER_STREAM_FLAG) == RPC_HEADER_STREAM_FLAG; + let is_end_or_error = + (bytes[0] & RPC_HEADER_END_OR_ERROR_FLAG) == RPC_HEADER_END_OR_ERROR_FLAG; + let body_type = match bytes[0] & RPC_HEADER_BODY_TYPE_MASK { + 0 => BodyType::Binary, + 1 => BodyType::UTF8, + 2 => BodyType::JSON, + _ => return Err(Error::InvalidBodyType), + }; + + let mut body_len_buff = [0u8; 4]; + body_len_buff.copy_from_slice(&bytes[1..5]); + let body_len = u32::from_be_bytes(body_len_buff); + + let mut reqno_buff = [0u8; 4]; + reqno_buff.copy_from_slice(&bytes[5..9]); + let req_no = i32::from_be_bytes(reqno_buff); + + Ok(Header { + req_no, + is_stream, + is_end_or_error, + body_type, + body_len, + }) + } + + pub fn to_array(&self) -> [u8; 9] { + let mut flags: u8 = 0; + if self.is_end_or_error { + flags |= RPC_HEADER_END_OR_ERROR_FLAG; + } + if self.is_stream { + flags |= RPC_HEADER_STREAM_FLAG; + } + flags |= match self.body_type { + BodyType::Binary => 0, + BodyType::UTF8 => 1, + BodyType::JSON => 2, + }; + let len = self.body_len.to_be_bytes(); + let req_no = self.req_no.to_be_bytes(); + + let mut encoded = [0u8; 9]; + encoded[0] = flags; + encoded[1..5].copy_from_slice(&len[..]); + encoded[5..9].copy_from_slice(&req_no[..]); + + encoded + } +} + +pub struct RpcStream { + box_reader: BoxStreamRead, + box_writer: BoxStreamWrite, + req_no: RequestNo, +} + +pub enum RecvMsg { + Request(Body), + ErrorResponse(String), + CancelStreamRespose(), + BodyResponse(Vec), +} + +impl RpcStream { + pub fn new(box_reader: BoxStreamRead, box_writer: BoxStreamWrite) -> RpcStream { + RpcStream { + box_reader, + box_writer, + req_no: 0, + } + } + + pub async fn recv(&mut self) -> Result<(RequestNo, RecvMsg)> { + let mut rpc_header_raw = [0u8; 9]; + self.box_reader.read_exact(&mut rpc_header_raw[..]).await?; + let rpc_header = Header::from_slice(&rpc_header_raw[..])?; + + let mut body_raw: Vec = vec![0; rpc_header.body_len as usize]; + self.box_reader.read_exact(&mut body_raw[..]).await?; + + if rpc_header.req_no > 0 { + let rpc_body = serde_json::from_slice(&body_raw)?; + Ok((rpc_header.req_no, RecvMsg::Request(rpc_body))) + } else if rpc_header.is_end_or_error { + if rpc_header.is_stream { + Ok((-rpc_header.req_no, RecvMsg::CancelStreamRespose())) + } else { + let err: ErrorMessage = serde_json::from_slice(&body_raw)?; + Ok(( + -rpc_header.req_no, + RecvMsg::ErrorResponse(err.message.to_string()), + )) + } + } else { + Ok((-rpc_header.req_no, RecvMsg::BodyResponse(body_raw))) + } + } + + pub async fn send_request( + &mut self, + name: &[&str], + rpc_type: RpcType, + args: &T, + ) -> Result { + self.req_no += 1; + + let body_str = serde_json::to_string(&BodyRef { + name, + rpc_type, + args: &[&args], + })?; + + let rpc_header = Header { + req_no: self.req_no, + is_stream: rpc_type == RpcType::Source, + is_end_or_error: false, + body_type: BodyType::JSON, + body_len: body_str.as_bytes().len() as u32, + } + .to_array(); + + self.box_writer.write_all(&rpc_header[..]).await?; + self.box_writer.write_all(body_str.as_bytes()).await?; + self.box_writer.flush().await?; + + Ok(self.req_no) + } + + pub async fn send_response( + &mut self, + req_no: RequestNo, + rpc_type: RpcType, + body_type: BodyType, + body: &[u8], + ) -> Result<()> { + let rpc_header = Header { + req_no: -req_no, + is_stream: rpc_type == RpcType::Source, + is_end_or_error: false, + body_type, + body_len: body.len() as u32, + } + .to_array(); + + self.box_writer.write_all(&rpc_header[..]).await?; + self.box_writer.write_all(body).await?; + self.box_writer.flush().await?; + + Ok(()) + } + + pub async fn send_error(&mut self, req_no: RequestNo, message: &str) -> Result<()> { + let body_bytes = serde_json::to_string(&ErrorMessage { + name: "Error", + message, + })?; + + let rpc_header = Header { + req_no: -req_no, + is_stream: false, + is_end_or_error: true, + body_type: BodyType::UTF8, + body_len: body_bytes.as_bytes().len() as u32, + } + .to_array(); + + self.box_writer.write_all(&rpc_header[..]).await?; + self.box_writer.write_all(body_bytes.as_bytes()).await?; + self.box_writer.flush().await?; + + Ok(()) + } + + pub async fn send_stream_eof(&mut self, req_no: RequestNo) -> Result<()> { + let body_bytes = b"true"; + + let rpc_header = Header { + req_no: -req_no, + is_stream: true, + is_end_or_error: true, + body_type: BodyType::JSON, + body_len: body_bytes.len() as u32, + } + .to_array(); + + self.box_writer.write_all(&rpc_header[..]).await?; + self.box_writer.write_all(&body_bytes[..]).await?; + self.box_writer.flush().await?; + Ok(()) + } + + pub async fn close(&mut self) -> Result<()> { + self.box_writer.goodbye().await?; + Ok(()) + } +} + +#[cfg(test)] +mod test { + use super::{BodyType, Header}; + + #[test] + fn test_header_encoding_1() { + let h = Header::from_slice( + &(Header { + req_no: 5, + is_stream: true, + is_end_or_error: false, + body_type: BodyType::JSON, + body_len: 123, + } + .to_array())[..], + ) + .unwrap(); + assert_eq!(h.req_no, 5); + assert_eq!(h.is_stream, true); + assert_eq!(h.is_end_or_error, false); + assert_eq!(h.body_type, BodyType::JSON); + assert_eq!(h.body_len, 123); + } + + #[test] + fn test_header_encoding_2() { + let h = Header::from_slice( + &(Header { + req_no: -5, + is_stream: false, + is_end_or_error: true, + body_type: BodyType::Binary, + body_len: 2123, + } + .to_array())[..], + ) + .unwrap(); + assert_eq!(h.req_no, -5); + assert_eq!(h.is_stream, false); + assert_eq!(h.is_end_or_error, true); + assert_eq!(h.body_type, BodyType::Binary); + assert_eq!(h.body_len, 2123); + } +}