Merge branch 'about-stream2' into get_profile_info

This commit is contained in:
notplants 2022-01-12 15:24:19 -05:00
commit 0055a1ed65
1 changed files with 8 additions and 11 deletions

View File

@ -128,7 +128,7 @@ impl Sbot {
} }
/// Call the `partialReplication getSubset` RPC method /// Call the `partialReplication getSubset` RPC method
/// and return a Stream of Result<SsbMessageKVT, GolgiError> /// and return a Stream of Result<SsbMessageValue, GolgiError>
/// ///
/// # Arguments /// # Arguments
/// ///
@ -139,14 +139,14 @@ impl Sbot {
&mut self, &mut self,
query: SubsetQuery, query: SubsetQuery,
options: Option<SubsetQueryOptions>, options: Option<SubsetQueryOptions>,
) -> Result<impl Stream<Item = Result<SsbMessageKVT, GolgiError>>, GolgiError> { ) -> Result<impl Stream<Item = Result<SsbMessageValue, GolgiError>>, GolgiError> {
let mut sbot_connection = self.get_sbot_connection().await?; let mut sbot_connection = self.get_sbot_connection().await?;
let req_id = sbot_connection let req_id = sbot_connection
.client .client
.getsubset_req_send(query, options) .getsubset_req_send(query, options)
.await?; .await?;
let get_subset_stream = let get_subset_stream =
get_source_stream(sbot_connection.rpc_reader, req_id, utils::kvt_res_parse).await; get_source_stream(sbot_connection.rpc_reader, req_id, utils::ssb_message_res_parse).await;
Ok(get_subset_stream) Ok(get_subset_stream)
} }
@ -253,18 +253,15 @@ impl Sbot {
keys: None, keys: None,
page_limit: None, page_limit: None,
}; };
let get_subset_kvt_stream = self.get_subset_stream(query, Some(query_options)).await?; let get_subset_stream = self.get_subset_stream(query, Some(query_options)).await?;
// map into Stream<Item=Result<SsbMessageValue, GolgiError>>
let ssb_message_stream = get_subset_kvt_stream.map(|msg| match msg {
Ok(val) => Ok(val.value),
Err(err) => Err(err),
});
// TODO: after fixing sbot regression, // TODO: after fixing sbot regression,
// change this subset query to filter by type about in addition to author // change this subset query to filter by type about in addition to author
// and remove this filter section // and remove this filter section
// filter down to about messages // filter down to about messages
let about_message_stream = ssb_message_stream.filter(|msg| match msg { let about_message_stream = get_subset_stream.filter(|msg| match msg {
Ok(val) => val.is_message_type(SsbMessageContentType::About), Ok(val) => {
val.is_message_type(SsbMessageContentType::About)
}
Err(_err) => false, Err(_err) => false,
}); });
// return about message stream // return about message stream