diff --git a/examples/ssb-stream-example.rs b/examples/ssb-stream-example.rs index 2e82ea8..3ed0fc4 100644 --- a/examples/ssb-stream-example.rs +++ b/examples/ssb-stream-example.rs @@ -3,8 +3,9 @@ use std::process; use golgi::error::GolgiError; use golgi::sbot::Sbot; -use async_std::stream::StreamExt; -use futures::pin_mut; +use async_std::stream::{Stream, StreamExt}; +use futures::{pin_mut, TryStreamExt}; +use golgi::messages::{SsbMessageContentType, SsbMessageValue}; async fn run() -> Result<(), GolgiError> { @@ -12,66 +13,15 @@ async fn run() -> Result<(), GolgiError> { let id = sbot_client.whoami().await?; println!("{}", id); - // - // let name = SsbMessageContent::About { - // about: id, - // name: Some("golgi".to_string()), - // title: None, - // branch: None, - // image: None, - // description: None, - // location: None, - // start_datetime: None, - // }; - // - // let name_msg_ref = sbot_client.publish(name).await?; - // println!("{}", name_msg_ref); - // - // let post = SsbMessageContent::Post { - // text: "golgi go womp womp".to_string(), - // mentions: None, - // }; - // - // let post_msg_ref = sbot_client.publish(post).await?; - // println!("{}", post_msg_ref); - // - // let post_msg_ref = sbot_client.publish_description("this is a description").await?; - // println!("description: {}", post_msg_ref); let author = "@L/z54cbc8V1kL1/MiBhpEKuN3QJkSoZYNaukny3ghIs=.ed25519"; - // let query = SubsetQuery::Author{ - // op: "author".to_string(), - // feed: author - // }; - // - // let kvts = sbot_client.get_subset(query).await?; - // - // for kvt in kvts { - // println!("kvt: {:?}", kvt); - // } - - // let about_messages = sbot_client.get_about_messages(author).await?; - // for msg in about_messages { - // println!("msg: {:?}", msg); - // } - // - // sbot_client.publish_name("this is my new name").await?; - // let name = sbot_client.get_name(author).await?; - // println!("name: {:?}", name); - // - // // sbot_client.publish_description("this is test description 99").await?; - // - // let description = sbot_client.get_description(author).await?; - // println!("desc: {:?}", description); - - // - // let description2 = sbot_client.get_latest_about_message(author, "description").await?; - // println!("desc2: {:?}", description2); - + // create a history stream let mut history_stream = sbot_client.create_history_stream(author.to_string()).await?; - pin_mut!(history_stream); // needed for iteration + // loop through the results until the end of the stream + pin_mut!(history_stream); // needed for iteration + println!("looping through stream"); while let Some(res) = history_stream.next().await { match res { Ok(value) => { @@ -81,10 +31,55 @@ async fn run() -> Result<(), GolgiError> { println!("err: {:?}", err); } } - } - println!("exit loop"); + println!("reached end of stream"); + // create a history stream and convert it into a Vec using try_collect + // (if there is any error in the results, it will be raised) + let mut history_stream = sbot_client.create_history_stream(author.to_string()).await?; + let results : Vec = history_stream.try_collect().await?; + for x in results { + println!("x: {:?}", x); + } + + // example to create a history stream and use a map to convert stream of SsbMessageValue + // into a stream of KeyTypeTuple (local struct for storing message_key and message_type) + #[derive(Debug)] + struct KeyTypeTuple { + message_key: String, + message_type: SsbMessageContentType, + }; + let mut history_stream = sbot_client.create_history_stream(author.to_string()).await?; + let type_stream = history_stream.map(|msg| { + match msg { + Ok(val) => { + let message_type = val.get_message_type()?; + let tuple = KeyTypeTuple { + message_key: val.signature, + message_type: message_type, + }; + Ok(tuple) + } + Err(err) => { + Err(err) + } + } + }); + pin_mut!(type_stream); // needed for iteration + println!("looping through type stream"); + while let Some(res) = type_stream.next().await { + match res { + Ok(value) => { + println!("value: {:?}", value); + }, + Err(err) => { + println!("err: {:?}", err); + } + } + } + println!("reached end of type stream"); + + // return Ok Ok(()) }