use std::fmt::Debug; use async_std::io::Read; use serde_json::Value; use serde::{Serialize, Deserialize}; use kuska_ssb::api::dto::WhoAmIOut; use kuska_ssb::feed::{Feed, Message}; use kuska_ssb::rpc::{RecvMsg, RequestNo, RpcReader}; use kuska_ssb::api::dto::content::TypedMessage; use ssb_legacy_msg_data::LegacyF64; use ssb_multiformats::multihash::Multihash; use crate::error::GolgiError; /// Data type representing the `value` of a message object (`KVT`). More information concerning the /// data model can be found /// in the [`Metadata` documentation](https://spec.scuttlebutt.nz/feed/messages.html#metadata). #[derive(Serialize, Deserialize, Debug)] #[serde(deny_unknown_fields)] pub struct SsbMessageValue { pub previous: Option, pub author: String, pub sequence: u64, pub timestamp: LegacyF64, pub hash: String, pub content: Value, pub signature: String, } impl SsbMessageValue { pub fn get_message_type(&self) -> String { let msg_type: String = self.content.get("type").map(|msg_type| msg_type.to_string()).unwrap_or_else(|| "none".to_string()); msg_type } pub fn into_typed_message(self) -> Result { let t: TypedMessage = serde_json::from_value(self.content)?; Ok(t) } pub fn into_typed_message_value(self) -> Result { let typed_message: TypedMessage = get_typed_message_from_value(self.content.clone())?; let typed_message_value = TypedSsbMessageValue { previous: self.previous, author: self.author, sequence: self.sequence, timestamp: self.timestamp, hash: self.hash, content: self.content, typed_message, signature: self.signature, }; Ok(typed_message_value) } } /// Function to parse a TypedMessage from an ssb message content field. /// TypedMessage has a tag field, named "type", which instructs serde to choose which variant /// to attempt to deserialize by looking at the value of the type field. /// /// See documentation here: https://serde.rs/enum-representations.html#internally-tagged /// /// # Arguments /// /// * `value` - A serde value to be parsed into a TypedMessage. pub fn get_typed_message_from_value(value: Value) -> Result { let message_type = value.get("type").ok_or(GolgiError::ContentTypeDecode("no type field in content".to_string()))?; let message_type_str = message_type.as_str().ok_or(GolgiError::ContentTypeDecode("invalid type field in content".to_string()))?; let to_return: TypedMessage = serde_json::from_value(value)?; Ok(to_return) } /// Data type representing the `value` of a message object (`KVT`), /// with an additional field typed_message which contains a TypedMessage object /// made by deserializing the content field. #[derive(Serialize, Deserialize, Debug)] #[serde(deny_unknown_fields)] pub struct TypedSsbMessageValue { pub previous: Option, pub author: String, pub sequence: u64, pub timestamp: LegacyF64, pub hash: String, pub content: Value, pub typed_message: TypedMessage, pub signature: String, } /// Data type representing the `value` of a message object (`KVT`). More information concerning the /// data model can be found /// in the [`Metadata` documentation](https://spec.scuttlebutt.nz/feed/messages.html#metadata). #[derive(Serialize, Deserialize, Debug)] #[serde(deny_unknown_fields)] pub struct KVT { pub key: String, pub value: SsbMessageValue, pub timestamp: f64, pub rts: Option, } /// Function to parse an array of bytes (returned by an rpc call) into a KVT. /// /// # Arguments /// /// * `body` - An array of u8 to be parsed. pub fn kvt_res_parse(body: &[u8]) -> Result { let value: Value = serde_json::from_slice(&body)?; let kvt: KVT = serde_json::from_value(value)?; Ok(kvt) } /// Function to parse an array of bytes (returned by an rpc call) into a kuska_ssb::feed::Feed. /// This data type is a KVT (TODO: link to explain this) /// /// # Arguments /// /// * `body` - An array of u8 to be parsed. pub fn feed_res_parse(body: &[u8]) -> Result { let value: Value = serde_json::from_slice(&body)?; let feed: Feed = serde_json::from_value(value)?; Ok(feed) } /// Function to parse an array of bytes (returned by an rpc call) into a String. /// /// # Arguments /// /// * `body` - An array of u8 to be parsed. pub fn string_res_parse(body: &[u8]) -> Result { // TODO: cleanup with proper error handling etc. Ok(std::str::from_utf8(body).unwrap().to_string()) } /// Function to parse an array of bytes (returned by an rpc call) into a serde_json::Value. /// /// # Arguments /// /// * `body` - An array of u8 to be parsed. pub fn json_res_parse(body: &[u8]) -> Result { let message: Value = serde_json::from_slice(body)?; Ok(message) } /// Function to parse an array of bytes (returned by an rpc call) into an SsbMessageValue /// /// # Arguments /// /// * `body` - An array of u8 to be parsed. pub fn ssb_message_res_parse(body: &[u8]) -> Result { let message: SsbMessageValue = serde_json::from_slice(body)?; Ok(message) } //pub fn publish_res_parse(body: &[u8]) -> Result { pub fn publish_res_parse(body: &[u8]) -> Result { //Ok(serde_json::from_slice(body)?) // TODO: cleanup with proper error handling etc. Ok(std::str::from_utf8(body).unwrap().to_string()) } pub fn whoami_res_parse(body: &[u8]) -> Result { Ok(serde_json::from_slice(body)?) } /// Takes in an rpc request number, and a handling function, /// and waits for an rpc response which matches the request number, /// and then calls the handling function on the response. /// /// # Arguments /// /// * `rpc_reader` - A `RpcReader` which can return Messages in a loop /// * `req_no` - A `RequestNo` of the response to listen for /// * `f` - A function which takes in an array of u8 and returns a Result. /// This is a function which parses the response from the RpcReader. T is a generic type, /// so this parse function can return multiple possible types (String, json, custom struct etc.) pub async fn get_async<'a, R, T, F>( rpc_reader: &mut RpcReader, req_no: RequestNo, f: F, ) -> Result where R: Read + Unpin, F: Fn(&[u8]) -> Result, T: Debug, { loop { let (id, msg) = rpc_reader.recv().await?; if id == req_no { match msg { RecvMsg::RpcResponse(_type, body) => { return f(&body).map_err(|err| err); }, RecvMsg::ErrorResponse(message) => { return Err(GolgiError::Sbot(message)); }, RecvMsg::CancelStreamRespose() => { return Err(GolgiError::Sbot("sbot returned CancelStreamResponse before any content".to_string())); }, _ => {} } } } } /// Takes in an rpc request number, and a handling function, /// and calls the handling function on all RPC responses which match the request number, /// appending the result of each parsed message to a vector, /// until a CancelStreamResponse is found, marking the end of the stream, /// and then finally a result is returned. /// /// # Arguments /// /// * `rpc_reader` - A `RpcReader` which can return Messages in a loop /// * `req_no` - A `RequestNo` of the response to listen for /// * `f` - A function which takes in an array of u8 and returns a Result. /// This is a function which parses the response from the RpcReader. T is a generic type, /// so this parse function can return multiple possible types (String, json, custom struct etc.) pub async fn get_async_until_eof<'a, R, T, F>( rpc_reader: &mut RpcReader, req_no: RequestNo, f: F, ) -> Result, GolgiError> where R: Read + Unpin, F: Fn(&[u8]) -> Result, T: Debug, { let mut messages: Vec = Vec::new();; loop { let (id, msg) = rpc_reader.recv().await?; if id == req_no { match msg { RecvMsg::RpcResponse(_type, body) => { let parsed_response: Result = f(&body); match parsed_response { Ok(parsed_message) => { messages.push(parsed_message); }, Err(err) => { return Err(err); } } } RecvMsg::ErrorResponse(message) => { return Err(GolgiError::Sbot(message)); } RecvMsg::CancelStreamRespose() => break, _ => {} } } } Ok(messages) } /// Takes in an rpc request number, and a handling function, /// and calls the handling function on all responses which match the request number, /// and prints out the result of the handling function. /// /// This is a function useful for debugging, and only prints the output. /// /// # Arguments /// /// * `rpc_reader` - A `RpcReader` which can return Messages in a loop /// * `req_no` - A `RequestNo` of the response to listen for /// * `f` - A function which takes in an array of u8 and returns a Result. /// This is a function which parses the response from the RpcReader. T is a generic type, /// so this parse function can return multiple possible types (String, json, custom struct etc.) pub async fn print_source_until_eof<'a, R, T, F>( rpc_reader: &mut RpcReader, req_no: RequestNo, f: F, ) -> Result<(), GolgiError> where R: Read + Unpin, F: Fn(&[u8]) -> Result, T: Debug + serde::Deserialize<'a>, { loop { let (id, msg) = rpc_reader.recv().await?; if id == req_no { match msg { RecvMsg::RpcResponse(_type, body) => { let display = f(&body)?; println!("{:?}", display); } RecvMsg::ErrorResponse(message) => { return Err(GolgiError::Sbot(message)); } RecvMsg::CancelStreamRespose() => { break }, _ => {} } } } Ok(()) }