diff --git a/examples/tangles.rs b/examples/tangles.rs new file mode 100644 index 0000000..07834c5 --- /dev/null +++ b/examples/tangles.rs @@ -0,0 +1,84 @@ +use std::process; + +use async_std::stream::StreamExt; + +use golgi::{api::tangles::TanglesThread, sbot::Keystore, GolgiError, Sbot}; + +// Golgi is an asynchronous library so we must call it from within an +// async function. The `GolgiError` type encapsulates all possible +// error variants for the library. +async fn run() -> Result<(), GolgiError> { + // Attempt to initialise a connection to an sbot instance using the + // secret file at the Patchwork path and the default IP address, port + // and network key (aka. capabilities key). + //let mut sbot_client = Sbot::init(Keystore::Patchwork, None, None).await?; + let mut sbot_client = + Sbot::init(Keystore::GoSbot, Some("127.0.0.1:8021".to_string()), None).await?; + + // Call the `whoami` RPC method to retrieve the public key for the sbot + // identity. This is our 'local' public key. + let id = sbot_client.whoami().await?; + + // Print the public key (identity) to `stdout`. + println!("whoami: {}", id); + + /* TANGLES THREAD EXAMPLE */ + + // Define the message key that will be used to generate the tangle stream. + // This must be the key of the root message in the thread (ie. the first + // message of the thread). + let msg_key = "%vY53ethgEG1tNsKqmLm6isNSbsu+jwMf/UNGMfUiLm0=.sha256".to_string(); + + // Instantiate the arguments for the `tangles.thread` RPC by instantiating + // a new instant of the `TanglesThread` struct. + let args = TanglesThread::new(msg_key).keys_values(true, true); + + // It's also possible to define the maximum number of messages + // returned from the stream by setting the `limit` value: + // Note: a limit of 3 will return a maximum of 4 messages! The root + // message + 3 other messages in the thread. + // let args = TanglesThread::new(msg_key).keys_values(true, true).limit(3); + + // Create an ordered stream of all messages comprising the thread + // in which the given message resides. Messages are returned as KVTs + // (Key Value Timestamp). + let thread_stream = sbot_client.tangles_thread(args).await?; + + // Pin the stream to the stack to allow polling of the `future`. + futures::pin_mut!(thread_stream); + + println!("looping through stream"); + + // Iterate through each element in the stream and match on the `Result`. + // In this case, each element has type `Result`. + while let Some(res) = thread_stream.next().await { + match res { + Ok(kvt) => { + // Print the `SsbMessageKVT` of this element to `stdout`. + println!("kvt: {:?}", kvt); + } + Err(err) => { + // Print the `GolgiError` of this element to `stderr`. + eprintln!("err: {:?}", err); + } + } + } + + println!("reached end of stream"); + + // Take a look at the `examples/streams.rs` file in this repo for more + // ways of handling streams. + + Ok(()) +} + +// Enable an async main function and execute the `run()` function, +// catching any errors and printing them to `stderr` before exiting the +// process. +#[async_std::main] +async fn main() { + if let Err(e) = run().await { + eprintln!("Application error: {}", e); + process::exit(1); + } +} diff --git a/src/api/mod.rs b/src/api/mod.rs index 9cbb18c..b27604c 100644 --- a/src/api/mod.rs +++ b/src/api/mod.rs @@ -6,6 +6,7 @@ pub mod history_stream; pub mod invite; pub mod private; pub mod publish; +pub mod tangles; pub mod whoami; pub use crate::sbot::*; diff --git a/src/api/tangles.rs b/src/api/tangles.rs new file mode 100644 index 0000000..cfca04e --- /dev/null +++ b/src/api/tangles.rs @@ -0,0 +1,60 @@ +//! Take a reference to the root message of a thread and return a stream of all +//! messages in the thread. This includes the root message and all replies. +//! +//! Implements the following methods: +//! +//! - [`Sbot::tangles_thread`] + +use async_std::stream::Stream; +pub use kuska_ssb::api::dto::TanglesThread; + +use crate::{error::GolgiError, messages::SsbMessageKVT, sbot::Sbot, utils}; + +impl Sbot { + /// Call the `tanglesThread` RPC method. Returns messages in the form + /// of KVTs (Key Value Timestamp). + /// + /// # Example + /// + /// ```rust + /// use async_std::stream::StreamExt; + /// use golgi::{ + /// Sbot, + /// GolgiError, + /// sbot::Keystore + /// }; + /// + /// async fn tangle() -> Result<(), GolgiError> { + /// let mut sbot_client = Sbot::init(Keystore::Patchwork, None, None).await?; + /// + /// let msg_key = "%kmXb3MXtBJaNugcEL/Q7G40DgcAkMNTj3yhmxKHjfCM=.sha256"; + /// + /// let args = TanglesThread::new(msg_key).keys_values(true, true); + /// let tangles_thread = sbot_client.tangles_thread(msg_id).await?; + /// + /// tangles_thread.for_each(|msg| { + /// match msg { + /// Ok(kvt) => println!("msg kvt: {:?}", kvt), + /// Err(e) => eprintln!("error: {}", e), + /// } + /// }).await; + /// + /// Ok(()) + /// } + /// ``` + pub async fn tangles_thread( + &mut self, + args: TanglesThread, + ) -> Result>, GolgiError> { + let mut sbot_connection = self.get_sbot_connection().await?; + let req_id = sbot_connection + .client + .tangles_thread_req_send(&args) + .await?; + + let thread_stream = + utils::get_source_stream(sbot_connection.rpc_reader, req_id, utils::kvt_res_parse) + .await; + Ok(thread_stream) + } +} diff --git a/src/messages.rs b/src/messages.rs index 1e6f08e..3231109 100644 --- a/src/messages.rs +++ b/src/messages.rs @@ -32,7 +32,7 @@ pub struct SsbMessageValue { } /// Message content types. -#[derive(Debug, PartialEq)] +#[derive(Debug, Eq, PartialEq)] #[allow(missing_docs)] pub enum SsbMessageContentType { About, @@ -99,6 +99,6 @@ impl SsbMessageValue { pub struct SsbMessageKVT { pub key: String, pub value: SsbMessageValue, - pub timestamp: f64, + pub timestamp: Option, pub rts: Option, }