@ -1,26 +1,18 @@
[package] [package]
name = "golgi" name = "golgi"
version = "0.2.4" version = "0.1.0"
authors = ["glyph <>"]
edition = "2021" edition = "2021"
authors = ["Max Fowler <>", "Andrew Reid <>"]
readme = ""
description = "An asynchronous, experimental Scuttlebutt client library"
repository = ""
homepage = ""
license = "LGPL-3.0"
keywords = ["scuttlebutt", "ssb", "decentralized", "peer-for-peer", "p4p"]
exclude = ["git_hooks/", "examples/"]
[dependencies] [dependencies]
async-std = "1.10.0" async-std = "1.10.0"
async-stream = "0.3.2"
base64 = "0.13.0" base64 = "0.13.0"
futures = "0.3.21" futures = "0.3.18"
log = "0.4"
hex = "0.4.3" hex = "0.4.3"
kuska-handshake = { version = "0.2.0", features = ["async_std"] } kuska-handshake = { version = "0.2.0", features = ["async_std"] }
kuska-sodiumoxide = "0.2.5-0" kuska-sodiumoxide = "0.2.5-0"
kuska-ssb = { git = "" } # waiting for a pr merge upstream
serde = { version = "1", features = ["derive"] } kuska-ssb = { path = "../ssb" }
# try to replace with miniserde
serde = "1"
serde_json = "1" serde_json = "1"
sha2 = "0.10.2"

View File

View File

