Multiple RPC

This commit is contained in:
notplants 2022-01-05 10:58:07 -05:00
parent a076e4741f
commit 03da29e3f9
2 changed files with 70 additions and 54 deletions

View File

@ -4,8 +4,8 @@ use golgi::error::GolgiError;
use golgi::sbot::Sbot;
use async_std::stream::{StreamExt};
use futures::{pin_mut};
use futures::{pin_mut, TryStreamExt};
use golgi::messages::{SsbMessageContentType, SsbMessageValue};
async fn run() -> Result<(), GolgiError> {
@ -34,52 +34,52 @@ async fn run() -> Result<(), GolgiError> {
}
println!("reached end of stream");
// // create a history stream and convert it into a Vec<SsbMessageValue> using try_collect
// // (if there is any error in the results, it will be raised)
// let mut history_stream = sbot_client.create_history_stream(author.to_string()).await?;
// let results : Vec<SsbMessageValue> = history_stream.try_collect().await?;
// for x in results {
// println!("x: {:?}", x);
// }
//
// // example to create a history stream and use a map to convert stream of SsbMessageValue
// // into a stream of KeyTypeTuple (local struct for storing message_key and message_type)
// #[derive(Debug)]
// struct KeyTypeTuple {
// message_key: String,
// message_type: SsbMessageContentType,
// };
// let mut history_stream = sbot_client.create_history_stream(author.to_string()).await?;
// let type_stream = history_stream.map(|msg| {
// match msg {
// Ok(val) => {
// let message_type = val.get_message_type()?;
// let tuple = KeyTypeTuple {
// message_key: val.signature,
// message_type: message_type,
// };
// Ok(tuple)
// }
// Err(err) => {
// Err(err)
// }
// }
// });
// pin_mut!(type_stream); // needed for iteration
// println!("looping through type stream");
// while let Some(res) = type_stream.next().await {
// match res {
// Ok(value) => {
// println!("value: {:?}", value);
// },
// Err(err) => {
// println!("err: {:?}", err);
// }
// }
// }
// println!("reached end of type stream");
//
// // return Ok
// create a history stream and convert it into a Vec<SsbMessageValue> using try_collect
// (if there is any error in the results, it will be raised)
let mut history_stream = sbot_client.create_history_stream(author.to_string()).await?;
let results : Vec<SsbMessageValue> = history_stream.try_collect().await?;
for x in results {
println!("x: {:?}", x);
}
// example to create a history stream and use a map to convert stream of SsbMessageValue
// into a stream of KeyTypeTuple (local struct for storing message_key and message_type)
#[derive(Debug)]
struct KeyTypeTuple {
message_key: String,
message_type: SsbMessageContentType,
};
let mut history_stream = sbot_client.create_history_stream(author.to_string()).await?;
let type_stream = history_stream.map(|msg| {
match msg {
Ok(val) => {
let message_type = val.get_message_type()?;
let tuple = KeyTypeTuple {
message_key: val.signature,
message_type: message_type,
};
Ok(tuple)
}
Err(err) => {
Err(err)
}
}
});
pin_mut!(type_stream); // needed for iteration
println!("looping through type stream");
while let Some(res) = type_stream.next().await {
match res {
Ok(value) => {
println!("value: {:?}", value);
},
Err(err) => {
println!("err: {:?}", err);
}
}
}
println!("reached end of type stream");
// return Ok
Ok(())
}

View File

@ -24,6 +24,11 @@ use crate::utils;
pub use kuska_ssb::api::dto::content::SubsetQuery;
use kuska_ssb::rpc::RequestNo;
pub struct SbotConnection {
client: ApiCaller<TcpStream>,
rpc_reader: RpcReader<TcpStream>,
}
/// The Scuttlebutt identity, keys and configuration parameters for connecting to a local sbot
/// instance, as well as handles for calling RPC methods and receiving responses.
pub struct Sbot {
@ -35,6 +40,7 @@ pub struct Sbot {
network_id: auth::Key,
client: ApiCaller<TcpStream>,
rpc_reader: RpcReader<TcpStream>,
sbot_connections: Vec<SbotConnection>,
}
impl Sbot {
@ -80,6 +86,7 @@ impl Sbot {
let rpc_reader = RpcReader::new(box_stream_read);
let client = ApiCaller::new(RpcWriter::new(box_stream_write));
let mut sbot_connections = Vec::new();
Ok(Self {
id,
@ -89,10 +96,11 @@ impl Sbot {
network_id,
client,
rpc_reader,
sbot_connections
})
}
pub async fn get_rpc_reader(&self, ip_port: Option<String>, net_id: Option<String>) -> Result<RpcReader<TcpStream>, GolgiError> {
pub async fn get_sbot_connection(&self, ip_port: Option<String>, net_id: Option<String>) -> Result<SbotConnection, GolgiError> {
let address = if ip_port.is_none() {
"127.0.0.1:8008".to_string()
} else {
@ -126,7 +134,12 @@ impl Sbot {
BoxStream::from_handshake(socket.clone(), socket, handshake, 0x8000).split_read_write();
let rpc_reader = RpcReader::new(box_stream_read);
Ok(rpc_reader)
let client = ApiCaller::new(RpcWriter::new(box_stream_write));
let sbot_connection = SbotConnection {
rpc_reader,
client,
};
Ok(sbot_connection)
}
/// Call the `partialReplication getSubset` RPC method and return a vector
@ -270,10 +283,10 @@ impl Sbot {
&mut self,
id: String,
) -> Result<impl Stream<Item = Result<SsbMessageValue, GolgiError>>, GolgiError> {
let mut sbot_connection = self.get_sbot_connection(None, None).await.unwrap();
let args = CreateHistoryStreamIn::new(id);
let req_id = self.client.create_history_stream_req_send(&args).await?;
let mut rpc_reader = self.get_rpc_reader(None, None).await.unwrap();
let history_stream = Sbot::get_source_stream(rpc_reader, req_id, utils::ssb_message_res_parse).await;
let req_id = sbot_connection.client.create_history_stream_req_send(&args).await?;
let history_stream = Sbot::get_source_stream(sbot_connection.rpc_reader, req_id, utils::ssb_message_res_parse).await;
Ok(history_stream)
}
@ -320,7 +333,10 @@ impl Sbot {
}
// if we find a CancelStreamResponse
// this is the end of the stream
RecvMsg::CancelStreamRespose() => break,
RecvMsg::CancelStreamRespose() => {
println!("found cancel stream");
break
},
// if we find an unknown response, we just continue the loop
_ => {}
}