golgi/src/utils.rs

283 lines
9.5 KiB
Rust

use std::fmt::Debug;
use async_std::io::Read;
use serde_json::Value;
use serde::{Serialize, Deserialize};
use kuska_ssb::api::dto::WhoAmIOut;
use kuska_ssb::feed::{Feed, Message};
use kuska_ssb::rpc::{RecvMsg, RequestNo, RpcReader};
use kuska_ssb::api::dto::content::{Post, Vote};
use kuska_ssb::api::dto::content::TypedMessage;
use ssb_legacy_msg_data::LegacyF64;
use ssb_multiformats::multihash::Multihash;
use crate::error::GolgiError;
/// Data type representing the `value` of a message object (`KVT`). More information concerning the
/// data model can be found
/// in the [`Metadata` documentation](https://spec.scuttlebutt.nz/feed/messages.html#metadata).
#[derive(Serialize, Deserialize, Debug)]
#[serde(deny_unknown_fields)]
pub struct SsbMessageValue {
pub previous: Option<Multihash>,
pub author: String,
pub sequence: u64,
pub timestamp: LegacyF64,
pub hash: String,
pub content: Value,
pub signature: String,
}
impl SsbMessageValue {
pub fn get_message_type(&self) -> String {
let msg_type: String = self.content.get("type").map(|msg_type| msg_type.to_string()).unwrap_or_else(|| "none".to_string());
msg_type
}
pub fn into_post(self) -> Result<Post, GolgiError> {
let p: Post = serde_json::from_value(self.content)?;
Ok(p)
}
pub fn into_vote(self) -> Result<Vote, GolgiError> {
let p: Vote = serde_json::from_value(self.content)?;
Ok(p)
}
// NOTE: this doesnt work, because the arms return two different types
// but maybe there is a way to do type inheritance?
// this SO post says you dont deserialize directly into enum variants
// https://stackoverflow.com/questions/59460464/how-do-i-use-serde-to-deserialize-into-a-specific-enum-variant
// pub fn into_typed_message(self) -> Result<TypedMessage, GolgiError> {
// let msg_type = self.get_message_type();
// let msg = match msg_type {
// "post" => {
// let p: Post = serde_json::from_value(self.content)?;
// p
// },
// "vote" => {
// let p: Vote = serde_json::from_value(self.content)?;
// p
// },
// _ => {
// Err(GolgiError::Sbot("No type included in message content".to_string()))
// }
// };
// Ok(msg)
// }
}
/// Data type representing the `value` of a message object (`KVT`). More information concerning the
/// data model can be found
/// in the [`Metadata` documentation](https://spec.scuttlebutt.nz/feed/messages.html#metadata).
#[derive(Serialize, Deserialize, Debug)]
#[serde(deny_unknown_fields)]
pub struct KVT {
pub key: String,
pub value: SsbMessageValue,
pub timestamp: f64,
pub rts: Option<f64>,
}
/// Function to parse an array of bytes (returned by an rpc call) into a KVT.
///
/// # Arguments
///
/// * `body` - An array of u8 to be parsed.
pub fn kvt_res_parse(body: &[u8]) -> Result<KVT, GolgiError> {
let value: Value = serde_json::from_slice(&body)?;
let kvt: KVT = serde_json::from_value(value)?;
Ok(kvt)
}
/// Function to parse an array of bytes (returned by an rpc call) into a kuska_ssb::feed::Feed.
/// This data type is a KVT (TODO: link to explain this)
///
/// # Arguments
///
/// * `body` - An array of u8 to be parsed.
pub fn feed_res_parse(body: &[u8]) -> Result<Feed, GolgiError> {
let value: Value = serde_json::from_slice(&body)?;
let feed: Feed = serde_json::from_value(value)?;
Ok(feed)
}
/// Function to parse an array of bytes (returned by an rpc call) into a String.
///
/// # Arguments
///
/// * `body` - An array of u8 to be parsed.
pub fn string_res_parse(body: &[u8]) -> Result<String, GolgiError> {
// TODO: cleanup with proper error handling etc.
Ok(std::str::from_utf8(body).unwrap().to_string())
}
/// Function to parse an array of bytes (returned by an rpc call) into a serde_json::Value.
///
/// # Arguments
///
/// * `body` - An array of u8 to be parsed.
pub fn json_res_parse(body: &[u8]) -> Result<Value, GolgiError> {
let message: Value = serde_json::from_slice(body)?;
Ok(message)
}
/// Function to parse an array of bytes (returned by an rpc call) into an SsbMessageValue
///
/// # Arguments
///
/// * `body` - An array of u8 to be parsed.
pub fn ssb_message_res_parse(body: &[u8]) -> Result<SsbMessageValue, GolgiError> {
let message: SsbMessageValue = serde_json::from_slice(body)?;
Ok(message)
}
//pub fn publish_res_parse(body: &[u8]) -> Result<PublishOut, GolgiError> {
pub fn publish_res_parse(body: &[u8]) -> Result<String, GolgiError> {
//Ok(serde_json::from_slice(body)?)
// TODO: cleanup with proper error handling etc.
Ok(std::str::from_utf8(body).unwrap().to_string())
}
pub fn whoami_res_parse(body: &[u8]) -> Result<WhoAmIOut, GolgiError> {
Ok(serde_json::from_slice(body)?)
}
/// Takes in an rpc request number, and a handling function,
/// and waits for an rpc response which matches the request number,
/// and then calls the handling function on the response.
///
/// # Arguments
///
/// * `rpc_reader` - A `RpcReader` which can return Messages in a loop
/// * `req_no` - A `RequestNo` of the response to listen for
/// * `f` - A function which takes in an array of u8 and returns a Result<T, GolgiError>.
/// This is a function which parses the response from the RpcReader. T is a generic type,
/// so this parse function can return multiple possible types (String, json, custom struct etc.)
pub async fn get_async<'a, R, T, F>(
rpc_reader: &mut RpcReader<R>,
req_no: RequestNo,
f: F,
) -> Result<T, GolgiError>
where
R: Read + Unpin,
F: Fn(&[u8]) -> Result<T, GolgiError>,
T: Debug,
{
loop {
let (id, msg) = rpc_reader.recv().await?;
if id == req_no {
match msg {
RecvMsg::RpcResponse(_type, body) => {
return f(&body).map_err(|err| err);
},
RecvMsg::ErrorResponse(message) => {
return Err(GolgiError::Sbot(message));
},
RecvMsg::CancelStreamRespose() => {
return Err(GolgiError::Sbot("sbot returned CancelStreamResponse before any content".to_string()));
},
_ => {}
}
}
}
}
/// Takes in an rpc request number, and a handling function,
/// and calls the handling function on all RPC responses which match the request number,
/// appending the result of each parsed message to a vector,
/// until a CancelStreamResponse is found, marking the end of the stream,
/// and then finally a result is returned.
///
/// # Arguments
///
/// * `rpc_reader` - A `RpcReader` which can return Messages in a loop
/// * `req_no` - A `RequestNo` of the response to listen for
/// * `f` - A function which takes in an array of u8 and returns a Result<T, GolgiError>.
/// This is a function which parses the response from the RpcReader. T is a generic type,
/// so this parse function can return multiple possible types (String, json, custom struct etc.)
pub async fn get_async_until_eof<'a, R, T, F>(
rpc_reader: &mut RpcReader<R>,
req_no: RequestNo,
f: F,
) -> Result<Vec<T>, GolgiError>
where
R: Read + Unpin,
F: Fn(&[u8]) -> Result<T, GolgiError>,
T: Debug,
{
let mut messages: Vec<T> = Vec::new();;
loop {
let (id, msg) = rpc_reader.recv().await?;
if id == req_no {
match msg {
RecvMsg::RpcResponse(_type, body) => {
let parsed_response: Result<T, GolgiError> = f(&body);
match parsed_response {
Ok(parsed_message) => {
messages.push(parsed_message);
},
Err(err) => {
return Err(err);
}
}
}
RecvMsg::ErrorResponse(message) => {
return Err(GolgiError::Sbot(message));
}
RecvMsg::CancelStreamRespose() => break,
_ => {}
}
}
}
Ok(messages)
}
/// Takes in an rpc request number, and a handling function,
/// and calls the handling function on all responses which match the request number,
/// and prints out the result of the handling function.
///
/// This is a function useful for debugging, and only prints the output.
///
/// # Arguments
///
/// * `rpc_reader` - A `RpcReader` which can return Messages in a loop
/// * `req_no` - A `RequestNo` of the response to listen for
/// * `f` - A function which takes in an array of u8 and returns a Result<T, GolgiError>.
/// This is a function which parses the response from the RpcReader. T is a generic type,
/// so this parse function can return multiple possible types (String, json, custom struct etc.)
pub async fn print_source_until_eof<'a, R, T, F>(
rpc_reader: &mut RpcReader<R>,
req_no: RequestNo,
f: F,
) -> Result<(), GolgiError>
where
R: Read + Unpin,
F: Fn(&[u8]) -> Result<T, GolgiError>,
T: Debug + serde::Deserialize<'a>,
{
loop {
let (id, msg) = rpc_reader.recv().await?;
if id == req_no {
match msg {
RecvMsg::RpcResponse(_type, body) => {
let display = f(&body)?;
println!("{:?}", display);
}
RecvMsg::ErrorResponse(message) => {
return Err(GolgiError::Sbot(message));
}
RecvMsg::CancelStreamRespose() => {
break
},
_ => {}
}
}
}
Ok(())
}