From 5c4b92a8bf3b51e5915b965a10886724e4322329 Mon Sep 17 00:00:00 2001 From: glyph Date: Fri, 18 Nov 2022 15:10:37 +0200 Subject: [PATCH] add tangles.thread rpc method --- src/api/mod.rs | 1 + src/api/tangles.rs | 66 ++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 67 insertions(+) create mode 100644 src/api/tangles.rs diff --git a/src/api/mod.rs b/src/api/mod.rs index 9cbb18c..b27604c 100644 --- a/src/api/mod.rs +++ b/src/api/mod.rs @@ -6,6 +6,7 @@ pub mod history_stream; pub mod invite; pub mod private; pub mod publish; +pub mod tangles; pub mod whoami; pub use crate::sbot::*; diff --git a/src/api/tangles.rs b/src/api/tangles.rs new file mode 100644 index 0000000..9e1cb8b --- /dev/null +++ b/src/api/tangles.rs @@ -0,0 +1,66 @@ +//! Take a message reference and return a stream of all messages in the thread. +//! +//! Implements the following methods: +//! +//! - [`Sbot::tangles_thread`] + +use async_std::stream::Stream; +pub use kuska_ssb::api::dto::TanglesThread; + +use crate::{error::GolgiError, messages::SsbMessageKVT, sbot::Sbot, utils}; +//use crate::{error::GolgiError, messages::SsbMessageValue, sbot::Sbot, utils}; + +impl Sbot { + // TODO: update this example code + + /// Call the `tanglesThread` RPC method. Returns messages in the form + /// of KVTs (Key Value Timestamp). + /// + /// # Example + /// + /// ```rust + /// use async_std::stream::StreamExt; + /// use golgi::{ + /// Sbot, + /// GolgiError, + /// sbot::Keystore + /// }; + /// + /// async fn tangle() -> Result<(), GolgiError> { + /// let mut sbot_client = Sbot::init(Keystore::Patchwork, None, None).await?; + /// + /// let msg_key = "%kmXb3MXtBJaNugcEL/Q7G40DgcAkMNTj3yhmxKHjfCM=.sha256"; + /// + /// let args = TanglesThread::new(msg_key).keys_values(true, true); + /// let tangles_thread = sbot_client.tangles_thread(msg_id).await?; + /// + /// tangles_thread.for_each(|msg| { + /// match msg { + /// Ok(kvt) => println!("msg kvt: {:?}", kvt), + /// Err(e) => eprintln!("error: {}", e), + /// } + /// }).await; + /// + /// Ok(()) + /// } + /// ``` + pub async fn tangles_thread( + &mut self, + args: TanglesThread, + ) -> Result>, GolgiError> { + // ) -> Result>, GolgiError> { + let mut sbot_connection = self.get_sbot_connection().await?; + let req_id = sbot_connection + .client + .tangles_thread_req_send(&args) + .await?; + + println!("{}", req_id); + + let thread_stream = + utils::get_source_stream(sbot_connection.rpc_reader, req_id, utils::kvt_res_parse) + //utils::get_source_stream(sbot_connection.rpc_reader, req_id, utils::ssb_message_res_parse) + .await; + Ok(thread_stream) + } +}