golgi/src/utils.rs

242 lines
8.8 KiB
Rust

//! Utility methods for `golgi`.
use std::fmt::Debug;
use async_std::{io::Read, net::TcpStream, stream::Stream};
use async_stream::stream;
use kuska_ssb::rpc::{RecvMsg, RequestNo, RpcReader};
use serde_json::Value;
use crate::error::GolgiError;
use crate::messages::{SsbMessageKVT, SsbMessageValue};
/// Function to parse an array of bytes (returned by an rpc call) into a KVT.
///
/// # Arguments
///
/// * `body` - An array of u8 to be parsed.
pub fn kvt_res_parse(body: &[u8]) -> Result<SsbMessageKVT, GolgiError> {
let value: Value = serde_json::from_slice(body)?;
let kvt: SsbMessageKVT = serde_json::from_value(value)?;
Ok(kvt)
}
/// Function to parse an array of bytes (returned by an rpc call) into a String.
///
/// # Arguments
///
/// * `body` - An array of u8 to be parsed.
pub fn string_res_parse(body: &[u8]) -> Result<String, GolgiError> {
Ok(std::str::from_utf8(body)?.to_string())
}
/// Function to parse an array of bytes (returned by an rpc call) into a serde_json::Value.
///
/// # Arguments
///
/// * `body` - An array of u8 to be parsed.
pub fn json_res_parse(body: &[u8]) -> Result<Value, GolgiError> {
let message: Value = serde_json::from_slice(body)?;
Ok(message)
}
/// Function to parse an array of bytes (returned by an rpc call) into an SsbMessageValue
///
/// # Arguments
///
/// * `body` - An array of u8 to be parsed.
pub fn ssb_message_res_parse(body: &[u8]) -> Result<SsbMessageValue, GolgiError> {
let message: SsbMessageValue = serde_json::from_slice(body)?;
Ok(message)
}
/// Takes in an rpc request number, and a handling function,
/// and waits for an rpc response which matches the request number,
/// and then calls the handling function on the response.
///
/// # Arguments
///
/// * `rpc_reader` - A `RpcReader` which can return Messages in a loop
/// * `req_no` - A `RequestNo` of the response to listen for
/// * `f` - A function which takes in an array of u8 and returns a Result<T, GolgiError>.
/// This is a function which parses the response from the RpcReader. T is a generic type,
/// so this parse function can return multiple possible types (String, json, custom struct etc.)
pub async fn get_async<'a, R, T, F>(
rpc_reader: &mut RpcReader<R>,
req_no: RequestNo,
f: F,
) -> Result<T, GolgiError>
where
R: Read + Unpin,
F: Fn(&[u8]) -> Result<T, GolgiError>,
T: Debug,
{
loop {
let (id, msg) = rpc_reader.recv().await?;
if id == req_no {
match msg {
RecvMsg::RpcResponse(_type, body) => {
return f(&body).map_err(|err| err);
}
RecvMsg::ErrorResponse(message) => {
return Err(GolgiError::Sbot(message));
}
RecvMsg::CancelStreamRespose() => {
return Err(GolgiError::Sbot(
"sbot returned CancelStreamResponse before any content".to_string(),
));
}
_ => {}
}
}
}
}
/// Takes in an rpc request number, and a handling function,
/// and calls the handling function on all RPC responses which match the request number,
/// appending the result of each parsed message to a vector,
/// until a CancelStreamResponse is found, marking the end of the stream,
/// and then finally a result is returned.
///
/// # Arguments
///
/// * `rpc_reader` - A `RpcReader` which can return Messages in a loop
/// * `req_no` - A `RequestNo` of the response to listen for
/// * `f` - A function which takes in an array of u8 and returns a Result<T, GolgiError>.
/// This is a function which parses the response from the RpcReader. T is a generic type,
/// so this parse function can return multiple possible types (String, json, custom struct etc.)
pub async fn get_source_until_eof<'a, R, T, F>(
rpc_reader: &mut RpcReader<R>,
req_no: RequestNo,
f: F,
) -> Result<Vec<T>, GolgiError>
where
R: Read + Unpin,
F: Fn(&[u8]) -> Result<T, GolgiError>,
T: Debug,
{
let mut messages: Vec<T> = Vec::new();
loop {
let (id, msg) = rpc_reader.recv().await?;
if id == req_no {
match msg {
RecvMsg::RpcResponse(_type, body) => {
let parsed_response: Result<T, GolgiError> = f(&body);
match parsed_response {
Ok(parsed_message) => {
messages.push(parsed_message);
}
Err(err) => {
return Err(err);
}
}
}
RecvMsg::ErrorResponse(message) => {
return Err(GolgiError::Sbot(message));
}
RecvMsg::CancelStreamRespose() => break,
_ => {}
}
}
}
Ok(messages)
}
/// Takes in an rpc request number, and a handling function,
/// and calls the handling function on all responses which match the request number,
/// and prints out the result of the handling function.
///
/// This is a function useful for debugging, and only prints the output.
///
/// # Arguments
///
/// * `rpc_reader` - A `RpcReader` which can return Messages in a loop
/// * `req_no` - A `RequestNo` of the response to listen for
/// * `f` - A function which takes in an array of u8 and returns a Result<T, GolgiError>.
/// This is a function which parses the response from the RpcReader. T is a generic type,
/// so this parse function can return multiple possible types (String, json, custom struct etc.)
pub async fn print_source_until_eof<'a, R, T, F>(
rpc_reader: &mut RpcReader<R>,
req_no: RequestNo,
f: F,
) -> Result<(), GolgiError>
where
R: Read + Unpin,
F: Fn(&[u8]) -> Result<T, GolgiError>,
T: Debug + serde::Deserialize<'a>,
{
loop {
let (id, msg) = rpc_reader.recv().await?;
if id == req_no {
match msg {
RecvMsg::RpcResponse(_type, body) => {
let display = f(&body)?;
println!("{:?}", display);
}
RecvMsg::ErrorResponse(message) => {
return Err(GolgiError::Sbot(message));
}
RecvMsg::CancelStreamRespose() => break,
_ => {}
}
}
}
Ok(())
}
/// Takes in an rpc request number, and a handling function (parsing results of type T),
/// and produces an async_std::stream::Stream
/// of results of type T where the handling function is called
/// on all rpc_reader responses which match the request number.
///
/// # Arguments
///
/// * `req_no` - A `RequestNo` of the response to listen for
/// * `f` - A function which takes in an array of u8 and returns a Result<T, GolgiError>.
/// This is a function which parses the response from the RpcReader. T is a generic type,
/// so this parse function can return multiple possible types (String, json, custom struct etc.)
pub async fn get_source_stream<'a, F, T>(
mut rpc_reader: RpcReader<TcpStream>,
req_no: RequestNo,
f: F,
) -> impl Stream<Item = Result<T, GolgiError>>
where
F: Fn(&[u8]) -> Result<T, GolgiError>,
T: Debug + serde::Deserialize<'a>,
{
// we use the async_stream::stream macro to allow for creating a stream which calls async functions
// see https://users.rust-lang.org/t/how-to-create-async-std-stream-which-calls-async-function-in-poll-next/69760
let source_stream = stream! {
loop {
// get the next message from the rpc_reader
let (id, msg) = rpc_reader.recv().await?;
let x : i32 = id.clone();
// check if the next message from rpc_reader matches the req_no we are looking for
// if it matches, then this rpc response is for the given request
// and if it doesn't match, then we ignore it
if x == req_no {
match msg {
RecvMsg::RpcResponse(_type, body) => {
// parse an item of type T from the message body using the provided
// function for parsing
let item = f(&body)?;
// return Ok(item) as the next value in the stream
yield Ok(item)
}
RecvMsg::ErrorResponse(message) => {
// if an error is received
// return an Err(err) as the next value in the stream
yield Err(GolgiError::Sbot(message.to_string()));
}
// if we find a CancelStreamResponse
// this is the end of the stream
RecvMsg::CancelStreamRespose() => break,
// if we find an unknown response, we just continue the loop
_ => {}
}
}
}
};
// finally return the stream object
source_stream
}