diff --git a/Cargo.lock b/Cargo.lock index 323f046..c873136 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -151,7 +151,17 @@ version = "0.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "22068c0c19514942eefcfd4daf8976ef1aad84e61539f95cd200c35202f80af5" dependencies = [ - "async-stream-impl", + "async-stream-impl 0.2.1", + "futures-core", +] + +[[package]] +name = "async-stream" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "171374e7e3b2504e0e5236e3b59260560f9fe94bfe9ac39ba5e4e929c5590625" +dependencies = [ + "async-stream-impl 0.3.2", "futures-core", ] @@ -166,6 +176,17 @@ dependencies = [ "syn", ] +[[package]] +name = "async-stream-impl" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "648ed8c8d2ce5409ccd57453d9d1b214b342a0d69376a6feda1fd6cae3299308" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "async-task" version = "4.0.3" @@ -478,12 +499,14 @@ name = "golgi" version = "0.1.0" dependencies = [ "async-std", + "async-stream 0.3.2", "base64 0.13.0", "futures", "hex", "kuska-handshake", "kuska-sodiumoxide", "kuska-ssb", + "rand", "serde", "serde_json", ] @@ -572,7 +595,7 @@ name = "kuska-ssb" version = "0.2.0" dependencies = [ "async-std", - "async-stream", + "async-stream 0.2.1", "base64 0.11.0", "dirs", "futures", @@ -690,6 +713,12 @@ dependencies = [ "winapi 0.3.9", ] +[[package]] +name = "ppv-lite86" +version = "0.2.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "eb9f9e6e233e5c4a35559a617bf40a4ec447db2e84c20b55a6f83167b7e57872" + [[package]] name = "proc-macro2" version = "1.0.32" @@ -708,6 +737,46 @@ dependencies = [ "proc-macro2", ] +[[package]] +name = "rand" +version = "0.8.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2e7573632e6454cf6b99d7aac4ccca54be06da05aca2ef7423d22d27d4d4bcd8" +dependencies = [ + "libc", + "rand_chacha", + "rand_core", + "rand_hc", +] + +[[package]] +name = "rand_chacha" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e6c10a63a0fa32252be49d21e7709d4d4baf8d231c2dbce1eaa8141b9b127d88" +dependencies = [ + "ppv-lite86", + "rand_core", +] + +[[package]] +name = "rand_core" +version = "0.6.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d34f1408f55294453790c48b2f1ebbb1c5b4b7563eb1f418bcfcfdbb06ebb4e7" +dependencies = [ + "getrandom", +] + +[[package]] +name = "rand_hc" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d51e9f596de227fda2ea6c84607f5558e196eeaf43c986b724ba4fb8fdf497e7" +dependencies = [ + "rand_core", +] + [[package]] name = "redox_syscall" version = "0.2.10" diff --git a/Cargo.toml b/Cargo.toml index ee3756f..d5eca42 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -15,3 +15,5 @@ kuska-sodiumoxide = "0.2.5-0" kuska-ssb = { path = "../ssb" } serde = { version = "1", features = ["derive"] } serde_json = "1" +async-stream = "0.3.2" +rand = "0.8.4" \ No newline at end of file diff --git a/src/sbot.rs b/src/sbot.rs index a7b2250..dc6def4 100644 --- a/src/sbot.rs +++ b/src/sbot.rs @@ -1,5 +1,7 @@ //! Sbot type and associated methods. use async_std::net::TcpStream; +use futures::pin_mut; +use async_std::stream::StreamExt; use kuska_handshake::async_std::BoxStream; use kuska_sodiumoxide::crypto::{auth, sign::ed25519}; @@ -16,6 +18,7 @@ use kuska_ssb::{ use crate::error::GolgiError; use crate::messages::{SsbMessageKVT, SsbMessageContent, SsbMessageValue, SsbMessageContentType}; use crate::utils; +use crate::utils::get_source_stream; // re-export types from kuska pub use kuska_ssb::api::dto::content::SubsetQuery; @@ -228,9 +231,25 @@ impl Sbot { pub async fn create_history_stream( &mut self, id: String, - ) -> Result, GolgiError> { + ) -> Result { let args = CreateHistoryStreamIn::new(id); let req_id = self.client.create_history_stream_req_send(&args).await?; - utils::get_source_until_eof(&mut self.rpc_reader, req_id, utils::ssb_message_res_parse).await + let source_stream = get_source_stream(&self.rpc_reader, req_id); + pin_mut!(source_stream); // needed for iteration + + while let Some(res) = source_stream.next().await { + match res { + Ok(value) => { + println!("value: {:?}", value); + }, + Err(err) => { + println!("err: {:?}", err); + } + } + + } + println!("exit loop"); + Ok(2) + // utils::get_source_until_eof(&mut self.rpc_reader, req_id, utils::ssb_message_res_parse).await } } diff --git a/src/utils.rs b/src/utils.rs index 34f5ec6..9a46288 100644 --- a/src/utils.rs +++ b/src/utils.rs @@ -1,6 +1,12 @@ //! Utility methods for `golgi`. use async_std::io::Read; use std::fmt::Debug; +use async_std::task; +use std::time::Duration; +use async_std::net::TcpStream; +use async_std::stream::Stream; +use async_stream::stream; +use rand::distributions::{Distribution, Uniform}; use kuska_ssb::rpc::{RecvMsg, RequestNo, RpcReader}; use serde_json::Value; @@ -64,10 +70,10 @@ pub async fn get_async<'a, R, T, F>( req_no: RequestNo, f: F, ) -> Result -where - R: Read + Unpin, - F: Fn(&[u8]) -> Result, - T: Debug, + where + R: Read + Unpin, + F: Fn(&[u8]) -> Result, + T: Debug, { loop { let (id, msg) = rpc_reader.recv().await?; @@ -108,10 +114,10 @@ pub async fn get_source_until_eof<'a, R, T, F>( req_no: RequestNo, f: F, ) -> Result, GolgiError> -where - R: Read + Unpin, - F: Fn(&[u8]) -> Result, - T: Debug, + where + R: Read + Unpin, + F: Fn(&[u8]) -> Result, + T: Debug, { let mut messages: Vec = Vec::new(); loop { @@ -160,10 +166,10 @@ pub async fn print_source_until_eof<'a, R, T, F>( req_no: RequestNo, f: F, ) -> Result<(), GolgiError> -where - R: Read + Unpin, - F: Fn(&[u8]) -> Result, - T: Debug + serde::Deserialize<'a>, + where + R: Read + Unpin, + F: Fn(&[u8]) -> Result, + T: Debug + serde::Deserialize<'a>, { loop { let (id, msg) = rpc_reader.recv().await?; @@ -183,3 +189,74 @@ where } Ok(()) } + + +pub async fn get_async_num() -> i32 { + let millis = Uniform::from(0..10).sample(&mut rand::thread_rng()); + println!("get_async_num will complete in {} ms", millis); + + task::sleep(Duration::from_millis(millis)).await; + let i: i32 = rand::random(); + i +} + + +// pub fn get_source_stream() -> impl Stream { +// let s = stream! { +// for i in 0..3 { +// let x = get_async_num().await; +// yield x; +// } +// }; +// s +// } + +// pub fn get_source_stream(rpc_reader: &RpcReader, req_no: RequestNo) -> impl Stream { +// let s = stream! { +// loop { +// let (id, msg) = rpc_reader.recv().await?; +// yield Ok(id); +// // if id == req_no { +// // match msg { +// // RecvMsg::RpcResponse(_type, body) => { +// // let display = f(&body)?; +// // println!("{:?}", display); +// // } +// // RecvMsg::ErrorResponse(message) => { +// // return Err(GolgiError::Sbot(message)); +// // } +// // RecvMsg::CancelStreamRespose() => break, +// // _ => {} +// // } +// // } +// // } +// // yield x; +// } +// }; +// s +// } + +pub fn<'a> get_source_stream(mut rpc_reader: &<'a> RpcReader, req_no: RequestNo) -> impl Stream> { + let s = stream! { + loop { + let (id, msg) = rpc_reader.recv().await?; + yield Ok(id); + // if id == req_no { + // match msg { + // RecvMsg::RpcResponse(_type, body) => { + // let display = f(&body)?; + // println!("{:?}", display); + // } + // RecvMsg::ErrorResponse(message) => { + // return Err(GolgiError::Sbot(message)); + // } + // RecvMsg::CancelStreamRespose() => break, + // _ => {} + // } + // } + // } + // yield x; + } + }; + s +} \ No newline at end of file