Working create_history_stream

This commit is contained in:
notplants 2022-01-04 14:09:49 -05:00
parent 58d133b5c8
commit 8ad65f785d
2 changed files with 137 additions and 30 deletions

View File

@ -0,0 +1,97 @@
use std::process;
use golgi::error::GolgiError;
use golgi::sbot::Sbot;
use async_std::stream::StreamExt;
use futures::pin_mut;
async fn run() -> Result<(), GolgiError> {
let mut sbot_client = Sbot::init(None, None).await?;
let id = sbot_client.whoami().await?;
println!("{}", id);
//
// let name = SsbMessageContent::About {
// about: id,
// name: Some("golgi".to_string()),
// title: None,
// branch: None,
// image: None,
// description: None,
// location: None,
// start_datetime: None,
// };
//
// let name_msg_ref = sbot_client.publish(name).await?;
// println!("{}", name_msg_ref);
//
// let post = SsbMessageContent::Post {
// text: "golgi go womp womp".to_string(),
// mentions: None,
// };
//
// let post_msg_ref = sbot_client.publish(post).await?;
// println!("{}", post_msg_ref);
//
// let post_msg_ref = sbot_client.publish_description("this is a description").await?;
// println!("description: {}", post_msg_ref);
let author = "@L/z54cbc8V1kL1/MiBhpEKuN3QJkSoZYNaukny3ghIs=.ed25519";
// let query = SubsetQuery::Author{
// op: "author".to_string(),
// feed: author
// };
//
// let kvts = sbot_client.get_subset(query).await?;
//
// for kvt in kvts {
// println!("kvt: {:?}", kvt);
// }
// let about_messages = sbot_client.get_about_messages(author).await?;
// for msg in about_messages {
// println!("msg: {:?}", msg);
// }
//
// sbot_client.publish_name("this is my new name").await?;
// let name = sbot_client.get_name(author).await?;
// println!("name: {:?}", name);
//
// // sbot_client.publish_description("this is test description 99").await?;
//
// let description = sbot_client.get_description(author).await?;
// println!("desc: {:?}", description);
//
// let description2 = sbot_client.get_latest_about_message(author, "description").await?;
// println!("desc2: {:?}", description2);
let mut history_stream = sbot_client.create_history_stream(author.to_string()).await?;
pin_mut!(history_stream); // needed for iteration
while let Some(res) = history_stream.next().await {
match res {
Ok(value) => {
println!("value: {:?}", value);
},
Err(err) => {
println!("err: {:?}", err);
}
}
}
println!("exit loop");
Ok(())
}
#[async_std::main]
async fn main() {
if let Err(e) = run().await {
eprintln!("Application error: {}", e);
process::exit(1);
}
}

View File

@ -228,58 +228,68 @@ impl Sbot {
self.get_latest_about_message(ssb_id, "description").await
}
/// Call the `createHistoryStream` RPC method and return a vector
/// of SsbMessageValue.
pub async fn create_history_stream(
&mut self,
/// Call the `createHistoryStream` RPC method
/// and return a Stream of Result<SsbMessageValue, GolgiError>
pub async fn create_history_stream<'a>(
&'a mut self,
id: String,
) -> Result<i32, GolgiError> {
) -> Result<impl Stream<Item = Result<SsbMessageValue, GolgiError>> + 'a, GolgiError> {
let args = CreateHistoryStreamIn::new(id);
let req_id = self.client.create_history_stream_req_send(&args).await?;
let source_stream = self.get_source_stream(req_id, utils::ssb_message_res_parse);
pin_mut!(source_stream); // needed for iteration
while let Some(res) = source_stream.next().await {
match res {
Ok(value) => {
println!("value: {:?}", value);
},
Err(err) => {
println!("err: {:?}", err);
}
}
}
println!("exit loop");
Ok(2)
// utils::get_source_until_eof(&mut self.rpc_reader, req_id, utils::ssb_message_res_parse).await
let history_stream = self.get_source_stream(req_id, utils::ssb_message_res_parse);
Ok(history_stream)
}
pub fn get_source_stream<'a, F, T>(&'a mut self, req_no: RequestNo, f: F) -> impl Stream<Item = Result<RequestNo, GolgiError>> + 'a
/// Takes in an rpc request number, and a handling function (parsing results of type T),
/// and produces an async_std::stream::Stream
/// of results of type T where the handling functions is called
/// on all rpc_reader responses which match the request number
///
/// # Arguments
///
/// * `req_no` - A `RequestNo` of the response to listen for
/// * `f` - A function which takes in an array of u8 and returns a Result<T, GolgiError>.
/// This is a function which parses the response from the RpcReader. T is a generic type,
/// so this parse function can return multiple possible types (String, json, custom struct etc.)
pub fn get_source_stream<'a, F, T>(&'a mut self, req_no: RequestNo, f: F) -> impl Stream<Item = Result<T, GolgiError>> + 'a
where
F: Fn(&[u8]) -> Result<T, GolgiError> + 'a,
T: Debug + serde::Deserialize<'a>,
T: Debug + serde::Deserialize<'a> + 'a,
{
let s = stream! {
// we use the async_stream::stream macro to allow for creating a stream which calls async functions
// see https://users.rust-lang.org/t/how-to-create-async-std-stream-which-calls-async-function-in-poll-next/69760
let source_stream = stream! {
loop {
// get the next message from the rpc_reader
let (id, msg) = &self.rpc_reader.recv().await?;
let x : i32 = id.clone();
// check if the next message from rpc_reader matches the req_no we are looking for
// if it matches, then this rpc response is for the given request
// and if it doesn't match, then we ignore it
if x == req_no {
match msg {
RecvMsg::RpcResponse(_type, body) => {
let display = f(&body)?;
println!("display: {:?}", display);
yield Ok(x)
// parse an item of type T from the message body using the provided
// function for parsing
let item = f(&body)?;
// return Ok(item) as the next value in the stream
yield Ok(item)
}
RecvMsg::ErrorResponse(message) => {
// if an error is received
// return an Err(err) as the next value in the stream
yield Err(GolgiError::Sbot(message.to_string()));
}
// if we find a CancelStreamResponse
// this is the end of the stream
RecvMsg::CancelStreamRespose() => break,
_ => break
// if we find an unknown response, we just continue the loop
_ => {}
}
}
}
};
s
// finally return the stream object
source_stream
}
}