From 195eeb523ca8750a3fec14e3551a7fb6c8e30505 Mon Sep 17 00:00:00 2001 From: glyph Date: Fri, 18 Nov 2022 15:04:50 +0200 Subject: [PATCH 1/6] make timestamp field an option for SsbMessageKVT and derive eq where partialeq is already derived --- src/messages.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/messages.rs b/src/messages.rs index 1e6f08e..3231109 100644 --- a/src/messages.rs +++ b/src/messages.rs @@ -32,7 +32,7 @@ pub struct SsbMessageValue { } /// Message content types. -#[derive(Debug, PartialEq)] +#[derive(Debug, Eq, PartialEq)] #[allow(missing_docs)] pub enum SsbMessageContentType { About, @@ -99,6 +99,6 @@ impl SsbMessageValue { pub struct SsbMessageKVT { pub key: String, pub value: SsbMessageValue, - pub timestamp: f64, + pub timestamp: Option, pub rts: Option, } From 5c4b92a8bf3b51e5915b965a10886724e4322329 Mon Sep 17 00:00:00 2001 From: glyph Date: Fri, 18 Nov 2022 15:10:37 +0200 Subject: [PATCH 2/6] add tangles.thread rpc method --- src/api/mod.rs | 1 + src/api/tangles.rs | 66 ++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 67 insertions(+) create mode 100644 src/api/tangles.rs diff --git a/src/api/mod.rs b/src/api/mod.rs index 9cbb18c..b27604c 100644 --- a/src/api/mod.rs +++ b/src/api/mod.rs @@ -6,6 +6,7 @@ pub mod history_stream; pub mod invite; pub mod private; pub mod publish; +pub mod tangles; pub mod whoami; pub use crate::sbot::*; diff --git a/src/api/tangles.rs b/src/api/tangles.rs new file mode 100644 index 0000000..9e1cb8b --- /dev/null +++ b/src/api/tangles.rs @@ -0,0 +1,66 @@ +//! Take a message reference and return a stream of all messages in the thread. +//! +//! Implements the following methods: +//! +//! - [`Sbot::tangles_thread`] + +use async_std::stream::Stream; +pub use kuska_ssb::api::dto::TanglesThread; + +use crate::{error::GolgiError, messages::SsbMessageKVT, sbot::Sbot, utils}; +//use crate::{error::GolgiError, messages::SsbMessageValue, sbot::Sbot, utils}; + +impl Sbot { + // TODO: update this example code + + /// Call the `tanglesThread` RPC method. Returns messages in the form + /// of KVTs (Key Value Timestamp). + /// + /// # Example + /// + /// ```rust + /// use async_std::stream::StreamExt; + /// use golgi::{ + /// Sbot, + /// GolgiError, + /// sbot::Keystore + /// }; + /// + /// async fn tangle() -> Result<(), GolgiError> { + /// let mut sbot_client = Sbot::init(Keystore::Patchwork, None, None).await?; + /// + /// let msg_key = "%kmXb3MXtBJaNugcEL/Q7G40DgcAkMNTj3yhmxKHjfCM=.sha256"; + /// + /// let args = TanglesThread::new(msg_key).keys_values(true, true); + /// let tangles_thread = sbot_client.tangles_thread(msg_id).await?; + /// + /// tangles_thread.for_each(|msg| { + /// match msg { + /// Ok(kvt) => println!("msg kvt: {:?}", kvt), + /// Err(e) => eprintln!("error: {}", e), + /// } + /// }).await; + /// + /// Ok(()) + /// } + /// ``` + pub async fn tangles_thread( + &mut self, + args: TanglesThread, + ) -> Result>, GolgiError> { + // ) -> Result>, GolgiError> { + let mut sbot_connection = self.get_sbot_connection().await?; + let req_id = sbot_connection + .client + .tangles_thread_req_send(&args) + .await?; + + println!("{}", req_id); + + let thread_stream = + utils::get_source_stream(sbot_connection.rpc_reader, req_id, utils::kvt_res_parse) + //utils::get_source_stream(sbot_connection.rpc_reader, req_id, utils::ssb_message_res_parse) + .await; + Ok(thread_stream) + } +} From a08f3adbefc07210a760842238b360f07c261c53 Mon Sep 17 00:00:00 2001 From: glyph Date: Fri, 18 Nov 2022 15:10:56 +0200 Subject: [PATCH 3/6] add an example of calling the tangles.thread rpc method --- examples/tangles.rs | 95 +++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 95 insertions(+) create mode 100644 examples/tangles.rs diff --git a/examples/tangles.rs b/examples/tangles.rs new file mode 100644 index 0000000..ddcf9ec --- /dev/null +++ b/examples/tangles.rs @@ -0,0 +1,95 @@ +use std::process; + +use async_std::stream::StreamExt; +use futures::TryStreamExt; + +use golgi::{ + api::{ + friends::{FriendsHops, RelationshipQuery}, + get_subset::{SubsetQuery, SubsetQueryOptions}, + history_stream::CreateHistoryStream, + tangles::TanglesThread, + }, + messages::{SsbMessageContentType, SsbMessageKVT}, + sbot::Keystore, + GolgiError, Sbot, +}; + +// Golgi is an asynchronous library so we must call it from within an +// async function. The `GolgiError` type encapsulates all possible +// error variants for the library. +async fn run() -> Result<(), GolgiError> { + // Attempt to initialise a connection to an sbot instance using the + // secret file at the Patchwork path and the default IP address, port + // and network key (aka. capabilities key). + //let mut sbot_client = Sbot::init(Keystore::Patchwork, None, None).await?; + let mut sbot_client = + Sbot::init(Keystore::GoSbot, Some("127.0.0.1:8021".to_string()), None).await?; + + // Call the `whoami` RPC method to retrieve the public key for the sbot + // identity. This is our 'local' public key. + let id = sbot_client.whoami().await?; + + // Print the public key (identity) to `stdout`. + println!("whoami: {}", id); + + /* TANGLES THREAD EXAMPLE */ + + // Define the message key that will be used to generate the tangle stream. + // This must be the key of the root message in the thread (ie. the first + // message of the thread). + let msg_key = "%vY53ethgEG1tNsKqmLm6isNSbsu+jwMf/UNGMfUiLm0=.sha256".to_string(); + + // Instantiate the arguments for the `tangles.thread` RPC by instantiating + // a new instant of the `TanglesThread` struct. + let args = TanglesThread::new(msg_key).keys_values(true, true); + + // It's also possible to define the maximum number of messages + // returned from the stream by setting the `limit` value: + // Note: a limit of 3 will return a maximum of 4 messages! The root + // message + 3 other messages in the thread. + // let args = TanglesThread::new(msg_key).keys_values(true, true).limit(3); + + // Create an ordered stream of all messages comprising the thread + // in which the given message resides. Messages are returned as KVTs + // (Key Value Timestamp). + let thread_stream = sbot_client.tangles_thread(args).await?; + + // Pin the stream to the stack to allow polling of the `future`. + futures::pin_mut!(thread_stream); + + println!("looping through stream"); + + // Iterate through each element in the stream and match on the `Result`. + // In this case, each element has type `Result`. + while let Some(res) = thread_stream.next().await { + match res { + Ok(kvt) => { + // Print the `SsbMessageKVT` of this element to `stdout`. + println!("kvt: {:?}", kvt); + } + Err(err) => { + // Print the `GolgiError` of this element to `stderr`. + eprintln!("err: {:?}", err); + } + } + } + + println!("reached end of stream"); + + // Take a look at the `examples/streams.rs` file in this repo for more + // ways of handling streams. + + Ok(()) +} + +// Enable an async main function and execute the `run()` function, +// catching any errors and printing them to `stderr` before exiting the +// process. +#[async_std::main] +async fn main() { + if let Err(e) = run().await { + eprintln!("Application error: {}", e); + process::exit(1); + } +} From 87f75fae949223514262d32dd2eaeba97ac6f293 Mon Sep 17 00:00:00 2001 From: glyph Date: Tue, 22 Nov 2022 09:44:36 +0200 Subject: [PATCH 4/6] remove debugging println --- src/api/tangles.rs | 7 ------- 1 file changed, 7 deletions(-) diff --git a/src/api/tangles.rs b/src/api/tangles.rs index 9e1cb8b..c72d525 100644 --- a/src/api/tangles.rs +++ b/src/api/tangles.rs @@ -8,11 +8,8 @@ use async_std::stream::Stream; pub use kuska_ssb::api::dto::TanglesThread; use crate::{error::GolgiError, messages::SsbMessageKVT, sbot::Sbot, utils}; -//use crate::{error::GolgiError, messages::SsbMessageValue, sbot::Sbot, utils}; impl Sbot { - // TODO: update this example code - /// Call the `tanglesThread` RPC method. Returns messages in the form /// of KVTs (Key Value Timestamp). /// @@ -48,18 +45,14 @@ impl Sbot { &mut self, args: TanglesThread, ) -> Result>, GolgiError> { - // ) -> Result>, GolgiError> { let mut sbot_connection = self.get_sbot_connection().await?; let req_id = sbot_connection .client .tangles_thread_req_send(&args) .await?; - println!("{}", req_id); - let thread_stream = utils::get_source_stream(sbot_connection.rpc_reader, req_id, utils::kvt_res_parse) - //utils::get_source_stream(sbot_connection.rpc_reader, req_id, utils::ssb_message_res_parse) .await; Ok(thread_stream) } From 8dac8b1a6285bd969fd8f39ddf4ba88c8eeb95c0 Mon Sep 17 00:00:00 2001 From: glyph Date: Tue, 22 Nov 2022 09:44:55 +0200 Subject: [PATCH 5/6] remove unneeded imports --- examples/tangles.rs | 13 +------------ 1 file changed, 1 insertion(+), 12 deletions(-) diff --git a/examples/tangles.rs b/examples/tangles.rs index ddcf9ec..07834c5 100644 --- a/examples/tangles.rs +++ b/examples/tangles.rs @@ -1,19 +1,8 @@ use std::process; use async_std::stream::StreamExt; -use futures::TryStreamExt; -use golgi::{ - api::{ - friends::{FriendsHops, RelationshipQuery}, - get_subset::{SubsetQuery, SubsetQueryOptions}, - history_stream::CreateHistoryStream, - tangles::TanglesThread, - }, - messages::{SsbMessageContentType, SsbMessageKVT}, - sbot::Keystore, - GolgiError, Sbot, -}; +use golgi::{api::tangles::TanglesThread, sbot::Keystore, GolgiError, Sbot}; // Golgi is an asynchronous library so we must call it from within an // async function. The `GolgiError` type encapsulates all possible From f6b561ebde31b739c29d1142deed3dae2396ce82 Mon Sep 17 00:00:00 2001 From: glyph Date: Tue, 22 Nov 2022 09:49:49 +0200 Subject: [PATCH 6/6] clarify tangles.thread doc comment --- src/api/tangles.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/api/tangles.rs b/src/api/tangles.rs index c72d525..cfca04e 100644 --- a/src/api/tangles.rs +++ b/src/api/tangles.rs @@ -1,4 +1,5 @@ -//! Take a message reference and return a stream of all messages in the thread. +//! Take a reference to the root message of a thread and return a stream of all +//! messages in the thread. This includes the root message and all replies. //! //! Implements the following methods: //!