diff --git a/examples/ssb-stream-example.rs b/examples/ssb-stream-example.rs index 3f056e7..077bb39 100644 --- a/examples/ssb-stream-example.rs +++ b/examples/ssb-stream-example.rs @@ -4,8 +4,8 @@ use golgi::error::GolgiError; use golgi::sbot::Sbot; use async_std::stream::{StreamExt}; -use futures::{pin_mut}; - +use futures::{pin_mut, TryStreamExt}; +use golgi::messages::{SsbMessageContentType, SsbMessageValue}; async fn run() -> Result<(), GolgiError> { @@ -34,52 +34,52 @@ async fn run() -> Result<(), GolgiError> { } println!("reached end of stream"); - // // create a history stream and convert it into a Vec using try_collect - // // (if there is any error in the results, it will be raised) - // let mut history_stream = sbot_client.create_history_stream(author.to_string()).await?; - // let results : Vec = history_stream.try_collect().await?; - // for x in results { - // println!("x: {:?}", x); - // } - // - // // example to create a history stream and use a map to convert stream of SsbMessageValue - // // into a stream of KeyTypeTuple (local struct for storing message_key and message_type) - // #[derive(Debug)] - // struct KeyTypeTuple { - // message_key: String, - // message_type: SsbMessageContentType, - // }; - // let mut history_stream = sbot_client.create_history_stream(author.to_string()).await?; - // let type_stream = history_stream.map(|msg| { - // match msg { - // Ok(val) => { - // let message_type = val.get_message_type()?; - // let tuple = KeyTypeTuple { - // message_key: val.signature, - // message_type: message_type, - // }; - // Ok(tuple) - // } - // Err(err) => { - // Err(err) - // } - // } - // }); - // pin_mut!(type_stream); // needed for iteration - // println!("looping through type stream"); - // while let Some(res) = type_stream.next().await { - // match res { - // Ok(value) => { - // println!("value: {:?}", value); - // }, - // Err(err) => { - // println!("err: {:?}", err); - // } - // } - // } - // println!("reached end of type stream"); - // - // // return Ok + // create a history stream and convert it into a Vec using try_collect + // (if there is any error in the results, it will be raised) + let mut history_stream = sbot_client.create_history_stream(author.to_string()).await?; + let results : Vec = history_stream.try_collect().await?; + for x in results { + println!("x: {:?}", x); + } + + // example to create a history stream and use a map to convert stream of SsbMessageValue + // into a stream of KeyTypeTuple (local struct for storing message_key and message_type) + #[derive(Debug)] + struct KeyTypeTuple { + message_key: String, + message_type: SsbMessageContentType, + }; + let mut history_stream = sbot_client.create_history_stream(author.to_string()).await?; + let type_stream = history_stream.map(|msg| { + match msg { + Ok(val) => { + let message_type = val.get_message_type()?; + let tuple = KeyTypeTuple { + message_key: val.signature, + message_type: message_type, + }; + Ok(tuple) + } + Err(err) => { + Err(err) + } + } + }); + pin_mut!(type_stream); // needed for iteration + println!("looping through type stream"); + while let Some(res) = type_stream.next().await { + match res { + Ok(value) => { + println!("value: {:?}", value); + }, + Err(err) => { + println!("err: {:?}", err); + } + } + } + println!("reached end of type stream"); + + // return Ok Ok(()) } diff --git a/src/sbot.rs b/src/sbot.rs index f64f614..998b498 100644 --- a/src/sbot.rs +++ b/src/sbot.rs @@ -24,6 +24,11 @@ use crate::utils; pub use kuska_ssb::api::dto::content::SubsetQuery; use kuska_ssb::rpc::RequestNo; +pub struct SbotConnection { + client: ApiCaller, + rpc_reader: RpcReader, +} + /// The Scuttlebutt identity, keys and configuration parameters for connecting to a local sbot /// instance, as well as handles for calling RPC methods and receiving responses. pub struct Sbot { @@ -35,6 +40,7 @@ pub struct Sbot { network_id: auth::Key, client: ApiCaller, rpc_reader: RpcReader, + sbot_connections: Vec, } impl Sbot { @@ -80,6 +86,7 @@ impl Sbot { let rpc_reader = RpcReader::new(box_stream_read); let client = ApiCaller::new(RpcWriter::new(box_stream_write)); + let mut sbot_connections = Vec::new(); Ok(Self { id, @@ -89,10 +96,11 @@ impl Sbot { network_id, client, rpc_reader, + sbot_connections }) } - pub async fn get_rpc_reader(&self, ip_port: Option, net_id: Option) -> Result, GolgiError> { + pub async fn get_sbot_connection(&self, ip_port: Option, net_id: Option) -> Result { let address = if ip_port.is_none() { "127.0.0.1:8008".to_string() } else { @@ -126,7 +134,12 @@ impl Sbot { BoxStream::from_handshake(socket.clone(), socket, handshake, 0x8000).split_read_write(); let rpc_reader = RpcReader::new(box_stream_read); - Ok(rpc_reader) + let client = ApiCaller::new(RpcWriter::new(box_stream_write)); + let sbot_connection = SbotConnection { + rpc_reader, + client, + }; + Ok(sbot_connection) } /// Call the `partialReplication getSubset` RPC method and return a vector @@ -270,10 +283,10 @@ impl Sbot { &mut self, id: String, ) -> Result>, GolgiError> { + let mut sbot_connection = self.get_sbot_connection(None, None).await.unwrap(); let args = CreateHistoryStreamIn::new(id); - let req_id = self.client.create_history_stream_req_send(&args).await?; - let mut rpc_reader = self.get_rpc_reader(None, None).await.unwrap(); - let history_stream = Sbot::get_source_stream(rpc_reader, req_id, utils::ssb_message_res_parse).await; + let req_id = sbot_connection.client.create_history_stream_req_send(&args).await?; + let history_stream = Sbot::get_source_stream(sbot_connection.rpc_reader, req_id, utils::ssb_message_res_parse).await; Ok(history_stream) } @@ -320,7 +333,10 @@ impl Sbot { } // if we find a CancelStreamResponse // this is the end of the stream - RecvMsg::CancelStreamRespose() => break, + RecvMsg::CancelStreamRespose() => { + println!("found cancel stream"); + break + }, // if we find an unknown response, we just continue the loop _ => {} }