diff --git a/Cargo.toml b/Cargo.toml index c3668f0..1bca071 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "kuska-ssb" -version = "0.1.5" +version = "0.2.0" authors = ["Dhole ", "Adria Massanet "] edition = "2018" @@ -8,7 +8,7 @@ edition = "2018" name = "kuska_ssb" [dependencies] -kuska-handshake = { path = "../kuska-handshake", branch = "master" , features=["sync","async_std"] } +kuska-handshake = { git = "https://github.com/Kuska-ssb/kuska-handshake", branch = "master" , features=["sync","async_std"] } sodiumoxide = { git = "https://github.com/Dhole/sodiumoxidez", branch = "extra" } base64 = "0.11.0" hex = "0.4.0" @@ -22,6 +22,10 @@ dirs = "2.0" futures = "0.3.4" lazy_static = "1.4.0" rand = "0.7.3" +get_if_addrs = "0.5.3" +regex = "1.3.7" +once_cell = "1.3.1" +async-stream = "0.2.1" [[example]] name = "ssb-cli" diff --git a/examples/ssb-cli.rs b/examples/ssb-cli.rs index ed9dcbd..dec6292 100644 --- a/examples/ssb-cli.rs +++ b/examples/ssb-cli.rs @@ -8,19 +8,19 @@ extern crate structopt; use std::fmt::Debug; -use async_std::io::{Read, Write}; +use async_std::io::Read; use async_std::net::{TcpStream, UdpSocket}; use kuska_handshake::async_std::{handshake_client, BoxStream}; use kuska_ssb::api::{ dto::{CreateHistoryStreamIn, CreateStreamIn, LatestOut, WhoAmIOut}, - ApiHelper, + ApiCaller, }; use kuska_ssb::discovery::ssb_net_id; use kuska_ssb::feed::{is_privatebox, privatebox_decipher, Feed, Message}; use kuska_ssb::keystore::from_patchwork_local; use kuska_ssb::keystore::OwnedIdentity; -use kuska_ssb::rpc::{RecvMsg, RequestNo, RpcStreamReader, RpcStreamWriter}; +use kuska_ssb::rpc::{RecvMsg, RequestNo, RpcReader, RpcWriter}; use regex::Regex; use sodiumoxide::crypto::sign::ed25519; @@ -70,15 +70,13 @@ impl std::fmt::Display for AppError { } } -async fn get_async<'a, R, W, T, F>( - rpc_reader : &mut RpcStreamReader, - client: &mut ApiHelper, +async fn get_async<'a, R, T, F>( + rpc_reader: &mut RpcReader, req_no: RequestNo, f: F, ) -> SolarResult where R: Read + Unpin, - W: Write + Unpin, F: Fn(&[u8]) -> SolarResult, T: Debug, { @@ -100,15 +98,13 @@ where } } -async fn print_source_until_eof<'a, R, W, T, F>( - rpc_reader : &mut RpcStreamReader, - client: &mut ApiHelper, +async fn print_source_until_eof<'a, R, T, F>( + rpc_reader: &mut RpcReader, req_no: RequestNo, f: F, ) -> SolarResult<()> where R: Read + Unpin, - W: Write + Unpin, F: Fn(&[u8]) -> SolarResult, T: Debug + serde::Deserialize<'a>, { @@ -191,11 +187,13 @@ async fn main() -> SolarResult<()> { let (box_stream_read, box_stream_write) = BoxStream::from_handshake(&socket, &socket, handshake, 0x8000).split_read_write(); - let mut rpc_reader = RpcStreamReader::new(box_stream_read); - let mut client = ApiHelper::new(RpcStreamWriter::new(box_stream_write)); + let mut rpc_reader = RpcReader::new(box_stream_read); + let mut client = ApiCaller::new(RpcWriter::new(box_stream_write)); let req_id = client.whoami_req_send().await?; - let whoami = get_async(&mut rpc_reader, &mut client, req_id, whoami_res_parse).await?.id; + let whoami = get_async(&mut rpc_reader, req_id, whoami_res_parse) + .await? + .id; println!("😊 server says hello to {}", whoami); @@ -214,7 +212,9 @@ async fn main() -> SolarResult<()> { } ("whoami", 1) => { let req_id = client.whoami_req_send().await?; - let whoami = get_async(&mut rpc_reader, &mut client, req_id, whoami_res_parse).await?.id; + let whoami = get_async(&mut rpc_reader, req_id, whoami_res_parse) + .await? + .id; println!("{}", whoami); } ("get", 2) => { @@ -224,7 +224,7 @@ async fn main() -> SolarResult<()> { args[1].clone() }; let req_id = client.get_req_send(&msg_id).await?; - let msg = get_async(&mut rpc_reader, &mut client, req_id, message_res_parse).await?; + let msg = get_async(&mut rpc_reader, req_id, message_res_parse).await?; println!("{:?}", msg); } ("user", 2) => { @@ -232,16 +232,16 @@ async fn main() -> SolarResult<()> { let args = CreateHistoryStreamIn::new(user_id.clone()); let req_id = client.create_history_stream_req_send(&args).await?; - print_source_until_eof(&mut rpc_reader, &mut client, req_id, feed_res_parse).await?; + print_source_until_eof(&mut rpc_reader, req_id, feed_res_parse).await?; } ("feed", 1) => { let args = CreateStreamIn::default(); let req_id = client.create_feed_stream_req_send(&args).await?; - print_source_until_eof(&mut rpc_reader, &mut client, req_id, feed_res_parse).await?; + print_source_until_eof(&mut rpc_reader, req_id, feed_res_parse).await?; } ("latest", 1) => { let req_id = client.latest_req_send().await?; - print_source_until_eof(&mut rpc_reader, &mut client, req_id, latest_res_parse).await?; + print_source_until_eof(&mut rpc_reader, req_id, latest_res_parse).await?; } ("private", 2) => { let user_id = if args[1] == "me" { &whoami } else { &args[1] }; @@ -260,7 +260,7 @@ async fn main() -> SolarResult<()> { let args = CreateHistoryStreamIn::new(user_id.clone()); let req_id = client.create_history_stream_req_send(&args).await?; - print_source_until_eof(&mut rpc_reader, &mut client, req_id, show_private).await?; + print_source_until_eof(&mut rpc_reader, req_id, show_private).await?; } _ => println!("unknown command {}", line_buffer), } diff --git a/src/api/helper.rs b/src/api/helper.rs index 34aa757..ec36c92 100644 --- a/src/api/helper.rs +++ b/src/api/helper.rs @@ -1,5 +1,5 @@ use crate::feed::Message; -use crate::rpc::{Body, BodyType, RequestNo, RpcStreamWriter, RpcType}; +use crate::rpc::{Body, BodyType, RequestNo, RpcType, RpcWriter}; use async_std::io::Write; use super::dto; @@ -50,16 +50,16 @@ impl ApiMethod { } } -pub struct ApiHelper { - rpc: RpcStreamWriter, +pub struct ApiCaller { + rpc: RpcWriter, } -impl ApiHelper { - pub fn new(rpc: RpcStreamWriter) -> Self { +impl ApiCaller { + pub fn new(rpc: RpcWriter) -> Self { Self { rpc } } - pub fn rpc(&mut self) -> &mut RpcStreamWriter { + pub fn rpc(&mut self) -> &mut RpcWriter { &mut self.rpc } diff --git a/src/api/mod.rs b/src/api/mod.rs index 3bc1600..d280bdd 100644 --- a/src/api/mod.rs +++ b/src/api/mod.rs @@ -3,4 +3,4 @@ mod error; mod helper; pub use error::{Error, Result}; -pub use helper::{ApiHelper, ApiMethod}; +pub use helper::{ApiCaller, ApiMethod}; diff --git a/src/crypto/mod.rs b/src/crypto/mod.rs index ef4e933..b219115 100644 --- a/src/crypto/mod.rs +++ b/src/crypto/mod.rs @@ -5,5 +5,5 @@ pub use error::{Error, Result}; pub use sodium::{ ToSodiumObject, ToSsbId, CURVE_ED25519_SUFFIX, ED25519_SIGNATURE_SUFFIX, SHA256_SUFFIX, }; -pub use sodiumoxide::crypto::hash::sha256 as sha256; -pub use sodiumoxide::crypto::sign::ed25519 as ed25519; \ No newline at end of file +pub use sodiumoxide::crypto::hash::sha256; +pub use sodiumoxide::crypto::sign::ed25519; diff --git a/src/crypto/sodium.rs b/src/crypto/sodium.rs index ef007cb..49695ed 100644 --- a/src/crypto/sodium.rs +++ b/src/crypto/sodium.rs @@ -50,7 +50,6 @@ impl ToSodiumObject for str { ed25519::PublicKey::from_slice(&bytes).ok_or_else(|| Error::BadPublicKey) } fn to_ed25519_pk_no_suffix(self: &str) -> Result { - let bytes = base64::decode(&self)?; ed25519::PublicKey::from_slice(&bytes).ok_or_else(|| Error::BadPublicKey) diff --git a/src/discovery/error.rs b/src/discovery/error.rs index 05cb7f3..12b7d1f 100644 --- a/src/discovery/error.rs +++ b/src/discovery/error.rs @@ -2,7 +2,9 @@ pub enum Error { ParseInt(std::num::ParseIntError), InvalidInviteCode, + InvalidBroadcastMessage, CryptoFormat(crate::crypto::Error), + Io(std::io::Error), } impl From for Error { @@ -17,6 +19,12 @@ impl From for Error { } } +impl From for Error { + fn from(err: std::io::Error) -> Self { + Error::Io(err) + } +} + impl std::fmt::Display for Error { fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { write!(f, "{:?}", self) diff --git a/src/discovery/lan.rs b/src/discovery/lan.rs new file mode 100644 index 0000000..6ac787d --- /dev/null +++ b/src/discovery/lan.rs @@ -0,0 +1,92 @@ +#![allow(clippy::single_match)] + +use get_if_addrs::{get_if_addrs, IfAddr}; + +use log::warn; +use std::string::ToString; + +use async_std::net::{IpAddr, SocketAddr, UdpSocket}; + +use once_cell::sync::Lazy; +use regex::Regex; + +use crate::{crypto::ed25519, crypto::ToSodiumObject}; + +use super::error::{Error, Result}; + +pub static BROADCAST_REGEX: Lazy = Lazy::new(|| { + Regex::new(r"net:([0-9]+\.[0-9]+\.[0-9]+\.[0-9]+):([0-9]+)~shs:([0-9a-zA-Z=/]+)").unwrap() +}); + +pub struct LanBroadcast { + destination: String, + packets: Vec<(SocketAddr, SocketAddr, String)>, +} + +impl LanBroadcast { + pub async fn new(id: &ed25519::PublicKey, rpc_port: u16) -> Result { + let server_pk = base64::encode(&id); + + let mut packets = Vec::new(); + + for if_addr in get_if_addrs()? { + let addrs = match if_addr.addr { + IfAddr::V4(v4) if !v4.is_loopback() && v4.broadcast.is_some() => { + Some((IpAddr::V4(v4.ip), IpAddr::V4(v4.broadcast.unwrap()))) + } + IfAddr::V6(v6) if !v6.is_loopback() && v6.broadcast.is_some() => { + Some((IpAddr::V6(v6.ip), IpAddr::V6(v6.broadcast.unwrap()))) + } + _ => None, + }; + + if let Some((local, broadcast)) = addrs { + let local_addr = SocketAddr::new(local, rpc_port); + let broadcast_addr = SocketAddr::new(broadcast, rpc_port); + let msg = format!("net:{}:{}~shs:{}", local, rpc_port, server_pk); + match UdpSocket::bind(SocketAddr::new(local, rpc_port)).await { + Ok(_) => packets.push((local_addr, broadcast_addr, msg)), + Err(err) => warn!("cannot broadcast to {:?} {:?}", local_addr, err), + }; + } + } + let destination = format!("255.255.255.255:{}", rpc_port); + Ok(LanBroadcast { + packets, + destination, + }) + } + pub async fn send(&self) { + for msg in &self.packets { + if let Ok(socket) = UdpSocket::bind(msg.0).await { + let _ = socket.set_broadcast(true); + match socket.send_to(msg.2.as_bytes(), &self.destination).await { + Err(err) => warn!(target:"solar", "Error broadcasting {}",err), + _ => {} + } + } + } + } + + pub fn parse(msg: &str) -> Option<(String, u32, ed25519::PublicKey)> { + let parse_shs = |addr: &str| -> Result<_> { + let captures = BROADCAST_REGEX + .captures(&addr) + .ok_or(Error::InvalidBroadcastMessage)?; + + let ip = captures[1].to_string(); + let port = captures[2].parse::()?; + let server_pk = captures[3].to_ed25519_pk_no_suffix()?; + + Ok((ip, port, server_pk)) + }; + + for addr in msg.split(';') { + if let Ok(shs) = parse_shs(addr) { + return Some(shs); + } + } + + None + } +} diff --git a/src/discovery/mod.rs b/src/discovery/mod.rs index 7f3b6a9..a634318 100644 --- a/src/discovery/mod.rs +++ b/src/discovery/mod.rs @@ -1,6 +1,8 @@ mod error; +mod lan; mod network; mod pubs; +pub use lan::LanBroadcast; pub use network::ssb_net_id; pub use pubs::Invite; diff --git a/src/lib.rs b/src/lib.rs index 89448b4..7948eb5 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,4 +1,4 @@ -extern crate kuska_handshake; +pub extern crate kuska_handshake as handshake; #[macro_use] extern crate serde; diff --git a/src/rpc/mod.rs b/src/rpc/mod.rs index 0129768..3a13171 100644 --- a/src/rpc/mod.rs +++ b/src/rpc/mod.rs @@ -2,4 +2,4 @@ mod error; mod stream; pub use error::{Error, Result}; -pub use stream::{Body, BodyType, RecvMsg, RequestNo, RpcStreamReader,RpcStreamWriter, RpcType}; +pub use stream::{Body, BodyType, RecvMsg, RequestNo, RpcReader, RpcType, RpcWriter}; diff --git a/src/rpc/stream.rs b/src/rpc/stream.rs index 92c1f04..87ed905 100644 --- a/src/rpc/stream.rs +++ b/src/rpc/stream.rs @@ -3,9 +3,8 @@ use super::error::{Error, Result}; use async_std::io; use async_std::prelude::*; use log::{trace, warn}; -use std::pin::Pin; -use std::task::{Context,Poll}; +use async_stream::stream; use kuska_handshake::async_std::{BoxStreamRead, BoxStreamWrite}; pub type RequestNo = i32; @@ -127,11 +126,11 @@ impl Header { } } -pub struct RpcStreamReader { +pub struct RpcReader { box_reader: BoxStreamRead, } -pub struct RpcStreamWriter { +pub struct RpcWriter { box_writer: BoxStreamWrite, req_no: RequestNo, } @@ -145,11 +144,9 @@ pub enum RecvMsg { CancelStreamRespose(), } -impl RpcStreamReader { - pub fn new(box_reader: BoxStreamRead) -> RpcStreamReader { - RpcStreamReader { - box_reader - } +impl RpcReader { + pub fn new(box_reader: BoxStreamRead) -> RpcReader { + RpcReader { box_reader } } pub async fn recv(&mut self) -> Result<(RequestNo, RecvMsg)> { @@ -190,18 +187,19 @@ impl RpcStreamReader { )) } } -} -/* -impl Stream for RpcStreamReader { - type Item = (RequestNo, RecvMsg); - fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - futures::ready!(self.recv()) + + pub fn into_stream(mut self) -> impl Stream { + stream! { + while let Ok(v) = self.recv().await { + yield v + } + } } } -*/ -impl RpcStreamWriter { - pub fn new(box_writer: BoxStreamWrite) -> RpcStreamWriter { - RpcStreamWriter { + +impl RpcWriter { + pub fn new(box_writer: BoxStreamWrite) -> RpcWriter { + RpcWriter { box_writer, req_no: 0, }