diff --git a/src/error.rs b/src/error.rs index 9d93d51..ac8fab5 100644 --- a/src/error.rs +++ b/src/error.rs @@ -1,6 +1,7 @@ //! Custom error type for `golgi`. use std::io::Error as IoError; +use std::str::Utf8Error; use base64::DecodeError; use kuska_handshake::async_std::Error as HandshakeError; @@ -37,6 +38,11 @@ pub enum GolgiError { SerdeJson(JsonError), /// Error decoding typed ssb message from content. ContentTypeDecode(String), + /// Error decoding UTF8 string from bytes + Utf8Parse { + /// The underlying parse error. + source: Utf8Error, + }, } impl std::error::Error for GolgiError { @@ -50,7 +56,8 @@ impl std::error::Error for GolgiError { GolgiError::Rpc(ref err) => Some(err), GolgiError::Sbot(_) => None, GolgiError::SerdeJson(ref err) => Some(err), - GolgiError::ContentTypeDecode(ref _err) => None, + GolgiError::ContentTypeDecode(_) => None, + GolgiError::Utf8Parse{ ref source} => Some(source), } } } @@ -75,6 +82,7 @@ impl std::fmt::Display for GolgiError { "Failed to decode typed message from ssb message content: {}", err ), + GolgiError::Utf8Parse{ source } => write!(f, "Failed to deserialize UTF8 from bytes: {}", source), } } } @@ -114,3 +122,9 @@ impl From for GolgiError { GolgiError::SerdeJson(err) } } + +impl From for GolgiError { + fn from(err: Utf8Error) -> Self { + GolgiError::Utf8Parse { source: err} + } +} diff --git a/src/messages.rs b/src/messages.rs index ee39a07..d422111 100644 --- a/src/messages.rs +++ b/src/messages.rs @@ -58,7 +58,7 @@ impl SsbMessageValue { #[derive(Serialize, Deserialize, Debug)] #[serde(deny_unknown_fields)] #[allow(missing_docs)] -pub struct SsbKVT { +pub struct SsbMessageKVT { pub key: String, pub value: SsbMessageValue, pub timestamp: f64, diff --git a/src/sbot.rs b/src/sbot.rs index 4ff8608..8610410 100644 --- a/src/sbot.rs +++ b/src/sbot.rs @@ -14,7 +14,7 @@ use kuska_ssb::{ }; use crate::error::GolgiError; -use crate::messages::{SsbKVT, SsbMessageContent, SsbMessageValue}; +use crate::messages::{SsbMessageKVT, SsbMessageContent, SsbMessageValue}; use crate::utils; /// The Scuttlebutt identity, keys and configuration parameters for connecting to a local sbot @@ -88,10 +88,10 @@ impl Sbot { /// Call the `partialReplication getSubset` RPC method and return a vector /// of messages as KVTs (key, value, timestamp). // TODO: add args for `descending` and `page` (max number of msgs in response) - pub async fn get_subset(&mut self, query: SubsetQuery) -> Result, GolgiError> { + pub async fn get_subset(&mut self, query: SubsetQuery) -> Result, GolgiError> { let req_id = self.client.getsubset_req_send(query).await?; - utils::get_async_until_eof(&mut self.rpc_reader, req_id, utils::kvt_res_parse).await + utils::get_source_until_eof(&mut self.rpc_reader, req_id, utils::kvt_res_parse).await } /// Call the `whoami` RPC method and return an `id`. @@ -101,13 +101,6 @@ impl Sbot { utils::get_async(&mut self.rpc_reader, req_id, utils::string_res_parse).await } - /// Call the `createLogStream` RPC method and return a Vec - pub async fn log(&mut self) -> Result { - let req_id = self.client.log_req_send().await?; - - utils::get_async(&mut self.rpc_reader, req_id, utils::string_res_parse).await - } - /// Call the `publish` RPC method and return a message reference. /// /// # Arguments @@ -161,6 +154,6 @@ impl Sbot { ) -> Result, GolgiError> { let args = CreateHistoryStreamIn::new(id); let req_id = self.client.create_history_stream_req_send(&args).await?; - utils::get_async_until_eof(&mut self.rpc_reader, req_id, utils::ssb_message_res_parse).await + utils::get_source_until_eof(&mut self.rpc_reader, req_id, utils::ssb_message_res_parse).await } } diff --git a/src/utils.rs b/src/utils.rs index 1ab449e..c840610 100644 --- a/src/utils.rs +++ b/src/utils.rs @@ -6,16 +6,16 @@ use kuska_ssb::rpc::{RecvMsg, RequestNo, RpcReader}; use serde_json::Value; use crate::error::GolgiError; -use crate::messages::{SsbKVT, SsbMessageValue}; +use crate::messages::{SsbMessageKVT, SsbMessageValue}; /// Function to parse an array of bytes (returned by an rpc call) into a KVT. /// /// # Arguments /// /// * `body` - An array of u8 to be parsed. -pub fn kvt_res_parse(body: &[u8]) -> Result { +pub fn kvt_res_parse(body: &[u8]) -> Result { let value: Value = serde_json::from_slice(body)?; - let kvt: SsbKVT = serde_json::from_value(value)?; + let kvt: SsbMessageKVT = serde_json::from_value(value)?; Ok(kvt) } @@ -25,8 +25,7 @@ pub fn kvt_res_parse(body: &[u8]) -> Result { /// /// * `body` - An array of u8 to be parsed. pub fn string_res_parse(body: &[u8]) -> Result { - // TODO: cleanup with proper error handling etc. - Ok(std::str::from_utf8(body).unwrap().to_string()) + Ok(std::str::from_utf8(body)?.to_string()) } /// Function to parse an array of bytes (returned by an rpc call) into a serde_json::Value. @@ -104,7 +103,7 @@ where /// * `f` - A function which takes in an array of u8 and returns a Result. /// This is a function which parses the response from the RpcReader. T is a generic type, /// so this parse function can return multiple possible types (String, json, custom struct etc.) -pub async fn get_async_until_eof<'a, R, T, F>( +pub async fn get_source_until_eof<'a, R, T, F>( rpc_reader: &mut RpcReader, req_no: RequestNo, f: F,