Change get_latest_about_message to use streams

This commit is contained in:
notplants 2022-01-12 12:32:09 -05:00
parent aa60a6e136
commit a3852d89c9

View File

@ -3,7 +3,7 @@ use async_std::{
net::TcpStream, net::TcpStream,
stream::{Stream, StreamExt}, stream::{Stream, StreamExt},
}; };
use futures::TryStreamExt; use futures::{pin_mut, TryStreamExt};
use kuska_handshake::async_std::BoxStream; use kuska_handshake::async_std::BoxStream;
use kuska_sodiumoxide::crypto::{auth, sign::ed25519}; use kuska_sodiumoxide::crypto::{auth, sign::ed25519};
@ -20,7 +20,7 @@ use crate::utils;
use crate::utils::get_source_stream; use crate::utils::get_source_stream;
// re-export types from kuska // re-export types from kuska
pub use kuska_ssb::api::dto::content::SubsetQuery; pub use kuska_ssb::api::dto::content::{SubsetQuery, SubsetQueryOptions};
/// A struct representing a connection with a running sbot. /// A struct representing a connection with a running sbot.
/// A client and an rpc_reader can together be used to make requests to the sbot /// A client and an rpc_reader can together be used to make requests to the sbot
@ -128,13 +128,19 @@ impl Sbot {
/// Call the `partialReplication getSubset` RPC method /// Call the `partialReplication getSubset` RPC method
/// and return a Stream of Result<SsbMessageKVT, GolgiError> /// and return a Stream of Result<SsbMessageKVT, GolgiError>
// TODO: add args for `descending` and `page` (max number of msgs in response) ///
/// # Arguments
///
/// * `query` - A `SubsetQuery` which specifies what filters to use.
/// * `option` - An Option<`SubsetQueryOptions`> which, if provided, adds additional
/// specifications to the query, such as specifying page limit and/or descending.
pub async fn get_subset_stream( pub async fn get_subset_stream(
&mut self, &mut self,
query: SubsetQuery, query: SubsetQuery,
options: Option<SubsetQueryOptions>
) -> Result<impl Stream<Item = Result<SsbMessageKVT, GolgiError>>, GolgiError> { ) -> Result<impl Stream<Item = Result<SsbMessageKVT, GolgiError>>, GolgiError> {
let mut sbot_connection = self.get_sbot_connection().await?; let mut sbot_connection = self.get_sbot_connection().await?;
let req_id = sbot_connection.client.getsubset_req_send(query).await?; let req_id = sbot_connection.client.getsubset_req_send(query, options).await?;
let get_subset_stream = let get_subset_stream =
get_source_stream(sbot_connection.rpc_reader, req_id, utils::kvt_res_parse).await; get_source_stream(sbot_connection.rpc_reader, req_id, utils::kvt_res_parse).await;
Ok(get_subset_stream) Ok(get_subset_stream)
@ -241,7 +247,13 @@ impl Sbot {
op: "author".to_string(), op: "author".to_string(),
feed: ssb_id.to_string(), feed: ssb_id.to_string(),
}; };
let get_subset_kvt_stream = self.get_subset_stream(query).await?; // specify that most recent messages should be returned first
let query_options = SubsetQueryOptions {
descending: Some(true),
keys: None,
page_limit: None
};
let get_subset_kvt_stream = self.get_subset_stream(query, Some(query_options)).await?;
// map into Stream<Item=Result<SsbMessageValue, GolgiError>> // map into Stream<Item=Result<SsbMessageValue, GolgiError>>
let ssb_message_stream = get_subset_kvt_stream.map(|msg| match msg { let ssb_message_stream = get_subset_kvt_stream.map(|msg| match msg {
Ok(val) => Ok(val.value), Ok(val) => Ok(val.value),
@ -267,28 +279,39 @@ impl Sbot {
) -> Result<Option<String>, GolgiError> { ) -> Result<Option<String>, GolgiError> {
// get about_message_stream // get about_message_stream
let about_message_stream = self.get_about_message_stream(ssb_id).await?; let about_message_stream = self.get_about_message_stream(ssb_id).await?;
// TODO: use subset query to order messages instead of doing it this way // now we have a stream of about messages with most recent at the front of the vector
let mut about_messages: Vec<SsbMessageValue> = about_message_stream.try_collect().await?; pin_mut!(about_message_stream);
about_messages.sort_by(|a, b| b.timestamp.partial_cmp(&a.timestamp).unwrap());
// now we have a vector of about messages with most recent at the front of the vector
// iterate through the vector looking for most recent about message with the given key // iterate through the vector looking for most recent about message with the given key
let latest_about = about_messages let latest_about_message: Result<SsbMessageValue, GolgiError> = about_message_stream
.iter()
// find the first msg that contains the field `key` // find the first msg that contains the field `key`
.find(|msg| msg.content.get(key).is_some()) .find(|res| {
// map the found msg (`Some(SsbMessageValue)`) to a `Some(Some(&Value))` match res {
.map(|msg| msg.content.get(key)) Ok(msg) => {
// flatten `Some(Some(&Value))` into `Some(&Value)` msg.content.get(key).is_some()
.flatten() },
// map `Some(&Value)` to `Some(Some(&str))` Err(_) => {
.map(|msg_val| msg_val.as_str()) false
// flatten `Some(Some(&str))` to `Some(&str)` }
.flatten() }
// map `Some(&str))` to `Some(String)` }).await.ok_or(GolgiError::Sbot("error while looking for about message with given key".to_string()))?;
.map(|msg_str| msg_str.to_string()); let latest_about_value = match latest_about_message {
Ok(msg) => {
msg
// SsbMessageValue -> Option<&Value>
.content.get(key)
// Option<&Value> -> Option<Option<&str>>
.map(|value| value.as_str())
// Option<Option<&str>> -> Option<&str>
.flatten()
// Option<&str> -> Option<String>
.map(|value| value.to_string())
}
Err(_) => {
None
}
};
// return value is either `Ok(Some(String))` or `Ok(None)` // return value is either `Ok(Some(String))` or `Ok(None)`
Ok(latest_about) Ok(latest_about_value)
} }
/// Get latest about name from given user /// Get latest about name from given user