Solar sync

This commit is contained in:
adria0 2020-05-10 00:13:34 +02:00
parent 7cd204cad2
commit 8a0509327c
8 changed files with 63 additions and 34 deletions

View File

@ -8,11 +8,11 @@ edition = "2018"
name = "kuska_ssb" name = "kuska_ssb"
[dependencies] [dependencies]
kuska-handshake = { git = "https://github.com/Kuska-ssb/kuska-handshake", branch = "master" , features=["sync","async_std"] } kuska-handshake = { path = "../kuska-handshake", branch = "master" , features=["sync","async_std"] }
sodiumoxide = { git = "https://github.com/Dhole/sodiumoxidez", branch = "extra" } sodiumoxide = { git = "https://github.com/Dhole/sodiumoxidez", branch = "extra" }
base64 = "0.11.0" base64 = "0.11.0"
hex = "0.4.0" hex = "0.4.0"
async-std = { version = "1.4.0", features=["unstable","attributes"] } async-std = { version = "1.5.0", features=["unstable","attributes"] }
crossbeam = "0.7.3" crossbeam = "0.7.3"
log = "0.4.8" log = "0.4.8"
env_logger = "0.7.1" env_logger = "0.7.1"

View File

@ -20,7 +20,7 @@ use kuska_ssb::discovery::ssb_net_id;
use kuska_ssb::feed::{is_privatebox, privatebox_decipher, Feed, Message}; use kuska_ssb::feed::{is_privatebox, privatebox_decipher, Feed, Message};
use kuska_ssb::keystore::from_patchwork_local; use kuska_ssb::keystore::from_patchwork_local;
use kuska_ssb::keystore::OwnedIdentity; use kuska_ssb::keystore::OwnedIdentity;
use kuska_ssb::rpc::{RecvMsg, RequestNo, RpcStream}; use kuska_ssb::rpc::{RecvMsg, RequestNo, RpcStreamReader, RpcStreamWriter};
use regex::Regex; use regex::Regex;
use sodiumoxide::crypto::sign::ed25519; use sodiumoxide::crypto::sign::ed25519;
@ -71,7 +71,8 @@ impl std::fmt::Display for AppError {
} }
async fn get_async<'a, R, W, T, F>( async fn get_async<'a, R, W, T, F>(
client: &mut ApiHelper<R, W>, rpc_reader : &mut RpcStreamReader<R>,
client: &mut ApiHelper<W>,
req_no: RequestNo, req_no: RequestNo,
f: F, f: F,
) -> SolarResult<T> ) -> SolarResult<T>
@ -82,7 +83,7 @@ where
T: Debug, T: Debug,
{ {
loop { loop {
let (id, msg) = client.rpc().recv().await?; let (id, msg) = rpc_reader.recv().await?;
if id == req_no { if id == req_no {
match msg { match msg {
RecvMsg::RpcResponse(_type, body) => { RecvMsg::RpcResponse(_type, body) => {
@ -100,7 +101,8 @@ where
} }
async fn print_source_until_eof<'a, R, W, T, F>( async fn print_source_until_eof<'a, R, W, T, F>(
client: &mut ApiHelper<R, W>, rpc_reader : &mut RpcStreamReader<R>,
client: &mut ApiHelper<W>,
req_no: RequestNo, req_no: RequestNo,
f: F, f: F,
) -> SolarResult<()> ) -> SolarResult<()>
@ -111,7 +113,7 @@ where
T: Debug + serde::Deserialize<'a>, T: Debug + serde::Deserialize<'a>,
{ {
loop { loop {
let (id, msg) = client.rpc().recv().await?; let (id, msg) = rpc_reader.recv().await?;
if id == req_no { if id == req_no {
match msg { match msg {
RecvMsg::RpcResponse(_type, body) => { RecvMsg::RpcResponse(_type, body) => {
@ -189,10 +191,11 @@ async fn main() -> SolarResult<()> {
let (box_stream_read, box_stream_write) = let (box_stream_read, box_stream_write) =
BoxStream::from_handshake(&socket, &socket, handshake, 0x8000).split_read_write(); BoxStream::from_handshake(&socket, &socket, handshake, 0x8000).split_read_write();
let mut client = ApiHelper::new(RpcStream::new(box_stream_read, box_stream_write)); let mut rpc_reader = RpcStreamReader::new(box_stream_read);
let mut client = ApiHelper::new(RpcStreamWriter::new(box_stream_write));
let req_id = client.whoami_req_send().await?; let req_id = client.whoami_req_send().await?;
let whoami = get_async(&mut client, req_id, whoami_res_parse).await?.id; let whoami = get_async(&mut rpc_reader, &mut client, req_id, whoami_res_parse).await?.id;
println!("😊 server says hello to {}", whoami); println!("😊 server says hello to {}", whoami);
@ -211,7 +214,7 @@ async fn main() -> SolarResult<()> {
} }
("whoami", 1) => { ("whoami", 1) => {
let req_id = client.whoami_req_send().await?; let req_id = client.whoami_req_send().await?;
let whoami = get_async(&mut client, req_id, whoami_res_parse).await?.id; let whoami = get_async(&mut rpc_reader, &mut client, req_id, whoami_res_parse).await?.id;
println!("{}", whoami); println!("{}", whoami);
} }
("get", 2) => { ("get", 2) => {
@ -221,7 +224,7 @@ async fn main() -> SolarResult<()> {
args[1].clone() args[1].clone()
}; };
let req_id = client.get_req_send(&msg_id).await?; let req_id = client.get_req_send(&msg_id).await?;
let msg = get_async(&mut client, req_id, message_res_parse).await?; let msg = get_async(&mut rpc_reader, &mut client, req_id, message_res_parse).await?;
println!("{:?}", msg); println!("{:?}", msg);
} }
("user", 2) => { ("user", 2) => {
@ -229,16 +232,16 @@ async fn main() -> SolarResult<()> {
let args = CreateHistoryStreamIn::new(user_id.clone()); let args = CreateHistoryStreamIn::new(user_id.clone());
let req_id = client.create_history_stream_req_send(&args).await?; let req_id = client.create_history_stream_req_send(&args).await?;
print_source_until_eof(&mut client, req_id, feed_res_parse).await?; print_source_until_eof(&mut rpc_reader, &mut client, req_id, feed_res_parse).await?;
} }
("feed", 1) => { ("feed", 1) => {
let args = CreateStreamIn::default(); let args = CreateStreamIn::default();
let req_id = client.create_feed_stream_req_send(&args).await?; let req_id = client.create_feed_stream_req_send(&args).await?;
print_source_until_eof(&mut client, req_id, feed_res_parse).await?; print_source_until_eof(&mut rpc_reader, &mut client, req_id, feed_res_parse).await?;
} }
("latest", 1) => { ("latest", 1) => {
let req_id = client.latest_req_send().await?; let req_id = client.latest_req_send().await?;
print_source_until_eof(&mut client, req_id, latest_res_parse).await?; print_source_until_eof(&mut rpc_reader, &mut client, req_id, latest_res_parse).await?;
} }
("private", 2) => { ("private", 2) => {
let user_id = if args[1] == "me" { &whoami } else { &args[1] }; let user_id = if args[1] == "me" { &whoami } else { &args[1] };
@ -257,7 +260,7 @@ async fn main() -> SolarResult<()> {
let args = CreateHistoryStreamIn::new(user_id.clone()); let args = CreateHistoryStreamIn::new(user_id.clone());
let req_id = client.create_history_stream_req_send(&args).await?; let req_id = client.create_history_stream_req_send(&args).await?;
print_source_until_eof(&mut client, req_id, show_private).await?; print_source_until_eof(&mut rpc_reader, &mut client, req_id, show_private).await?;
} }
_ => println!("unknown command {}", line_buffer), _ => println!("unknown command {}", line_buffer),
} }

View File

@ -96,7 +96,10 @@ pub enum TypedMessage {
#[serde(rename = "pub")] #[serde(rename = "pub")]
Pub { address: Option<PubAddress> }, Pub { address: Option<PubAddress> },
#[serde(rename = "post")] #[serde(rename = "post")]
Post { post: Post }, Post {
text: String,
mentions: Option<Vec<Mention>>,
},
#[serde(rename = "contact")] #[serde(rename = "contact")]
Contact { Contact {
contact: Option<SsbId>, contact: Option<SsbId>,

View File

@ -34,7 +34,7 @@ impl CreateHistoryStreamIn {
limit: None, limit: None,
} }
} }
pub fn starting_seq(self: Self, seq: u64) -> Self { pub fn after_seq(self: Self, seq: u64) -> Self {
Self { Self {
seq: Some(seq), seq: Some(seq),
..self ..self

View File

@ -1,7 +1,6 @@
use crate::feed::Message; use crate::feed::Message;
use crate::rpc::{Body, BodyType, RequestNo, RpcStream, RpcType}; use crate::rpc::{Body, BodyType, RequestNo, RpcStreamWriter, RpcType};
use async_std::io::{Read, Write}; use async_std::io::Write;
use serde_json;
use super::dto; use super::dto;
use super::error::Result; use super::error::Result;
@ -51,16 +50,16 @@ impl ApiMethod {
} }
} }
pub struct ApiHelper<R: Read + Unpin, W: Write + Unpin> { pub struct ApiHelper<W: Write + Unpin> {
rpc: RpcStream<R, W>, rpc: RpcStreamWriter<W>,
} }
impl<R: Read + Unpin, W: Write + Unpin> ApiHelper<R, W> { impl<W: Write + Unpin> ApiHelper<W> {
pub fn new(rpc: RpcStream<R, W>) -> Self { pub fn new(rpc: RpcStreamWriter<W>) -> Self {
Self { rpc } Self { rpc }
} }
pub fn rpc(&mut self) -> &mut RpcStream<R, W> { pub fn rpc(&mut self) -> &mut RpcStreamWriter<W> {
&mut self.rpc &mut self.rpc
} }

View File

@ -1,4 +1,3 @@
use base64;
use sodiumoxide::crypto::hash::sha256; use sodiumoxide::crypto::hash::sha256;
use sodiumoxide::crypto::sign::ed25519; use sodiumoxide::crypto::sign::ed25519;
@ -33,6 +32,12 @@ impl<'a> ToSsbId for ed25519::SecretKey {
} }
} }
impl<'a> ToSsbId for sha256::Digest {
fn to_ssb_id(&self) -> String {
format!("{}{}", base64::encode(self), SHA256_SUFFIX)
}
}
impl ToSodiumObject for str { impl ToSodiumObject for str {
fn to_ed25519_pk(self: &str) -> Result<ed25519::PublicKey> { fn to_ed25519_pk(self: &str) -> Result<ed25519::PublicKey> {
if !self.ends_with(CURVE_ED25519_SUFFIX) { if !self.ends_with(CURVE_ED25519_SUFFIX) {

View File

@ -2,4 +2,4 @@ mod error;
mod stream; mod stream;
pub use error::{Error, Result}; pub use error::{Error, Result};
pub use stream::{Body, BodyType, RecvMsg, RequestNo, RpcStream, RpcType}; pub use stream::{Body, BodyType, RecvMsg, RequestNo, RpcStreamReader,RpcStreamWriter, RpcType};

View File

@ -3,6 +3,8 @@ use super::error::{Error, Result};
use async_std::io; use async_std::io;
use async_std::prelude::*; use async_std::prelude::*;
use log::{trace, warn}; use log::{trace, warn};
use std::pin::Pin;
use std::task::{Context,Poll};
use kuska_handshake::async_std::{BoxStreamRead, BoxStreamWrite}; use kuska_handshake::async_std::{BoxStreamRead, BoxStreamWrite};
@ -125,8 +127,11 @@ impl Header {
} }
} }
pub struct RpcStream<R: io::Read + Unpin, W: io::Write + Unpin> { pub struct RpcStreamReader<R: io::Read + Unpin> {
box_reader: BoxStreamRead<R>, box_reader: BoxStreamRead<R>,
}
pub struct RpcStreamWriter<W: io::Write + Unpin> {
box_writer: BoxStreamWrite<W>, box_writer: BoxStreamWrite<W>,
req_no: RequestNo, req_no: RequestNo,
} }
@ -140,12 +145,10 @@ pub enum RecvMsg {
CancelStreamRespose(), CancelStreamRespose(),
} }
impl<R: io::Read + Unpin, W: io::Write + Unpin> RpcStream<R, W> { impl<R: io::Read + Unpin> RpcStreamReader<R> {
pub fn new(box_reader: BoxStreamRead<R>, box_writer: BoxStreamWrite<W>) -> RpcStream<R, W> { pub fn new(box_reader: BoxStreamRead<R>) -> RpcStreamReader<R> {
RpcStream { RpcStreamReader {
box_reader, box_reader
box_writer,
req_no: 0,
} }
} }
@ -187,6 +190,22 @@ impl<R: io::Read + Unpin, W: io::Write + Unpin> RpcStream<R, W> {
)) ))
} }
} }
}
/*
impl<R: io::Read + Unpin> Stream for RpcStreamReader<R> {
type Item = (RequestNo, RecvMsg);
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
futures::ready!(self.recv())
}
}
*/
impl<W: io::Write + Unpin> RpcStreamWriter<W> {
pub fn new(box_writer: BoxStreamWrite<W>) -> RpcStreamWriter<W> {
RpcStreamWriter {
box_writer,
req_no: 0,
}
}
pub async fn send_request<T: serde::Serialize>( pub async fn send_request<T: serde::Serialize>(
&mut self, &mut self,