Working recv stream

This commit is contained in:
notplants 2022-01-04 12:01:22 -05:00
parent 0548777948
commit 58d133b5c8
2 changed files with 34 additions and 53 deletions

View File

@ -1,7 +1,9 @@
//! Sbot type and associated methods.
use std::fmt::Debug;
use async_std::net::TcpStream;
use futures::pin_mut;
use async_std::stream::StreamExt;
use async_std::stream::{Stream, StreamExt};
use async_stream::stream;
use kuska_handshake::async_std::BoxStream;
use kuska_sodiumoxide::crypto::{auth, sign::ed25519};
@ -12,16 +14,16 @@ use kuska_ssb::{
},
discovery, keystore,
keystore::OwnedIdentity,
rpc::{RpcReader, RpcWriter},
rpc::{RpcReader, RpcWriter, RecvMsg},
};
use crate::error::GolgiError;
use crate::messages::{SsbMessageKVT, SsbMessageContent, SsbMessageValue, SsbMessageContentType};
use crate::utils;
use crate::utils::get_source_stream;
// re-export types from kuska
pub use kuska_ssb::api::dto::content::SubsetQuery;
use kuska_ssb::rpc::RequestNo;
/// The Scuttlebutt identity, keys and configuration parameters for connecting to a local sbot
/// instance, as well as handles for calling RPC methods and receiving responses.
@ -234,7 +236,7 @@ impl Sbot {
) -> Result<i32, GolgiError> {
let args = CreateHistoryStreamIn::new(id);
let req_id = self.client.create_history_stream_req_send(&args).await?;
let source_stream = get_source_stream(&self.rpc_reader, req_id);
let source_stream = self.get_source_stream(req_id, utils::ssb_message_res_parse);
pin_mut!(source_stream); // needed for iteration
while let Some(res) = source_stream.next().await {
@ -252,4 +254,32 @@ impl Sbot {
Ok(2)
// utils::get_source_until_eof(&mut self.rpc_reader, req_id, utils::ssb_message_res_parse).await
}
pub fn get_source_stream<'a, F, T>(&'a mut self, req_no: RequestNo, f: F) -> impl Stream<Item = Result<RequestNo, GolgiError>> + 'a
where
F: Fn(&[u8]) -> Result<T, GolgiError> + 'a,
T: Debug + serde::Deserialize<'a>,
{
let s = stream! {
loop {
let (id, msg) = &self.rpc_reader.recv().await?;
let x : i32 = id.clone();
if x == req_no {
match msg {
RecvMsg::RpcResponse(_type, body) => {
let display = f(&body)?;
println!("display: {:?}", display);
yield Ok(x)
}
RecvMsg::ErrorResponse(message) => {
yield Err(GolgiError::Sbot(message.to_string()));
}
RecvMsg::CancelStreamRespose() => break,
_ => break
}
}
}
};
s
}
}

View File

@ -211,52 +211,3 @@ pub async fn get_async_num() -> i32 {
// s
// }
// pub fn get_source_stream(rpc_reader: &RpcReader<TcpStream>, req_no: RequestNo) -> impl Stream<Item = RequestNo> {
// let s = stream! {
// loop {
// let (id, msg) = rpc_reader.recv().await?;
// yield Ok(id);
// // if id == req_no {
// // match msg {
// // RecvMsg::RpcResponse(_type, body) => {
// // let display = f(&body)?;
// // println!("{:?}", display);
// // }
// // RecvMsg::ErrorResponse(message) => {
// // return Err(GolgiError::Sbot(message));
// // }
// // RecvMsg::CancelStreamRespose() => break,
// // _ => {}
// // }
// // }
// // }
// // yield x;
// }
// };
// s
// }
pub fn<'a> get_source_stream(mut rpc_reader: &<'a> RpcReader<TcpStream>, req_no: RequestNo) -> impl Stream<Item = Result<RequestNo, GolgiError>> {
let s = stream! {
loop {
let (id, msg) = rpc_reader.recv().await?;
yield Ok(id);
// if id == req_no {
// match msg {
// RecvMsg::RpcResponse(_type, body) => {
// let display = f(&body)?;
// println!("{:?}", display);
// }
// RecvMsg::ErrorResponse(message) => {
// return Err(GolgiError::Sbot(message));
// }
// RecvMsg::CancelStreamRespose() => break,
// _ => {}
// }
// }
// }
// yield x;
}
};
s
}