//! Utility methods. use std::fmt::Debug; use async_std::{io::Read, net::TcpStream, stream::Stream}; use async_stream::stream; use kuska_ssb::rpc::{RecvMsg, RequestNo, RpcReader}; use serde_json::Value; use crate::error::GolgiError; use crate::messages::{SsbMessageKVT, SsbMessageValue}; /// Parse an array of bytes (returned by an rpc call) into a `KVT`. /// /// # Arguments /// /// * `body` - An array of u8 to be parsed. pub fn kvt_res_parse(body: &[u8]) -> Result { let value: Value = serde_json::from_slice(body)?; let kvt: SsbMessageKVT = serde_json::from_value(value)?; Ok(kvt) } /// Parse an array of bytes (returned by an rpc call) into a `String`. /// /// # Arguments /// /// * `body` - An array of u8 to be parsed. pub fn string_res_parse(body: &[u8]) -> Result { Ok(std::str::from_utf8(body)?.to_string()) } /// Parse an array of bytes (returned by an rpc call) into a `serde_json::Value`. /// /// # Arguments /// /// * `body` - An array of u8 to be parsed. pub fn json_res_parse(body: &[u8]) -> Result { let message: Value = serde_json::from_slice(body)?; Ok(message) } /// Parse an array of bytes (returned by an rpc call) into an `SsbMessageValue`. /// /// # Arguments /// /// * `body` - An array of u8 to be parsed. pub fn ssb_message_res_parse(body: &[u8]) -> Result { let message: SsbMessageValue = serde_json::from_slice(body)?; Ok(message) } /// Take in an RPC request number along with a handling function and wait for /// an RPC response which matches the request number. Then, call the handling /// function on the response. /// /// # Arguments /// /// * `rpc_reader` - A `RpcReader` which can return Messages in a loop /// * `req_no` - A `RequestNo` of the response to listen for /// * `f` - A function which takes in an array of `u8` and returns a /// `Result`. This is a function which parses the response from /// the `RpcReader`. `T` is a generic type, so this parse function can return /// multiple possible types (`String`, JSON, custom struct etc.) pub async fn get_async<'a, R, T, F>( rpc_reader: &mut RpcReader, req_no: RequestNo, f: F, ) -> Result where R: Read + Unpin, F: Fn(&[u8]) -> Result, T: Debug, { loop { let (id, msg) = rpc_reader.recv().await?; if id == req_no { match msg { RecvMsg::RpcResponse(_type, body) => { return f(&body); } RecvMsg::ErrorResponse(message) => { return Err(GolgiError::Sbot(message)); } RecvMsg::CancelStreamRespose() => { return Err(GolgiError::Sbot( "sbot returned CancelStreamResponse before any content".to_string(), )); } _ => {} } } } } /// Take in an RPC request number along with a handling function and call /// the handling function on all RPC responses which match the request number, /// appending the result of each parsed message to a vector until a /// `CancelStreamResponse` is found (marking the end of the stream). /// /// # Arguments /// /// * `rpc_reader` - A `RpcReader` which can return Messages in a loop /// * `req_no` - A `RequestNo` of the response to listen for /// * `f` - A function which takes in an array of `u8` and returns a /// `Result`. This is a function which parses the response from /// the `RpcReader`. `T` is a generic type, so this parse function can return /// multiple possible types (`String`, JSON, custom struct etc.) pub async fn get_source_until_eof<'a, R, T, F>( rpc_reader: &mut RpcReader, req_no: RequestNo, f: F, ) -> Result, GolgiError> where R: Read + Unpin, F: Fn(&[u8]) -> Result, T: Debug, { let mut messages: Vec = Vec::new(); loop { let (id, msg) = rpc_reader.recv().await?; if id == req_no { match msg { RecvMsg::RpcResponse(_type, body) => { let parsed_response: Result = f(&body); match parsed_response { Ok(parsed_message) => { messages.push(parsed_message); } Err(err) => { return Err(err); } } } RecvMsg::ErrorResponse(message) => { return Err(GolgiError::Sbot(message)); } RecvMsg::CancelStreamRespose() => break, _ => {} } } } Ok(messages) } /// Take in an RPC request number along with a handling function and call the /// handling function on all responses which match the request number. Then, /// prints out the result of the handling function. /// /// This function useful for debugging and only prints the output. /// /// # Arguments /// /// * `rpc_reader` - A `RpcReader` which can return Messages in a loop /// * `req_no` - A `RequestNo` of the response to listen for /// * `f` - A function which takes in an array of `u8` and returns a /// `Result`. This is a function which parses the response from /// the `RpcReader`. `T` is a generic type, so this parse function can return /// multiple possible types (`String`, JSON, custom struct etc.) pub async fn print_source_until_eof<'a, R, T, F>( rpc_reader: &mut RpcReader, req_no: RequestNo, f: F, ) -> Result<(), GolgiError> where R: Read + Unpin, F: Fn(&[u8]) -> Result, T: Debug + serde::Deserialize<'a>, { loop { let (id, msg) = rpc_reader.recv().await?; if id == req_no { match msg { RecvMsg::RpcResponse(_type, body) => { let display = f(&body)?; println!("{:?}", display); } RecvMsg::ErrorResponse(message) => { return Err(GolgiError::Sbot(message)); } RecvMsg::CancelStreamRespose() => break, _ => {} } } } Ok(()) } /// Take in an RPC request number along with a handling function (parsing /// results of type `T`) and produce an `async_std::stream::Stream` of results /// of type `T`, where the handling function is called on all `RpcReader` /// responses which match the request number. /// /// # Arguments /// /// * `req_no` - A `RequestNo` of the response to listen for /// * `f` - A function which takes in an array of `u8` and returns a /// `Result`. This is a function which parses the response from /// the `RpcReader`. `T` is a generic type, so this parse function can return /// multiple possible types (`String`, JSON, custom struct etc.) pub async fn get_source_stream<'a, F, T>( mut rpc_reader: RpcReader, req_no: RequestNo, f: F, ) -> impl Stream> where F: Fn(&[u8]) -> Result, T: Debug + serde::Deserialize<'a>, { // we use the async_stream::stream macro to allow for creating a stream which calls async functions // see https://users.rust-lang.org/t/how-to-create-async-std-stream-which-calls-async-function-in-poll-next/69760 let source_stream = stream! { loop { // get the next message from the rpc_reader let (id, msg) = rpc_reader.recv().await?; let x : i32 = id.clone(); // check if the next message from rpc_reader matches the req_no we are looking for // if it matches, then this rpc response is for the given request // and if it doesn't match, then we ignore it if x == req_no { match msg { RecvMsg::RpcResponse(_type, body) => { // parse an item of type T from the message body using the provided // function for parsing let item = f(&body)?; // return Ok(item) as the next value in the stream yield Ok(item) } RecvMsg::ErrorResponse(message) => { // if an error is received // return an Err(err) as the next value in the stream yield Err(GolgiError::Sbot(message.to_string())); } // if we find a CancelStreamResponse // this is the end of the stream RecvMsg::CancelStreamRespose() => break, // if we find an unknown response, we just continue the loop _ => {} } } } }; // finally return the stream object source_stream }