2022-02-07 14:09:51 +00:00
|
|
|
//! Utility methods.
|
2021-12-29 16:31:53 +00:00
|
|
|
use std::fmt::Debug;
|
2022-01-04 20:00:05 +00:00
|
|
|
|
2022-01-05 19:05:59 +00:00
|
|
|
use async_std::{io::Read, net::TcpStream, stream::Stream};
|
|
|
|
use async_stream::stream;
|
2021-12-02 13:12:52 +00:00
|
|
|
use kuska_ssb::rpc::{RecvMsg, RequestNo, RpcReader};
|
2021-12-29 16:31:53 +00:00
|
|
|
use serde_json::Value;
|
2021-12-28 19:57:30 +00:00
|
|
|
|
2021-12-02 13:12:52 +00:00
|
|
|
use crate::error::GolgiError;
|
2021-12-30 01:08:55 +00:00
|
|
|
use crate::messages::{SsbMessageKVT, SsbMessageValue};
|
2021-12-02 13:12:52 +00:00
|
|
|
|
2022-02-14 15:00:10 +00:00
|
|
|
/// Parse an array of bytes (returned by an rpc call) into a `KVT`.
|
2021-12-28 19:57:30 +00:00
|
|
|
///
|
|
|
|
/// # Arguments
|
|
|
|
///
|
|
|
|
/// * `body` - An array of u8 to be parsed.
|
2021-12-30 01:08:55 +00:00
|
|
|
pub fn kvt_res_parse(body: &[u8]) -> Result<SsbMessageKVT, GolgiError> {
|
2021-12-29 16:31:53 +00:00
|
|
|
let value: Value = serde_json::from_slice(body)?;
|
2021-12-30 01:08:55 +00:00
|
|
|
let kvt: SsbMessageKVT = serde_json::from_value(value)?;
|
2021-12-28 19:57:30 +00:00
|
|
|
Ok(kvt)
|
|
|
|
}
|
|
|
|
|
2022-02-14 15:00:10 +00:00
|
|
|
/// Parse an array of bytes (returned by an rpc call) into a `String`.
|
2021-12-28 19:57:30 +00:00
|
|
|
///
|
|
|
|
/// # Arguments
|
|
|
|
///
|
|
|
|
/// * `body` - An array of u8 to be parsed.
|
2021-12-27 21:43:44 +00:00
|
|
|
pub fn string_res_parse(body: &[u8]) -> Result<String, GolgiError> {
|
2021-12-30 01:08:55 +00:00
|
|
|
Ok(std::str::from_utf8(body)?.to_string())
|
2021-12-27 21:43:44 +00:00
|
|
|
}
|
|
|
|
|
2022-02-14 15:00:10 +00:00
|
|
|
/// Parse an array of bytes (returned by an rpc call) into a `serde_json::Value`.
|
2021-12-28 19:57:30 +00:00
|
|
|
///
|
|
|
|
/// # Arguments
|
|
|
|
///
|
|
|
|
/// * `body` - An array of u8 to be parsed.
|
2021-12-28 15:24:35 +00:00
|
|
|
pub fn json_res_parse(body: &[u8]) -> Result<Value, GolgiError> {
|
|
|
|
let message: Value = serde_json::from_slice(body)?;
|
|
|
|
Ok(message)
|
|
|
|
}
|
|
|
|
|
2022-02-14 15:00:10 +00:00
|
|
|
/// Parse an array of bytes (returned by an rpc call) into an `SsbMessageValue`.
|
2021-12-28 19:57:30 +00:00
|
|
|
///
|
|
|
|
/// # Arguments
|
|
|
|
///
|
|
|
|
/// * `body` - An array of u8 to be parsed.
|
|
|
|
pub fn ssb_message_res_parse(body: &[u8]) -> Result<SsbMessageValue, GolgiError> {
|
|
|
|
let message: SsbMessageValue = serde_json::from_slice(body)?;
|
|
|
|
Ok(message)
|
|
|
|
}
|
|
|
|
|
2022-02-14 15:00:10 +00:00
|
|
|
/// Take in an RPC request number along with a handling function and wait for
|
|
|
|
/// an RPC response which matches the request number. Then, call the handling
|
|
|
|
/// function on the response.
|
2021-12-27 21:43:44 +00:00
|
|
|
///
|
|
|
|
/// # Arguments
|
|
|
|
///
|
|
|
|
/// * `rpc_reader` - A `RpcReader` which can return Messages in a loop
|
|
|
|
/// * `req_no` - A `RequestNo` of the response to listen for
|
2022-02-14 15:00:10 +00:00
|
|
|
/// * `f` - A function which takes in an array of `u8` and returns a
|
|
|
|
/// `Result<T, GolgiError>`. This is a function which parses the response from
|
|
|
|
/// the `RpcReader`. `T` is a generic type, so this parse function can return
|
|
|
|
/// multiple possible types (`String`, JSON, custom struct etc.)
|
2021-12-02 13:12:52 +00:00
|
|
|
pub async fn get_async<'a, R, T, F>(
|
|
|
|
rpc_reader: &mut RpcReader<R>,
|
|
|
|
req_no: RequestNo,
|
|
|
|
f: F,
|
|
|
|
) -> Result<T, GolgiError>
|
2022-01-05 18:58:48 +00:00
|
|
|
where
|
|
|
|
R: Read + Unpin,
|
|
|
|
F: Fn(&[u8]) -> Result<T, GolgiError>,
|
|
|
|
T: Debug,
|
2021-12-02 13:12:52 +00:00
|
|
|
{
|
|
|
|
loop {
|
|
|
|
let (id, msg) = rpc_reader.recv().await?;
|
|
|
|
if id == req_no {
|
|
|
|
match msg {
|
|
|
|
RecvMsg::RpcResponse(_type, body) => {
|
|
|
|
return f(&body).map_err(|err| err);
|
2021-12-29 16:31:53 +00:00
|
|
|
}
|
2021-12-27 21:43:44 +00:00
|
|
|
RecvMsg::ErrorResponse(message) => {
|
|
|
|
return Err(GolgiError::Sbot(message));
|
2021-12-29 16:31:53 +00:00
|
|
|
}
|
2021-12-27 21:43:44 +00:00
|
|
|
RecvMsg::CancelStreamRespose() => {
|
2021-12-29 16:31:53 +00:00
|
|
|
return Err(GolgiError::Sbot(
|
|
|
|
"sbot returned CancelStreamResponse before any content".to_string(),
|
|
|
|
));
|
|
|
|
}
|
2021-12-28 19:57:30 +00:00
|
|
|
_ => {}
|
2021-12-27 21:43:44 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2022-02-14 15:00:10 +00:00
|
|
|
/// Take in an RPC request number along with a handling function and call
|
|
|
|
/// the handling function on all RPC responses which match the request number,
|
|
|
|
/// appending the result of each parsed message to a vector until a
|
|
|
|
/// `CancelStreamResponse` is found (marking the end of the stream).
|
2021-12-27 21:43:44 +00:00
|
|
|
///
|
|
|
|
/// # Arguments
|
|
|
|
///
|
|
|
|
/// * `rpc_reader` - A `RpcReader` which can return Messages in a loop
|
|
|
|
/// * `req_no` - A `RequestNo` of the response to listen for
|
2022-02-14 15:00:10 +00:00
|
|
|
/// * `f` - A function which takes in an array of `u8` and returns a
|
|
|
|
/// `Result<T, GolgiError>`. This is a function which parses the response from
|
|
|
|
/// the `RpcReader`. `T` is a generic type, so this parse function can return
|
|
|
|
/// multiple possible types (`String`, JSON, custom struct etc.)
|
2021-12-30 01:08:55 +00:00
|
|
|
pub async fn get_source_until_eof<'a, R, T, F>(
|
2021-12-27 21:43:44 +00:00
|
|
|
rpc_reader: &mut RpcReader<R>,
|
|
|
|
req_no: RequestNo,
|
|
|
|
f: F,
|
|
|
|
) -> Result<Vec<T>, GolgiError>
|
2022-01-05 18:58:48 +00:00
|
|
|
where
|
|
|
|
R: Read + Unpin,
|
|
|
|
F: Fn(&[u8]) -> Result<T, GolgiError>,
|
|
|
|
T: Debug,
|
2021-12-27 21:43:44 +00:00
|
|
|
{
|
2021-12-29 16:31:53 +00:00
|
|
|
let mut messages: Vec<T> = Vec::new();
|
2021-12-27 21:43:44 +00:00
|
|
|
loop {
|
|
|
|
let (id, msg) = rpc_reader.recv().await?;
|
|
|
|
if id == req_no {
|
|
|
|
match msg {
|
|
|
|
RecvMsg::RpcResponse(_type, body) => {
|
|
|
|
let parsed_response: Result<T, GolgiError> = f(&body);
|
|
|
|
match parsed_response {
|
|
|
|
Ok(parsed_message) => {
|
|
|
|
messages.push(parsed_message);
|
2021-12-29 16:31:53 +00:00
|
|
|
}
|
2021-12-27 21:43:44 +00:00
|
|
|
Err(err) => {
|
|
|
|
return Err(err);
|
|
|
|
}
|
|
|
|
}
|
2021-12-02 13:12:52 +00:00
|
|
|
}
|
|
|
|
RecvMsg::ErrorResponse(message) => {
|
|
|
|
return Err(GolgiError::Sbot(message));
|
|
|
|
}
|
2021-12-27 21:43:44 +00:00
|
|
|
RecvMsg::CancelStreamRespose() => break,
|
2021-12-02 13:12:52 +00:00
|
|
|
_ => {}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
2021-12-27 21:43:44 +00:00
|
|
|
Ok(messages)
|
2021-12-02 13:12:52 +00:00
|
|
|
}
|
|
|
|
|
2022-02-14 15:00:10 +00:00
|
|
|
/// Take in an RPC request number along with a handling function and call the
|
|
|
|
/// handling function on all responses which match the request number. Then,
|
|
|
|
/// prints out the result of the handling function.
|
2021-12-28 19:57:30 +00:00
|
|
|
///
|
2022-02-14 15:00:10 +00:00
|
|
|
/// This function useful for debugging and only prints the output.
|
2021-12-28 19:57:30 +00:00
|
|
|
///
|
|
|
|
/// # Arguments
|
|
|
|
///
|
|
|
|
/// * `rpc_reader` - A `RpcReader` which can return Messages in a loop
|
|
|
|
/// * `req_no` - A `RequestNo` of the response to listen for
|
2022-02-14 15:00:10 +00:00
|
|
|
/// * `f` - A function which takes in an array of `u8` and returns a
|
|
|
|
/// `Result<T, GolgiError>`. This is a function which parses the response from
|
|
|
|
/// the `RpcReader`. `T` is a generic type, so this parse function can return
|
|
|
|
/// multiple possible types (`String`, JSON, custom struct etc.)
|
2021-12-02 13:12:52 +00:00
|
|
|
pub async fn print_source_until_eof<'a, R, T, F>(
|
|
|
|
rpc_reader: &mut RpcReader<R>,
|
|
|
|
req_no: RequestNo,
|
|
|
|
f: F,
|
|
|
|
) -> Result<(), GolgiError>
|
2022-01-05 18:58:48 +00:00
|
|
|
where
|
|
|
|
R: Read + Unpin,
|
|
|
|
F: Fn(&[u8]) -> Result<T, GolgiError>,
|
|
|
|
T: Debug + serde::Deserialize<'a>,
|
2021-12-02 13:12:52 +00:00
|
|
|
{
|
|
|
|
loop {
|
|
|
|
let (id, msg) = rpc_reader.recv().await?;
|
|
|
|
if id == req_no {
|
|
|
|
match msg {
|
|
|
|
RecvMsg::RpcResponse(_type, body) => {
|
|
|
|
let display = f(&body)?;
|
2021-12-28 19:57:30 +00:00
|
|
|
println!("{:?}", display);
|
2021-12-02 13:12:52 +00:00
|
|
|
}
|
|
|
|
RecvMsg::ErrorResponse(message) => {
|
|
|
|
return Err(GolgiError::Sbot(message));
|
|
|
|
}
|
2021-12-29 16:31:53 +00:00
|
|
|
RecvMsg::CancelStreamRespose() => break,
|
2021-12-02 13:12:52 +00:00
|
|
|
_ => {}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
Ok(())
|
2022-01-05 18:58:48 +00:00
|
|
|
}
|
|
|
|
|
2022-02-14 15:00:10 +00:00
|
|
|
/// Take in an RPC request number along with a handling function (parsing
|
|
|
|
/// results of type `T`) and produce an `async_std::stream::Stream` of results
|
|
|
|
/// of type `T`, where the handling function is called on all `RpcReader`
|
|
|
|
/// responses which match the request number.
|
2022-01-05 18:58:48 +00:00
|
|
|
///
|
|
|
|
/// # Arguments
|
|
|
|
///
|
|
|
|
/// * `req_no` - A `RequestNo` of the response to listen for
|
2022-02-14 15:00:10 +00:00
|
|
|
/// * `f` - A function which takes in an array of `u8` and returns a
|
|
|
|
/// `Result<T, GolgiError>`. This is a function which parses the response from
|
|
|
|
/// the `RpcReader`. `T` is a generic type, so this parse function can return
|
|
|
|
/// multiple possible types (`String`, JSON, custom struct etc.)
|
2022-01-05 18:58:48 +00:00
|
|
|
pub async fn get_source_stream<'a, F, T>(
|
|
|
|
mut rpc_reader: RpcReader<TcpStream>,
|
|
|
|
req_no: RequestNo,
|
|
|
|
f: F,
|
|
|
|
) -> impl Stream<Item = Result<T, GolgiError>>
|
|
|
|
where
|
|
|
|
F: Fn(&[u8]) -> Result<T, GolgiError>,
|
|
|
|
T: Debug + serde::Deserialize<'a>,
|
|
|
|
{
|
|
|
|
// we use the async_stream::stream macro to allow for creating a stream which calls async functions
|
|
|
|
// see https://users.rust-lang.org/t/how-to-create-async-std-stream-which-calls-async-function-in-poll-next/69760
|
|
|
|
let source_stream = stream! {
|
|
|
|
loop {
|
|
|
|
// get the next message from the rpc_reader
|
|
|
|
let (id, msg) = rpc_reader.recv().await?;
|
|
|
|
let x : i32 = id.clone();
|
|
|
|
// check if the next message from rpc_reader matches the req_no we are looking for
|
|
|
|
// if it matches, then this rpc response is for the given request
|
|
|
|
// and if it doesn't match, then we ignore it
|
|
|
|
if x == req_no {
|
|
|
|
match msg {
|
|
|
|
RecvMsg::RpcResponse(_type, body) => {
|
|
|
|
// parse an item of type T from the message body using the provided
|
|
|
|
// function for parsing
|
|
|
|
let item = f(&body)?;
|
|
|
|
// return Ok(item) as the next value in the stream
|
|
|
|
yield Ok(item)
|
|
|
|
}
|
|
|
|
RecvMsg::ErrorResponse(message) => {
|
|
|
|
// if an error is received
|
|
|
|
// return an Err(err) as the next value in the stream
|
|
|
|
yield Err(GolgiError::Sbot(message.to_string()));
|
|
|
|
}
|
|
|
|
// if we find a CancelStreamResponse
|
|
|
|
// this is the end of the stream
|
|
|
|
RecvMsg::CancelStreamRespose() => break,
|
|
|
|
// if we find an unknown response, we just continue the loop
|
|
|
|
_ => {}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
};
|
|
|
|
// finally return the stream object
|
|
|
|
source_stream
|
|
|
|
}
|