use std::process; use async_std::stream::StreamExt; use futures::TryStreamExt; use golgi::{ api::{ get_subset::{SubsetQuery, SubsetQueryOptions}, history_stream::CreateHistoryStream, }, messages::{SsbMessageContentType, SsbMessageKVT}, sbot::Keystore, GolgiError, Sbot, }; // Golgi is an asynchronous library so we must call it from within an // async function. The `GolgiError` type encapsulates all possible // error variants for the library. async fn run() -> Result<(), GolgiError> { // Attempt to initialise a connection to an sbot instance using the // secret file at the Patchwork path and the default IP address, port // and network key (aka. capabilities key). let mut sbot_client = Sbot::init(Keystore::Patchwork, None, None).await?; // Call the `whoami` RPC method to retrieve the public key for the sbot // identity. This is our 'local' public key. let id = sbot_client.whoami().await?; // Print the public key (identity) to `stdout`. println!("whoami: {}", id); let author = id.clone(); /* HISTORY STREAM EXAMPLE */ let history_stream_args = CreateHistoryStream::new(author.to_string()); // Create an ordered stream of all messages authored by the `author` // identity. Messages are returned as KVTs (Key Value Timestamp). let history_stream = sbot_client .create_history_stream(history_stream_args) .await?; // Pin the stream to the stack to allow polling of the `future`. futures::pin_mut!(history_stream); println!("looping through stream"); // Iterate through each element in the stream and match on the `Result`. // In this case, each element has type `Result`. while let Some(res) = history_stream.next().await { match res { Ok(kvt) => { // Print the `SsbMessageKVT` of this element to `stdout`. println!("kvt: {:?}", kvt); } Err(err) => { // Print the `GolgiError` of this element to `stderr`. eprintln!("err: {:?}", err); } } } println!("reached end of stream"); // Create an ordered stream of all messages authored by the `author` // identity. let history_stream = sbot_client .create_history_stream(CreateHistoryStream::new(author.to_string())) .await?; // Collect the stream elements into a `Vec` using // `try_collect`. A `GolgiError` will be returned from the `run` // function if any element contains an error. let results: Vec = history_stream.try_collect().await?; // Loop through the `SsbMessageKVT` elements, printing each one // to `stdout`. for x in results { println!("x: {:?}", x); } // Create an ordered stream of all messages authored by the `author` // identity. let history_stream = sbot_client .create_history_stream(CreateHistoryStream::new(author.to_string())) .await?; // Iterate through the elements in the stream and use `map` to convert // each `SsbMessageKVT` element into a tuple of // `(String, SsbMessageContentType)`. This is an example of stream // conversion. let type_stream = history_stream.map(|msg| match msg { Ok(kvt) => { let message_type = kvt.value.get_message_type()?; // Return the message key and type. let tuple: (String, SsbMessageContentType) = (kvt.key, message_type); Ok(tuple) } Err(err) => Err(err), }); // Pin the stream to the stack to allow polling of the `future`. futures::pin_mut!(type_stream); println!("looping through type stream"); // Iterate through each element in the stream and match on the `Result`. // In this case, each element has type // `Result<(String, SsbMessageContentType), GolgiError>`. while let Some(res) = type_stream.next().await { match res { Ok(value) => { println!("value: {:?}", value); } Err(err) => { println!("err: {:?}", err); } } } println!("reached end of type stream"); /* SUBSET STREAM EXAMPLE */ // Compose a subset query for `post` message types. let post_query = SubsetQuery::Type { op: "type".to_string(), string: "post".to_string(), }; // Define the options for the query. let post_query_opts = SubsetQueryOptions { descending: Some(true), keys: None, page_limit: Some(5), }; // Return 5 `post` type messages from any author in descending order. let query_stream = sbot_client .get_subset_stream(post_query, Some(post_query_opts)) .await?; println!("looping through post query stream"); // Iterate over the stream and pretty-print each returned message // value while ignoring any errors. query_stream.for_each(|msg| match msg { Ok(val) => println!("{:#?}", val), Err(_) => (), }); println!("reached end of post query stream"); Ok(()) } // Enable an async main function and execute the `run()` function, // catching any errors and printing them to `stderr` before exiting the // process. #[async_std::main] async fn main() { if let Err(e) = run().await { eprintln!("Application error: {}", e); process::exit(1); } }