use std::fmt::Debug; use async_std::io::Read; use serde_json::Value; use serde::{Serialize, Deserialize}; use kuska_ssb::api::dto::WhoAmIOut; use kuska_ssb::feed::{Feed, Message}; use kuska_ssb::rpc::{RecvMsg, RequestNo, RpcReader}; use kuska_ssb::api::dto::content::TypedMessage; use crate::error::GolgiError; use crate::messages::{SsbKVT, SsbMessageValue}; /// Function to parse an array of bytes (returned by an rpc call) into a KVT. /// /// # Arguments /// /// * `body` - An array of u8 to be parsed. pub fn kvt_res_parse(body: &[u8]) -> Result { let value: Value = serde_json::from_slice(&body)?; let kvt: SsbKVT = serde_json::from_value(value)?; Ok(kvt) } /// Function to parse an array of bytes (returned by an rpc call) into a String. /// /// # Arguments /// /// * `body` - An array of u8 to be parsed. pub fn string_res_parse(body: &[u8]) -> Result { // TODO: cleanup with proper error handling etc. Ok(std::str::from_utf8(body).unwrap().to_string()) } /// Function to parse an array of bytes (returned by an rpc call) into a serde_json::Value. /// /// # Arguments /// /// * `body` - An array of u8 to be parsed. pub fn json_res_parse(body: &[u8]) -> Result { let message: Value = serde_json::from_slice(body)?; Ok(message) } /// Function to parse an array of bytes (returned by an rpc call) into an SsbMessageValue /// /// # Arguments /// /// * `body` - An array of u8 to be parsed. pub fn ssb_message_res_parse(body: &[u8]) -> Result { let message: SsbMessageValue = serde_json::from_slice(body)?; Ok(message) } //pub fn publish_res_parse(body: &[u8]) -> Result { pub fn publish_res_parse(body: &[u8]) -> Result { //Ok(serde_json::from_slice(body)?) // TODO: cleanup with proper error handling etc. Ok(std::str::from_utf8(body).unwrap().to_string()) } pub fn whoami_res_parse(body: &[u8]) -> Result { Ok(serde_json::from_slice(body)?) } /// Takes in an rpc request number, and a handling function, /// and waits for an rpc response which matches the request number, /// and then calls the handling function on the response. /// /// # Arguments /// /// * `rpc_reader` - A `RpcReader` which can return Messages in a loop /// * `req_no` - A `RequestNo` of the response to listen for /// * `f` - A function which takes in an array of u8 and returns a Result. /// This is a function which parses the response from the RpcReader. T is a generic type, /// so this parse function can return multiple possible types (String, json, custom struct etc.) pub async fn get_async<'a, R, T, F>( rpc_reader: &mut RpcReader, req_no: RequestNo, f: F, ) -> Result where R: Read + Unpin, F: Fn(&[u8]) -> Result, T: Debug, { loop { let (id, msg) = rpc_reader.recv().await?; if id == req_no { match msg { RecvMsg::RpcResponse(_type, body) => { return f(&body).map_err(|err| err); }, RecvMsg::ErrorResponse(message) => { return Err(GolgiError::Sbot(message)); }, RecvMsg::CancelStreamRespose() => { return Err(GolgiError::Sbot("sbot returned CancelStreamResponse before any content".to_string())); }, _ => {} } } } } /// Takes in an rpc request number, and a handling function, /// and calls the handling function on all RPC responses which match the request number, /// appending the result of each parsed message to a vector, /// until a CancelStreamResponse is found, marking the end of the stream, /// and then finally a result is returned. /// /// # Arguments /// /// * `rpc_reader` - A `RpcReader` which can return Messages in a loop /// * `req_no` - A `RequestNo` of the response to listen for /// * `f` - A function which takes in an array of u8 and returns a Result. /// This is a function which parses the response from the RpcReader. T is a generic type, /// so this parse function can return multiple possible types (String, json, custom struct etc.) pub async fn get_async_until_eof<'a, R, T, F>( rpc_reader: &mut RpcReader, req_no: RequestNo, f: F, ) -> Result, GolgiError> where R: Read + Unpin, F: Fn(&[u8]) -> Result, T: Debug, { let mut messages: Vec = Vec::new();; loop { let (id, msg) = rpc_reader.recv().await?; if id == req_no { match msg { RecvMsg::RpcResponse(_type, body) => { let parsed_response: Result = f(&body); match parsed_response { Ok(parsed_message) => { messages.push(parsed_message); }, Err(err) => { return Err(err); } } } RecvMsg::ErrorResponse(message) => { return Err(GolgiError::Sbot(message)); } RecvMsg::CancelStreamRespose() => break, _ => {} } } } Ok(messages) } /// Takes in an rpc request number, and a handling function, /// and calls the handling function on all responses which match the request number, /// and prints out the result of the handling function. /// /// This is a function useful for debugging, and only prints the output. /// /// # Arguments /// /// * `rpc_reader` - A `RpcReader` which can return Messages in a loop /// * `req_no` - A `RequestNo` of the response to listen for /// * `f` - A function which takes in an array of u8 and returns a Result. /// This is a function which parses the response from the RpcReader. T is a generic type, /// so this parse function can return multiple possible types (String, json, custom struct etc.) pub async fn print_source_until_eof<'a, R, T, F>( rpc_reader: &mut RpcReader, req_no: RequestNo, f: F, ) -> Result<(), GolgiError> where R: Read + Unpin, F: Fn(&[u8]) -> Result, T: Debug + serde::Deserialize<'a>, { loop { let (id, msg) = rpc_reader.recv().await?; if id == req_no { match msg { RecvMsg::RpcResponse(_type, body) => { let display = f(&body)?; println!("{:?}", display); } RecvMsg::ErrorResponse(message) => { return Err(GolgiError::Sbot(message)); } RecvMsg::CancelStreamRespose() => { break }, _ => {} } } } Ok(()) }