diff --git a/Cargo.toml b/Cargo.toml index c41d15b..45203ba 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -6,9 +6,14 @@ edition = "2021" [dependencies] async-std = "1.10" bincode = "1.3" +chrono = "0.4" env_logger = "0.9" -golgi = { git = "https://git.coopcloud.tech/golgi-ssb/golgi.git" } +futures = "0.3" +#golgi = { git = "https://git.coopcloud.tech/golgi-ssb/golgi.git" } +golgi = { path = "../golgi" } log = "0.4" rocket = "0.5.0-rc.1" rocket_dyn_templates = { version = "0.1.0-rc.1", features = ["tera"] } +serde = "1" +serde_json = "1" sled = "0.34" diff --git a/src/db.rs b/src/db.rs index 8a75ce4..54c2e05 100644 --- a/src/db.rs +++ b/src/db.rs @@ -6,9 +6,19 @@ use std::{ path::Path, }; -use bincode::Options; -use log::{debug, info, warn}; -use sled::{Db, IVec, Result, Tree}; +use log::{debug, info}; +use serde::{Deserialize, Serialize}; +use sled::{Batch, Db, IVec, Result, Tree}; + +// The text and metadata of a Scuttlebutt root post. +#[derive(Debug, Deserialize, Serialize)] +pub struct Post { + pub key: String, + pub text: String, + pub date: String, + pub sequence: u64, + pub read: bool, +} #[derive(Debug, Clone, PartialEq, Eq)] struct IVecString { @@ -31,54 +41,96 @@ impl From for IVecString { } } +#[derive(Clone)] pub struct Database { /// Stores the sled database instance. db: Db, - /// Stores the public keys of all the feeds we are subscribed to. - feed_tree: Tree, - /// Stores the messages (content and metadata) for all the feeds we are subscribed to. - message_tree: Tree, + /// Stores the public keys of all the peers we are subscribed to. + peer_tree: Tree, + /// Stores the posts (content and metadata) for all the feeds we are subscribed to. + pub post_tree: Tree, } impl Database { - // TODO: return Result and use try operators - // implement simple custom error type pub fn init(path: &Path) -> Self { // Open the database at the given path. // The database will be created if it does not yet exist. // This code will panic if an IO error is encountered. info!("initialising the sled database"); let db = sled::open(path).expect("failed to open database"); - debug!("opening the 'feeds' database tree"); - let feed_tree = db - .open_tree("feeds") - .expect("failed to open database feeds tree"); - debug!("opening the 'messages' database tree"); - let message_tree = db - .open_tree("messages") - .expect("failed to open database messages tree"); + debug!("opening the 'peers' database tree"); + let peer_tree = db + .open_tree("peers") + .expect("failed to open database peers tree"); + debug!("opening the 'posts' database tree"); + let post_tree = db + .open_tree("posts") + .expect("failed to open database posts tree"); Database { db, - feed_tree, - message_tree, + peer_tree, + post_tree, } } - pub fn add_feed(&self, public_key: &str) -> Result> { - self.feed_tree.insert(&public_key, vec![0]) + pub fn add_peer(&self, public_key: &str) -> Result> { + self.peer_tree.insert(&public_key, vec![0]) } - pub fn remove_feed(&self, public_key: &str) -> Result> { - self.feed_tree.remove(&public_key) + pub fn remove_peer(&self, public_key: &str) -> Result> { + self.peer_tree.remove(&public_key) } - pub fn get_feeds(&self) -> Vec { - self.feed_tree + pub fn get_peers(&self) -> Vec { + self.peer_tree .iter() .keys() .map(|bytes| IVecString::from(bytes.unwrap())) .map(|ivec_string| ivec_string.string) .collect() } + + pub fn insert_post(&self, public_key: &str, post: Post) -> Result> { + let post_key = format!("{}_{}", public_key, post.key); + let post_bytes = bincode::serialize(&post).unwrap(); + + self.post_tree.insert(post_key.as_bytes(), post_bytes) + } + + pub fn insert_post_batch(&self, public_key: &str, posts: Vec) -> Result<()> { + let mut post_batch = Batch::default(); + + for post in posts { + let post_key = format!("{}_{}", public_key, post.key); + let post_bytes = bincode::serialize(&post).unwrap(); + + post_batch.insert(post_key.as_bytes(), post_bytes) + } + + self.post_tree.apply_batch(post_batch) + } + + pub fn get_posts(&self, public_key: &str) -> Result> { + let mut posts = Vec::new(); + + self.post_tree + .scan_prefix(public_key.as_bytes()) + .map(|post| post.unwrap()) + .for_each(|post| posts.push(bincode::deserialize(&post.1).unwrap())); + + Ok(posts) + } + + pub fn get_post(&self, public_key: &str, msg_id: &str) -> Result> { + let post_key = format!("{}_{}", public_key, msg_id); + + let post = self + .post_tree + .get(post_key.as_bytes()) + .unwrap() + .map(|post| bincode::deserialize(&post).unwrap()); + + Ok(post) + } } diff --git a/src/main.rs b/src/main.rs index d75bcd4..6f65f33 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,14 +1,23 @@ mod db; mod sbot; +mod task_loop; mod utils; -use std::path::Path; +use std::{env, path::Path}; +use async_std::{channel, channel::Sender}; use log::{debug, info, warn}; -use rocket::{form::Form, get, launch, post, response::Redirect, routes, uri, FromForm, State}; +use rocket::{ + fairing::AdHoc, + form::Form, + fs::{relative, FileServer}, + get, launch, post, + response::Redirect, + routes, uri, FromForm, State, +}; use rocket_dyn_templates::{tera::Context, Template}; -use crate::db::Database; +use crate::{db::Database, task_loop::Task}; #[derive(FromForm)] struct Peer { @@ -16,27 +25,77 @@ struct Peer { } #[get("/")] -fn index(db: &State) -> Template { - let mut context = Context::new(); - let feeds = db.get_feeds(); - context.insert("feeds", &feeds); +async fn home(db: &State) -> Template { + let peers = db.get_peers(); - Template::render("index", &context.into_json()) + let mut context = Context::new(); + context.insert("peers", &peers); + + Template::render("home", &context.into_json()) +} + +#[get("/posts/")] +async fn posts(db: &State, public_key: &str) -> Template { + let peers = db.get_peers(); + let posts = db.get_posts(public_key).unwrap(); + + let mut context = Context::new(); + context.insert("selected_peer", &public_key); + context.insert("peers", &peers); + context.insert("posts", &posts); + + Template::render("home", &context.into_json()) +} + +#[get("/posts//")] +async fn post(db: &State, public_key: &str, msg_id: &str) -> Template { + let peers = db.get_peers(); + let posts = db.get_posts(public_key).unwrap(); + let post = db.get_post(public_key, msg_id).unwrap(); + + let mut context = Context::new(); + context.insert("selected_peer", &public_key); + context.insert("selected_post", &msg_id); + context.insert("peers", &peers); + context.insert("posts", &posts); + context.insert("post", &post); + + Template::render("home", &context.into_json()) } #[post("/subscribe", data = "")] -fn subscribe_form(db: &State, peer: Form) -> Redirect { - // validate the public key +async fn subscribe_form( + db: &State, + whoami: &State, + tx: &State>, + peer: Form, +) -> Redirect { if let Ok(_) = utils::validate_public_key(&peer.public_key) { debug!("public key {} is valid", &peer.public_key); - match db.add_feed(&peer.public_key) { + match db.add_peer(&peer.public_key) { Ok(_) => { - debug!("added {} to feed tree in database", &peer.public_key); - // check if we already follow the peer - // - if not, follow the peer and create a tree for the peer + debug!("added {} to peer tree in database", &peer.public_key); + // TODO: i don't think we actually want to follow... + // we might still have the data in our ssb db, even if we don't follow + match sbot::is_following(&whoami.public_key, &peer.public_key).await { + Ok(status) if status.as_str() == "false" => { + match sbot::follow_peer(&peer.public_key).await { + Ok(_) => debug!("followed {}", &peer.public_key), + Err(e) => warn!("failed to follow {}: {}", &peer.public_key, e), + } + } + Ok(status) if status.as_str() == "true" => { + debug!("we already follow {}", &peer.public_key) + } + _ => (), + } + let peer = peer.public_key.to_string(); + if let Err(e) = tx.send(Task::FetchAll(peer)).await { + warn!("task loop error: {}", e) + } } Err(_e) => warn!( - "failed to add {} to feed tree in database", + "failed to add {} to peer tree in database", &peer.public_key ), } @@ -44,7 +103,7 @@ fn subscribe_form(db: &State, peer: Form) -> Redirect { warn!("{} is invalid", &peer.public_key); } - Redirect::to(uri!(index)) + Redirect::to(uri!(home)) } #[post("/unsubscribe", data = "")] @@ -53,10 +112,10 @@ fn unsubscribe_form(db: &State, peer: Form) -> Redirect { match utils::validate_public_key(&peer.public_key) { Ok(_) => { debug!("public key {} is valid", &peer.public_key); - match db.remove_feed(&peer.public_key) { - Ok(_) => debug!("removed {} from feed tree in database", &peer.public_key), + match db.remove_peer(&peer.public_key) { + Ok(_) => debug!("removed {} from peer tree in database", &peer.public_key), Err(_e) => warn!( - "failed to remove {} from feed tree in database", + "failed to remove {} from peer tree in database", &peer.public_key ), } @@ -64,16 +123,42 @@ fn unsubscribe_form(db: &State, peer: Form) -> Redirect { Err(e) => warn!("{} is invalid: {}", &peer.public_key, e), } - Redirect::to(uri!(index)) + Redirect::to(uri!(home)) +} + +struct WhoAmI { + public_key: String, } #[launch] -fn rocket() -> _ { +async fn rocket() -> _ { env_logger::init(); + let public_key: String = sbot::whoami().await.expect("whoami sbot call failed"); + let whoami = WhoAmI { public_key }; + + let db = Database::init(Path::new("lykin_db")); + let db_clone = db.clone(); + + let (tx, rx) = channel::unbounded(); + let tx_clone = tx.clone(); + + task_loop::spawn(rx, db_clone).await; + info!("launching the web server"); rocket::build() - .manage(Database::init(Path::new("lykin"))) - .mount("/", routes![index, subscribe_form, unsubscribe_form]) + .manage(db) + .manage(whoami) + .manage(tx) + .mount( + "/", + routes![home, subscribe_form, unsubscribe_form, posts, post], + ) + .mount("/", FileServer::from(relative!("static"))) .attach(Template::fairing()) + .attach(AdHoc::on_shutdown("cancel task loop", |_| { + Box::pin(async move { + tx_clone.send(Task::Cancel).await; + }) + })) } diff --git a/src/sbot.rs b/src/sbot.rs index dc83b47..a82ec6a 100644 --- a/src/sbot.rs +++ b/src/sbot.rs @@ -1,16 +1,103 @@ // Scuttlebutt functionality. -use async_std::task; -use golgi::Sbot; +use async_std::stream::StreamExt; +use chrono::{NaiveDate, NaiveDateTime, TimeZone, Utc}; +use golgi::{ + api::friends::RelationshipQuery, + messages::{SsbMessageContentType, SsbMessageKVT}, + sbot::Keystore, + GolgiError, Sbot, +}; +use log::warn; +use serde_json::value::Value; + +use crate::db::Post; + +pub async fn whoami() -> Result { + let mut sbot = Sbot::init(Keystore::Patchwork, None, None) + .await + .map_err(|e| e.to_string())?; + + sbot.whoami().await.map_err(|e| e.to_string()) +} /// Follow a peer. -pub fn follow_peer(public_key: &str) -> Result { - task::block_on(async { - let mut sbot_client = Sbot::init(None, None).await.map_err(|e| e.to_string())?; +pub async fn follow_peer(public_key: &str) -> Result { + let mut sbot = Sbot::init(Keystore::Patchwork, None, None) + .await + .map_err(|e| e.to_string())?; - match sbot_client.follow(public_key).await { - Ok(_) => Ok("Followed peer".to_string()), - Err(e) => Err(format!("Failed to follow peer: {}", e)), - } - }) + sbot.follow(public_key).await.map_err(|e| e.to_string()) +} + +/// Check follow status. +/// +/// Is peer A (`public_key_a`) following peer B (`public_key_b`)? +pub async fn is_following(public_key_a: &str, public_key_b: &str) -> Result { + let mut sbot = Sbot::init(Keystore::Patchwork, None, None) + .await + .map_err(|e| e.to_string())?; + + let query = RelationshipQuery { + source: public_key_a.to_string(), + dest: public_key_b.to_string(), + }; + + sbot.friends_is_following(query) + .await + .map_err(|e| e.to_string()) +} + +pub async fn get_message_stream( + public_key: &str, +) -> impl futures::Stream> { + let mut sbot = Sbot::init(Keystore::Patchwork, None, None) + .await + .map_err(|e| e.to_string()) + .unwrap(); + + sbot.create_history_stream(public_key.to_string()) + .await + .unwrap() +} + +pub async fn get_root_posts( + history_stream: impl futures::Stream>, +) -> Vec { + let mut posts = Vec::new(); + + futures::pin_mut!(history_stream); + + while let Some(res) = history_stream.next().await { + match res { + Ok(msg) => { + if msg.value.is_message_type(SsbMessageContentType::Post) { + let content = msg.value.content.to_owned(); + if let Value::Object(map) = content { + if !map.contains_key("root") { + let text = map.get_key_value("text").unwrap(); + let timestamp_int = msg.value.timestamp.round() as i64 / 1000; + //let timestamp = Utc.timestamp(timestamp_int, 0); + let datetime = NaiveDateTime::from_timestamp(timestamp_int, 0); + //let datetime = timestamp.to_rfc2822(); + let date = datetime.format("%d %b %Y").to_string(); + posts.push(Post { + key: msg.key.to_owned(), + text: text.1.to_string(), + date, + sequence: msg.value.sequence, + read: false, + }) + } + } + } + } + Err(err) => { + // Print the `GolgiError` of this element to `stderr`. + warn!("err: {:?}", err); + } + } + } + + posts } diff --git a/src/task_loop.rs b/src/task_loop.rs new file mode 100644 index 0000000..3bfaaf3 --- /dev/null +++ b/src/task_loop.rs @@ -0,0 +1,36 @@ +use async_std::{channel::Receiver, task}; +use log::{debug, info, warn}; + +use crate::{db::Database, sbot}; + +pub enum Task { + Cancel, + FetchAll(String), +} + +pub async fn spawn(rx: Receiver, db: Database) { + task::spawn(async move { + while let Ok(task) = rx.recv().await { + match task { + // Fetch all messages authored by the given peer, filter + // the root posts and insert them into the peer tree of the + // database. + Task::FetchAll(peer) => { + let peer_msgs = sbot::get_message_stream(&peer).await; + let root_posts = sbot::get_root_posts(peer_msgs).await; + match db.insert_post_batch(&peer, root_posts) { + Ok(_) => debug!("inserted message batch into peer tree for {}", &peer), + Err(e) => warn!( + "failed to insert message batch into peer tree for {}: {}", + &peer, e + ), + } + } + Task::Cancel => { + info!("exiting task loop..."); + break; + } + } + } + }); +} diff --git a/static/icons/delete_post.png b/static/icons/delete_post.png new file mode 100644 index 0000000..572528d Binary files /dev/null and b/static/icons/delete_post.png differ diff --git a/static/icons/download.png b/static/icons/download.png new file mode 100644 index 0000000..6139c0e Binary files /dev/null and b/static/icons/download.png differ diff --git a/static/icons/icon_attributions b/static/icons/icon_attributions new file mode 100644 index 0000000..f990896 --- /dev/null +++ b/static/icons/icon_attributions @@ -0,0 +1 @@ +Download icons created by Kiranshastry - Flaticon \ No newline at end of file diff --git a/static/icons/read_post.png b/static/icons/read_post.png new file mode 100644 index 0000000..72b4f84 Binary files /dev/null and b/static/icons/read_post.png differ diff --git a/static/icons/unread_post.png b/static/icons/unread_post.png new file mode 100644 index 0000000..c2c8c73 Binary files /dev/null and b/static/icons/unread_post.png differ diff --git a/templates/home.html.tera b/templates/home.html.tera new file mode 100644 index 0000000..e7b320f --- /dev/null +++ b/templates/home.html.tera @@ -0,0 +1,131 @@ + + + + + lykin + + + + + + + +

lykin

+
+ +
+ +
+
+ {% if posts %} + + {% endif %} +
+
+ {% if post %} + {{ post.text }} + {% endif %} +
+
+ +