Working on getsubset

This commit is contained in:
notplants 2021-12-27 16:43:44 -05:00
parent feae382ead
commit 807bb8700c
3 changed files with 149 additions and 35 deletions

View File

@ -11,42 +11,56 @@ async fn run() -> Result<(), GolgiError> {
let id = sbot_client.whoami().await?;
println!("{}", id);
/*
let name = TypedMessage::About {
about: id,
name: Some("golgi".to_string()),
title: None,
branch: None,
image: None,
description: None,
location: None,
start_datetime: None,
};
// /*
// let name = TypedMessage::About {
// about: id,
// name: Some("golgi".to_string()),
// title: None,
// branch: None,
// image: None,
// description: None,
// location: None,
// start_datetime: None,
// };
//
// let name_msg_ref = sbot_client.publish(name).await?;
// println!("{}", name_msg_ref);
let name_msg_ref = sbot_client.publish(name).await?;
println!("{}", name_msg_ref);
// let post = TypedMessage::Post {
// text: "golgi go womp womp".to_string(),
// mentions: None,
// };
//
// let post_msg_ref = sbot_client.publish(post).await?;
// println!("{}", post_msg_ref);
let post = TypedMessage::Post {
text: "golgi go womp womp".to_string(),
mentions: None,
};
let post_msg_ref = sbot_client.publish(post).await?;
println!("{}", post_msg_ref);
let post_msg_ref = sbot_client
.publish_description("this is a description")
.await?;
println!("description: {}", post_msg_ref);
*/
//
// let post_msg_ref = sbot_client
// .publish_description("this is a description")
// .await?;
// println!("description: {}", post_msg_ref);
// */
// let query = SubsetQuery::Type {
// op: "type".to_string(),
// string: "post".to_string(),
// };
let query = SubsetQuery::Type {
op: "type".to_string(),
string: "vote".to_string(),
op: "author".to_string(),
string: "@L/z54cbc8V1kL1/MiBhpEKuN3QJkSoZYNaukny3ghIs=.ed25519".to_string(),
};
let query_response = sbot_client.getsubset(query).await?;
println!("{}", query_response);
// println!("Calling log");
// let log_response = sbot_client.log().await?;
// println!("log: {}", log_response);
// println!("Calling create_history");
// let hist_response = sbot_client.create_history_stream("@L/z54cbc8V1kL1/MiBhpEKuN3QJkSoZYNaukny3ghIs=.ed25519".to_string()).await?;
// println!("hist: {:?}", hist_response);
Ok(())
}

View File

