From 40142600b933263d8d940b541d5296f2f384e72d Mon Sep 17 00:00:00 2001 From: glyph Date: Tue, 30 Aug 2022 08:48:28 +0100 Subject: [PATCH 1/5] remove ignore list rules for installments --- .gitignore | 7 ------- Cargo.toml | 6 +++--- 2 files changed, 3 insertions(+), 10 deletions(-) diff --git a/.gitignore b/.gitignore index 6ed602a..eb5a316 100644 --- a/.gitignore +++ b/.gitignore @@ -1,8 +1 @@ -part_2_subscribe_form -part_3_database_follows -part_4_posts_streams -part_5_latest_posts -part_6_unread_delete -part_7_extension_ideas -notes target diff --git a/Cargo.toml b/Cargo.toml index dd40f7e..cf411f0 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -2,7 +2,7 @@ members = [ "part_1_sbot_rocket", - #"part_2_subscribe_form", - #"part_3_database_follows", - #"part_4_posts_streams" + "part_2_subscribe_form", + "part_3_database_follows", + "part_4_posts_streams" ] From 8a3daf1085ab92abd9f4fa4e685ba8961d9b64eb Mon Sep 17 00:00:00 2001 From: glyph Date: Mon, 5 Sep 2022 09:54:19 +0100 Subject: [PATCH 2/5] bring code up to date with samples in readme --- part_4_posts_streams/Cargo.toml | 4 ++ part_4_posts_streams/README.md | 21 +++++++ part_4_posts_streams/src/db.rs | 87 ++++++++++++++++++++++++- part_4_posts_streams/src/sbot.rs | 105 +++++++++++++++++++++++++++---- 4 files changed, 204 insertions(+), 13 deletions(-) diff --git a/part_4_posts_streams/Cargo.toml b/part_4_posts_streams/Cargo.toml index f2f7445..e913fb0 100644 --- a/part_4_posts_streams/Cargo.toml +++ b/part_4_posts_streams/Cargo.toml @@ -6,11 +6,15 @@ edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] +async-std = "1.10" bincode = "1.3" +chrono = "0.4" +futures = "0.3" golgi = { git = "https://git.coopcloud.tech/golgi-ssb/golgi.git" } log = "0.4" rocket = "0.5.0-rc.1" rocket_dyn_templates = { version = "0.1.0-rc.1", features = ["tera"] } serde = "1" +serde_json = "1" sled = "0.34" xdg = "2.4.1" diff --git a/part_4_posts_streams/README.md b/part_4_posts_streams/README.md index 74adaec..ada026e 100644 --- a/part_4_posts_streams/README.md +++ b/part_4_posts_streams/README.md @@ -17,6 +17,15 @@ Here's what we'll tackle in this fourth part of the series: - Add a post to the database - Add a post batch to the database +### Libraries + +The following libraries are introduced in this part: + + - [async-std](https://crates.io/crates/async-std) + - [chrono](https://crates.io/crates/chrono) + - [futures](https://crates.io/crates/futures) + - [serde_json](https://crates.io/crates/serde_json) + ### Create a Post Data Structure We'll begin by creating a `Post` struct to store data about each Scuttlebutt post we want to render in our application. The fields of our struct will diverge from the fields we expect in a Scuttlebutt post-type message. Open `src/db.rs` and add the following code (I've included code comments to further define each field): @@ -130,6 +139,16 @@ Now that we have the ability to obtain all messages authored by a specific peer, `src/sbot.rs` ```rust +use chrono::NaiveDateTime; +use golgi::{ + api::history_stream::CreateHistoryStream}, + messages::{SsbMessageContentType, SsbMessageKVT}, + sbot::GolgiError, +}; +use serde_json::value::Value; + +use crate::db::Post; + // Filter a stream of messages and return a vector of root posts. pub async fn get_root_posts( history_stream: impl futures::Stream>, @@ -279,6 +298,8 @@ On most occasions we'll find ourselves in a situation where we wish to add multi `src/db.rs` ```rust +use sled::Batch; + impl Database { // ... diff --git a/part_4_posts_streams/src/db.rs b/part_4_posts_streams/src/db.rs index f971a4e..c05a9d9 100644 --- a/part_4_posts_streams/src/db.rs +++ b/part_4_posts_streams/src/db.rs @@ -2,7 +2,7 @@ use std::path::Path; use log::{debug, info}; use serde::{Deserialize, Serialize}; -use sled::{Db, IVec, Result, Tree}; +use sled::{Batch, Db, IVec, Result, Tree}; /// Scuttlebutt peer data. #[derive(Debug, Deserialize, Serialize)] @@ -31,6 +31,49 @@ impl Peer { } } +/// The text and metadata of a Scuttlebutt root post. +#[derive(Debug, Deserialize, Serialize)] +pub struct Post { + /// The key of the post-type message, also known as a message reference. + pub key: String, + /// The text of the post (may be formatted as markdown). + pub text: String, + /// The date the post was published (e.g. 17 May 2021). + pub date: String, + /// The sequence number of the post-type message. + pub sequence: u64, + /// The read state of the post; true if read, false if unread. + pub read: bool, + /// The timestamp representing the date the post was published. + pub timestamp: i64, + /// The subject of the post, represented as the first 53 characters of + /// the post text. + pub subject: Option, +} + +impl Post { + // Create a new instance of the Post struct. A default value of `false` is + // set for `read`. + pub fn new( + key: String, + text: String, + date: String, + sequence: u64, + timestamp: i64, + subject: Option, + ) -> Post { + Post { + key, + text, + date, + sequence, + timestamp, + subject, + read: false, + } + } +} + /// An instance of the key-value database and relevant trees. #[allow(dead_code)] #[derive(Clone)] @@ -40,6 +83,9 @@ pub struct Database { /// A database tree containing Peer struct instances for all the peers /// we are subscribed to. peer_tree: Tree, + /// A database tree containing Post struct instances for all of the posts + /// we have downloaded from the peer to whom we subscribe. + pub post_tree: Tree, } impl Database { @@ -55,8 +101,16 @@ impl Database { let peer_tree = db .open_tree("peers") .expect("Failed to open 'peers' database tree"); + debug!("Opening 'posts' database tree"); + let post_tree = db + .open_tree("posts") + .expect("Failed to open 'posts' database tree"); - Database { db, peer_tree } + Database { + db, + peer_tree, + post_tree, + } } /// Add a peer to the database by inserting the public key into the peer @@ -78,4 +132,33 @@ impl Database { debug!("Removing peer {} from 'peers' database tree", &public_key); self.peer_tree.remove(&public_key).map(|_| ()) } + + /// Add a post to the database by inserting an instance of the Post struct + /// into the post tree. + pub fn add_post(&self, public_key: &str, post: Post) -> Result> { + let post_key = format!("{}_{}", public_key, post.key); + debug!("Serializing post data for {} to bincode", &post_key); + let post_bytes = bincode::serialize(&post).unwrap(); + + debug!("Inserting post {} into 'posts' database tree", &post_key); + self.post_tree.insert(post_key.as_bytes(), post_bytes) + } + + /// Add a batch of posts to the database by inserting a vector of instances + /// of the Post struct into the post tree. + pub fn add_post_batch(&self, public_key: &str, posts: Vec) -> Result<()> { + let mut post_batch = Batch::default(); + + for post in posts { + let post_key = format!("{}_{}", public_key, post.key); + debug!("Serializing post data for {} to bincode", &post_key); + let post_bytes = bincode::serialize(&post).unwrap(); + + debug!("Inserting post {} into 'posts' database tree", &post_key); + post_batch.insert(post_key.as_bytes(), post_bytes) + } + + debug!("Applying batch insertion into 'posts' database tree"); + self.post_tree.apply_batch(post_batch) + } } diff --git a/part_4_posts_streams/src/sbot.rs b/part_4_posts_streams/src/sbot.rs index 32300be..57c7e10 100644 --- a/part_4_posts_streams/src/sbot.rs +++ b/part_4_posts_streams/src/sbot.rs @@ -1,7 +1,17 @@ use std::env; -use golgi::{api::friends::RelationshipQuery, sbot::Keystore, Sbot}; +use async_std::stream::StreamExt; +use chrono::NaiveDateTime; +use golgi::{ + api::{friends::RelationshipQuery, history_stream::CreateHistoryStream}, + messages::{SsbMessageContentType, SsbMessageKVT}, + sbot::Keystore, + GolgiError, Sbot, +}; use log::{info, warn}; +use serde_json::value::Value; + +use crate::db::Post; /// Initialise a connection to a Scuttlebutt server. pub async fn init_sbot() -> Result { @@ -53,16 +63,6 @@ pub async fn unfollow_peer(public_key: &str) -> Result { sbot.unfollow(public_key).await.map_err(|e| e.to_string()) } -/// Return the name (self-identifier) for the peer associated with the given -/// public key. -/// -/// The public key of the peer will be returned if a name is not found. -pub async fn get_name(public_key: &str) -> Result { - let mut sbot = init_sbot().await?; - - sbot.get_name(public_key).await.map_err(|e| e.to_string()) -} - /// Check the follow status of a remote peer and follow them if not already /// following. pub async fn follow_if_not_following(remote_peer: &str) -> Result<(), String> { @@ -128,3 +128,86 @@ pub async fn unfollow_if_following(remote_peer: &str) -> Result<(), String> { Err(err_msg) } } + +/// Return a stream of messages authored by the given public key. +/// +/// This returns all messages regardless of type. +pub async fn get_message_stream( + public_key: &str, + sequence_number: u64, +) -> impl futures::Stream> { + let mut sbot = init_sbot().await.unwrap(); + + let history_stream_args = CreateHistoryStream::new(public_key.to_string()) + .keys_values(true, true) + .after_seq(sequence_number); + + sbot.create_history_stream(history_stream_args) + .await + .unwrap() +} + +/// Return the name (self-identifier) for the peer associated with the given +/// public key. +/// +/// The public key of the peer will be returned if a name is not found. +pub async fn get_name(public_key: &str) -> Result { + let mut sbot = init_sbot().await?; + + sbot.get_name(public_key).await.map_err(|e| e.to_string()) +} + +/// Filter a stream of messages and return a vector of root posts. +/// +/// Each returned vector element includes the key of the post, the content +/// text, the date the post was published, the sequence number of the post +/// and whether it is read or unread. +pub async fn get_root_posts( + history_stream: impl futures::Stream>, +) -> (u64, Vec) { + let mut latest_sequence = 0; + let mut posts = Vec::new(); + + futures::pin_mut!(history_stream); + + while let Some(res) = history_stream.next().await { + match res { + Ok(msg) => { + if msg.value.is_message_type(SsbMessageContentType::Post) { + let content = msg.value.content.to_owned(); + if let Value::Object(content_map) = content { + if !content_map.contains_key("root") { + latest_sequence = msg.value.sequence; + + let text = match content_map.get_key_value("text") { + Some(value) => value.1.to_string(), + None => String::from(""), + }; + let timestamp = msg.value.timestamp.round() as i64 / 1000; + let datetime = NaiveDateTime::from_timestamp(timestamp, 0); + let date = datetime.format("%d %b %Y").to_string(); + let subject = text.get(0..52).map(|s| s.to_string()); + + let post = Post::new( + msg.key.to_owned(), + text, + date, + msg.value.sequence, + timestamp, + subject, + ); + + posts.push(post) + } + } + } + } + Err(err) => { + // Print the `GolgiError` of this element to `stderr`. + warn!("err: {:?}", err); + } + } + } + + (latest_sequence, posts) +} From 63fb07322f465c90e8b3916004051bec4b98a49e Mon Sep 17 00:00:00 2001 From: glyph Date: Mon, 5 Sep 2022 09:54:40 +0100 Subject: [PATCH 3/5] add part 5 draft progress --- part_5_task_loop/Cargo.toml | 17 ++ part_5_task_loop/README.md | 179 ++++++++++++++++++++++ part_5_task_loop/src/db.rs | 81 ++++++++++ part_5_task_loop/src/main.rs | 42 +++++ part_5_task_loop/src/routes.rs | 141 +++++++++++++++++ part_5_task_loop/src/sbot.rs | 130 ++++++++++++++++ part_5_task_loop/src/task_loop.rs | 35 +++++ part_5_task_loop/src/utils.rs | 32 ++++ part_5_task_loop/templates/base.html.tera | 21 +++ 9 files changed, 678 insertions(+) create mode 100644 part_5_task_loop/Cargo.toml create mode 100644 part_5_task_loop/README.md create mode 100644 part_5_task_loop/src/db.rs create mode 100644 part_5_task_loop/src/main.rs create mode 100644 part_5_task_loop/src/routes.rs create mode 100644 part_5_task_loop/src/sbot.rs create mode 100644 part_5_task_loop/src/task_loop.rs create mode 100644 part_5_task_loop/src/utils.rs create mode 100644 part_5_task_loop/templates/base.html.tera diff --git a/part_5_task_loop/Cargo.toml b/part_5_task_loop/Cargo.toml new file mode 100644 index 0000000..6316daf --- /dev/null +++ b/part_5_task_loop/Cargo.toml @@ -0,0 +1,17 @@ +[package] +name = "part_5_task_loop" +version = "0.1.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +async-std = "1.10" +bincode = "1.3" +golgi = { git = "https://git.coopcloud.tech/golgi-ssb/golgi.git" } +log = "0.4" +rocket = "0.5.0-rc.1" +rocket_dyn_templates = { version = "0.1.0-rc.1", features = ["tera"] } +serde = "1" +sled = "0.34" +xdg = "2.4.1" diff --git a/part_5_task_loop/README.md b/part_5_task_loop/README.md new file mode 100644 index 0000000..136cc6a --- /dev/null +++ b/part_5_task_loop/README.md @@ -0,0 +1,179 @@ +# lykin tutorial + +## Part 5: Task Loop and Post Fetching + +### Introduction + +In the last installment we added support to our key-value database for dealing with Scuttlebutt posts and wrote code to create and filter streams of Scuttlebutt messages. Since our peers may have authored tens of thousands of messages, it's useful to create a way of fetching and filtering message streams as a background process. Today we'll do just that; writing a task loop that we can be invoked from our web application route handlers and used to execute potentially long-running processes. + +### Outline + +Here's what we'll tackle in this fifth part of the series: + + - Create an asynchronous task loop + - Create a message passing channel and spawn the task loop + - Write sbot-related task functions + - Fetch root posts on subscription + +### Libraries + +The following libraries are introduced in this part: + + - [`async-std`](https://crates.io/crates/async-std) + +### Create an Asynchronous Task Loop + +Let's start by defining a task type that enumerates the various tasks we might want to carry out. We'll create a separate module for our task loop: + +`src/task_loop.rs` + +```rust +pub enum Task { + Cancel, + FetchAllPosts(String), + FetchLatestName(String), +} +``` + +The `Task` enum is simple enough: we can fetch all the posts by a given peer (the `String` value is the public key of the peer we're interested in), fetch the latest name assigned to a peer or cancel the task loop. + +We're going to use a message passing approach in order to trigger tasks inside the loop. Let's write the basic loop code now, adding it below the `Task` we just defined, while also adding the necessary crate imports: + +```rust +use async_std::{channel::Receiver, task}; +use log::info; + +// Spawn an asynchronous loop which receives tasks over an unbounded channel +// and invokes task functions accordingly. +pub async fn spawn(rx: Receiver) { + task::spawn(async move { + while let Ok(task) = rx.recv().await { + match task { + // Fetch all messages authored by the given peer, filter + // the root posts and insert them into the posts tree of the + // database. + Task::FetchAllPosts(peer_id) => { + info!("Fetching all posts for peer: {}", peer_id); + } + // Fetch the latest name for the given peer and update the + // peer entry in the peers tree of the database. + Task::FetchLatestName(peer_id) => { + info!("Fetching latest name for peer: {}", peer_id); + } + // Break out of the task loop. + Task::Cancel => { + info!("Exiting task loop..."); + break; + } + } + } + }); +} +``` + +The loop spawning function is fairly simple: it takes the receiver half of a channel and expects messages of type `Task` to be delivered; it matches on the `Task` variant each time a message is received on the channel and acts accordingly. Writing an async loop like this means that we can call functions without blocking the execution of the rest of our program. This is a particularly useful in route handlers where we want to be able to trigger a task and then immediately respond to the request in order to keep the UI snappy and responsive. + +### Create Message Passing Channel and Spawn the Task Loop + +Let's return to the root of our application to create the message passing channel, spawn the task loop and add the channel transmitter to managed state: + +`src/main.rs` + +```rust +mod task_loop; + +use async_std::channel; +use log::info; +use rocket::fairing::AdHoc; + +use crate::task_loop::Task; + +#[launch] async fn rocket() -> _ { + // ... + + // Create the key-value database. + // ... + + // Create a message passing channel. + let (tx, rx) = channel::unbounded(); + let tx_clone = tx.clone(); + + // Spawn the task loop, passing in the receiver half of the channel. + info!("Spawning task loop"); + task_loop::spawn(rx).await; + + rocket::build() + .manage(db) + // Add the transmitter half of the channel to the managed state + // of the Rocket application. + .manage(tx) + // ... + // Send a task loop cancellation message when the application + // is shutting down. + .attach(AdHoc::on_shutdown("cancel task loop", |_| { + Box::pin(async move { + tx_clone.send(Task::Cancel).await.unwrap(); + }) + })) +} +``` + +Reviewing the code above: first an unbounded, asynchronous channel is created and split into transmitting (`tx`) and receiving (`rx`) ends, after which the transmitting channel is cloned. The task loop is then spawned and takes with it the receiving end of the channel. As we did previously with the `db` instance, the transmitting half of the channel is added to the managed state of the Rocket application; this will allow us to transmit tasks to the task loop from our web route handlers. And finaly, a shutdown handler is attached to the Rocket application in order to send a cancellation task to the task loop before the program ends. This ensures that the task loop closes cleanly. + +### Write Sbot-Related Task Functions + +Now it's time to write the functions that will be executed when the `FetchAllPosts` and `FetchLatestName` tasks are invoked. These functions will be responsible for retrieving data from the sbot and updating the database with the latest values: + +`src/task_loop.rs` + +```rust +async fn fetch_posts_and_update_db(db: &Database, peer_id: String, after_sequence: u64) { + let peer_msgs = sbot::get_message_stream(&peer_id, after_sequence).await; + let (latest_sequence, root_posts) = sbot::get_root_posts(peer_msgs).await; + + match db.add_post_batch(&peer_id, root_posts) { + Ok(_) => { + info!( + "Inserted batch of posts into database post tree for peer: {}", + &peer_id + ) + } + Err(e) => warn!( + "Failed to insert batch of posts into database post tree for peer: {}: {}", + &peer_id, e + ), + } + + // Update the value of the latest sequence number for + // the peer (this is stored in the database). + if let Ok(Some(peer)) = db.get_peer(&peer_id) { + db.add_peer(peer.set_latest_sequence(latest_sequence)) + .unwrap(); + } +} + +/// Request the name of the peer represented by the given public key (ID) +/// and update the existing entry in the database. +async fn fetch_name_and_update_db(db: &Database, peer_id: String) { + match sbot::get_name(&peer_id).await { + Ok(name) => { + if let Ok(Some(peer)) = db.get_peer(&peer_id) { + let updated_peer = peer.set_name(&name); + match db.add_peer(updated_peer) { + Ok(_) => info!("Updated name for peer: {}", &peer_id), + Err(e) => { + warn!("Failed to update name for peer: {}: {}", &peer_id, e) + } + } + } + } + Err(e) => warn!("Failed to fetch name for {}: {}", &peer_id, e), + } +} +``` + +### Fetch Root Posts on Subscription + +### Conclusion + +## Funding diff --git a/part_5_task_loop/src/db.rs b/part_5_task_loop/src/db.rs new file mode 100644 index 0000000..f971a4e --- /dev/null +++ b/part_5_task_loop/src/db.rs @@ -0,0 +1,81 @@ +use std::path::Path; + +use log::{debug, info}; +use serde::{Deserialize, Serialize}; +use sled::{Db, IVec, Result, Tree}; + +/// Scuttlebutt peer data. +#[derive(Debug, Deserialize, Serialize)] +pub struct Peer { + pub public_key: String, + pub name: String, +} + +impl Peer { + /// Create a new instance of the Peer struct using the given public + /// key. A default value is set for name. + pub fn new(public_key: &str) -> Peer { + Peer { + public_key: public_key.to_string(), + name: "".to_string(), + } + } + + /// Modify the name field of an instance of the Peer struct, leaving + /// the other values unchanged. + pub fn set_name(self, name: &str) -> Peer { + Self { + name: name.to_string(), + ..self + } + } +} + +/// An instance of the key-value database and relevant trees. +#[allow(dead_code)] +#[derive(Clone)] +pub struct Database { + /// The sled database instance. + db: Db, + /// A database tree containing Peer struct instances for all the peers + /// we are subscribed to. + peer_tree: Tree, +} + +impl Database { + /// Initialise the database by opening the database file, loading the + /// peers tree and returning an instantiated Database struct. + pub fn init(path: &Path) -> Self { + // Open the database at the given path. + // The database will be created if it does not yet exist. + // This code will panic if an IO error is encountered. + info!("Initialising sled database"); + let db = sled::open(path).expect("Failed to open database"); + debug!("Opening 'peers' database tree"); + let peer_tree = db + .open_tree("peers") + .expect("Failed to open 'peers' database tree"); + + Database { db, peer_tree } + } + + /// Add a peer to the database by inserting the public key into the peer + /// tree. + pub fn add_peer(&self, peer: Peer) -> Result> { + debug!("Serializing peer data for {} to bincode", &peer.public_key); + let peer_bytes = bincode::serialize(&peer).unwrap(); + + debug!( + "Inserting peer {} into 'peers' database tree", + &peer.public_key + ); + self.peer_tree.insert(&peer.public_key, peer_bytes) + } + + /// Remove a peer from the database, as represented by the given public + /// key. + pub fn remove_peer(&self, public_key: &str) -> Result<()> { + debug!("Removing peer {} from 'peers' database tree", &public_key); + self.peer_tree.remove(&public_key).map(|_| ()) + } +} diff --git a/part_5_task_loop/src/main.rs b/part_5_task_loop/src/main.rs new file mode 100644 index 0000000..40bb0e8 --- /dev/null +++ b/part_5_task_loop/src/main.rs @@ -0,0 +1,42 @@ +mod db; +mod routes; +mod sbot; +mod task_loop; +mod utils; + +use async_std::channel; +use log::info; +use rocket::{fairing::AdHoc, launch, routes}; +use rocket_dyn_templates::Template; +use xdg::BaseDirectories; + +use crate::{db::Database, routes::*, task_loop::Task}; + +#[launch] +async fn rocket() -> _ { + // Create the key-value database. + let xdg_dirs = BaseDirectories::with_prefix("lykin").unwrap(); + let db_path = xdg_dirs + .place_config_file("database") + .expect("cannot create database directory"); + let db = Database::init(&db_path); + + // Create a message passing channel. + let (tx, rx) = channel::unbounded(); + let tx_clone = tx.clone(); + + // Spawn the task loop, passing in the receiver half of the channel. + info!("Spawning task loop"); + task_loop::spawn(rx).await; + + rocket::build() + .manage(db) + .manage(tx) + .attach(Template::fairing()) + .mount("/", routes![home, subscribe_form, unsubscribe_form]) + .attach(AdHoc::on_shutdown("cancel task loop", |_| { + Box::pin(async move { + tx_clone.send(Task::Cancel).await.unwrap(); + }) + })) +} diff --git a/part_5_task_loop/src/routes.rs b/part_5_task_loop/src/routes.rs new file mode 100644 index 0000000..02de398 --- /dev/null +++ b/part_5_task_loop/src/routes.rs @@ -0,0 +1,141 @@ +use log::{info, warn}; +use rocket::{ + form::Form, + get, post, + request::FlashMessage, + response::{Flash, Redirect}, + uri, FromForm, State, +}; +use rocket_dyn_templates::{context, Template}; + +use crate::{ + db::{Database, Peer}, + sbot, utils, +}; + +#[derive(FromForm)] +pub struct PeerForm { + pub public_key: String, +} + +#[get("/")] +pub async fn home(flash: Option>) -> Template { + let whoami = match sbot::whoami().await { + Ok(id) => id, + Err(e) => format!("Error making `whoami` RPC call: {}. Please ensure the local go-sbot is running and refresh.", e), + }; + + Template::render("base", context! { whoami: whoami, flash: flash }) +} + +#[post("/subscribe", data = "")] +pub async fn subscribe_form( + db: &State, + peer: Form, +) -> Result> { + if let Err(e) = utils::validate_public_key(&peer.public_key) { + let validation_err_msg = format!("Public key {} is invalid: {}", &peer.public_key, e); + warn!("{}", validation_err_msg); + return Err(Flash::error(Redirect::to(uri!(home)), validation_err_msg)); + } else { + info!("Public key {} is valid", &peer.public_key); + // Retrieve the name of the peer to which we are subscribing. + let peer_name = match sbot::get_name(&peer.public_key).await { + Ok(name) => name, + Err(e) => { + warn!("Failed to fetch name for peer {}: {}", &peer.public_key, e); + // Return an empty string if an error occurs. + String::from("") + } + }; + let peer_info = Peer::new(&peer.public_key).set_name(&peer_name); + + match sbot::follow_if_not_following(&peer.public_key).await { + Ok(_) => { + // Add the peer to the database. + if db.add_peer(peer_info).is_ok() { + info!("Added {} to 'peers' database tree", &peer.public_key); + } else { + let err_msg = format!( + "Failed to add peer {} to 'peers' database tree", + &peer.public_key + ); + warn!("{}", err_msg); + return Err(Flash::error(Redirect::to(uri!(home)), err_msg)); + } + } + Err(e) => { + warn!("{}", e); + return Err(Flash::error(Redirect::to(uri!(home)), e)); + } + } + } + + Ok(Redirect::to(uri!(home))) +} + +#[post("/unsubscribe", data = "")] +pub async fn unsubscribe_form( + db: &State, + peer: Form, +) -> Result> { + if let Err(e) = utils::validate_public_key(&peer.public_key) { + let validation_err_msg = format!("Public key {} is invalid: {}", &peer.public_key, e); + warn!("{}", validation_err_msg); + return Err(Flash::error(Redirect::to(uri!(home)), validation_err_msg)); + } else { + info!("Public key {} is valid", &peer.public_key); + match sbot::unfollow_if_following(&peer.public_key).await { + Ok(_) => { + // Remove the peer from the database. + if db.remove_peer(&peer.public_key).is_ok() { + info!( + "Removed peer {} from 'peers' database tree", + &peer.public_key + ); + } else { + warn!( + "Failed to remove peer {} from 'peers' database tree", + &peer.public_key + ); + } + } + Err(e) => { + warn!("{}", e); + return Err(Flash::error(Redirect::to(uri!(home)), e)); + } + } + } + + Ok(Redirect::to(uri!(home))) +} + +/* +#[post("/subscribe", data = "")] +pub async fn subscribe_form(peer: Form) -> Result> { + if let Err(e) = utils::validate_public_key(&peer.public_key) { + let validation_err_msg = format!("Public key {} is invalid: {}", &peer.public_key, e); + warn!("{}", validation_err_msg); + return Err(Flash::error(Redirect::to(uri!(home)), validation_err_msg)); + } else { + info!("Public key {} is valid", &peer.public_key); + sbot::follow_if_not_following(&peer.public_key).await; + } + + Ok(Redirect::to(uri!(home))) +} + +#[post("/unsubscribe", data = "")] +pub async fn unsubscribe_form(peer: Form) -> Result> { + if let Err(e) = utils::validate_public_key(&peer.public_key) { + let validation_err_msg = format!("Public key {} is invalid: {}", &peer.public_key, e); + warn!("{}", validation_err_msg); + return Err(Flash::error(Redirect::to(uri!(home)), validation_err_msg)); + } else { + info!("Public key {} is valid", &peer.public_key); + sbot::unfollow_if_following(&peer.public_key).await; + } + + Ok(Redirect::to(uri!(home))) +} +*/ diff --git a/part_5_task_loop/src/sbot.rs b/part_5_task_loop/src/sbot.rs new file mode 100644 index 0000000..32300be --- /dev/null +++ b/part_5_task_loop/src/sbot.rs @@ -0,0 +1,130 @@ +use std::env; + +use golgi::{api::friends::RelationshipQuery, sbot::Keystore, Sbot}; +use log::{info, warn}; + +/// Initialise a connection to a Scuttlebutt server. +pub async fn init_sbot() -> Result { + let go_sbot_port = env::var("GO_SBOT_PORT").unwrap_or_else(|_| "8021".to_string()); + + let keystore = Keystore::GoSbot; + let ip_port = Some(format!("127.0.0.1:{}", go_sbot_port)); + let net_id = None; + + Sbot::init(keystore, ip_port, net_id) + .await + .map_err(|e| e.to_string()) +} + +/// Return the public key of the local sbot instance. +pub async fn whoami() -> Result { + let mut sbot = init_sbot().await?; + + sbot.whoami().await.map_err(|e| e.to_string()) +} + +/// Check follow status. +/// +/// Is peer A (`public_key_a`) following peer B (`public_key_b`)? +pub async fn is_following(public_key_a: &str, public_key_b: &str) -> Result { + let mut sbot = init_sbot().await?; + + let query = RelationshipQuery { + source: public_key_a.to_string(), + dest: public_key_b.to_string(), + }; + + sbot.friends_is_following(query) + .await + .map_err(|e| e.to_string()) +} + +/// Follow a peer. +pub async fn follow_peer(public_key: &str) -> Result { + let mut sbot = init_sbot().await?; + + sbot.follow(public_key).await.map_err(|e| e.to_string()) +} + +/// Unfollow a peer. +pub async fn unfollow_peer(public_key: &str) -> Result { + let mut sbot = init_sbot().await?; + + sbot.unfollow(public_key).await.map_err(|e| e.to_string()) +} + +/// Return the name (self-identifier) for the peer associated with the given +/// public key. +/// +/// The public key of the peer will be returned if a name is not found. +pub async fn get_name(public_key: &str) -> Result { + let mut sbot = init_sbot().await?; + + sbot.get_name(public_key).await.map_err(|e| e.to_string()) +} + +/// Check the follow status of a remote peer and follow them if not already +/// following. +pub async fn follow_if_not_following(remote_peer: &str) -> Result<(), String> { + if let Ok(whoami) = whoami().await { + match is_following(&whoami, remote_peer).await { + Ok(status) if status.as_str() == "false" => match follow_peer(remote_peer).await { + Ok(_) => { + info!("Followed peer {}", &remote_peer); + Ok(()) + } + Err(e) => { + let err_msg = format!("Failed to follow peer {}: {}", &remote_peer, e); + warn!("{}", err_msg); + Err(err_msg) + } + }, + Ok(status) if status.as_str() == "true" => { + info!( + "Already following peer {}. No further action taken", + &remote_peer + ); + Ok(()) + } + _ => Err( + "Failed to determine follow status: received unrecognised response from local sbot" + .to_string(), + ), + } + } else { + let err_msg = String::from("Received an error during `whoami` RPC call. Please ensure the go-sbot is running and try again"); + warn!("{}", err_msg); + Err(err_msg) + } +} + +/// Check the follow status of a remote peer and unfollow them if already +/// following. +pub async fn unfollow_if_following(remote_peer: &str) -> Result<(), String> { + if let Ok(whoami) = whoami().await { + match is_following(&whoami, remote_peer).await { + Ok(status) if status.as_str() == "true" => { + info!("Unfollowing peer {}", &remote_peer); + match unfollow_peer(remote_peer).await { + Ok(_) => { + info!("Unfollowed peer {}", &remote_peer); + Ok(()) + } + Err(e) => { + let err_msg = format!("Failed to unfollow peer {}: {}", &remote_peer, e); + warn!("{}", err_msg); + Err(err_msg) + } + } + } + _ => Err( + "Failed to determine follow status: received unrecognised response from local sbot" + .to_string(), + ), + } + } else { + let err_msg = String::from("Received an error during `whoami` RPC call. Please ensure the go-sbot is running and try again"); + warn!("{}", err_msg); + Err(err_msg) + } +} diff --git a/part_5_task_loop/src/task_loop.rs b/part_5_task_loop/src/task_loop.rs new file mode 100644 index 0000000..ccb68ca --- /dev/null +++ b/part_5_task_loop/src/task_loop.rs @@ -0,0 +1,35 @@ +use async_std::{channel::Receiver, task}; +use log::info; + +pub enum Task { + Cancel, + FetchAllPosts(String), + FetchLatestName(String), +} + +/// Spawn an asynchronous loop which receives tasks over an unbounded channel +/// and invokes task functions accordingly. +pub async fn spawn(rx: Receiver) { + task::spawn(async move { + while let Ok(task) = rx.recv().await { + match task { + // Fetch all messages authored by the given peer, filter + // the root posts and insert them into the posts tree of the + // database. + Task::FetchAllPosts(peer_id) => { + info!("Fetching all posts for peer: {}", peer_id); + } + // Fetch the latest name for the given peer and update the + // peer entry in the peers tree of the database. + Task::FetchLatestName(peer_id) => { + info!("Fetching latest name for peer: {}", peer_id); + } + // Break out of the task loop. + Task::Cancel => { + info!("Exiting task loop..."); + break; + } + } + } + }); +} diff --git a/part_5_task_loop/src/utils.rs b/part_5_task_loop/src/utils.rs new file mode 100644 index 0000000..8f2cccb --- /dev/null +++ b/part_5_task_loop/src/utils.rs @@ -0,0 +1,32 @@ +//! Public key validation. + +/// Ensure that the given public key is a valid ed25519 key. +/// +/// Return an error string if the key is invalid. +pub fn validate_public_key(public_key: &str) -> Result<(), String> { + // Ensure the ID starts with the correct sigil link. + if !public_key.starts_with('@') { + return Err("expected '@' sigil as first character".to_string()); + } + + // Find the dot index denoting the start of the algorithm definition tag. + let dot_index = match public_key.rfind('.') { + Some(index) => index, + None => return Err("no dot index was found".to_string()), + }; + + // Check the hashing algorithm (must end with ".ed25519"). + if !&public_key.ends_with(".ed25519") { + return Err("hashing algorithm must be ed25519".to_string()); + } + + // Obtain the base64 portion (substring) of the public key. + let base64_str = &public_key[1..dot_index]; + + // Ensure the length of the base64 encoded ed25519 public key is correct. + if base64_str.len() != 44 { + return Err("base64 data length is incorrect".to_string()); + } + + Ok(()) +} diff --git a/part_5_task_loop/templates/base.html.tera b/part_5_task_loop/templates/base.html.tera new file mode 100644 index 0000000..1eddb15 --- /dev/null +++ b/part_5_task_loop/templates/base.html.tera @@ -0,0 +1,21 @@ + + + + + lykin + + + +

lykin

+

{{ whoami }}

+
+ + + + +
+ {% if flash and flash.kind == "error" %} +

[ {{ flash.message }} ]

+ {% endif %} + + From 198ecafb78f9c48c94a0ab818210f8479ec9cf0b Mon Sep 17 00:00:00 2001 From: glyph Date: Mon, 5 Sep 2022 09:55:02 +0100 Subject: [PATCH 4/5] update tutorial list in readme; update manifest --- Cargo.lock | 762 ++++++++++++++++++++++++++++++++++++++++++++++++++++- Cargo.toml | 3 +- README.md | 6 +- 3 files changed, 760 insertions(+), 11 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index eb5ccb3..8e80916 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -46,6 +46,15 @@ dependencies = [ "memchr", ] +[[package]] +name = "android_system_properties" +version = "0.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d7ed72e1635e121ca3e79420540282af22da58be50de153d36f81ddc6b83aa9e" +dependencies = [ + "libc", +] + [[package]] name = "ansi_term" version = "0.12.1" @@ -288,6 +297,15 @@ version = "0.1.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "383d29d513d8764dcdc42ea295d979eb99c3c9f00607b3692cf68a431f7dca72" +[[package]] +name = "bincode" +version = "1.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b1f45e9417d87227c7a56d22e471c6206462cba514c7590c09aff4cf6d1ddcad" +dependencies = [ + "serde", +] + [[package]] name = "bitflags" version = "1.3.2" @@ -317,12 +335,27 @@ dependencies = [ "once_cell", ] +[[package]] +name = "bstr" +version = "0.2.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ba3569f383e8f1598449f1a423e72e99569137b47740b1da11ef19af3d5c3223" +dependencies = [ + "memchr", +] + [[package]] name = "bumpalo" version = "3.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "37ccbd214614c6783386c1af30caf03192f17891059cecc394b4fb119e363de3" +[[package]] +name = "byteorder" +version = "1.4.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "14c189c53d098945499cdfa7ecc63567cf3886b3332b312a5b4585d8d3a6a610" + [[package]] name = "bytes" version = "1.2.1" @@ -359,6 +392,43 @@ version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" +[[package]] +name = "chrono" +version = "0.4.22" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bfd4d1b31faaa3a89d7934dbded3111da0d2ef28e3ebccdb4f0179f5929d1ef1" +dependencies = [ + "iana-time-zone", + "js-sys", + "num-integer", + "num-traits", + "time 0.1.44", + "wasm-bindgen", + "winapi 0.3.9", +] + +[[package]] +name = "chrono-tz" +version = "0.6.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "29c39203181991a7dd4343b8005bd804e7a9a37afb8ac070e43771e8c820bbde" +dependencies = [ + "chrono", + "chrono-tz-build", + "phf", +] + +[[package]] +name = "chrono-tz-build" +version = "0.0.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6f509c3a87b33437b05e2458750a0700e5bdd6956176773e6c7d6dd15a283a0c" +dependencies = [ + "parse-zoneinfo", + "phf", + "phf_codegen", +] + [[package]] name = "cipher" version = "0.3.0" @@ -391,10 +461,16 @@ dependencies = [ "rand", "sha2", "subtle", - "time", + "time 0.3.13", "version_check", ] +[[package]] +name = "core-foundation-sys" +version = "0.8.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5827cebf4670468b8772dd191856768aedcb1b0278a04f989f7766351917b9dc" + [[package]] name = "cpufeatures" version = "0.2.2" @@ -404,6 +480,29 @@ dependencies = [ "libc", ] +[[package]] +name = "crc32fast" +version = "1.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b540bd8bc810d3885c6ea91e2018302f68baba2129ab3e88f32389ee9370880d" +dependencies = [ + "cfg-if 1.0.0", +] + +[[package]] +name = "crossbeam-epoch" +version = "0.9.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "045ebe27666471bb549370b4b0b3e51b07f56325befa4284db65fc89c02511b1" +dependencies = [ + "autocfg", + "cfg-if 1.0.0", + "crossbeam-utils", + "memoffset", + "once_cell", + "scopeguard", +] + [[package]] name = "crossbeam-utils" version = "0.8.11" @@ -443,6 +542,12 @@ dependencies = [ "cipher", ] +[[package]] +name = "deunicode" +version = "0.4.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "850878694b7933ca4c9569d30a34b55031b9b139ee1fc7b94a527c4ef960d690" + [[package]] name = "devise" version = "0.3.1" @@ -497,6 +602,15 @@ dependencies = [ "dirs-sys", ] +[[package]] +name = "dirs" +version = "4.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ca3aa72a6f96ea37bbc5aa912f6788242832f75369bdfdadcb0e38423f100059" +dependencies = [ + "dirs-sys", +] + [[package]] name = "dirs-sys" version = "0.3.7" @@ -552,12 +666,69 @@ dependencies = [ "version_check", ] +[[package]] +name = "filetime" +version = "0.2.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e94a7bbaa59354bc20dd75b67f23e2797b4490e9d6928203fb105c79e448c86c" +dependencies = [ + "cfg-if 1.0.0", + "libc", + "redox_syscall", + "windows-sys", +] + [[package]] name = "fnv" version = "1.0.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1" +[[package]] +name = "fs2" +version = "0.4.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9564fc758e15025b46aa6643b1b77d047d1a56a1aea6e01002ac0c7026876213" +dependencies = [ + "libc", + "winapi 0.3.9", +] + +[[package]] +name = "fsevent" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5ab7d1bd1bd33cc98b0889831b72da23c0aa4df9cec7e0702f46ecea04b35db6" +dependencies = [ + "bitflags", + "fsevent-sys", +] + +[[package]] +name = "fsevent-sys" +version = "2.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f41b048a94555da0f42f1d632e2e19510084fb8e303b0daa2816e733fb3644a0" +dependencies = [ + "libc", +] + +[[package]] +name = "fuchsia-zircon" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2e9763c69ebaae630ba35f74888db465e49e259ba1bc0eda7d06f4a067615d82" +dependencies = [ + "bitflags", + "fuchsia-zircon-sys", +] + +[[package]] +name = "fuchsia-zircon-sys" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3dcaa9ae7725d12cdb85b3ad99a434db70b468c09ded17e012d86b5c1010f7a7" + [[package]] name = "futures" version = "0.3.21" @@ -662,6 +833,15 @@ dependencies = [ "slab", ] +[[package]] +name = "fxhash" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c31b6d751ae2c7f11320402d34e41349dd1016f8d5d45e48c4312bc8625af50c" +dependencies = [ + "byteorder", +] + [[package]] name = "gcc" version = "0.3.55" @@ -721,7 +901,7 @@ checksum = "4eb1a864a501629691edf6c15a593b7a51eebaa1e8468e9ddc623de7c9b58ec6" dependencies = [ "cfg-if 1.0.0", "libc", - "wasi", + "wasi 0.11.0+wasi-snapshot-preview1", ] [[package]] @@ -740,6 +920,30 @@ version = "0.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9b919933a397b79c37e33b77bb2aa3dc8eb6e165ad809e58ff75bc7db2e34574" +[[package]] +name = "globset" +version = "0.4.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0a1e17342619edbc21a964c2afbeb6c820c6a2560032872f397bb97ea127bd0a" +dependencies = [ + "aho-corasick", + "bstr", + "fnv", + "log", + "regex", +] + +[[package]] +name = "globwalk" +version = "0.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "93e3af942408868f6934a7b85134a3230832b9977cf66125df2f9edcfce4ddcc" +dependencies = [ + "bitflags", + "ignore", + "walkdir", +] + [[package]] name = "gloo-timers" version = "0.2.4" @@ -863,6 +1067,12 @@ version = "1.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c4a1e36c821dbe04574f602848a19f742f4fb3c98d40449f11bcad18d6b17421" +[[package]] +name = "humansize" +version = "1.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "02296996cb8796d7c6e3bc2d9211b7802812d36999a51bb754123ead7d37d026" + [[package]] name = "hyper" version = "0.14.20" @@ -887,6 +1097,37 @@ dependencies = [ "want", ] +[[package]] +name = "iana-time-zone" +version = "0.1.46" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ad2bfd338099682614d3ee3fe0cd72e0b6a41ca6a87f6a74a3bd593c91650501" +dependencies = [ + "android_system_properties", + "core-foundation-sys", + "js-sys", + "wasm-bindgen", + "winapi 0.3.9", +] + +[[package]] +name = "ignore" +version = "0.4.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "713f1b139373f96a2e0ce3ac931cd01ee973c3c5dd7c40c0c2efe96ad2b6751d" +dependencies = [ + "crossbeam-utils", + "globset", + "lazy_static", + "log", + "memchr", + "regex", + "same-file", + "thread_local", + "walkdir", + "winapi-util", +] + [[package]] name = "indexmap" version = "1.9.1" @@ -904,6 +1145,26 @@ version = "0.1.15" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c8fae54786f62fb2918dcfae3d568594e50eb9b5c25bf04371af6fe7516452fb" +[[package]] +name = "inotify" +version = "0.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4816c66d2c8ae673df83366c18341538f234a26d65a9ecea5c348b453ac1d02f" +dependencies = [ + "bitflags", + "inotify-sys", + "libc", +] + +[[package]] +name = "inotify-sys" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e05c02b5e89bff3b946cedeca278abc628fe811e604f027c45a8aa3cf793d0eb" +dependencies = [ + "libc", +] + [[package]] name = "instant" version = "0.1.12" @@ -913,6 +1174,15 @@ dependencies = [ "cfg-if 1.0.0", ] +[[package]] +name = "iovec" +version = "0.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b2b3ea6ff95e175473f8ffe6a7eb7c00d054240321b84c57051175fe3c1e075e" +dependencies = [ + "libc", +] + [[package]] name = "itoa" version = "1.0.3" @@ -928,6 +1198,16 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "kernel32-sys" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7507624b29483431c0ba2d82aece8ca6cdba9382bff4ddd0f7490560c056098d" +dependencies = [ + "winapi 0.2.8", + "winapi-build", +] + [[package]] name = "kuska-handshake" version = "0.2.0" @@ -960,7 +1240,7 @@ dependencies = [ "async-std", "async-stream 0.2.1", "base64 0.11.0", - "dirs", + "dirs 2.0.2", "futures", "get_if_addrs", "hex", @@ -989,6 +1269,12 @@ version = "1.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646" +[[package]] +name = "lazycell" +version = "1.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "830d08ce1d1d941e6b30645f1a0eb5643013d835ce3779a5fc208261dbe10f55" + [[package]] name = "libc" version = "0.2.129" @@ -1057,12 +1343,40 @@ version = "2.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2dffe52ecf27772e601905b7522cb4ef790d2cc203488bbd0e2fe85fcb74566d" +[[package]] +name = "memoffset" +version = "0.6.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5aa361d4faea93603064a027415f07bd8e1d5c88c9fbf68bf56a285428fd79ce" +dependencies = [ + "autocfg", +] + [[package]] name = "mime" version = "0.3.16" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2a60c7ce501c71e03a9c9c0d35b861413ae925bd979cc7a4e30d060069aaac8d" +[[package]] +name = "mio" +version = "0.6.23" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4afd66f5b91bf2a3bc13fad0e21caedac168ca4c707504e75585648ae80e4cc4" +dependencies = [ + "cfg-if 0.1.10", + "fuchsia-zircon", + "fuchsia-zircon-sys", + "iovec", + "kernel32-sys", + "libc", + "log", + "miow", + "net2", + "slab", + "winapi 0.2.8", +] + [[package]] name = "mio" version = "0.8.4" @@ -1071,10 +1385,34 @@ checksum = "57ee1c23c7c63b0c9250c339ffdc69255f110b298b901b9f6c82547b7b87caaf" dependencies = [ "libc", "log", - "wasi", + "wasi 0.11.0+wasi-snapshot-preview1", "windows-sys", ] +[[package]] +name = "mio-extras" +version = "2.0.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "52403fe290012ce777c4626790c8951324a2b9e3316b3143779c72b029742f19" +dependencies = [ + "lazycell", + "log", + "mio 0.6.23", + "slab", +] + +[[package]] +name = "miow" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ebd808424166322d4a38da87083bfddd3ac4c131334ed55856112eb06d46944d" +dependencies = [ + "kernel32-sys", + "net2", + "winapi 0.2.8", + "ws2_32-sys", +] + [[package]] name = "multer" version = "2.0.3" @@ -1095,6 +1433,63 @@ dependencies = [ "version_check", ] +[[package]] +name = "net2" +version = "0.2.37" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "391630d12b68002ae1e25e8f974306474966550ad82dac6886fb8910c19568ae" +dependencies = [ + "cfg-if 0.1.10", + "libc", + "winapi 0.3.9", +] + +[[package]] +name = "normpath" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "04aaf5e9cb0fbf883cc0423159eacdf96a9878022084b35c462c428cab73bcaf" +dependencies = [ + "winapi 0.3.9", +] + +[[package]] +name = "notify" +version = "4.0.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ae03c8c853dba7bfd23e571ff0cff7bc9dceb40a4cd684cd1681824183f45257" +dependencies = [ + "bitflags", + "filetime", + "fsevent", + "fsevent-sys", + "inotify", + "libc", + "mio 0.6.23", + "mio-extras", + "walkdir", + "winapi 0.3.9", +] + +[[package]] +name = "num-integer" +version = "0.1.45" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "225d3389fb3509a24c93f5c29eb6bde2586b98d9f016636dff58d7c6f7569cd9" +dependencies = [ + "autocfg", + "num-traits", +] + +[[package]] +name = "num-traits" +version = "0.2.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "578ede34cf02f8924ab9447f50c28075b4d3e5b269972345e7e0372b38c6cdcd" +dependencies = [ + "autocfg", +] + [[package]] name = "num_cpus" version = "1.13.1" @@ -1132,6 +1527,17 @@ version = "2.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "427c3892f9e783d91cc128285287e70a59e206ca452770ece88a76f7a3eddd72" +[[package]] +name = "parking_lot" +version = "0.11.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7d17b78036a60663b797adeaee46f5c9dfebb86948d1255007a1d6be0271ff99" +dependencies = [ + "instant", + "lock_api", + "parking_lot_core 0.8.5", +] + [[package]] name = "parking_lot" version = "0.12.1" @@ -1139,7 +1545,21 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3742b2c103b9f06bc9fff0a37ff4912935851bee6d36f3c02bcc755bcfec228f" dependencies = [ "lock_api", - "parking_lot_core", + "parking_lot_core 0.9.3", +] + +[[package]] +name = "parking_lot_core" +version = "0.8.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d76e8e1493bcac0d2766c42737f34458f1c8c50c0d23bcb24ea953affb273216" +dependencies = [ + "cfg-if 1.0.0", + "instant", + "libc", + "redox_syscall", + "smallvec", + "winapi 0.3.9", ] [[package]] @@ -1155,6 +1575,15 @@ dependencies = [ "windows-sys", ] +[[package]] +name = "parse-zoneinfo" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c705f256449c60da65e11ff6626e0c16a0a0b96aaa348de61376b249bc340f41" +dependencies = [ + "regex", +] + [[package]] name = "part_1_sbot_rocket" version = "0.1.0" @@ -1163,6 +1592,63 @@ dependencies = [ "rocket", ] +[[package]] +name = "part_2_subscribe_form" +version = "0.1.0" +dependencies = [ + "golgi", + "log", + "rocket", + "rocket_dyn_templates", +] + +[[package]] +name = "part_3_database_follows" +version = "0.1.0" +dependencies = [ + "bincode", + "golgi", + "log", + "rocket", + "rocket_dyn_templates", + "serde", + "sled", + "xdg", +] + +[[package]] +name = "part_4_posts_streams" +version = "0.1.0" +dependencies = [ + "async-std", + "bincode", + "chrono", + "futures", + "golgi", + "log", + "rocket", + "rocket_dyn_templates", + "serde", + "serde_json", + "sled", + "xdg", +] + +[[package]] +name = "part_5_task_loop" +version = "0.1.0" +dependencies = [ + "async-std", + "bincode", + "golgi", + "log", + "rocket", + "rocket_dyn_templates", + "serde", + "sled", + "xdg", +] + [[package]] name = "pear" version = "0.2.3" @@ -1192,6 +1678,89 @@ version = "2.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d4fd5641d01c8f18a23da7b6fe29298ff4b55afcccdf78973b24cf3175fee32e" +[[package]] +name = "pest" +version = "2.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4b0560d531d1febc25a3c9398a62a71256c0178f2e3443baedd9ad4bb8c9deb4" +dependencies = [ + "thiserror", + "ucd-trie", +] + +[[package]] +name = "pest_derive" +version = "2.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "905708f7f674518498c1f8d644481440f476d39ca6ecae83319bba7c6c12da91" +dependencies = [ + "pest", + "pest_generator", +] + +[[package]] +name = "pest_generator" +version = "2.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5803d8284a629cc999094ecd630f55e91b561a1d1ba75e233b00ae13b91a69ad" +dependencies = [ + "pest", + "pest_meta", + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "pest_meta" +version = "2.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1538eb784f07615c6d9a8ab061089c6c54a344c5b4301db51990ca1c241e8c04" +dependencies = [ + "once_cell", + "pest", + "sha-1", +] + +[[package]] +name = "phf" +version = "0.11.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "928c6535de93548188ef63bb7c4036bd415cd8f36ad25af44b9789b2ee72a48c" +dependencies = [ + "phf_shared", +] + +[[package]] +name = "phf_codegen" +version = "0.11.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a56ac890c5e3ca598bbdeaa99964edb5b0258a583a9eb6ef4e89fc85d9224770" +dependencies = [ + "phf_generator", + "phf_shared", +] + +[[package]] +name = "phf_generator" +version = "0.11.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b1181c94580fa345f50f19d738aaa39c0ed30a600d95cb2d3e23f94266f14fbf" +dependencies = [ + "phf_shared", + "rand", +] + +[[package]] +name = "phf_shared" +version = "0.11.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e1fb5f6f826b772a8d4c0394209441e7d37cbbb967ae9c7e0e8134365c9ee676" +dependencies = [ + "siphasher", + "uncased", +] + [[package]] name = "pin-project-lite" version = "0.2.9" @@ -1397,7 +1966,7 @@ dependencies = [ "memchr", "multer", "num_cpus", - "parking_lot", + "parking_lot 0.12.1", "pin-project-lite", "rand", "ref-cast", @@ -1406,7 +1975,7 @@ dependencies = [ "serde", "state", "tempfile", - "time", + "time 0.3.13", "tokio", "tokio-stream", "tokio-util", @@ -1431,6 +2000,19 @@ dependencies = [ "unicode-xid", ] +[[package]] +name = "rocket_dyn_templates" +version = "0.1.0-rc.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bab13df598440527c200f46fb944dc55d8d67a1818b617eb5a3981dcd8b63fd2" +dependencies = [ + "glob", + "normpath", + "notify", + "rocket", + "tera", +] + [[package]] name = "rocket_http" version = "0.5.0-rc.2" @@ -1453,7 +2035,7 @@ dependencies = [ "smallvec", "stable-pattern", "state", - "time", + "time 0.3.13", "tokio", "uncased", ] @@ -1523,6 +2105,17 @@ dependencies = [ "serde", ] +[[package]] +name = "sha-1" +version = "0.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "028f48d513f9678cda28f6e4064755b3fbb2af6acd672f2c209b62323f7aea0f" +dependencies = [ + "cfg-if 1.0.0", + "cpufeatures", + "digest", +] + [[package]] name = "sha2" version = "0.10.2" @@ -1562,6 +2155,12 @@ dependencies = [ "libc", ] +[[package]] +name = "siphasher" +version = "0.3.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7bd3e3206899af3f8b12af284fafc038cc1dc2b41d1b89dd17297221c5d225de" + [[package]] name = "slab" version = "0.4.7" @@ -1571,6 +2170,31 @@ dependencies = [ "autocfg", ] +[[package]] +name = "sled" +version = "0.34.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7f96b4737c2ce5987354855aed3797279def4ebf734436c6aa4552cf8e169935" +dependencies = [ + "crc32fast", + "crossbeam-epoch", + "crossbeam-utils", + "fs2", + "fxhash", + "libc", + "log", + "parking_lot 0.11.2", +] + +[[package]] +name = "slug" +version = "0.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b3bc762e6a4b6c6fcaade73e77f9ebc6991b676f88bb2358bddb56560f073373" +dependencies = [ + "deunicode", +] + [[package]] name = "smallvec" version = "1.9.0" @@ -1642,6 +2266,28 @@ dependencies = [ "winapi 0.3.9", ] +[[package]] +name = "tera" +version = "1.17.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1d4685e72cb35f0eb74319c8fe2d3b61e93da5609841cde2cb87fcc3bea56d20" +dependencies = [ + "chrono", + "chrono-tz", + "globwalk", + "humansize", + "lazy_static", + "percent-encoding", + "pest", + "pest_derive", + "rand", + "regex", + "serde", + "serde_json", + "slug", + "unic-segment", +] + [[package]] name = "thiserror" version = "1.0.32" @@ -1671,6 +2317,17 @@ dependencies = [ "once_cell", ] +[[package]] +name = "time" +version = "0.1.44" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6db9e6914ab8b1ae1c260a4ae7a49b6c5611b40328a735b21862567685e73255" +dependencies = [ + "libc", + "wasi 0.10.0+wasi-snapshot-preview1", + "winapi 0.3.9", +] + [[package]] name = "time" version = "0.3.13" @@ -1699,7 +2356,7 @@ dependencies = [ "bytes", "libc", "memchr", - "mio", + "mio 0.8.4", "num_cpus", "once_cell", "pin-project-lite", @@ -1843,6 +2500,12 @@ dependencies = [ "serde", ] +[[package]] +name = "ucd-trie" +version = "0.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "89570599c4fe5585de2b388aab47e99f7fa4e9238a1399f707a02e356058141c" + [[package]] name = "uncased" version = "0.9.7" @@ -1853,6 +2516,56 @@ dependencies = [ "version_check", ] +[[package]] +name = "unic-char-property" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a8c57a407d9b6fa02b4795eb81c5b6652060a15a7903ea981f3d723e6c0be221" +dependencies = [ + "unic-char-range", +] + +[[package]] +name = "unic-char-range" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0398022d5f700414f6b899e10b8348231abf9173fa93144cbc1a43b9793c1fbc" + +[[package]] +name = "unic-common" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "80d7ff825a6a654ee85a63e80f92f054f904f21e7d12da4e22f9834a4aaa35bc" + +[[package]] +name = "unic-segment" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e4ed5d26be57f84f176157270c112ef57b86debac9cd21daaabbe56db0f88f23" +dependencies = [ + "unic-ucd-segment", +] + +[[package]] +name = "unic-ucd-segment" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2079c122a62205b421f499da10f3ee0f7697f012f55b675e002483c73ea34700" +dependencies = [ + "unic-char-property", + "unic-char-range", + "unic-ucd-version", +] + +[[package]] +name = "unic-ucd-version" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "96bd2f2237fe450fcd0a1d2f5f4e91711124f7857ba2e964247776ebeeb7b0c4" +dependencies = [ + "unic-common", +] + [[package]] name = "unicode-ident" version = "1.0.3" @@ -1924,6 +2637,12 @@ dependencies = [ "try-lock", ] +[[package]] +name = "wasi" +version = "0.10.0+wasi-snapshot-preview1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1a143597ca7c7793eff794def352d41792a93c481eb1042423ff7ff72ba2c31f" + [[package]] name = "wasi" version = "0.11.0+wasi-snapshot-preview1" @@ -2031,6 +2750,12 @@ dependencies = [ "winapi-x86_64-pc-windows-gnu", ] +[[package]] +name = "winapi-build" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2d315eee3b34aca4797b2da6b13ed88266e6d612562a0c46390af8299fc699bc" + [[package]] name = "winapi-i686-pc-windows-gnu" version = "0.4.0" @@ -2138,6 +2863,25 @@ version = "0.36.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c811ca4a8c853ef420abd8592ba53ddbbac90410fab6903b3e79972a631f7680" +[[package]] +name = "ws2_32-sys" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d59cefebd0c892fa2dd6de581e937301d8552cb44489cdff035c6187cb63fa5e" +dependencies = [ + "winapi 0.2.8", + "winapi-build", +] + +[[package]] +name = "xdg" +version = "2.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0c4583db5cbd4c4c0303df2d15af80f0539db703fa1c68802d4cbbd2dd0f88f6" +dependencies = [ + "dirs 4.0.0", +] + [[package]] name = "yansi" version = "0.5.1" diff --git a/Cargo.toml b/Cargo.toml index cf411f0..3f4b37f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -4,5 +4,6 @@ members = [ "part_1_sbot_rocket", "part_2_subscribe_form", "part_3_database_follows", - "part_4_posts_streams" + "part_4_posts_streams", + "part_5_task_loop" ] diff --git a/README.md b/README.md index 95c9aae..1f31ae6 100644 --- a/README.md +++ b/README.md @@ -12,7 +12,11 @@ Author: [@glyph](https://mycelial.technology/) - Part 2: Subscription Form and Key Validation - Part 3: Database and Follows - Part 4: Posts and Message Streams - - ... + - Part 5: Task Loop + - Part 6: Display Peers and Update UI + - Part 7: Fetch Latest Posts + - Part 8: Read, Unread and Delete Posts + - Part 9: Extension Ideas and Conclusion ## Links From b141b53648a51b52cc27c773e4d8a71952f40f76 Mon Sep 17 00:00:00 2001 From: glyph Date: Mon, 5 Sep 2022 13:58:17 +0100 Subject: [PATCH 5/5] add part 5 draft tutorial --- Cargo.lock | 3 + part_5_task_loop/Cargo.toml | 3 + part_5_task_loop/README.md | 188 +++++++++++++++++++++++++++--- part_5_task_loop/src/db.rs | 107 ++++++++++++++++- part_5_task_loop/src/main.rs | 3 +- part_5_task_loop/src/routes.rs | 43 ++----- part_5_task_loop/src/sbot.rs | 85 +++++++++++++- part_5_task_loop/src/task_loop.rs | 45 ++++++- 8 files changed, 421 insertions(+), 56 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 8e80916..8a79719 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1640,11 +1640,14 @@ version = "0.1.0" dependencies = [ "async-std", "bincode", + "chrono", + "futures", "golgi", "log", "rocket", "rocket_dyn_templates", "serde", + "serde_json", "sled", "xdg", ] diff --git a/part_5_task_loop/Cargo.toml b/part_5_task_loop/Cargo.toml index 6316daf..efec41e 100644 --- a/part_5_task_loop/Cargo.toml +++ b/part_5_task_loop/Cargo.toml @@ -8,10 +8,13 @@ edition = "2021" [dependencies] async-std = "1.10" bincode = "1.3" +chrono = "0.4" +futures = "0.3" golgi = { git = "https://git.coopcloud.tech/golgi-ssb/golgi.git" } log = "0.4" rocket = "0.5.0-rc.1" rocket_dyn_templates = { version = "0.1.0-rc.1", features = ["tera"] } serde = "1" +serde_json = "1" sled = "0.34" xdg = "2.4.1" diff --git a/part_5_task_loop/README.md b/part_5_task_loop/README.md index 136cc6a..f132530 100644 --- a/part_5_task_loop/README.md +++ b/part_5_task_loop/README.md @@ -4,7 +4,7 @@ ### Introduction -In the last installment we added support to our key-value database for dealing with Scuttlebutt posts and wrote code to create and filter streams of Scuttlebutt messages. Since our peers may have authored tens of thousands of messages, it's useful to create a way of fetching and filtering message streams as a background process. Today we'll do just that; writing a task loop that we can be invoked from our web application route handlers and used to execute potentially long-running processes. +In the last installment we added support to our key-value database for dealing with Scuttlebutt posts and wrote code to create and filter streams of Scuttlebutt messages. Since our peers may have authored tens of thousands of messages, it's useful to create a way of fetching and filtering message streams as a background process. Today we'll do just that; writing a task loop that can be invoked from our web application route handlers and used to execute potentially long-running processes. ### Outline @@ -13,14 +13,9 @@ Here's what we'll tackle in this fifth part of the series: - Create an asynchronous task loop - Create a message passing channel and spawn the task loop - Write sbot-related task functions + - Pass database instance into task loop - Fetch root posts on subscription -### Libraries - -The following libraries are introduced in this part: - - - [`async-std`](https://crates.io/crates/async-std) - ### Create an Asynchronous Task Loop Let's start by defining a task type that enumerates the various tasks we might want to carry out. We'll create a separate module for our task loop: @@ -118,18 +113,64 @@ use crate::task_loop::Task; } ``` -Reviewing the code above: first an unbounded, asynchronous channel is created and split into transmitting (`tx`) and receiving (`rx`) ends, after which the transmitting channel is cloned. The task loop is then spawned and takes with it the receiving end of the channel. As we did previously with the `db` instance, the transmitting half of the channel is added to the managed state of the Rocket application; this will allow us to transmit tasks to the task loop from our web route handlers. And finaly, a shutdown handler is attached to the Rocket application in order to send a cancellation task to the task loop before the program ends. This ensures that the task loop closes cleanly. +Reviewing the code above: first an unbounded, asynchronous channel is created and split into transmitting (`tx`) and receiving (`rx`) ends, after which the transmitting channel is cloned. The task loop is then spawned and takes with it the receiving end of the channel. As we did previously with the `db` instance, the transmitting half of the channel is added to the managed state of the Rocket application; this will allow us to transmit tasks to the task loop from our web route handlers. Finally, a shutdown handler is attached to the Rocket application in order to send a cancellation task to the task loop before the program ends. This ensures that the task loop closes cleanly. ### Write Sbot-Related Task Functions -Now it's time to write the functions that will be executed when the `FetchAllPosts` and `FetchLatestName` tasks are invoked. These functions will be responsible for retrieving data from the sbot and updating the database with the latest values: +Before we can write the sbot-related task functions, we first need to add a method to our database code to allow the retrieval of data for a specific peer. Since we serialized the peer data as bincode before inserting it into the database, we need to deserialize the value after fetching it. + +`src/db.rs` + +```rust +impl Database { + // pub fn add_peer(&self, peer: Peer) -> Result> { + // ... + // } + + // Get a single peer from the peer tree, defined by the given public key. + // The byte value for the matching entry, if found, is deserialized from + // bincode into an instance of the Peer struct. + pub fn get_peer(&self, public_key: &str) -> Result> { + debug!( + "Retrieving peer data for {} from 'peers' database tree", + &public_key + ); + let peer = self + .peer_tree + .get(public_key.as_bytes()) + .unwrap() + .map(|peer| { + debug!("Deserializing peer data for {} from bincode", &public_key); + bincode::deserialize(&peer).unwrap() + }); + + Ok(peer) + } + + // ... +} +``` + +Now it's time to write the functions that will be executed when the `FetchAllPosts` and `FetchLatestName` tasks are invoked. These functions will be responsible for retrieving data from the sbot and updating the database with the latest values. We can keep our task loop neat and readable by separating this logic into functions: `src/task_loop.rs` ```rust +use log::warn; + +use crate::{Database, sbot}; + +// Retrieve a set of posts from the local sbot instance and add them to the +// posts tree of the database. +// +// A stream of messages is first requested for the peer represented by the +// given public key (ID), starting after the given sequence number. The root +// posts are filtered from the set of messages and added to the database as a +// batch. Finally, the value of the latest sequence for the peer is updated +// and saved to the existing database entry. async fn fetch_posts_and_update_db(db: &Database, peer_id: String, after_sequence: u64) { let peer_msgs = sbot::get_message_stream(&peer_id, after_sequence).await; - let (latest_sequence, root_posts) = sbot::get_root_posts(peer_msgs).await; + let (_latest_sequence, root_posts) = sbot::get_root_posts(peer_msgs).await; match db.add_post_batch(&peer_id, root_posts) { Ok(_) => { @@ -143,17 +184,10 @@ async fn fetch_posts_and_update_db(db: &Database, peer_id: String, after_sequenc &peer_id, e ), } - - // Update the value of the latest sequence number for - // the peer (this is stored in the database). - if let Ok(Some(peer)) = db.get_peer(&peer_id) { - db.add_peer(peer.set_latest_sequence(latest_sequence)) - .unwrap(); - } } -/// Request the name of the peer represented by the given public key (ID) -/// and update the existing entry in the database. +// Request the name of the peer represented by the given public key (ID) +// and update the existing entry in the database. async fn fetch_name_and_update_db(db: &Database, peer_id: String) { match sbot::get_name(&peer_id).await { Ok(name) => { @@ -172,8 +206,124 @@ async fn fetch_name_and_update_db(db: &Database, peer_id: String) { } ``` +These function calls can now be added to our task matching code in the task loop. Note that we also need to add the database instance as a parameter in the function isgnature: + +`src/task_loop.rs` + +```rust +pub async fn spawn(db: Database, rx: Receiver) { + task::spawn(async move { + while let Ok(task) = rx.recv().await { + match task { + Task::FetchAllPosts(peer_id) => { + info!("Fetching all posts for peer: {}", peer_id); + fetch_posts_and_update_db(&db, peer_id, 0).await; + } + Task::FetchLatestName(peer_id) => { + info!("Fetching latest name for peer: {}", peer_id); + fetch_name_and_update_db(&db, peer_id).await; + } + Task::Cancel => { + info!("Exiting task loop..."); + break; + } + } + } + }); +} +``` + +### Pass Database Instance Into Task Loop + +As it currently stands, our code will fail to compile because `task_loop::spawn()` expects a database instance which has not yet been provided. We need to revisit the code in the root of our application to clone the database and pass it into the task loop: + +`src/main.rs` + +```rust +#[launch] +async fn rocket() -> _ { + // ... + let db = Database::init(&db_path); + // Clone the database instance. + let db_clone = db.clone(); + + // Create a message passing channel. + let (tx, rx) = channel::unbounded(); + let tx_clone = tx.clone(); + + // Spawn the task loop. + info!("Spawning task loop"); + // Pass the clone database instance and the rx channel into the task loop. + task_loop::spawn(db_clone, rx).await; + + // ... +} +``` + ### Fetch Root Posts on Subscription +Great, the task loop is primed and ready for action. We are very close to being able to initiate tasks from the route handler(s) of our web application. Earlier in this installment of the tutorial we created a message passing channel in `src.main.rs` and added the transmission end of the channel to the managed state of our Rocket instance. We need to add the transmitter as a parameter of the `subscribe_form` function before we can invoke tasks: + +`src/routes.rs` + +```rust +use async_std::channel::Sender; + +use crate::task_loop::Task; + +#[post("/subscribe", data = "")] +pub async fn subscribe_form( + db: &State, + tx: &State>, + peer: Form, +) -> Result> { + info!("Subscribing to peer {}", &peer.public_key); + // ... +} +``` + +Now, when a subscription event occurs (ie. the subscribe form is submitted with a peer ID), we can trigger a task to fetch all the root posts for that peer and add them to the key-value database. Note that I've omitted most of the code we've already written from the sample below. The most important three lines are those beginning with `if let Err(e) = tx.send...`. + +```rust +#[post("/subscribe", data = "")] +pub async fn subscribe_form( + db: &State, + tx: &State>, + peer: Form, +) -> Result> { + // ... { + match sbot::follow_if_not_following(&peer.public_key).await { + Ok(_) => { + if db.add_peer(peer_info).is_ok() { + // ... + + // Fetch all root posts authored by the peer we're subscribing + // to. Posts will be added to the key-value database. + if let Err(e) = tx.send(Task::FetchAllPosts(peer_id)).await { + warn!("Task loop error: {}", e) + } + } else { + // ... + } + } + Err(e) => { + // ... + } + } + } + + Ok(Redirect::to(uri!(home))) +} +``` + ### Conclusion +In this installment we wrote an asynchronous task loop and `Task` type to be able to execute background processes in our application. We created task variants and functions for two primary operations: 1. fetching all the root posts for a peer and adding them to the key-value database, and 2. fetching the latest name assigned to a peer. We created a message passing channel, passed the receiving end to the task loop and the transmitting end to the managed state of our web application, and invoked the fetch-all task from our subscription route handler. + +The `Task` type and loop we wrote today can be easily extended by adding more variants. It's a part of the code we will return to in a future installment. + +In the next tutorial installment we'll focus on updating the web interface. We'll add more templates to create a modular layout, write some CSS and populate a list of peers from the data in our key-value store. Soon the application will begin to take shape! + ## Funding + +This work has been funded by a Scuttlebutt Community Grant. diff --git a/part_5_task_loop/src/db.rs b/part_5_task_loop/src/db.rs index f971a4e..b7d60a7 100644 --- a/part_5_task_loop/src/db.rs +++ b/part_5_task_loop/src/db.rs @@ -2,7 +2,7 @@ use std::path::Path; use log::{debug, info}; use serde::{Deserialize, Serialize}; -use sled::{Db, IVec, Result, Tree}; +use sled::{Batch, Db, IVec, Result, Tree}; /// Scuttlebutt peer data. #[derive(Debug, Deserialize, Serialize)] @@ -31,6 +31,49 @@ impl Peer { } } +/// The text and metadata of a Scuttlebutt root post. +#[derive(Debug, Deserialize, Serialize)] +pub struct Post { + /// The key of the post-type message, also known as a message reference. + pub key: String, + /// The text of the post (may be formatted as markdown). + pub text: String, + /// The date the post was published (e.g. 17 May 2021). + pub date: String, + /// The sequence number of the post-type message. + pub sequence: u64, + /// The read state of the post; true if read, false if unread. + pub read: bool, + /// The timestamp representing the date the post was published. + pub timestamp: i64, + /// The subject of the post, represented as the first 53 characters of + /// the post text. + pub subject: Option, +} + +impl Post { + // Create a new instance of the Post struct. A default value of `false` is + // set for `read`. + pub fn new( + key: String, + text: String, + date: String, + sequence: u64, + timestamp: i64, + subject: Option, + ) -> Post { + Post { + key, + text, + date, + sequence, + timestamp, + subject, + read: false, + } + } +} + /// An instance of the key-value database and relevant trees. #[allow(dead_code)] #[derive(Clone)] @@ -40,6 +83,9 @@ pub struct Database { /// A database tree containing Peer struct instances for all the peers /// we are subscribed to. peer_tree: Tree, + /// A database tree containing Post struct instances for all of the posts + /// we have downloaded from the peer to whom we subscribe. + pub post_tree: Tree, } impl Database { @@ -55,8 +101,16 @@ impl Database { let peer_tree = db .open_tree("peers") .expect("Failed to open 'peers' database tree"); + debug!("Opening 'posts' database tree"); + let post_tree = db + .open_tree("posts") + .expect("Failed to open 'posts' database tree"); - Database { db, peer_tree } + Database { + db, + peer_tree, + post_tree, + } } /// Add a peer to the database by inserting the public key into the peer @@ -72,10 +126,59 @@ impl Database { self.peer_tree.insert(&peer.public_key, peer_bytes) } + /// Get a single peer from the peer tree, defined by the given public key. + /// The byte value for the matching entry, if found, is deserialized from + /// bincode into an instance of the Peer struct. + pub fn get_peer(&self, public_key: &str) -> Result> { + debug!( + "Retrieving peer data for {} from 'peers' database tree", + &public_key + ); + let peer = self + .peer_tree + .get(public_key.as_bytes()) + .unwrap() + .map(|peer| { + debug!("Deserializing peer data for {} from bincode", &public_key); + bincode::deserialize(&peer).unwrap() + }); + + Ok(peer) + } + /// Remove a peer from the database, as represented by the given public /// key. pub fn remove_peer(&self, public_key: &str) -> Result<()> { debug!("Removing peer {} from 'peers' database tree", &public_key); self.peer_tree.remove(&public_key).map(|_| ()) } + + /// Add a post to the database by inserting an instance of the Post struct + /// into the post tree. + pub fn add_post(&self, public_key: &str, post: Post) -> Result> { + let post_key = format!("{}_{}", public_key, post.key); + debug!("Serializing post data for {} to bincode", &post_key); + let post_bytes = bincode::serialize(&post).unwrap(); + + debug!("Inserting post {} into 'posts' database tree", &post_key); + self.post_tree.insert(post_key.as_bytes(), post_bytes) + } + + /// Add a batch of posts to the database by inserting a vector of instances + /// of the Post struct into the post tree. + pub fn add_post_batch(&self, public_key: &str, posts: Vec) -> Result<()> { + let mut post_batch = Batch::default(); + + for post in posts { + let post_key = format!("{}_{}", public_key, post.key); + debug!("Serializing post data for {} to bincode", &post_key); + let post_bytes = bincode::serialize(&post).unwrap(); + + debug!("Inserting post {} into 'posts' database tree", &post_key); + post_batch.insert(post_key.as_bytes(), post_bytes) + } + + debug!("Applying batch insertion into 'posts' database tree"); + self.post_tree.apply_batch(post_batch) + } } diff --git a/part_5_task_loop/src/main.rs b/part_5_task_loop/src/main.rs index 40bb0e8..a5fe380 100644 --- a/part_5_task_loop/src/main.rs +++ b/part_5_task_loop/src/main.rs @@ -20,6 +20,7 @@ async fn rocket() -> _ { .place_config_file("database") .expect("cannot create database directory"); let db = Database::init(&db_path); + let db_clone = db.clone(); // Create a message passing channel. let (tx, rx) = channel::unbounded(); @@ -27,7 +28,7 @@ async fn rocket() -> _ { // Spawn the task loop, passing in the receiver half of the channel. info!("Spawning task loop"); - task_loop::spawn(rx).await; + task_loop::spawn(db_clone, rx).await; rocket::build() .manage(db) diff --git a/part_5_task_loop/src/routes.rs b/part_5_task_loop/src/routes.rs index 02de398..ec365b6 100644 --- a/part_5_task_loop/src/routes.rs +++ b/part_5_task_loop/src/routes.rs @@ -1,3 +1,4 @@ +use async_std::channel::Sender; use log::{info, warn}; use rocket::{ form::Form, @@ -10,7 +11,9 @@ use rocket_dyn_templates::{context, Template}; use crate::{ db::{Database, Peer}, - sbot, utils, + sbot, + task_loop::Task, + utils, }; #[derive(FromForm)] @@ -31,6 +34,7 @@ pub async fn home(flash: Option>) -> Template { #[post("/subscribe", data = "")] pub async fn subscribe_form( db: &State, + tx: &State>, peer: Form, ) -> Result> { if let Err(e) = utils::validate_public_key(&peer.public_key) { @@ -55,6 +59,13 @@ pub async fn subscribe_form( // Add the peer to the database. if db.add_peer(peer_info).is_ok() { info!("Added {} to 'peers' database tree", &peer.public_key); + let peer_id = peer.public_key.to_string(); + + // Fetch all root posts authored by the peer we're subscribing + // to. Posts will be added to the key-value database. + if let Err(e) = tx.send(Task::FetchAllPosts(peer_id)).await { + warn!("Task loop error: {}", e) + } } else { let err_msg = format!( "Failed to add peer {} to 'peers' database tree", @@ -109,33 +120,3 @@ pub async fn unsubscribe_form( Ok(Redirect::to(uri!(home))) } - -/* -#[post("/subscribe", data = "")] -pub async fn subscribe_form(peer: Form) -> Result> { - if let Err(e) = utils::validate_public_key(&peer.public_key) { - let validation_err_msg = format!("Public key {} is invalid: {}", &peer.public_key, e); - warn!("{}", validation_err_msg); - return Err(Flash::error(Redirect::to(uri!(home)), validation_err_msg)); - } else { - info!("Public key {} is valid", &peer.public_key); - sbot::follow_if_not_following(&peer.public_key).await; - } - - Ok(Redirect::to(uri!(home))) -} - -#[post("/unsubscribe", data = "")] -pub async fn unsubscribe_form(peer: Form) -> Result> { - if let Err(e) = utils::validate_public_key(&peer.public_key) { - let validation_err_msg = format!("Public key {} is invalid: {}", &peer.public_key, e); - warn!("{}", validation_err_msg); - return Err(Flash::error(Redirect::to(uri!(home)), validation_err_msg)); - } else { - info!("Public key {} is valid", &peer.public_key); - sbot::unfollow_if_following(&peer.public_key).await; - } - - Ok(Redirect::to(uri!(home))) -} -*/ diff --git a/part_5_task_loop/src/sbot.rs b/part_5_task_loop/src/sbot.rs index 32300be..2c7b218 100644 --- a/part_5_task_loop/src/sbot.rs +++ b/part_5_task_loop/src/sbot.rs @@ -1,7 +1,17 @@ use std::env; -use golgi::{api::friends::RelationshipQuery, sbot::Keystore, Sbot}; +use async_std::stream::StreamExt; +use chrono::NaiveDateTime; +use golgi::{ + api::{friends::RelationshipQuery, history_stream::CreateHistoryStream}, + messages::{SsbMessageContentType, SsbMessageKVT}, + sbot::Keystore, + GolgiError, Sbot, +}; use log::{info, warn}; +use serde_json::value::Value; + +use crate::db::Post; /// Initialise a connection to a Scuttlebutt server. pub async fn init_sbot() -> Result { @@ -128,3 +138,76 @@ pub async fn unfollow_if_following(remote_peer: &str) -> Result<(), String> { Err(err_msg) } } + +/// Return a stream of messages authored by the given public key. +/// +/// This returns all messages regardless of type. +pub async fn get_message_stream( + public_key: &str, + sequence_number: u64, +) -> impl futures::Stream> { + let mut sbot = init_sbot().await.unwrap(); + + let history_stream_args = CreateHistoryStream::new(public_key.to_string()) + .keys_values(true, true) + .after_seq(sequence_number); + + sbot.create_history_stream(history_stream_args) + .await + .unwrap() +} + +/// Filter a stream of messages and return a vector of root posts. +/// +/// Each returned vector element includes the key of the post, the content +/// text, the date the post was published, the sequence number of the post +/// and whether it is read or unread. +pub async fn get_root_posts( + history_stream: impl futures::Stream>, +) -> (u64, Vec) { + let mut latest_sequence = 0; + let mut posts = Vec::new(); + + futures::pin_mut!(history_stream); + + while let Some(res) = history_stream.next().await { + match res { + Ok(msg) => { + if msg.value.is_message_type(SsbMessageContentType::Post) { + let content = msg.value.content.to_owned(); + if let Value::Object(content_map) = content { + if !content_map.contains_key("root") { + latest_sequence = msg.value.sequence; + + let text = match content_map.get_key_value("text") { + Some(value) => value.1.to_string(), + None => String::from(""), + }; + let timestamp = msg.value.timestamp.round() as i64 / 1000; + let datetime = NaiveDateTime::from_timestamp(timestamp, 0); + let date = datetime.format("%d %b %Y").to_string(); + let subject = text.get(0..52).map(|s| s.to_string()); + + let post = Post::new( + msg.key.to_owned(), + text, + date, + msg.value.sequence, + timestamp, + subject, + ); + + posts.push(post) + } + } + } + } + Err(err) => { + // Print the `GolgiError` of this element to `stderr`. + warn!("err: {:?}", err); + } + } + } + + (latest_sequence, posts) +} diff --git a/part_5_task_loop/src/task_loop.rs b/part_5_task_loop/src/task_loop.rs index ccb68ca..d71282e 100644 --- a/part_5_task_loop/src/task_loop.rs +++ b/part_5_task_loop/src/task_loop.rs @@ -1,5 +1,44 @@ use async_std::{channel::Receiver, task}; -use log::info; +use log::{info, warn}; + +use crate::{sbot, Database}; + +async fn fetch_posts_and_update_db(db: &Database, peer_id: String, after_sequence: u64) { + let peer_msgs = sbot::get_message_stream(&peer_id, after_sequence).await; + let (_latest_sequence, root_posts) = sbot::get_root_posts(peer_msgs).await; + + match db.add_post_batch(&peer_id, root_posts) { + Ok(_) => { + info!( + "Inserted batch of posts into database post tree for peer: {}", + &peer_id + ) + } + Err(e) => warn!( + "Failed to insert batch of posts into database post tree for peer: {}: {}", + &peer_id, e + ), + } +} + +/// Request the name of the peer represented by the given public key (ID) +/// and update the existing entry in the database. +async fn fetch_name_and_update_db(db: &Database, peer_id: String) { + match sbot::get_name(&peer_id).await { + Ok(name) => { + if let Ok(Some(peer)) = db.get_peer(&peer_id) { + let updated_peer = peer.set_name(&name); + match db.add_peer(updated_peer) { + Ok(_) => info!("Updated name for peer: {}", &peer_id), + Err(e) => { + warn!("Failed to update name for peer: {}: {}", &peer_id, e) + } + } + } + } + Err(e) => warn!("Failed to fetch name for {}: {}", &peer_id, e), + } +} pub enum Task { Cancel, @@ -9,7 +48,7 @@ pub enum Task { /// Spawn an asynchronous loop which receives tasks over an unbounded channel /// and invokes task functions accordingly. -pub async fn spawn(rx: Receiver) { +pub async fn spawn(db: Database, rx: Receiver) { task::spawn(async move { while let Ok(task) = rx.recv().await { match task { @@ -18,11 +57,13 @@ pub async fn spawn(rx: Receiver) { // database. Task::FetchAllPosts(peer_id) => { info!("Fetching all posts for peer: {}", peer_id); + fetch_posts_and_update_db(&db, peer_id, 0).await; } // Fetch the latest name for the given peer and update the // peer entry in the peers tree of the database. Task::FetchLatestName(peer_id) => { info!("Fetching latest name for peer: {}", peer_id); + fetch_name_and_update_db(&db, peer_id).await; } // Break out of the task loop. Task::Cancel => {