diff --git a/examples/ssb-stream-example.rs b/examples/ssb-stream-example.rs new file mode 100644 index 0000000..2e82ea8 --- /dev/null +++ b/examples/ssb-stream-example.rs @@ -0,0 +1,97 @@ +use std::process; + +use golgi::error::GolgiError; + +use golgi::sbot::Sbot; +use async_std::stream::StreamExt; +use futures::pin_mut; + + +async fn run() -> Result<(), GolgiError> { + let mut sbot_client = Sbot::init(None, None).await?; + + let id = sbot_client.whoami().await?; + println!("{}", id); + // + // let name = SsbMessageContent::About { + // about: id, + // name: Some("golgi".to_string()), + // title: None, + // branch: None, + // image: None, + // description: None, + // location: None, + // start_datetime: None, + // }; + // + // let name_msg_ref = sbot_client.publish(name).await?; + // println!("{}", name_msg_ref); + // + // let post = SsbMessageContent::Post { + // text: "golgi go womp womp".to_string(), + // mentions: None, + // }; + // + // let post_msg_ref = sbot_client.publish(post).await?; + // println!("{}", post_msg_ref); + // + // let post_msg_ref = sbot_client.publish_description("this is a description").await?; + // println!("description: {}", post_msg_ref); + + let author = "@L/z54cbc8V1kL1/MiBhpEKuN3QJkSoZYNaukny3ghIs=.ed25519"; + + // let query = SubsetQuery::Author{ + // op: "author".to_string(), + // feed: author + // }; + // + // let kvts = sbot_client.get_subset(query).await?; + // + // for kvt in kvts { + // println!("kvt: {:?}", kvt); + // } + + // let about_messages = sbot_client.get_about_messages(author).await?; + // for msg in about_messages { + // println!("msg: {:?}", msg); + // } + // + // sbot_client.publish_name("this is my new name").await?; + // let name = sbot_client.get_name(author).await?; + // println!("name: {:?}", name); + // + // // sbot_client.publish_description("this is test description 99").await?; + // + // let description = sbot_client.get_description(author).await?; + // println!("desc: {:?}", description); + + // + // let description2 = sbot_client.get_latest_about_message(author, "description").await?; + // println!("desc2: {:?}", description2); + + let mut history_stream = sbot_client.create_history_stream(author.to_string()).await?; + pin_mut!(history_stream); // needed for iteration + + while let Some(res) = history_stream.next().await { + match res { + Ok(value) => { + println!("value: {:?}", value); + }, + Err(err) => { + println!("err: {:?}", err); + } + } + + } + println!("exit loop"); + + Ok(()) +} + +#[async_std::main] +async fn main() { + if let Err(e) = run().await { + eprintln!("Application error: {}", e); + process::exit(1); + } +} diff --git a/src/sbot.rs b/src/sbot.rs index a3d876d..5022977 100644 --- a/src/sbot.rs +++ b/src/sbot.rs @@ -228,58 +228,68 @@ impl Sbot { self.get_latest_about_message(ssb_id, "description").await } - /// Call the `createHistoryStream` RPC method and return a vector - /// of SsbMessageValue. - pub async fn create_history_stream( - &mut self, + /// Call the `createHistoryStream` RPC method + /// and return a Stream of Result + pub async fn create_history_stream<'a>( + &'a mut self, id: String, - ) -> Result { + ) -> Result> + 'a, GolgiError> { let args = CreateHistoryStreamIn::new(id); let req_id = self.client.create_history_stream_req_send(&args).await?; - let source_stream = self.get_source_stream(req_id, utils::ssb_message_res_parse); - pin_mut!(source_stream); // needed for iteration - - while let Some(res) = source_stream.next().await { - match res { - Ok(value) => { - println!("value: {:?}", value); - }, - Err(err) => { - println!("err: {:?}", err); - } - } - - } - println!("exit loop"); - Ok(2) - // utils::get_source_until_eof(&mut self.rpc_reader, req_id, utils::ssb_message_res_parse).await + let history_stream = self.get_source_stream(req_id, utils::ssb_message_res_parse); + Ok(history_stream) } - pub fn get_source_stream<'a, F, T>(&'a mut self, req_no: RequestNo, f: F) -> impl Stream> + 'a + /// Takes in an rpc request number, and a handling function (parsing results of type T), + /// and produces an async_std::stream::Stream + /// of results of type T where the handling functions is called + /// on all rpc_reader responses which match the request number + /// + /// # Arguments + /// + /// * `req_no` - A `RequestNo` of the response to listen for + /// * `f` - A function which takes in an array of u8 and returns a Result. + /// This is a function which parses the response from the RpcReader. T is a generic type, + /// so this parse function can return multiple possible types (String, json, custom struct etc.) + pub fn get_source_stream<'a, F, T>(&'a mut self, req_no: RequestNo, f: F) -> impl Stream> + 'a where F: Fn(&[u8]) -> Result + 'a, - T: Debug + serde::Deserialize<'a>, + T: Debug + serde::Deserialize<'a> + 'a, { - let s = stream! { + // we use the async_stream::stream macro to allow for creating a stream which calls async functions + // see https://users.rust-lang.org/t/how-to-create-async-std-stream-which-calls-async-function-in-poll-next/69760 + let source_stream = stream! { loop { + // get the next message from the rpc_reader let (id, msg) = &self.rpc_reader.recv().await?; let x : i32 = id.clone(); + // check if the next message from rpc_reader matches the req_no we are looking for + // if it matches, then this rpc response is for the given request + // and if it doesn't match, then we ignore it if x == req_no { match msg { RecvMsg::RpcResponse(_type, body) => { - let display = f(&body)?; - println!("display: {:?}", display); - yield Ok(x) + // parse an item of type T from the message body using the provided + // function for parsing + let item = f(&body)?; + // return Ok(item) as the next value in the stream + yield Ok(item) } RecvMsg::ErrorResponse(message) => { + // if an error is received + // return an Err(err) as the next value in the stream yield Err(GolgiError::Sbot(message.to_string())); } + // if we find a CancelStreamResponse + // this is the end of the stream RecvMsg::CancelStreamRespose() => break, - _ => break + // if we find an unknown response, we just continue the loop + _ => {} } } } }; - s + // finally return the stream object + source_stream } }