golgi/src/utils.rs

191 lines
6.2 KiB
Rust
Raw Normal View History

2021-12-02 13:12:52 +00:00
/*
use kuska_handshake::async_std::BoxStream;
use kuska_sodiumoxide::crypto::sign::ed25519;
use kuska_ssb::discovery;
use kuska_ssb::keystore;
use kuska_ssb::keystore::OwnedIdentity;
*/
use std::fmt::Debug;
use async_std::io::Read;
2021-12-27 17:56:43 +00:00
use kuska_ssb::api::dto::WhoAmIOut;
2021-12-28 15:24:35 +00:00
use serde_json::Value;
2021-12-27 21:43:44 +00:00
use kuska_ssb::feed::{Feed, Message};
2021-12-02 13:12:52 +00:00
use kuska_ssb::rpc::{RecvMsg, RequestNo, RpcReader};
use crate::error::GolgiError;
2021-12-27 17:56:43 +00:00
pub fn getsubset_res_parse(body: &[u8]) -> Result<String, GolgiError> {
// TODO: cleanup with proper error handling etc.
Ok(std::str::from_utf8(body).unwrap().to_string())
}
2021-12-28 15:24:35 +00:00
// parses a kvt
2021-12-02 13:12:52 +00:00
pub fn feed_res_parse(body: &[u8]) -> Result<Feed, GolgiError> {
Ok(Feed::from_slice(body)?)
}
2021-12-27 21:43:44 +00:00
pub fn message_res_parse(body: &[u8]) -> Result<Message, GolgiError> {
let content = std::str::from_utf8(&body).unwrap().to_string();
println!("content: {:?}", content);
let message: Message = serde_json::from_slice(body)?;
Ok(message)
}
pub fn string_res_parse(body: &[u8]) -> Result<String, GolgiError> {
// TODO: cleanup with proper error handling etc.
Ok(std::str::from_utf8(body).unwrap().to_string())
}
2021-12-28 15:24:35 +00:00
pub fn json_res_parse(body: &[u8]) -> Result<Value, GolgiError> {
let content = std::str::from_utf8(&body).unwrap().to_string();
println!("content: {:?}", content);
let message: Value = serde_json::from_slice(body)?;
Ok(message)
}
2021-12-27 21:43:44 +00:00
2021-12-02 17:48:16 +00:00
//pub fn publish_res_parse(body: &[u8]) -> Result<PublishOut, GolgiError> {
pub fn publish_res_parse(body: &[u8]) -> Result<String, GolgiError> {
//Ok(serde_json::from_slice(body)?)
// TODO: cleanup with proper error handling etc.
Ok(std::str::from_utf8(body).unwrap().to_string())
}
2021-12-02 13:12:52 +00:00
pub fn whoami_res_parse(body: &[u8]) -> Result<WhoAmIOut, GolgiError> {
Ok(serde_json::from_slice(body)?)
}
2021-12-27 21:43:44 +00:00
/// Takes in an rpc request number, and a handling function,
/// and waits for an rpc response which matches the request number,
/// and then calls the handling function on the response.
///
/// # Arguments
///
/// * `rpc_reader` - A `RpcReader` which can return Messages in a loop
/// * `req_no` - A `RequestNo` of the response to listen for
/// * `f` - A function which takes in an array of u8 and returns a Result<T, GolgiError>.
/// This is a function which parses the response from the RpcReader. T is a generic type,
/// so this parse function can return multiple possible types (String, json, custom struct etc.)
2021-12-02 13:12:52 +00:00
pub async fn get_async<'a, R, T, F>(
rpc_reader: &mut RpcReader<R>,
req_no: RequestNo,
f: F,
) -> Result<T, GolgiError>
where
R: Read + Unpin,
F: Fn(&[u8]) -> Result<T, GolgiError>,
T: Debug,
{
2021-12-27 21:43:44 +00:00
println!("req_no: {:?}", req_no);
2021-12-02 13:12:52 +00:00
loop {
let (id, msg) = rpc_reader.recv().await?;
2021-12-27 21:43:44 +00:00
println!("id: {:?}", id);
println!("msg: {:?}", msg);
2021-12-02 13:12:52 +00:00
if id == req_no {
match msg {
RecvMsg::RpcResponse(_type, body) => {
2021-12-27 21:43:44 +00:00
let m = std::str::from_utf8(&body).unwrap().to_string();
println!("msg: {:?}", m);
2021-12-02 13:12:52 +00:00
return f(&body).map_err(|err| err);
2021-12-27 21:43:44 +00:00
},
RecvMsg::ErrorResponse(message) => {
return Err(GolgiError::Sbot(message));
},
RecvMsg::CancelStreamRespose() => {
return Err(GolgiError::Sbot("No RPC response from sbot for request".to_string()));
},
_ => {
println!("empty");
}
}
}
}
}
/// Takes in an rpc request number, and a handling function,
/// and calls the handling function on all RPC responses which match the request number,
/// appending the result of each parsed message to a vector,
/// until a CancelStreamResponse is found, marking the end of the stream,
/// and then finally a result is returned.
///
/// # Arguments
///
/// * `rpc_reader` - A `RpcReader` which can return Messages in a loop
/// * `req_no` - A `RequestNo` of the response to listen for
/// * `f` - A function which takes in an array of u8 and returns a Result<T, GolgiError>.
/// This is a function which parses the response from the RpcReader. T is a generic type,
/// so this parse function can return multiple possible types (String, json, custom struct etc.)
pub async fn get_async_until_eof<'a, R, T, F>(
rpc_reader: &mut RpcReader<R>,
req_no: RequestNo,
f: F,
) -> Result<Vec<T>, GolgiError>
where
R: Read + Unpin,
F: Fn(&[u8]) -> Result<T, GolgiError>,
T: Debug,
{
let mut messages: Vec<T> = Vec::new();;
loop {
let (id, msg) = rpc_reader.recv().await?;
if id == req_no {
match msg {
RecvMsg::RpcResponse(_type, body) => {
let parsed_response: Result<T, GolgiError> = f(&body);
match parsed_response {
Ok(parsed_message) => {
println!("parsed {:?}", parsed_message);
messages.push(parsed_message);
},
Err(err) => {
return Err(err);
}
}
let display = f(&body)?;
println!("{:?}", display);
2021-12-02 13:12:52 +00:00
}
RecvMsg::ErrorResponse(message) => {
return Err(GolgiError::Sbot(message));
}
2021-12-27 21:43:44 +00:00
RecvMsg::CancelStreamRespose() => break,
2021-12-02 13:12:52 +00:00
_ => {}
}
}
}
2021-12-27 21:43:44 +00:00
Ok(messages)
2021-12-02 13:12:52 +00:00
}
pub async fn print_source_until_eof<'a, R, T, F>(
rpc_reader: &mut RpcReader<R>,
req_no: RequestNo,
f: F,
) -> Result<(), GolgiError>
where
R: Read + Unpin,
F: Fn(&[u8]) -> Result<T, GolgiError>,
T: Debug + serde::Deserialize<'a>,
{
loop {
let (id, msg) = rpc_reader.recv().await?;
if id == req_no {
match msg {
RecvMsg::RpcResponse(_type, body) => {
let display = f(&body)?;
}
RecvMsg::ErrorResponse(message) => {
return Err(GolgiError::Sbot(message));
}
2021-12-27 21:43:44 +00:00
RecvMsg::CancelStreamRespose() => {
break
},
2021-12-02 13:12:52 +00:00
_ => {}
}
}
}
Ok(())
}