Working on lifetimes

This commit is contained in:
notplants 2022-01-04 11:07:51 -05:00
parent c06b03ad54
commit 0548777948
4 changed files with 183 additions and 16 deletions

73
Cargo.lock generated
View File

@ -151,7 +151,17 @@ version = "0.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "22068c0c19514942eefcfd4daf8976ef1aad84e61539f95cd200c35202f80af5"
dependencies = [
"async-stream-impl",
"async-stream-impl 0.2.1",
"futures-core",
]
[[package]]
name = "async-stream"
version = "0.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "171374e7e3b2504e0e5236e3b59260560f9fe94bfe9ac39ba5e4e929c5590625"
dependencies = [
"async-stream-impl 0.3.2",
"futures-core",
]
@ -166,6 +176,17 @@ dependencies = [
"syn",
]
[[package]]
name = "async-stream-impl"
version = "0.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "648ed8c8d2ce5409ccd57453d9d1b214b342a0d69376a6feda1fd6cae3299308"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "async-task"
version = "4.0.3"
@ -478,12 +499,14 @@ name = "golgi"
version = "0.1.0"
dependencies = [
"async-std",
"async-stream 0.3.2",
"base64 0.13.0",
"futures",
"hex",
"kuska-handshake",
"kuska-sodiumoxide",
"kuska-ssb",
"rand",
"serde",
"serde_json",
]
@ -572,7 +595,7 @@ name = "kuska-ssb"
version = "0.2.0"
dependencies = [
"async-std",
"async-stream",
"async-stream 0.2.1",
"base64 0.11.0",
"dirs",
"futures",
@ -690,6 +713,12 @@ dependencies = [
"winapi 0.3.9",
]
[[package]]
name = "ppv-lite86"
version = "0.2.16"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "eb9f9e6e233e5c4a35559a617bf40a4ec447db2e84c20b55a6f83167b7e57872"
[[package]]
name = "proc-macro2"
version = "1.0.32"
@ -708,6 +737,46 @@ dependencies = [
"proc-macro2",
]
[[package]]
name = "rand"
version = "0.8.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2e7573632e6454cf6b99d7aac4ccca54be06da05aca2ef7423d22d27d4d4bcd8"
dependencies = [
"libc",
"rand_chacha",
"rand_core",
"rand_hc",
]
[[package]]
name = "rand_chacha"
version = "0.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e6c10a63a0fa32252be49d21e7709d4d4baf8d231c2dbce1eaa8141b9b127d88"
dependencies = [
"ppv-lite86",
"rand_core",
]
[[package]]
name = "rand_core"
version = "0.6.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d34f1408f55294453790c48b2f1ebbb1c5b4b7563eb1f418bcfcfdbb06ebb4e7"
dependencies = [
"getrandom",
]
[[package]]
name = "rand_hc"
version = "0.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d51e9f596de227fda2ea6c84607f5558e196eeaf43c986b724ba4fb8fdf497e7"
dependencies = [
"rand_core",
]
[[package]]
name = "redox_syscall"
version = "0.2.10"

View File

@ -15,3 +15,5 @@ kuska-sodiumoxide = "0.2.5-0"
kuska-ssb = { path = "../ssb" }
serde = { version = "1", features = ["derive"] }
serde_json = "1"
async-stream = "0.3.2"
rand = "0.8.4"

View File

@ -1,5 +1,7 @@
//! Sbot type and associated methods.
use async_std::net::TcpStream;
use futures::pin_mut;
use async_std::stream::StreamExt;
use kuska_handshake::async_std::BoxStream;
use kuska_sodiumoxide::crypto::{auth, sign::ed25519};
@ -16,6 +18,7 @@ use kuska_ssb::{
use crate::error::GolgiError;
use crate::messages::{SsbMessageKVT, SsbMessageContent, SsbMessageValue, SsbMessageContentType};
use crate::utils;
use crate::utils::get_source_stream;
// re-export types from kuska
pub use kuska_ssb::api::dto::content::SubsetQuery;
@ -228,9 +231,25 @@ impl Sbot {
pub async fn create_history_stream(
&mut self,
id: String,
) -> Result<Vec<SsbMessageValue>, GolgiError> {
) -> Result<i32, GolgiError> {
let args = CreateHistoryStreamIn::new(id);
let req_id = self.client.create_history_stream_req_send(&args).await?;
utils::get_source_until_eof(&mut self.rpc_reader, req_id, utils::ssb_message_res_parse).await
let source_stream = get_source_stream(&self.rpc_reader, req_id);
pin_mut!(source_stream); // needed for iteration
while let Some(res) = source_stream.next().await {
match res {
Ok(value) => {
println!("value: {:?}", value);
},
Err(err) => {
println!("err: {:?}", err);
}
}
}
println!("exit loop");
Ok(2)
// utils::get_source_until_eof(&mut self.rpc_reader, req_id, utils::ssb_message_res_parse).await
}
}

View File

@ -1,6 +1,12 @@
//! Utility methods for `golgi`.
use async_std::io::Read;
use std::fmt::Debug;
use async_std::task;
use std::time::Duration;
use async_std::net::TcpStream;
use async_std::stream::Stream;
use async_stream::stream;
use rand::distributions::{Distribution, Uniform};
use kuska_ssb::rpc::{RecvMsg, RequestNo, RpcReader};
use serde_json::Value;
@ -64,10 +70,10 @@ pub async fn get_async<'a, R, T, F>(
req_no: RequestNo,
f: F,
) -> Result<T, GolgiError>
where
R: Read + Unpin,
F: Fn(&[u8]) -> Result<T, GolgiError>,
T: Debug,
where
R: Read + Unpin,
F: Fn(&[u8]) -> Result<T, GolgiError>,
T: Debug,
{
loop {
let (id, msg) = rpc_reader.recv().await?;
@ -108,10 +114,10 @@ pub async fn get_source_until_eof<'a, R, T, F>(
req_no: RequestNo,
f: F,
) -> Result<Vec<T>, GolgiError>
where
R: Read + Unpin,
F: Fn(&[u8]) -> Result<T, GolgiError>,
T: Debug,
where
R: Read + Unpin,
F: Fn(&[u8]) -> Result<T, GolgiError>,
T: Debug,
{
let mut messages: Vec<T> = Vec::new();
loop {
@ -160,10 +166,10 @@ pub async fn print_source_until_eof<'a, R, T, F>(
req_no: RequestNo,
f: F,
) -> Result<(), GolgiError>
where
R: Read + Unpin,
F: Fn(&[u8]) -> Result<T, GolgiError>,
T: Debug + serde::Deserialize<'a>,
where
R: Read + Unpin,
F: Fn(&[u8]) -> Result<T, GolgiError>,
T: Debug + serde::Deserialize<'a>,
{
loop {
let (id, msg) = rpc_reader.recv().await?;
@ -183,3 +189,74 @@ where
}
Ok(())
}
pub async fn get_async_num() -> i32 {
let millis = Uniform::from(0..10).sample(&mut rand::thread_rng());
println!("get_async_num will complete in {} ms", millis);
task::sleep(Duration::from_millis(millis)).await;
let i: i32 = rand::random();
i
}
// pub fn get_source_stream() -> impl Stream<Item = i32> {
// let s = stream! {
// for i in 0..3 {
// let x = get_async_num().await;
// yield x;
// }
// };
// s
// }
// pub fn get_source_stream(rpc_reader: &RpcReader<TcpStream>, req_no: RequestNo) -> impl Stream<Item = RequestNo> {
// let s = stream! {
// loop {
// let (id, msg) = rpc_reader.recv().await?;
// yield Ok(id);
// // if id == req_no {
// // match msg {
// // RecvMsg::RpcResponse(_type, body) => {
// // let display = f(&body)?;
// // println!("{:?}", display);
// // }
// // RecvMsg::ErrorResponse(message) => {
// // return Err(GolgiError::Sbot(message));
// // }
// // RecvMsg::CancelStreamRespose() => break,
// // _ => {}
// // }
// // }
// // }
// // yield x;
// }
// };
// s
// }
pub fn<'a> get_source_stream(mut rpc_reader: &<'a> RpcReader<TcpStream>, req_no: RequestNo) -> impl Stream<Item = Result<RequestNo, GolgiError>> {
let s = stream! {
loop {
let (id, msg) = rpc_reader.recv().await?;
yield Ok(id);
// if id == req_no {
// match msg {
// RecvMsg::RpcResponse(_type, body) => {
// let display = f(&body)?;
// println!("{:?}", display);
// }
// RecvMsg::ErrorResponse(message) => {
// return Err(GolgiError::Sbot(message));
// }
// RecvMsg::CancelStreamRespose() => break,
// _ => {}
// }
// }
// }
// yield x;
}
};
s
}