Implement get_about_message_stream

This commit is contained in:
notplants 2022-01-05 15:08:51 -05:00
parent 386642d6f1
commit aa60a6e136
3 changed files with 63 additions and 44 deletions

View File

@ -8,10 +8,10 @@ async fn run() -> Result<(), GolgiError> {
let mut sbot_client = Sbot::init(None, None).await?; let mut sbot_client = Sbot::init(None, None).await?;
let id = sbot_client.whoami().await?; let id = sbot_client.whoami().await?;
println!("{}", id); println!("whoami: {}", id);
let name = SsbMessageContent::About { let name = SsbMessageContent::About {
about: id, about: id.clone(),
name: Some("golgi".to_string()), name: Some("golgi".to_string()),
title: None, title: None,
branch: None, branch: None,
@ -22,7 +22,7 @@ async fn run() -> Result<(), GolgiError> {
}; };
let name_msg_ref = sbot_client.publish(name).await?; let name_msg_ref = sbot_client.publish(name).await?;
println!("{}", name_msg_ref); println!("name_msg_ref: {}", name_msg_ref);
let post = SsbMessageContent::Post { let post = SsbMessageContent::Post {
text: "golgi go womp womp".to_string(), text: "golgi go womp womp".to_string(),
@ -30,13 +30,18 @@ async fn run() -> Result<(), GolgiError> {
}; };
let post_msg_ref = sbot_client.publish(post).await?; let post_msg_ref = sbot_client.publish(post).await?;
println!("{}", post_msg_ref); println!("post_msg_ref: {}", post_msg_ref);
let post_msg_ref = sbot_client let post_msg_ref = sbot_client
.publish_description("this is a description") .publish_description("this is a description7")
.await?; .await?;
println!("description: {}", post_msg_ref); println!("description: {}", post_msg_ref);
let author: String = id.clone();
println!("author: {:?}", author);
let description = sbot_client.get_description(&author).await?;
println!("found description: {:?}", description);
Ok(()) Ok(())
} }

View File

@ -11,9 +11,9 @@ async fn run() -> Result<(), GolgiError> {
let mut sbot_client = Sbot::init(None, None).await?; let mut sbot_client = Sbot::init(None, None).await?;
let id = sbot_client.whoami().await?; let id = sbot_client.whoami().await?;
println!("{}", id); println!("whoami: {}", id);
let author = "@L/z54cbc8V1kL1/MiBhpEKuN3QJkSoZYNaukny3ghIs=.ed25519"; let author = id.clone();
// create a history stream // create a history stream
let history_stream = sbot_client let history_stream = sbot_client

View File

@ -1,5 +1,9 @@
//! Sbot type and associated methods. //! Sbot type and associated methods.
use async_std::{net::TcpStream, stream::Stream}; use async_std::{
net::TcpStream,
stream::{Stream, StreamExt},
};
use futures::TryStreamExt;
use kuska_handshake::async_std::BoxStream; use kuska_handshake::async_std::BoxStream;
use kuska_sodiumoxide::crypto::{auth, sign::ed25519}; use kuska_sodiumoxide::crypto::{auth, sign::ed25519};
@ -30,7 +34,7 @@ pub struct SbotConnection {
/// The Scuttlebutt identity, keys and configuration parameters for connecting to a local sbot /// The Scuttlebutt identity, keys and configuration parameters for connecting to a local sbot
/// instance, as well as handles for calling RPC methods and receiving responses. /// instance, as well as handles for calling RPC methods and receiving responses.
pub struct Sbot { pub struct Sbot {
id: String, pub id: String,
public_key: ed25519::PublicKey, public_key: ed25519::PublicKey,
private_key: ed25519::SecretKey, private_key: ed25519::SecretKey,
address: String, address: String,
@ -122,37 +126,41 @@ impl Sbot {
Ok(sbot_connection) Ok(sbot_connection)
} }
/// Call the `partialReplication getSubset` RPC method and return a vector /// Call the `partialReplication getSubset` RPC method
/// of messages as KVTs (key, value, timestamp). /// and return a Stream of Result<SsbMessageKVT, GolgiError>
// TODO: add args for `descending` and `page` (max number of msgs in response) // TODO: add args for `descending` and `page` (max number of msgs in response)
pub async fn get_subset( pub async fn get_subset_stream(
&mut self, &mut self,
query: SubsetQuery, query: SubsetQuery,
) -> Result<Vec<SsbMessageKVT>, GolgiError> { ) -> Result<impl Stream<Item = Result<SsbMessageKVT, GolgiError>>, GolgiError> {
let req_id = self let mut sbot_connection = self.get_sbot_connection().await?;
.sbot_connection let req_id = sbot_connection.client.getsubset_req_send(query).await?;
.client let get_subset_stream =
.getsubset_req_send(query) get_source_stream(sbot_connection.rpc_reader, req_id, utils::kvt_res_parse).await;
.await?; Ok(get_subset_stream)
utils::get_source_until_eof(
&mut self.sbot_connection.rpc_reader,
req_id,
utils::kvt_res_parse,
)
.await
} }
/// Call the `whoami` RPC method and return an `id`. /// Call the `whoami` RPC method and return an `id`.
pub async fn whoami(&mut self) -> Result<String, GolgiError> { pub async fn whoami(&mut self) -> Result<String, GolgiError> {
let req_id = self.sbot_connection.client.whoami_req_send().await?; let req_id = self.sbot_connection.client.whoami_req_send().await?;
utils::get_async( let result = utils::get_async(
&mut self.sbot_connection.rpc_reader, &mut self.sbot_connection.rpc_reader,
req_id, req_id,
utils::string_res_parse, utils::json_res_parse,
) )
.await .await?;
let id = result
.get("id")
.ok_or(GolgiError::Sbot(
"id key not found on whoami call".to_string(),
))?
.as_str()
.ok_or(GolgiError::Sbot(
"whoami returned non-string value".to_string(),
))?;
Ok(id.to_string())
} }
/// Call the `publish` RPC method and return a message reference. /// Call the `publish` RPC method and return a message reference.
@ -225,28 +233,30 @@ impl Sbot {
} }
/// Get the about messages for a particular user in order of recency. /// Get the about messages for a particular user in order of recency.
pub async fn get_about_messages( pub async fn get_about_message_stream(
&mut self, &mut self,
ssb_id: &str, ssb_id: &str,
) -> Result<Vec<SsbMessageValue>, GolgiError> { ) -> Result<impl Stream<Item = Result<SsbMessageValue, GolgiError>>, GolgiError> {
let query = SubsetQuery::Author { let query = SubsetQuery::Author {
op: "author".to_string(), op: "author".to_string(),
feed: ssb_id.to_string(), feed: ssb_id.to_string(),
}; };
let kvts: Vec<SsbMessageKVT> = self.get_subset(query).await?; let get_subset_kvt_stream = self.get_subset_stream(query).await?;
let messages: Vec<SsbMessageValue> = kvts.into_iter().map(|kvt| kvt.value).collect(); // map into Stream<Item=Result<SsbMessageValue, GolgiError>>
let ssb_message_stream = get_subset_kvt_stream.map(|msg| match msg {
Ok(val) => Ok(val.value),
Err(err) => Err(err),
});
// TODO: after fixing sbot regression, // TODO: after fixing sbot regression,
// change this subset query to filter by type about in addition to author // change this subset query to filter by type about in addition to author
// and remove this filter section // and remove this filter section
// filter down to about messages // filter down to about messages
let mut about_messages: Vec<SsbMessageValue> = messages let about_message_stream = ssb_message_stream.filter(|msg| match msg {
.into_iter() Ok(val) => val.is_message_type(SsbMessageContentType::About),
.filter(|msg| msg.is_message_type(SsbMessageContentType::About)) Err(_err) => false,
.collect(); });
// TODO: use subset query to order messages instead of doing it this way // return about message stream
about_messages.sort_by(|a, b| b.timestamp.partial_cmp(&a.timestamp).unwrap()); Ok(about_message_stream)
// return about messages
Ok(about_messages)
} }
/// Get value of latest about message with given key from given user /// Get value of latest about message with given key from given user
@ -255,8 +265,12 @@ impl Sbot {
ssb_id: &str, ssb_id: &str,
key: &str, key: &str,
) -> Result<Option<String>, GolgiError> { ) -> Result<Option<String>, GolgiError> {
// vector of about messages with most recent at the front of the vector // get about_message_stream
let about_messages = self.get_about_messages(ssb_id).await?; let about_message_stream = self.get_about_message_stream(ssb_id).await?;
// TODO: use subset query to order messages instead of doing it this way
let mut about_messages: Vec<SsbMessageValue> = about_message_stream.try_collect().await?;
about_messages.sort_by(|a, b| b.timestamp.partial_cmp(&a.timestamp).unwrap());
// now we have a vector of about messages with most recent at the front of the vector
// iterate through the vector looking for most recent about message with the given key // iterate through the vector looking for most recent about message with the given key
let latest_about = about_messages let latest_about = about_messages
.iter() .iter()
@ -287,7 +301,7 @@ impl Sbot {
self.get_latest_about_message(ssb_id, "name").await self.get_latest_about_message(ssb_id, "name").await
} }
/// Get lateset about description from given user /// Get latest about description from given user
/// ///
/// # Arguments /// # Arguments
/// ///
@ -303,7 +317,7 @@ impl Sbot {
&mut self, &mut self,
id: String, id: String,
) -> Result<impl Stream<Item = Result<SsbMessageValue, GolgiError>>, GolgiError> { ) -> Result<impl Stream<Item = Result<SsbMessageValue, GolgiError>>, GolgiError> {
let mut sbot_connection = self.get_sbot_connection().await.unwrap(); let mut sbot_connection = self.get_sbot_connection().await?;
let args = CreateHistoryStreamIn::new(id); let args = CreateHistoryStreamIn::new(id);
let req_id = sbot_connection let req_id = sbot_connection
.client .client