@ -16,6 +16,7 @@ use kuska_ssb::{
discovery, keystore,
keystore::OwnedIdentity,
rpc::{RpcReader, RpcWriter},
feed::{Feed, Message}
};
use crate::error::GolgiError;
@ -95,6 +96,7 @@ impl Sbot {
pub async fn getsubset(&mut self, query: SubsetQuery) -> Result<String, GolgiError> {
let req_id = self.client.getsubset_req_send(query).await?;
println!("getting subbset!");
utils::get_async(&mut self.rpc_reader, req_id, utils::getsubset_res_parse).await
}
@ -107,6 +109,14 @@ impl Sbot {
.map(|whoami| whoami.id)
}
/// Call the `createLogStream` RPC method and return a Vec<Message>
pub async fn log(&mut self) -> Result<String, GolgiError> {
let req_id = self.client.log_req_send().await?;
utils::get_async(&mut self.rpc_reader, req_id, utils::getsubset_res_parse)
.await
}
/// Call the `publish` RPC method and return a message reference.
///
/// # Arguments
@ -161,10 +171,11 @@ impl Sbot {
*/
/// Call the `createHistoryStream` RPC method and print the output.
async fn create_history_stream(&mut self, id: String) -> Result<(), GolgiError> {
pub async fn create_history_stream(&mut self, id: String) -> Result<Vec<String>, GolgiError> {
let args = CreateHistoryStreamIn::new(id);
let req_id = self.client.create_history_stream_req_send(&args).await?;
// TODO: we should return a vector of messages instead of printing them
utils::print_source_until_eof(&mut self.rpc_reader, req_id, utils::feed_res_parse).await
// TODO: message_res_parse is currently throwing "Failed to serializ JSON slice"
// utils::get_async_until_eof(&mut self.rpc_reader, req_id, utils::message_res_parse).await
utils::get_async_until_eof(&mut self.rpc_reader, req_id, utils::string_res_parse).await
}
}

View File

@ -11,7 +11,7 @@ use std::fmt::Debug;
use async_std::io::Read;
use kuska_ssb::api::dto::WhoAmIOut;
use kuska_ssb::feed::Feed;
use kuska_ssb::feed::{Feed, Message};
use kuska_ssb::rpc::{RecvMsg, RequestNo, RpcReader};
use crate::error::GolgiError;
@ -25,6 +25,19 @@ pub fn feed_res_parse(body: &[u8]) -> Result<Feed, GolgiError> {
Ok(Feed::from_slice(body)?)
}
pub fn message_res_parse(body: &[u8]) -> Result<Message, GolgiError> {
let content = std::str::from_utf8(&body).unwrap().to_string();
println!("content: {:?}", content);
let message: Message = serde_json::from_slice(body)?;
Ok(message)
}
pub fn string_res_parse(body: &[u8]) -> Result<String, GolgiError> {
// TODO: cleanup with proper error handling etc.
Ok(std::str::from_utf8(body).unwrap().to_string())
}
//pub fn publish_res_parse(body: &[u8]) -> Result<PublishOut, GolgiError> {
pub fn publish_res_parse(body: &[u8]) -> Result<String, GolgiError> {
//Ok(serde_json::from_slice(body)?)
@ -36,6 +49,18 @@ pub fn whoami_res_parse(body: &[u8]) -> Result<WhoAmIOut, GolgiError> {
Ok(serde_json::from_slice(body)?)
}
/// Takes in an rpc request number, and a handling function,
/// and waits for an rpc response which matches the request number,
/// and then calls the handling function on the response.
///
/// # Arguments
///
/// * `rpc_reader` - A `RpcReader` which can return Messages in a loop
/// * `req_no` - A `RequestNo` of the response to listen for
/// * `f` - A function which takes in an array of u8 and returns a Result<T, GolgiError>.
/// This is a function which parses the response from the RpcReader. T is a generic type,
/// so this parse function can return multiple possible types (String, json, custom struct etc.)
pub async fn get_async<'a, R, T, F>(
rpc_reader: &mut RpcReader<R>,
req_no: RequestNo,
@ -46,20 +71,83 @@ where
F: Fn(&[u8]) -> Result<T, GolgiError>,
T: Debug,
{
println!("req_no: {:?}", req_no);
loop {
let (id, msg) = rpc_reader.recv().await?;
println!("id: {:?}", id);
println!("msg: {:?}", msg);
if id == req_no {
match msg {
RecvMsg::RpcResponse(_type, body) => {
let m = std::str::from_utf8(&body).unwrap().to_string();
println!("msg: {:?}", m);
return f(&body).map_err(|err| err);
},
RecvMsg::ErrorResponse(message) => {
return Err(GolgiError::Sbot(message));
},
RecvMsg::CancelStreamRespose() => {
return Err(GolgiError::Sbot("No RPC response from sbot for request".to_string()));
},
_ => {
println!("empty");
}
}
}
}
}
/// Takes in an rpc request number, and a handling function,
/// and calls the handling function on all RPC responses which match the request number,
/// appending the result of each parsed message to a vector,
/// until a CancelStreamResponse is found, marking the end of the stream,
/// and then finally a result is returned.
///
/// # Arguments
///
/// * `rpc_reader` - A `RpcReader` which can return Messages in a loop
/// * `req_no` - A `RequestNo` of the response to listen for
/// * `f` - A function which takes in an array of u8 and returns a Result<T, GolgiError>.
/// This is a function which parses the response from the RpcReader. T is a generic type,
/// so this parse function can return multiple possible types (String, json, custom struct etc.)
pub async fn get_async_until_eof<'a, R, T, F>(
rpc_reader: &mut RpcReader<R>,
req_no: RequestNo,
f: F,
) -> Result<Vec<T>, GolgiError>
where
R: Read + Unpin,
F: Fn(&[u8]) -> Result<T, GolgiError>,
T: Debug,
{
let mut messages: Vec<T> = Vec::new();;
loop {
let (id, msg) = rpc_reader.recv().await?;
if id == req_no {
match msg {
RecvMsg::RpcResponse(_type, body) => {
return f(&body).map_err(|err| err);
let parsed_response: Result<T, GolgiError> = f(&body);
match parsed_response {
Ok(parsed_message) => {
println!("parsed {:?}", parsed_message);
messages.push(parsed_message);
},
Err(err) => {
return Err(err);
}
}
let display = f(&body)?;
println!("{:?}", display);
}
RecvMsg::ErrorResponse(message) => {
return Err(GolgiError::Sbot(message));
}
RecvMsg::CancelStreamRespose() => break,
_ => {}
}
}
}
Ok(messages)
}
pub async fn print_source_until_eof<'a, R, T, F>(
@ -78,12 +166,13 @@ where
match msg {
RecvMsg::RpcResponse(_type, body) => {
let display = f(&body)?;
println!("{:?}", display);
}
RecvMsg::ErrorResponse(message) => {
return Err(GolgiError::Sbot(message));
}
RecvMsg::CancelStreamRespose() => break,
RecvMsg::CancelStreamRespose() => {
break
},
_ => {}
}
}