@ -1,72 +1,18 @@
# golgi # golgi
_The Golgi complex (aka. Golgi apparatus or Golgi body) packages proteins _The Golgi complex (aka. Golgi apparatus or Golgi body) packages proteins into membrane-bound vesicles inside the cell before the vesicles are sent to their destination._
into membrane-bound vesicles inside the cell before the vesicles are sent
to their destination._
----- -----
## Introduction Golgi is an experimental Scuttlebutt client which uses the [kuska-ssb]( libraries and aims to provide a high-level API for interacting with an sbot instance. Development efforts are currently oriented towards [go-sbot]( interoperability.
Golgi is an asynchronous, experimental Scuttlebutt client that aims to
facilitate Scuttlebutt application development. It provides a high-level
API for interacting with an sbot instance and uses the
[kuska-ssb]( libraries to make RPC calls.
## Features
Golgi offers the ability to invoke individual RPC methods while also
providing a number of convenience methods which may involve multiple RPC
calls and / or the processing of data received from those calls. The
`Sbot` `struct` is the primary means of interacting with the library.
Features include the ability to publish messages of various kinds; to
retrieve messages (e.g. `about` and `description` messages) and formulate
queries; to follow, unfollow, block and unblock a peer; to query the social
graph; and to generate pub invite codes.
## Example Usage ## Example Usage
Basic usage is demonstrated below. Visit the [examples directory]( in the `golgi` repository for
more comprehensive examples.
```rust ```rust
use golgi::{GolgiError, Sbot, sbot::Keystore};
pub async fn run() -> Result<(), GolgiError> { pub async fn run() -> Result<(), GolgiError> {
// Attempt to initialise a connection to an sbot instance using the let mut sbot_client = Sbot::init(None, None).await?;
// secret file at the Patchwork path and the default IP address, port
// and network key (aka. capabilities key).
let mut sbot_client = Sbot::init(Keystore::Patchwork, None, None).await?;
// Call the `whoami` RPC method to retrieve the public key for the sbot
// identity.
let id = sbot_client.whoami().await?; let id = sbot_client.whoami().await?;
// Print the public key (identity) to `stdout`.
println!("{}", id); println!("{}", id);
// Compose an SSB post message type.
let post = SsbMessageContent::Post {
text: "Biology, eh?!".to_string(),
mentions: None,
// Publish the post.
let post_msg_reference = sbot_client.publish(post).await?;
// Print the reference (sigil-link) for the published post.
println!("{}", post_msg_reference);
} }
``` ```
## Authors
- [notplants](
- [glyph](
## License

View File

@ -0,0 +1,59 @@
View File

View File

@ -1,7 +1,6 @@
//! Custom error type. //! Custom error type for `golgi`.
use std::io::Error as IoError; use std::io::Error as IoError;
use std::str::Utf8Error;
use base64::DecodeError; use base64::DecodeError;
use kuska_handshake::async_std::Error as HandshakeError; use kuska_handshake::async_std::Error as HandshakeError;
@ -34,17 +33,8 @@ pub enum GolgiError {
Rpc(RpcError), Rpc(RpcError),
/// Go-sbot error. /// Go-sbot error.
Sbot(String), Sbot(String),
/// SSB sigil-link error.
/// JSON serialization or deserialization error. /// JSON serialization or deserialization error.
SerdeJson(JsonError), SerdeJson(JsonError),
/// Error decoding typed SSB message from content.
/// Error decoding UTF8 string from bytes
Utf8Parse {
/// The underlying parse error.
source: Utf8Error,
} }
impl std::error::Error for GolgiError {
@ -57,10 +47,7 @@ impl std::error::Error for GolgiError {
@ -57,10 +47,7 @@ impl std::error::Error for GolgiError {
GolgiError::Feed(ref err) => Some(err), GolgiError::Feed(ref err) => Some(err),
GolgiError::Rpc(ref err) => Some(err), GolgiError::Rpc(ref err) => Some(err),
GolgiError::Sbot(_) => None, GolgiError::Sbot(_) => None,
GolgiError::SigilLink(_) => None,
GolgiError::SerdeJson(ref err) => Some(err), GolgiError::SerdeJson(ref err) => Some(err),
GolgiError::ContentType(_) => None,
GolgiError::Utf8Parse { ref source } => Some(source),
} }
} }
} }
@ -71,24 +58,15 @@ impl std::fmt::Display for GolgiError {
// TODO: add context (what were we trying to decode?) // TODO: add context (what were we trying to decode?)
GolgiError::DecodeBase64(_) => write!(f, "Failed to decode base64"), GolgiError::DecodeBase64(_) => write!(f, "Failed to decode base64"),
GolgiError::Io { ref context, .. } => write!(f, "IO error: {}", context), GolgiError::Io { ref context, .. } => write!(f, "IO error: {}", context),
GolgiError::Handshake(ref err) => write!(f, "Handshake failure: {}", err), GolgiError::Handshake(ref err) => write!(f, "{}", err),
GolgiError::Api(ref err) => write!(f, "SSB API failure: {}", err), GolgiError::Api(ref err) => write!(f, "SSB API failure: {}", err),
GolgiError::Feed(ref err) => write!(f, "SSB feed error: {}", err), GolgiError::Feed(ref err) => write!(f, "SSB feed error: {}", err),
// TODO: improve this variant with a context message // TODO: improve this variant with a context message
// then have the core display msg be: "SSB RPC error: {}", context // then have the core display msg be: "SSB RPC error: {}", context
GolgiError::Rpc(ref err) => write!(f, "SSB RPC failure: {}", err), GolgiError::Rpc(ref err) => write!(f, "SSB RPC failure: {}", err),
GolgiError::Sbot(ref err) => write!(f, "Sbot encountered an error: {}", err), GolgiError::Sbot(ref err) => write!(f, "Sbot returned an error response: {}", err),
GolgiError::SigilLink(ref context) => write!(f, "SSB blob ID error: {}", context),
GolgiError::SerdeJson(_) => write!(f, "Failed to serialize JSON slice"), GolgiError::SerdeJson(_) => write!(f, "Failed to serialize JSON slice"),
//GolgiError::WhoAmI(ref err) => write!(f, "{}", err), //GolgiError::WhoAmI(ref err) => write!(f, "{}", err),
GolgiError::ContentType(ref err) => write!(
"Failed to decode typed message from SSB message content: {}",
GolgiError::Utf8Parse { source } => {
write!(f, "Failed to deserialize UTF8 from bytes: {}", source)
} }
} }
} }
@ -128,9 +106,3 @@ impl From<JsonError> for GolgiError {
GolgiError::SerdeJson(err) GolgiError::SerdeJson(err)
} }
} }
impl From<Utf8Error> for GolgiError {
fn from(err: Utf8Error) -> Self {
GolgiError::Utf8Parse { source: err }

View File

@ -2,78 +2,30 @@
//! # golgi //! # golgi
//! //!
//! _The Golgi complex (aka. Golgi apparatus or Golgi body) packages proteins //! _The Golgi complex (aka. Golgi apparatus or Golgi body) packages proteins into membrane-bound vesicles inside the cell before the vesicles are sent to their destination._
//! into membrane-bound vesicles inside the cell before the vesicles are sent
//! to their destination._
//! //!
//! ----- //! -----
//! //!
//! ## Introduction //! Golgi is an experimental Scuttlebutt client which uses the [kuska-ssb]( libraries and aims to provide a high-level API for interacting with an sbot instance. Development efforts are currently oriented towards [go-sbot]( interoperability.
//! Golgi is an asynchronous, experimental Scuttlebutt client that aims to
//! facilitate Scuttlebutt application development. It provides a high-level
//! API for interacting with an sbot instance and uses the
//! [kuska-ssb]( libraries to make RPC calls.
//! Development efforts are currently oriented towards
//! [go-sbot]( interoperability.
//! ## Features
//! Golgi offers the ability to invoke individual RPC methods while also
//! providing a number of convenience methods which may involve multiple RPC
//! calls and / or the processing of data received from those calls. The
//! [`Sbot`](crate::sbot::Sbot) `struct` is the primary means of interacting
//! with the library.
//! Features include the ability to publish messages of various kinds; to
//! retrieve messages (e.g. `about` and `description` messages) and formulate
//! queries; to follow, unfollow, block and unblock a peer; to query the social
//! graph; and to generate pub invite codes.
//! Visit the [API modules](crate::api) to view the available methods.
//! //!
//! ## Example Usage //! ## Example Usage
//! //!
//! Basic usage is demonstrated below. Visit the [examples directory]( in the `golgi` repository for
//! more comprehensive examples.
//! ```rust //! ```rust
//! use golgi::{messages::SsbMessageContent, GolgiError, Sbot, sbot::Keystore}; //! use golgi::GolgiError;
//! use golgi::sbot::Sbot;
//! //!
//! pub async fn run() -> Result<(), GolgiError> { //! pub async fn run() -> Result<(), GolgiError> {
//! // Attempt to connect to an sbot instance using the default IP address, //! let mut sbot_client = Sbot::init(None, None).await?;
//! // port and network key (aka. capabilities key).
//! let mut sbot_client = Sbot::init(Keystore::Patchwork, None, None).await?;
//! //!
//! // Call the `whoami` RPC method to retrieve the public key for the sbot
//! // identity.
//! let id = sbot_client.whoami().await?; //! let id = sbot_client.whoami().await?;
//! // Print the public key (identity) to `stdout`.
//! println!("{}", id); //! println!("{}", id);
//! //!
//! // Compose an SSB post message type.
//! let post = SsbMessageContent::Post {
//! text: "Biology, eh?!".to_string(),
//! mentions: None,
//! };
//! // Publish the post.
//! let post_msg_reference = sbot_client.publish(post).await?;
//! // Print the reference (sigil-link) for the published post.
//! println!("{}", post_msg_reference);
//! Ok(()) //! Ok(())
//! } //! }
//! ``` //! ```
pub mod api;
pub mod blobs;
pub mod error; pub mod error;
pub mod messages;
pub mod sbot; pub mod sbot;
pub mod utils; mod utils;
pub use crate::{error::GolgiError, sbot::Sbot}; pub use crate::error::GolgiError;
pub use kuska_ssb;

View File

@ -1,104 +0,0 @@
//! Message types and conversion methods.
use kuska_ssb::api::dto::content::TypedMessage;
use serde::{Deserialize, Serialize};
use serde_json::Value;
use std::fmt::Debug;
use crate::error::GolgiError;
/// `SsbMessageContent` is a type alias for `TypedMessage` from the `kuska_ssb` library.
/// It is aliased in golgi to fit the naming convention of the other message
/// types: `SsbMessageKVT` and `SsbMessageValue`.
/// See the [kuska source code]( for the type definition of `TypedMessage`.
pub type SsbMessageContent = TypedMessage;
/// The `value` of an SSB message (the `V` in `KVT`).
/// More information concerning the data model can be found in the
/// [`Metadata` documentation](
#[derive(Serialize, Deserialize, Debug)]
pub struct SsbMessageValue {
pub previous: Option<String>,
pub author: String,
pub sequence: u64,
pub timestamp: f64,
pub hash: String,
pub content: Value,
pub signature: String,
/// Message content types.
#[derive(Debug, Eq, PartialEq)]
pub enum SsbMessageContentType {
impl SsbMessageValue {
/// Get the type field of the message content as an enum, if found.
/// If no `type` field is found or the `type` field is not a string,
/// it returns an `Err(GolgiError::ContentType)`.
/// If a `type` field is found but with an unknown string,
/// it returns an `Ok(SsbMessageContentType::Unrecognized)`.
pub fn get_message_type(&self) -> Result<SsbMessageContentType, GolgiError> {
let msg_type = self
.ok_or_else(|| GolgiError::ContentType("type field not found".to_string()))?;
let mtype_str: &str = msg_type.as_str().ok_or_else(|| {
GolgiError::ContentType("type field value is not a string as expected".to_string())
let enum_type = match mtype_str {
"about" => SsbMessageContentType::About,
"post" => SsbMessageContentType::Post,
"vote" => SsbMessageContentType::Vote,
"contact" => SsbMessageContentType::Contact,
_ => SsbMessageContentType::Unrecognized,
/// Helper function which returns `true` if this message is of the given type,
/// and `false` if the type does not match or is not found.
pub fn is_message_type(&self, message_type: SsbMessageContentType) -> bool {
let self_message_type = self.get_message_type();
match self_message_type {
Ok(mtype) => mtype == message_type,
Err(_err) => false,
/// Convert the content JSON value into an `SsbMessageContent` `enum`,
/// using the `type` field as a tag to select which variant of the `enum`
/// to deserialize into.
/// See the [Serde docs on internally-tagged enum representations]( for further details.
pub fn into_ssb_message_content(self) -> Result<SsbMessageContent, GolgiError> {
let m: SsbMessageContent = serde_json::from_value(self.content)?;
/// An SSB message represented as a key-value-timestamp (`KVT`).
/// More information concerning the data model can be found in the
/// [`Metadata` documentation](
#[derive(Serialize, Deserialize, Debug)]
pub struct SsbMessageKVT {
pub key: String,
pub value: SsbMessageValue,
pub timestamp: Option<f64>,
pub rts: Option<f64>,

View File

@ -1,158 +1,73 @@
//! Sbot type and connection-related methods. //! Sbot type and associated methods.
use async_std::net::TcpStream; use async_std::net::TcpStream;
use kuska_handshake::async_std::BoxStream; use kuska_handshake::async_std::BoxStream;
use kuska_sodiumoxide::crypto::{auth, sign::ed25519}; use kuska_sodiumoxide::crypto::{auth, sign::ed25519};
use kuska_ssb::{ use kuska_ssb::{
api::ApiCaller, api::{
//content::{About, Post},
content::{SubsetQuery, TypedMessage},
discovery, keystore, discovery, keystore,
keystore::OwnedIdentity, keystore::OwnedIdentity,
rpc::{RpcReader, RpcWriter}, rpc::{RpcReader, RpcWriter},
}; };
use crate::error::GolgiError; use crate::error::GolgiError;
use crate::utils;
/// Keystore selector to specify the location of the secret file. /// The Scuttlebutt identity, keys and configuration parameters for connecting to a local sbot
/// /// instance, as well as handles for calling RPC methods and receiving responses.
/// This enum is used when initiating a connection with an sbot instance.
pub enum Keystore {
/// Patchwork default keystore path: `.ssb/secret` in the user's home directory.
/// GoSbot default keystore path: `.ssb-go/secret` in the user's home directory.
/// GoSbot keystore in a custom location
/// Patchwork keystore in a custom location
/// A struct representing a connection with a running sbot.
/// A client and an rpc_reader can together be used to make requests to the sbot
/// and read the responses.
/// Note there can be multiple SbotConnection at the same time.
pub struct SbotConnection {
/// Client for writing requests to go-bot
pub client: ApiCaller<TcpStream>,
/// RpcReader object for reading responses from go-sbot
pub rpc_reader: RpcReader<TcpStream>,
/// Holds the Scuttlebutt identity, keys and configuration parameters for
/// connecting to a local sbot and implements all Golgi API methods.
pub struct Sbot { pub struct Sbot {
/// The ID (public key value) of the account associated with the local sbot instance. id: String,
pub id: String,
public_key: ed25519::PublicKey, public_key: ed25519::PublicKey,
private_key: ed25519::SecretKey, private_key: ed25519::SecretKey,
address: String, address: String,
// aka caps key (scuttleverse identifier) // aka caps key (scuttleverse identifier)
network_id: auth::Key, network_id: auth::Key,
client: ApiCaller<TcpStream>,
rpc_reader: RpcReader<TcpStream>,
} }
impl Sbot { impl Sbot {
/// Initiate a connection with an sbot instance. Define the IP address, /// Initiate a connection with an sbot instance. Define the IP address, port and network key
/// port and network key for the sbot, then retrieve the public key, /// for the sbot, then retrieve the public key, private key (secret) and identity from the
/// private key (secret) and identity from the `.ssb-go/secret` file. /// `.ssb-go/secret` file. Open a TCP stream to the sbot and perform the secret handshake. If successful, create a box stream and split it into a writer and reader. Return RPC handles to the sbot as part of the `struct` output.
pub async fn init( pub async fn init(ip_port: Option<String>, net_id: Option<String>) -> Result<Sbot, GolgiError> {
keystore: Keystore, let address = if ip_port.is_none() {
ip_port: Option<String>,
net_id: Option<String>,
) -> Result<Sbot, GolgiError> {
let mut address = if ip_port.is_none() {
"".to_string() "".to_string()
} else { } else {
ip_port.unwrap() ip_port.unwrap()
}; };
if address.starts_with(':') {
address = format!("{}", address);
let network_id = if net_id.is_none() { let network_id = if net_id.is_none() {
discovery::ssb_net_id() discovery::ssb_net_id()
} else { } else {
auth::Key::from_slice(&hex::decode(net_id.unwrap()).unwrap()).unwrap() auth::Key::from_slice(&hex::decode(net_id.unwrap()).unwrap()).unwrap()
}; };
let OwnedIdentity { pk, sk, id } = match keystore { let OwnedIdentity { pk, sk, id } = keystore::from_gosbot_local()
Keystore::Patchwork => keystore::from_patchwork_local().await.map_err(|_err| { .await
GolgiError::Sbot( .expect("couldn't read local secret");
"sbot initialization error: couldn't read local patchwork secret from default location".to_string(),
Keystore::GoSbot => keystore::from_gosbot_local().await.map_err(|_err| {
"sbot initialization error: couldn't read local go-sbot secret from default location".to_string(),
Keystore::CustomGoSbot(key_path) => {
.map_err(|_err| {
"sbot initialization error: couldn't read local go-sbot secret from: {}",
Keystore::CustomPatchwork(key_path) => {
.map_err(|_err| {
"sbot initialization error: couldn't read local patchwork secret from: {}",
Ok(Self {
public_key: pk,
private_key: sk,
/// Creates a new connection with the sbot, using the address, network_id,
/// public_key and private_key supplied when Sbot was initialized.
/// Note that a single Sbot can have multiple SbotConnection at the same time.
pub async fn get_sbot_connection(&self) -> Result<SbotConnection, GolgiError> {
let address = self.address.clone();
let network_id = self.network_id.clone();
let public_key = self.public_key;
let private_key = self.private_key.clone();
Sbot::_get_sbot_connection_helper(address, network_id, public_key, private_key).await
/// Private helper function which creates a new connection with sbot,
/// but with all variables passed as arguments.
/// Open a TCP stream to the sbot and perform the secret handshake. If
/// successful, create a box stream and split it into a writer and reader.
/// Return RPC handles to the sbot as part of the `struct` output.
async fn _get_sbot_connection_helper(
address: String,
network_id: auth::Key,
public_key: ed25519::PublicKey,
private_key: ed25519::SecretKey,
) -> Result<SbotConnection, GolgiError> {
let socket = TcpStream::connect(&address) let socket = TcpStream::connect(&address)
.await .await
.map_err(|source| GolgiError::Io { .map_err(|source| GolgiError::Io {
source, source,
context: "failed to initiate tcp stream connection".to_string(), context: "socket error; failed to initiate tcp stream connection".to_string(),
})?; })?;
let handshake = kuska_handshake::async_std::handshake_client( let handshake = kuska_handshake::async_std::handshake_client(
&mut &socket, &mut &socket,
network_id.clone(), network_id.clone(),
public_key, pk,
private_key.clone(), sk.clone(),
public_key, pk,
) )
.await .await
.map_err(GolgiError::Handshake)?; .map_err(GolgiError::Handshake)?;
@ -162,7 +77,94 @@ impl Sbot {
let rpc_reader = RpcReader::new(box_stream_read); let rpc_reader = RpcReader::new(box_stream_read);
let client = ApiCaller::new(RpcWriter::new(box_stream_write)); let client = ApiCaller::new(RpcWriter::new(box_stream_write));
let sbot_connection = SbotConnection { rpc_reader, client };
Ok(sbot_connection) Ok(Self {
public_key: pk,
private_key: sk,
/// Call the `partialReplication getSubset` RPC method and return a vector
/// of messages as KVTs (key, value, timestamp).
// TODO: add args for `descending` and `page` (max number of msgs in response)
pub async fn getsubset(&mut self, query: SubsetQuery) -> Result<String, GolgiError> {
let req_id = self.client.getsubset_req_send(query).await?;
utils::get_async(&mut self.rpc_reader, req_id, utils::getsubset_res_parse).await
/// Call the `whoami` RPC method and return an `id`.
pub async fn whoami(&mut self) -> Result<String, GolgiError> {
let req_id = self.client.whoami_req_send().await?;
utils::get_async(&mut self.rpc_reader, req_id, utils::whoami_res_parse)
/// Call the `publish` RPC method and return a message reference.
/// # Arguments
/// * `msg` - A `TypedMessage` `enum` whose variants include `Pub`, `Post`, `Contact`, `About`,
/// `Channel` and `Vote`. See the `kuska_ssb` documentation for further details such as field
/// names and accepted values for each variant.
pub async fn publish(&mut self, msg: TypedMessage) -> Result<String, GolgiError> {
let req_id = self.client.publish_req_send(msg).await?;
utils::get_async(&mut self.rpc_reader, req_id, utils::publish_res_parse).await
/// Wrapper for publish which constructs and publishes a post message appropriately from a string.
/// # Arguments
/// * `text` - A reference to a string slice which represents the text to be published in the post
pub async fn publish_post(&mut self, text: &str) -> Result<String, GolgiError> {
let msg = TypedMessage::Post {
text: text.to_string(),
mentions: None,
/// Wrapper for publish which constructs and publishes an about description message appropriately from a string.
/// # Arguments
/// * `description` - A reference to a string slice which represents the text to be published as an about description.
pub async fn publish_description(&mut self, description: &str) -> Result<String, GolgiError> {
let msg = TypedMessage::About {
name: None,
title: None,
branch: None,
image: None,
description: Some(description.to_string()),
location: None,
start_datetime: None,
pub async fn publish_post(&mut self, post: Post) -> Result<String, GolgiError> {
let req_id = self.client.publish_req_send(post).await?;
utils::get_async(&mut self.rpc_reader, req_id, utils::publish_res_parse).await
/// Call the `createHistoryStream` RPC method and print the output.
async fn create_history_stream(&mut self, id: String) -> Result<(), GolgiError> {
let args = CreateHistoryStreamIn::new(id);
let req_id = self.client.create_history_stream_req_send(&args).await?;
// TODO: we should return a vector of messages instead of printing them
utils::print_source_until_eof(&mut self.rpc_reader, req_id, utils::feed_res_parse).await
} }
} }

View File

@ -1,66 +1,41 @@
//! Utility methods. /*
/*
use kuska_sodiumoxide::crypto::sign::ed25519;
use kuska_ssb::discovery;
use kuska_ssb::keystore;
use kuska_ssb::keystore::OwnedIdentity;
use std::fmt::Debug; use std::fmt::Debug;
use async_std::{io::Read, net::TcpStream, stream::Stream}; use async_std::io::Read;
use async_stream::stream;
use kuska_ssb::api::dto::WhoAmIOut;
use kuska_ssb::feed::Feed;
use kuska_ssb::rpc::{RecvMsg, RequestNo, RpcReader}; use kuska_ssb::rpc::{RecvMsg, RequestNo, RpcReader};
use serde_json::Value;
use crate::error::GolgiError; use crate::error::GolgiError;
use crate::messages::{SsbMessageKVT, SsbMessageValue};
/// Parse an array of bytes (returned by an rpc call) into a `KVT`. pub fn getsubset_res_parse(body: &[u8]) -> Result<String, GolgiError> {
/// // TODO: cleanup with proper error handling etc.
/// # Arguments Ok(std::str::from_utf8(body).unwrap().to_string())
/// * `body` - An array of u8 to be parsed.
pub fn kvt_res_parse(body: &[u8]) -> Result<SsbMessageKVT, GolgiError> {
let value: Value = serde_json::from_slice(body)?;
let kvt: SsbMessageKVT = serde_json::from_value(value)?;
} }
/// Parse an array of bytes (returned by an rpc call) into a `String`. pub fn feed_res_parse(body: &[u8]) -> Result<Feed, GolgiError> {
/// Ok(Feed::from_slice(body)?)
/// # Arguments
/// * `body` - An array of u8 to be parsed.
pub fn string_res_parse(body: &[u8]) -> Result<String, GolgiError> {
} }
/// Parse an array of bytes (returned by an rpc call) into a `serde_json::Value`. //pub fn publish_res_parse(body: &[u8]) -> Result<PublishOut, GolgiError> {
/// pub fn publish_res_parse(body: &[u8]) -> Result<String, GolgiError> {
/// # Arguments //Ok(serde_json::from_slice(body)?)
/// // TODO: cleanup with proper error handling etc.
/// * `body` - An array of u8 to be parsed. Ok(std::str::from_utf8(body).unwrap().to_string())
pub fn json_res_parse(body: &[u8]) -> Result<Value, GolgiError> {
let message: Value = serde_json::from_slice(body)?;
} }
/// Parse an array of bytes (returned by an rpc call) into an `SsbMessageValue`. pub fn whoami_res_parse(body: &[u8]) -> Result<WhoAmIOut, GolgiError> {
/// Ok(serde_json::from_slice(body)?)
/// # Arguments
/// * `body` - An array of u8 to be parsed.
pub fn ssb_message_res_parse(body: &[u8]) -> Result<SsbMessageValue, GolgiError> {
let message: SsbMessageValue = serde_json::from_slice(body)?;
} }
/// Take in an RPC request number along with a handling function and wait for
/// an RPC response which matches the request number. Then, call the handling
/// function on the response.
/// # Arguments
/// * `rpc_reader` - A `RpcReader` which can return Messages in a loop
/// * `req_no` - A `RequestNo` of the response to listen for
/// * `f` - A function which takes in an array of `u8` and returns a
/// `Result<T, GolgiError>`. This is a function which parses the response from
/// the `RpcReader`. `T` is a generic type, so this parse function can return
/// multiple possible types (`String`, JSON, custom struct etc.)
pub async fn get_async<'a, R, T, F>( pub async fn get_async<'a, R, T, F>(
rpc_reader: &mut RpcReader<R>, rpc_reader: &mut RpcReader<R>,
req_no: RequestNo, req_no: RequestNo,
@ -76,86 +51,17 @@ where
if id == req_no { if id == req_no {
match msg { match msg {
RecvMsg::RpcResponse(_type, body) => { RecvMsg::RpcResponse(_type, body) => {
return f(&body); return f(&body).map_err(|err| err);
} }
RecvMsg::ErrorResponse(message) => { RecvMsg::ErrorResponse(message) => {
return Err(GolgiError::Sbot(message)); return Err(GolgiError::Sbot(message));
} }
RecvMsg::CancelStreamRespose() => {
return Err(GolgiError::Sbot(
"sbot returned CancelStreamResponse before any content".to_string(),
_ => {} _ => {}
} }
} }
} }
} }
/// Take in an RPC request number along with a handling function and call
/// the handling function on all RPC responses which match the request number,
/// appending the result of each parsed message to a vector until a
/// `CancelStreamResponse` is found (marking the end of the stream).
/// # Arguments
/// * `rpc_reader` - A `RpcReader` which can return Messages in a loop
/// * `req_no` - A `RequestNo` of the response to listen for
/// * `f` - A function which takes in an array of `u8` and returns a
/// `Result<T, GolgiError>`. This is a function which parses the response from
/// the `RpcReader`. `T` is a generic type, so this parse function can return
/// multiple possible types (`String`, JSON, custom struct etc.)
pub async fn get_source_until_eof<'a, R, T, F>(
rpc_reader: &mut RpcReader<R>,
req_no: RequestNo,
f: F,
) -> Result<Vec<T>, GolgiError>
R: Read + Unpin,
F: Fn(&[u8]) -> Result<T, GolgiError>,
T: Debug,
let mut messages: Vec<T> = Vec::new();
loop {
let (id, msg) = rpc_reader.recv().await?;
if id == req_no {
match msg {
RecvMsg::RpcResponse(_type, body) => {
let parsed_response: Result<T, GolgiError> = f(&body);
match parsed_response {
Ok(parsed_message) => {
Err(err) => {
return Err(err);
RecvMsg::ErrorResponse(message) => {
return Err(GolgiError::Sbot(message));
RecvMsg::CancelStreamRespose() => break,
_ => {}
/// Take in an RPC request number along with a handling function and call the
/// handling function on all responses which match the request number. Then,
/// prints out the result of the handling function.
/// This function useful for debugging and only prints the output.
/// # Arguments
/// * `rpc_reader` - A `RpcReader` which can return Messages in a loop
/// * `req_no` - A `RequestNo` of the response to listen for
/// * `f` - A function which takes in an array of `u8` and returns a
/// `Result<T, GolgiError>`. This is a function which parses the response from
/// the `RpcReader`. `T` is a generic type, so this parse function can return
/// multiple possible types (`String`, JSON, custom struct etc.)
pub async fn print_source_until_eof<'a, R, T, F>( pub async fn print_source_until_eof<'a, R, T, F>(
rpc_reader: &mut RpcReader<R>, rpc_reader: &mut RpcReader<R>,
req_no: RequestNo, req_no: RequestNo,
@ -184,61 +90,3 @@ where
} }
Ok(()) Ok(())
} }
/// Take in an RPC request number along with a handling function (parsing
/// results of type `T`) and produce an `async_std::stream::Stream` of results
/// of type `T`, where the handling function is called on all `RpcReader`
/// responses which match the request number.
/// # Arguments
/// * `req_no` - A `RequestNo` of the response to listen for
/// * `f` - A function which takes in an array of `u8` and returns a
/// `Result<T, GolgiError>`. This is a function which parses the response from
/// the `RpcReader`. `T` is a generic type, so this parse function can return
/// multiple possible types (`String`, JSON, custom struct etc.)
pub async fn get_source_stream<'a, F, T>(
mut rpc_reader: RpcReader<TcpStream>,
req_no: RequestNo,
f: F,
) -> impl Stream<Item = Result<T, GolgiError>>
F: Fn(&[u8]) -> Result<T, GolgiError>,
T: Debug + serde::Deserialize<'a>,
// we use the async_stream::stream macro to allow for creating a stream which calls async functions
// see
let source_stream = stream! {
loop {
// get the next message from the rpc_reader
let (id, msg) = rpc_reader.recv().await?;
let x : i32 = id.clone();
// check if the next message from rpc_reader matches the req_no we are looking for
// if it matches, then this rpc response is for the given request
// and if it doesn't match, then we ignore it
if x == req_no {
match msg {
RecvMsg::RpcResponse(_type, body) => {
// parse an item of type T from the message body using the provided
// function for parsing
let item = f(&body)?;
// return Ok(item) as the next value in the stream
yield Ok(item)
RecvMsg::ErrorResponse(message) => {
// if an error is received
// return an Err(err) as the next value in the stream
yield Err(GolgiError::Sbot(message.to_string()));
// if we find a CancelStreamResponse
// this is the end of the stream
RecvMsg::CancelStreamRespose() => break,
// if we find an unknown response, we just continue the loop
_ => {}
// finally return the stream object