diff --git a/examples/ssb-client.rs b/examples/ssb-client.rs index d1216a5..10fea55 100644 --- a/examples/ssb-client.rs +++ b/examples/ssb-client.rs @@ -8,10 +8,10 @@ async fn run() -> Result<(), GolgiError> { let mut sbot_client = Sbot::init(None, None).await?; let id = sbot_client.whoami().await?; - println!("{}", id); + println!("whoami: {}", id); let name = SsbMessageContent::About { - about: id, + about: id.clone(), name: Some("golgi".to_string()), title: None, branch: None, @@ -22,7 +22,7 @@ async fn run() -> Result<(), GolgiError> { }; let name_msg_ref = sbot_client.publish(name).await?; - println!("{}", name_msg_ref); + println!("name_msg_ref: {}", name_msg_ref); let post = SsbMessageContent::Post { text: "golgi go womp womp".to_string(), @@ -30,13 +30,18 @@ async fn run() -> Result<(), GolgiError> { }; let post_msg_ref = sbot_client.publish(post).await?; - println!("{}", post_msg_ref); + println!("post_msg_ref: {}", post_msg_ref); let post_msg_ref = sbot_client - .publish_description("this is a description") + .publish_description("this is a description7") .await?; println!("description: {}", post_msg_ref); + let author: String = id.clone(); + println!("author: {:?}", author); + let description = sbot_client.get_description(&author).await?; + println!("found description: {:?}", description); + Ok(()) } diff --git a/examples/ssb-stream-example.rs b/examples/ssb-stream-example.rs index 22ef92b..8f1d8de 100644 --- a/examples/ssb-stream-example.rs +++ b/examples/ssb-stream-example.rs @@ -11,9 +11,9 @@ async fn run() -> Result<(), GolgiError> { let mut sbot_client = Sbot::init(None, None).await?; let id = sbot_client.whoami().await?; - println!("{}", id); + println!("whoami: {}", id); - let author = "@L/z54cbc8V1kL1/MiBhpEKuN3QJkSoZYNaukny3ghIs=.ed25519"; + let author = id.clone(); // create a history stream let history_stream = sbot_client diff --git a/src/sbot.rs b/src/sbot.rs index 361f3c6..b7e740e 100644 --- a/src/sbot.rs +++ b/src/sbot.rs @@ -1,5 +1,9 @@ //! Sbot type and associated methods. -use async_std::{net::TcpStream, stream::Stream}; +use async_std::{ + net::TcpStream, + stream::{Stream, StreamExt}, +}; +use futures::TryStreamExt; use kuska_handshake::async_std::BoxStream; use kuska_sodiumoxide::crypto::{auth, sign::ed25519}; @@ -30,7 +34,7 @@ pub struct SbotConnection { /// The Scuttlebutt identity, keys and configuration parameters for connecting to a local sbot /// instance, as well as handles for calling RPC methods and receiving responses. pub struct Sbot { - id: String, + pub id: String, public_key: ed25519::PublicKey, private_key: ed25519::SecretKey, address: String, @@ -122,37 +126,41 @@ impl Sbot { Ok(sbot_connection) } - /// Call the `partialReplication getSubset` RPC method and return a vector - /// of messages as KVTs (key, value, timestamp). + /// Call the `partialReplication getSubset` RPC method + /// and return a Stream of Result // TODO: add args for `descending` and `page` (max number of msgs in response) - pub async fn get_subset( + pub async fn get_subset_stream( &mut self, query: SubsetQuery, - ) -> Result, GolgiError> { - let req_id = self - .sbot_connection - .client - .getsubset_req_send(query) - .await?; - - utils::get_source_until_eof( - &mut self.sbot_connection.rpc_reader, - req_id, - utils::kvt_res_parse, - ) - .await + ) -> Result>, GolgiError> { + let mut sbot_connection = self.get_sbot_connection().await?; + let req_id = sbot_connection.client.getsubset_req_send(query).await?; + let get_subset_stream = + get_source_stream(sbot_connection.rpc_reader, req_id, utils::kvt_res_parse).await; + Ok(get_subset_stream) } /// Call the `whoami` RPC method and return an `id`. pub async fn whoami(&mut self) -> Result { let req_id = self.sbot_connection.client.whoami_req_send().await?; - utils::get_async( + let result = utils::get_async( &mut self.sbot_connection.rpc_reader, req_id, - utils::string_res_parse, + utils::json_res_parse, ) - .await + .await?; + + let id = result + .get("id") + .ok_or(GolgiError::Sbot( + "id key not found on whoami call".to_string(), + ))? + .as_str() + .ok_or(GolgiError::Sbot( + "whoami returned non-string value".to_string(), + ))?; + Ok(id.to_string()) } /// Call the `publish` RPC method and return a message reference. @@ -225,28 +233,30 @@ impl Sbot { } /// Get the about messages for a particular user in order of recency. - pub async fn get_about_messages( + pub async fn get_about_message_stream( &mut self, ssb_id: &str, - ) -> Result, GolgiError> { + ) -> Result>, GolgiError> { let query = SubsetQuery::Author { op: "author".to_string(), feed: ssb_id.to_string(), }; - let kvts: Vec = self.get_subset(query).await?; - let messages: Vec = kvts.into_iter().map(|kvt| kvt.value).collect(); + let get_subset_kvt_stream = self.get_subset_stream(query).await?; + // map into Stream> + let ssb_message_stream = get_subset_kvt_stream.map(|msg| match msg { + Ok(val) => Ok(val.value), + Err(err) => Err(err), + }); // TODO: after fixing sbot regression, // change this subset query to filter by type about in addition to author // and remove this filter section // filter down to about messages - let mut about_messages: Vec = messages - .into_iter() - .filter(|msg| msg.is_message_type(SsbMessageContentType::About)) - .collect(); - // TODO: use subset query to order messages instead of doing it this way - about_messages.sort_by(|a, b| b.timestamp.partial_cmp(&a.timestamp).unwrap()); - // return about messages - Ok(about_messages) + let about_message_stream = ssb_message_stream.filter(|msg| match msg { + Ok(val) => val.is_message_type(SsbMessageContentType::About), + Err(_err) => false, + }); + // return about message stream + Ok(about_message_stream) } /// Get value of latest about message with given key from given user @@ -255,8 +265,12 @@ impl Sbot { ssb_id: &str, key: &str, ) -> Result, GolgiError> { - // vector of about messages with most recent at the front of the vector - let about_messages = self.get_about_messages(ssb_id).await?; + // get about_message_stream + let about_message_stream = self.get_about_message_stream(ssb_id).await?; + // TODO: use subset query to order messages instead of doing it this way + let mut about_messages: Vec = about_message_stream.try_collect().await?; + about_messages.sort_by(|a, b| b.timestamp.partial_cmp(&a.timestamp).unwrap()); + // now we have a vector of about messages with most recent at the front of the vector // iterate through the vector looking for most recent about message with the given key let latest_about = about_messages .iter() @@ -287,7 +301,7 @@ impl Sbot { self.get_latest_about_message(ssb_id, "name").await } - /// Get lateset about description from given user + /// Get latest about description from given user /// /// # Arguments /// @@ -303,7 +317,7 @@ impl Sbot { &mut self, id: String, ) -> Result>, GolgiError> { - let mut sbot_connection = self.get_sbot_connection().await.unwrap(); + let mut sbot_connection = self.get_sbot_connection().await?; let args = CreateHistoryStreamIn::new(id); let req_id = sbot_connection .client