solar sync
This commit is contained in:
parent
8a0509327c
commit
cebb24b821
@ -1,6 +1,6 @@
|
|||||||
[package]
|
[package]
|
||||||
name = "kuska-ssb"
|
name = "kuska-ssb"
|
||||||
version = "0.1.5"
|
version = "0.2.0"
|
||||||
authors = ["Dhole <dhole@riseup.net>", "Adria Massanet <adria@codecontext.io>"]
|
authors = ["Dhole <dhole@riseup.net>", "Adria Massanet <adria@codecontext.io>"]
|
||||||
edition = "2018"
|
edition = "2018"
|
||||||
|
|
||||||
@ -8,7 +8,7 @@ edition = "2018"
|
|||||||
name = "kuska_ssb"
|
name = "kuska_ssb"
|
||||||
|
|
||||||
[dependencies]
|
[dependencies]
|
||||||
kuska-handshake = { path = "../kuska-handshake", branch = "master" , features=["sync","async_std"] }
|
kuska-handshake = { git = "https://github.com/Kuska-ssb/kuska-handshake", branch = "master" , features=["sync","async_std"] }
|
||||||
sodiumoxide = { git = "https://github.com/Dhole/sodiumoxidez", branch = "extra" }
|
sodiumoxide = { git = "https://github.com/Dhole/sodiumoxidez", branch = "extra" }
|
||||||
base64 = "0.11.0"
|
base64 = "0.11.0"
|
||||||
hex = "0.4.0"
|
hex = "0.4.0"
|
||||||
@ -22,6 +22,10 @@ dirs = "2.0"
|
|||||||
futures = "0.3.4"
|
futures = "0.3.4"
|
||||||
lazy_static = "1.4.0"
|
lazy_static = "1.4.0"
|
||||||
rand = "0.7.3"
|
rand = "0.7.3"
|
||||||
|
get_if_addrs = "0.5.3"
|
||||||
|
regex = "1.3.7"
|
||||||
|
once_cell = "1.3.1"
|
||||||
|
async-stream = "0.2.1"
|
||||||
|
|
||||||
[[example]]
|
[[example]]
|
||||||
name = "ssb-cli"
|
name = "ssb-cli"
|
||||||
|
@ -8,19 +8,19 @@ extern crate structopt;
|
|||||||
|
|
||||||
use std::fmt::Debug;
|
use std::fmt::Debug;
|
||||||
|
|
||||||
use async_std::io::{Read, Write};
|
use async_std::io::Read;
|
||||||
use async_std::net::{TcpStream, UdpSocket};
|
use async_std::net::{TcpStream, UdpSocket};
|
||||||
|
|
||||||
use kuska_handshake::async_std::{handshake_client, BoxStream};
|
use kuska_handshake::async_std::{handshake_client, BoxStream};
|
||||||
use kuska_ssb::api::{
|
use kuska_ssb::api::{
|
||||||
dto::{CreateHistoryStreamIn, CreateStreamIn, LatestOut, WhoAmIOut},
|
dto::{CreateHistoryStreamIn, CreateStreamIn, LatestOut, WhoAmIOut},
|
||||||
ApiHelper,
|
ApiCaller,
|
||||||
};
|
};
|
||||||
use kuska_ssb::discovery::ssb_net_id;
|
use kuska_ssb::discovery::ssb_net_id;
|
||||||
use kuska_ssb::feed::{is_privatebox, privatebox_decipher, Feed, Message};
|
use kuska_ssb::feed::{is_privatebox, privatebox_decipher, Feed, Message};
|
||||||
use kuska_ssb::keystore::from_patchwork_local;
|
use kuska_ssb::keystore::from_patchwork_local;
|
||||||
use kuska_ssb::keystore::OwnedIdentity;
|
use kuska_ssb::keystore::OwnedIdentity;
|
||||||
use kuska_ssb::rpc::{RecvMsg, RequestNo, RpcStreamReader, RpcStreamWriter};
|
use kuska_ssb::rpc::{RecvMsg, RequestNo, RpcReader, RpcWriter};
|
||||||
|
|
||||||
use regex::Regex;
|
use regex::Regex;
|
||||||
use sodiumoxide::crypto::sign::ed25519;
|
use sodiumoxide::crypto::sign::ed25519;
|
||||||
@ -70,15 +70,13 @@ impl std::fmt::Display for AppError {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn get_async<'a, R, W, T, F>(
|
async fn get_async<'a, R, T, F>(
|
||||||
rpc_reader : &mut RpcStreamReader<R>,
|
rpc_reader: &mut RpcReader<R>,
|
||||||
client: &mut ApiHelper<W>,
|
|
||||||
req_no: RequestNo,
|
req_no: RequestNo,
|
||||||
f: F,
|
f: F,
|
||||||
) -> SolarResult<T>
|
) -> SolarResult<T>
|
||||||
where
|
where
|
||||||
R: Read + Unpin,
|
R: Read + Unpin,
|
||||||
W: Write + Unpin,
|
|
||||||
F: Fn(&[u8]) -> SolarResult<T>,
|
F: Fn(&[u8]) -> SolarResult<T>,
|
||||||
T: Debug,
|
T: Debug,
|
||||||
{
|
{
|
||||||
@ -100,15 +98,13 @@ where
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn print_source_until_eof<'a, R, W, T, F>(
|
async fn print_source_until_eof<'a, R, T, F>(
|
||||||
rpc_reader : &mut RpcStreamReader<R>,
|
rpc_reader: &mut RpcReader<R>,
|
||||||
client: &mut ApiHelper<W>,
|
|
||||||
req_no: RequestNo,
|
req_no: RequestNo,
|
||||||
f: F,
|
f: F,
|
||||||
) -> SolarResult<()>
|
) -> SolarResult<()>
|
||||||
where
|
where
|
||||||
R: Read + Unpin,
|
R: Read + Unpin,
|
||||||
W: Write + Unpin,
|
|
||||||
F: Fn(&[u8]) -> SolarResult<T>,
|
F: Fn(&[u8]) -> SolarResult<T>,
|
||||||
T: Debug + serde::Deserialize<'a>,
|
T: Debug + serde::Deserialize<'a>,
|
||||||
{
|
{
|
||||||
@ -191,11 +187,13 @@ async fn main() -> SolarResult<()> {
|
|||||||
let (box_stream_read, box_stream_write) =
|
let (box_stream_read, box_stream_write) =
|
||||||
BoxStream::from_handshake(&socket, &socket, handshake, 0x8000).split_read_write();
|
BoxStream::from_handshake(&socket, &socket, handshake, 0x8000).split_read_write();
|
||||||
|
|
||||||
let mut rpc_reader = RpcStreamReader::new(box_stream_read);
|
let mut rpc_reader = RpcReader::new(box_stream_read);
|
||||||
let mut client = ApiHelper::new(RpcStreamWriter::new(box_stream_write));
|
let mut client = ApiCaller::new(RpcWriter::new(box_stream_write));
|
||||||
|
|
||||||
let req_id = client.whoami_req_send().await?;
|
let req_id = client.whoami_req_send().await?;
|
||||||
let whoami = get_async(&mut rpc_reader, &mut client, req_id, whoami_res_parse).await?.id;
|
let whoami = get_async(&mut rpc_reader, req_id, whoami_res_parse)
|
||||||
|
.await?
|
||||||
|
.id;
|
||||||
|
|
||||||
println!("😊 server says hello to {}", whoami);
|
println!("😊 server says hello to {}", whoami);
|
||||||
|
|
||||||
@ -214,7 +212,9 @@ async fn main() -> SolarResult<()> {
|
|||||||
}
|
}
|
||||||
("whoami", 1) => {
|
("whoami", 1) => {
|
||||||
let req_id = client.whoami_req_send().await?;
|
let req_id = client.whoami_req_send().await?;
|
||||||
let whoami = get_async(&mut rpc_reader, &mut client, req_id, whoami_res_parse).await?.id;
|
let whoami = get_async(&mut rpc_reader, req_id, whoami_res_parse)
|
||||||
|
.await?
|
||||||
|
.id;
|
||||||
println!("{}", whoami);
|
println!("{}", whoami);
|
||||||
}
|
}
|
||||||
("get", 2) => {
|
("get", 2) => {
|
||||||
@ -224,7 +224,7 @@ async fn main() -> SolarResult<()> {
|
|||||||
args[1].clone()
|
args[1].clone()
|
||||||
};
|
};
|
||||||
let req_id = client.get_req_send(&msg_id).await?;
|
let req_id = client.get_req_send(&msg_id).await?;
|
||||||
let msg = get_async(&mut rpc_reader, &mut client, req_id, message_res_parse).await?;
|
let msg = get_async(&mut rpc_reader, req_id, message_res_parse).await?;
|
||||||
println!("{:?}", msg);
|
println!("{:?}", msg);
|
||||||
}
|
}
|
||||||
("user", 2) => {
|
("user", 2) => {
|
||||||
@ -232,16 +232,16 @@ async fn main() -> SolarResult<()> {
|
|||||||
|
|
||||||
let args = CreateHistoryStreamIn::new(user_id.clone());
|
let args = CreateHistoryStreamIn::new(user_id.clone());
|
||||||
let req_id = client.create_history_stream_req_send(&args).await?;
|
let req_id = client.create_history_stream_req_send(&args).await?;
|
||||||
print_source_until_eof(&mut rpc_reader, &mut client, req_id, feed_res_parse).await?;
|
print_source_until_eof(&mut rpc_reader, req_id, feed_res_parse).await?;
|
||||||
}
|
}
|
||||||
("feed", 1) => {
|
("feed", 1) => {
|
||||||
let args = CreateStreamIn::default();
|
let args = CreateStreamIn::default();
|
||||||
let req_id = client.create_feed_stream_req_send(&args).await?;
|
let req_id = client.create_feed_stream_req_send(&args).await?;
|
||||||
print_source_until_eof(&mut rpc_reader, &mut client, req_id, feed_res_parse).await?;
|
print_source_until_eof(&mut rpc_reader, req_id, feed_res_parse).await?;
|
||||||
}
|
}
|
||||||
("latest", 1) => {
|
("latest", 1) => {
|
||||||
let req_id = client.latest_req_send().await?;
|
let req_id = client.latest_req_send().await?;
|
||||||
print_source_until_eof(&mut rpc_reader, &mut client, req_id, latest_res_parse).await?;
|
print_source_until_eof(&mut rpc_reader, req_id, latest_res_parse).await?;
|
||||||
}
|
}
|
||||||
("private", 2) => {
|
("private", 2) => {
|
||||||
let user_id = if args[1] == "me" { &whoami } else { &args[1] };
|
let user_id = if args[1] == "me" { &whoami } else { &args[1] };
|
||||||
@ -260,7 +260,7 @@ async fn main() -> SolarResult<()> {
|
|||||||
let args = CreateHistoryStreamIn::new(user_id.clone());
|
let args = CreateHistoryStreamIn::new(user_id.clone());
|
||||||
let req_id = client.create_history_stream_req_send(&args).await?;
|
let req_id = client.create_history_stream_req_send(&args).await?;
|
||||||
|
|
||||||
print_source_until_eof(&mut rpc_reader, &mut client, req_id, show_private).await?;
|
print_source_until_eof(&mut rpc_reader, req_id, show_private).await?;
|
||||||
}
|
}
|
||||||
_ => println!("unknown command {}", line_buffer),
|
_ => println!("unknown command {}", line_buffer),
|
||||||
}
|
}
|
||||||
|
@ -1,5 +1,5 @@
|
|||||||
use crate::feed::Message;
|
use crate::feed::Message;
|
||||||
use crate::rpc::{Body, BodyType, RequestNo, RpcStreamWriter, RpcType};
|
use crate::rpc::{Body, BodyType, RequestNo, RpcType, RpcWriter};
|
||||||
use async_std::io::Write;
|
use async_std::io::Write;
|
||||||
|
|
||||||
use super::dto;
|
use super::dto;
|
||||||
@ -50,16 +50,16 @@ impl ApiMethod {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub struct ApiHelper<W: Write + Unpin> {
|
pub struct ApiCaller<W: Write + Unpin> {
|
||||||
rpc: RpcStreamWriter<W>,
|
rpc: RpcWriter<W>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<W: Write + Unpin> ApiHelper<W> {
|
impl<W: Write + Unpin> ApiCaller<W> {
|
||||||
pub fn new(rpc: RpcStreamWriter<W>) -> Self {
|
pub fn new(rpc: RpcWriter<W>) -> Self {
|
||||||
Self { rpc }
|
Self { rpc }
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn rpc(&mut self) -> &mut RpcStreamWriter<W> {
|
pub fn rpc(&mut self) -> &mut RpcWriter<W> {
|
||||||
&mut self.rpc
|
&mut self.rpc
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -3,4 +3,4 @@ mod error;
|
|||||||
mod helper;
|
mod helper;
|
||||||
|
|
||||||
pub use error::{Error, Result};
|
pub use error::{Error, Result};
|
||||||
pub use helper::{ApiHelper, ApiMethod};
|
pub use helper::{ApiCaller, ApiMethod};
|
||||||
|
@ -5,5 +5,5 @@ pub use error::{Error, Result};
|
|||||||
pub use sodium::{
|
pub use sodium::{
|
||||||
ToSodiumObject, ToSsbId, CURVE_ED25519_SUFFIX, ED25519_SIGNATURE_SUFFIX, SHA256_SUFFIX,
|
ToSodiumObject, ToSsbId, CURVE_ED25519_SUFFIX, ED25519_SIGNATURE_SUFFIX, SHA256_SUFFIX,
|
||||||
};
|
};
|
||||||
pub use sodiumoxide::crypto::hash::sha256 as sha256;
|
pub use sodiumoxide::crypto::hash::sha256;
|
||||||
pub use sodiumoxide::crypto::sign::ed25519 as ed25519;
|
pub use sodiumoxide::crypto::sign::ed25519;
|
||||||
|
@ -50,7 +50,6 @@ impl ToSodiumObject for str {
|
|||||||
ed25519::PublicKey::from_slice(&bytes).ok_or_else(|| Error::BadPublicKey)
|
ed25519::PublicKey::from_slice(&bytes).ok_or_else(|| Error::BadPublicKey)
|
||||||
}
|
}
|
||||||
fn to_ed25519_pk_no_suffix(self: &str) -> Result<ed25519::PublicKey> {
|
fn to_ed25519_pk_no_suffix(self: &str) -> Result<ed25519::PublicKey> {
|
||||||
|
|
||||||
let bytes = base64::decode(&self)?;
|
let bytes = base64::decode(&self)?;
|
||||||
|
|
||||||
ed25519::PublicKey::from_slice(&bytes).ok_or_else(|| Error::BadPublicKey)
|
ed25519::PublicKey::from_slice(&bytes).ok_or_else(|| Error::BadPublicKey)
|
||||||
|
@ -2,7 +2,9 @@
|
|||||||
pub enum Error {
|
pub enum Error {
|
||||||
ParseInt(std::num::ParseIntError),
|
ParseInt(std::num::ParseIntError),
|
||||||
InvalidInviteCode,
|
InvalidInviteCode,
|
||||||
|
InvalidBroadcastMessage,
|
||||||
CryptoFormat(crate::crypto::Error),
|
CryptoFormat(crate::crypto::Error),
|
||||||
|
Io(std::io::Error),
|
||||||
}
|
}
|
||||||
|
|
||||||
impl From<crate::crypto::Error> for Error {
|
impl From<crate::crypto::Error> for Error {
|
||||||
@ -17,6 +19,12 @@ impl From<std::num::ParseIntError> for Error {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl From<std::io::Error> for Error {
|
||||||
|
fn from(err: std::io::Error) -> Self {
|
||||||
|
Error::Io(err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
impl std::fmt::Display for Error {
|
impl std::fmt::Display for Error {
|
||||||
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
|
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
|
||||||
write!(f, "{:?}", self)
|
write!(f, "{:?}", self)
|
||||||
|
92
src/discovery/lan.rs
Normal file
92
src/discovery/lan.rs
Normal file
@ -0,0 +1,92 @@
|
|||||||
|
#![allow(clippy::single_match)]
|
||||||
|
|
||||||
|
use get_if_addrs::{get_if_addrs, IfAddr};
|
||||||
|
|
||||||
|
use log::warn;
|
||||||
|
use std::string::ToString;
|
||||||
|
|
||||||
|
use async_std::net::{IpAddr, SocketAddr, UdpSocket};
|
||||||
|
|
||||||
|
use once_cell::sync::Lazy;
|
||||||
|
use regex::Regex;
|
||||||
|
|
||||||
|
use crate::{crypto::ed25519, crypto::ToSodiumObject};
|
||||||
|
|
||||||
|
use super::error::{Error, Result};
|
||||||
|
|
||||||
|
pub static BROADCAST_REGEX: Lazy<Regex> = Lazy::new(|| {
|
||||||
|
Regex::new(r"net:([0-9]+\.[0-9]+\.[0-9]+\.[0-9]+):([0-9]+)~shs:([0-9a-zA-Z=/]+)").unwrap()
|
||||||
|
});
|
||||||
|
|
||||||
|
pub struct LanBroadcast {
|
||||||
|
destination: String,
|
||||||
|
packets: Vec<(SocketAddr, SocketAddr, String)>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl LanBroadcast {
|
||||||
|
pub async fn new(id: &ed25519::PublicKey, rpc_port: u16) -> Result<Self> {
|
||||||
|
let server_pk = base64::encode(&id);
|
||||||
|
|
||||||
|
let mut packets = Vec::new();
|
||||||
|
|
||||||
|
for if_addr in get_if_addrs()? {
|
||||||
|
let addrs = match if_addr.addr {
|
||||||
|
IfAddr::V4(v4) if !v4.is_loopback() && v4.broadcast.is_some() => {
|
||||||
|
Some((IpAddr::V4(v4.ip), IpAddr::V4(v4.broadcast.unwrap())))
|
||||||
|
}
|
||||||
|
IfAddr::V6(v6) if !v6.is_loopback() && v6.broadcast.is_some() => {
|
||||||
|
Some((IpAddr::V6(v6.ip), IpAddr::V6(v6.broadcast.unwrap())))
|
||||||
|
}
|
||||||
|
_ => None,
|
||||||
|
};
|
||||||
|
|
||||||
|
if let Some((local, broadcast)) = addrs {
|
||||||
|
let local_addr = SocketAddr::new(local, rpc_port);
|
||||||
|
let broadcast_addr = SocketAddr::new(broadcast, rpc_port);
|
||||||
|
let msg = format!("net:{}:{}~shs:{}", local, rpc_port, server_pk);
|
||||||
|
match UdpSocket::bind(SocketAddr::new(local, rpc_port)).await {
|
||||||
|
Ok(_) => packets.push((local_addr, broadcast_addr, msg)),
|
||||||
|
Err(err) => warn!("cannot broadcast to {:?} {:?}", local_addr, err),
|
||||||
|
};
|
||||||
|
}
|
||||||
|
}
|
||||||
|
let destination = format!("255.255.255.255:{}", rpc_port);
|
||||||
|
Ok(LanBroadcast {
|
||||||
|
packets,
|
||||||
|
destination,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
pub async fn send(&self) {
|
||||||
|
for msg in &self.packets {
|
||||||
|
if let Ok(socket) = UdpSocket::bind(msg.0).await {
|
||||||
|
let _ = socket.set_broadcast(true);
|
||||||
|
match socket.send_to(msg.2.as_bytes(), &self.destination).await {
|
||||||
|
Err(err) => warn!(target:"solar", "Error broadcasting {}",err),
|
||||||
|
_ => {}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn parse(msg: &str) -> Option<(String, u32, ed25519::PublicKey)> {
|
||||||
|
let parse_shs = |addr: &str| -> Result<_> {
|
||||||
|
let captures = BROADCAST_REGEX
|
||||||
|
.captures(&addr)
|
||||||
|
.ok_or(Error::InvalidBroadcastMessage)?;
|
||||||
|
|
||||||
|
let ip = captures[1].to_string();
|
||||||
|
let port = captures[2].parse::<u32>()?;
|
||||||
|
let server_pk = captures[3].to_ed25519_pk_no_suffix()?;
|
||||||
|
|
||||||
|
Ok((ip, port, server_pk))
|
||||||
|
};
|
||||||
|
|
||||||
|
for addr in msg.split(';') {
|
||||||
|
if let Ok(shs) = parse_shs(addr) {
|
||||||
|
return Some(shs);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
None
|
||||||
|
}
|
||||||
|
}
|
@ -1,6 +1,8 @@
|
|||||||
mod error;
|
mod error;
|
||||||
|
mod lan;
|
||||||
mod network;
|
mod network;
|
||||||
mod pubs;
|
mod pubs;
|
||||||
|
|
||||||
|
pub use lan::LanBroadcast;
|
||||||
pub use network::ssb_net_id;
|
pub use network::ssb_net_id;
|
||||||
pub use pubs::Invite;
|
pub use pubs::Invite;
|
||||||
|
@ -1,4 +1,4 @@
|
|||||||
extern crate kuska_handshake;
|
pub extern crate kuska_handshake as handshake;
|
||||||
|
|
||||||
#[macro_use]
|
#[macro_use]
|
||||||
extern crate serde;
|
extern crate serde;
|
||||||
|
@ -2,4 +2,4 @@ mod error;
|
|||||||
mod stream;
|
mod stream;
|
||||||
|
|
||||||
pub use error::{Error, Result};
|
pub use error::{Error, Result};
|
||||||
pub use stream::{Body, BodyType, RecvMsg, RequestNo, RpcStreamReader,RpcStreamWriter, RpcType};
|
pub use stream::{Body, BodyType, RecvMsg, RequestNo, RpcReader, RpcType, RpcWriter};
|
||||||
|
@ -3,9 +3,8 @@ use super::error::{Error, Result};
|
|||||||
use async_std::io;
|
use async_std::io;
|
||||||
use async_std::prelude::*;
|
use async_std::prelude::*;
|
||||||
use log::{trace, warn};
|
use log::{trace, warn};
|
||||||
use std::pin::Pin;
|
|
||||||
use std::task::{Context,Poll};
|
|
||||||
|
|
||||||
|
use async_stream::stream;
|
||||||
use kuska_handshake::async_std::{BoxStreamRead, BoxStreamWrite};
|
use kuska_handshake::async_std::{BoxStreamRead, BoxStreamWrite};
|
||||||
|
|
||||||
pub type RequestNo = i32;
|
pub type RequestNo = i32;
|
||||||
@ -127,11 +126,11 @@ impl Header {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub struct RpcStreamReader<R: io::Read + Unpin> {
|
pub struct RpcReader<R: io::Read + Unpin> {
|
||||||
box_reader: BoxStreamRead<R>,
|
box_reader: BoxStreamRead<R>,
|
||||||
}
|
}
|
||||||
|
|
||||||
pub struct RpcStreamWriter<W: io::Write + Unpin> {
|
pub struct RpcWriter<W: io::Write + Unpin> {
|
||||||
box_writer: BoxStreamWrite<W>,
|
box_writer: BoxStreamWrite<W>,
|
||||||
req_no: RequestNo,
|
req_no: RequestNo,
|
||||||
}
|
}
|
||||||
@ -145,11 +144,9 @@ pub enum RecvMsg {
|
|||||||
CancelStreamRespose(),
|
CancelStreamRespose(),
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<R: io::Read + Unpin> RpcStreamReader<R> {
|
impl<R: io::Read + Unpin> RpcReader<R> {
|
||||||
pub fn new(box_reader: BoxStreamRead<R>) -> RpcStreamReader<R> {
|
pub fn new(box_reader: BoxStreamRead<R>) -> RpcReader<R> {
|
||||||
RpcStreamReader {
|
RpcReader { box_reader }
|
||||||
box_reader
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn recv(&mut self) -> Result<(RequestNo, RecvMsg)> {
|
pub async fn recv(&mut self) -> Result<(RequestNo, RecvMsg)> {
|
||||||
@ -190,18 +187,19 @@ impl<R: io::Read + Unpin> RpcStreamReader<R> {
|
|||||||
))
|
))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
|
||||||
/*
|
pub fn into_stream(mut self) -> impl Stream<Item = (RequestNo, RecvMsg)> {
|
||||||
impl<R: io::Read + Unpin> Stream for RpcStreamReader<R> {
|
stream! {
|
||||||
type Item = (RequestNo, RecvMsg);
|
while let Ok(v) = self.recv().await {
|
||||||
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
|
yield v
|
||||||
futures::ready!(self.recv())
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
*/
|
|
||||||
impl<W: io::Write + Unpin> RpcStreamWriter<W> {
|
impl<W: io::Write + Unpin> RpcWriter<W> {
|
||||||
pub fn new(box_writer: BoxStreamWrite<W>) -> RpcStreamWriter<W> {
|
pub fn new(box_writer: BoxStreamWrite<W>) -> RpcWriter<W> {
|
||||||
RpcStreamWriter {
|
RpcWriter {
|
||||||
box_writer,
|
box_writer,
|
||||||
req_no: 0,
|
req_no: 0,
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user