diff --git a/src/sbot.rs b/src/sbot.rs index b7e740e..21b87ca 100644 --- a/src/sbot.rs +++ b/src/sbot.rs @@ -3,7 +3,7 @@ use async_std::{ net::TcpStream, stream::{Stream, StreamExt}, }; -use futures::TryStreamExt; +use futures::{pin_mut, TryStreamExt}; use kuska_handshake::async_std::BoxStream; use kuska_sodiumoxide::crypto::{auth, sign::ed25519}; @@ -20,7 +20,7 @@ use crate::utils; use crate::utils::get_source_stream; // re-export types from kuska -pub use kuska_ssb::api::dto::content::SubsetQuery; +pub use kuska_ssb::api::dto::content::{SubsetQuery, SubsetQueryOptions}; /// A struct representing a connection with a running sbot. /// A client and an rpc_reader can together be used to make requests to the sbot @@ -128,13 +128,19 @@ impl Sbot { /// Call the `partialReplication getSubset` RPC method /// and return a Stream of Result - // TODO: add args for `descending` and `page` (max number of msgs in response) + /// + /// # Arguments + /// + /// * `query` - A `SubsetQuery` which specifies what filters to use. + /// * `option` - An Option<`SubsetQueryOptions`> which, if provided, adds additional + /// specifications to the query, such as specifying page limit and/or descending. pub async fn get_subset_stream( &mut self, query: SubsetQuery, + options: Option ) -> Result>, GolgiError> { let mut sbot_connection = self.get_sbot_connection().await?; - let req_id = sbot_connection.client.getsubset_req_send(query).await?; + let req_id = sbot_connection.client.getsubset_req_send(query, options).await?; let get_subset_stream = get_source_stream(sbot_connection.rpc_reader, req_id, utils::kvt_res_parse).await; Ok(get_subset_stream) @@ -241,7 +247,13 @@ impl Sbot { op: "author".to_string(), feed: ssb_id.to_string(), }; - let get_subset_kvt_stream = self.get_subset_stream(query).await?; + // specify that most recent messages should be returned first + let query_options = SubsetQueryOptions { + descending: Some(true), + keys: None, + page_limit: None + }; + let get_subset_kvt_stream = self.get_subset_stream(query, Some(query_options)).await?; // map into Stream> let ssb_message_stream = get_subset_kvt_stream.map(|msg| match msg { Ok(val) => Ok(val.value), @@ -267,28 +279,39 @@ impl Sbot { ) -> Result, GolgiError> { // get about_message_stream let about_message_stream = self.get_about_message_stream(ssb_id).await?; - // TODO: use subset query to order messages instead of doing it this way - let mut about_messages: Vec = about_message_stream.try_collect().await?; - about_messages.sort_by(|a, b| b.timestamp.partial_cmp(&a.timestamp).unwrap()); - // now we have a vector of about messages with most recent at the front of the vector + // now we have a stream of about messages with most recent at the front of the vector + pin_mut!(about_message_stream); // iterate through the vector looking for most recent about message with the given key - let latest_about = about_messages - .iter() + let latest_about_message: Result = about_message_stream // find the first msg that contains the field `key` - .find(|msg| msg.content.get(key).is_some()) - // map the found msg (`Some(SsbMessageValue)`) to a `Some(Some(&Value))` - .map(|msg| msg.content.get(key)) - // flatten `Some(Some(&Value))` into `Some(&Value)` - .flatten() - // map `Some(&Value)` to `Some(Some(&str))` - .map(|msg_val| msg_val.as_str()) - // flatten `Some(Some(&str))` to `Some(&str)` - .flatten() - // map `Some(&str))` to `Some(String)` - .map(|msg_str| msg_str.to_string()); - + .find(|res| { + match res { + Ok(msg) => { + msg.content.get(key).is_some() + }, + Err(_) => { + false + } + } + }).await.ok_or(GolgiError::Sbot("error while looking for about message with given key".to_string()))?; + let latest_about_value = match latest_about_message { + Ok(msg) => { + msg + // SsbMessageValue -> Option<&Value> + .content.get(key) + // Option<&Value> -> Option> + .map(|value| value.as_str()) + // Option> -> Option<&str> + .flatten() + // Option<&str> -> Option + .map(|value| value.to_string()) + } + Err(_) => { + None + } + }; // return value is either `Ok(Some(String))` or `Ok(None)` - Ok(latest_about) + Ok(latest_about_value) } /// Get latest about name from given user