diff --git a/part_4_posts_streams/Cargo.toml b/part_4_posts_streams/Cargo.toml index f2f7445..e913fb0 100644 --- a/part_4_posts_streams/Cargo.toml +++ b/part_4_posts_streams/Cargo.toml @@ -6,11 +6,15 @@ edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] +async-std = "1.10" bincode = "1.3" +chrono = "0.4" +futures = "0.3" golgi = { git = "https://git.coopcloud.tech/golgi-ssb/golgi.git" } log = "0.4" rocket = "0.5.0-rc.1" rocket_dyn_templates = { version = "0.1.0-rc.1", features = ["tera"] } serde = "1" +serde_json = "1" sled = "0.34" xdg = "2.4.1" diff --git a/part_4_posts_streams/README.md b/part_4_posts_streams/README.md index 74adaec..ada026e 100644 --- a/part_4_posts_streams/README.md +++ b/part_4_posts_streams/README.md @@ -17,6 +17,15 @@ Here's what we'll tackle in this fourth part of the series: - Add a post to the database - Add a post batch to the database +### Libraries + +The following libraries are introduced in this part: + + - [async-std](https://crates.io/crates/async-std) + - [chrono](https://crates.io/crates/chrono) + - [futures](https://crates.io/crates/futures) + - [serde_json](https://crates.io/crates/serde_json) + ### Create a Post Data Structure We'll begin by creating a `Post` struct to store data about each Scuttlebutt post we want to render in our application. The fields of our struct will diverge from the fields we expect in a Scuttlebutt post-type message. Open `src/db.rs` and add the following code (I've included code comments to further define each field): @@ -130,6 +139,16 @@ Now that we have the ability to obtain all messages authored by a specific peer, `src/sbot.rs` ```rust +use chrono::NaiveDateTime; +use golgi::{ + api::history_stream::CreateHistoryStream}, + messages::{SsbMessageContentType, SsbMessageKVT}, + sbot::GolgiError, +}; +use serde_json::value::Value; + +use crate::db::Post; + // Filter a stream of messages and return a vector of root posts. pub async fn get_root_posts( history_stream: impl futures::Stream>, @@ -279,6 +298,8 @@ On most occasions we'll find ourselves in a situation where we wish to add multi `src/db.rs` ```rust +use sled::Batch; + impl Database { // ... diff --git a/part_4_posts_streams/src/db.rs b/part_4_posts_streams/src/db.rs index f971a4e..c05a9d9 100644 --- a/part_4_posts_streams/src/db.rs +++ b/part_4_posts_streams/src/db.rs @@ -2,7 +2,7 @@ use std::path::Path; use log::{debug, info}; use serde::{Deserialize, Serialize}; -use sled::{Db, IVec, Result, Tree}; +use sled::{Batch, Db, IVec, Result, Tree}; /// Scuttlebutt peer data. #[derive(Debug, Deserialize, Serialize)] @@ -31,6 +31,49 @@ impl Peer { } } +/// The text and metadata of a Scuttlebutt root post. +#[derive(Debug, Deserialize, Serialize)] +pub struct Post { + /// The key of the post-type message, also known as a message reference. + pub key: String, + /// The text of the post (may be formatted as markdown). + pub text: String, + /// The date the post was published (e.g. 17 May 2021). + pub date: String, + /// The sequence number of the post-type message. + pub sequence: u64, + /// The read state of the post; true if read, false if unread. + pub read: bool, + /// The timestamp representing the date the post was published. + pub timestamp: i64, + /// The subject of the post, represented as the first 53 characters of + /// the post text. + pub subject: Option, +} + +impl Post { + // Create a new instance of the Post struct. A default value of `false` is + // set for `read`. + pub fn new( + key: String, + text: String, + date: String, + sequence: u64, + timestamp: i64, + subject: Option, + ) -> Post { + Post { + key, + text, + date, + sequence, + timestamp, + subject, + read: false, + } + } +} + /// An instance of the key-value database and relevant trees. #[allow(dead_code)] #[derive(Clone)] @@ -40,6 +83,9 @@ pub struct Database { /// A database tree containing Peer struct instances for all the peers /// we are subscribed to. peer_tree: Tree, + /// A database tree containing Post struct instances for all of the posts + /// we have downloaded from the peer to whom we subscribe. + pub post_tree: Tree, } impl Database { @@ -55,8 +101,16 @@ impl Database { let peer_tree = db .open_tree("peers") .expect("Failed to open 'peers' database tree"); + debug!("Opening 'posts' database tree"); + let post_tree = db + .open_tree("posts") + .expect("Failed to open 'posts' database tree"); - Database { db, peer_tree } + Database { + db, + peer_tree, + post_tree, + } } /// Add a peer to the database by inserting the public key into the peer @@ -78,4 +132,33 @@ impl Database { debug!("Removing peer {} from 'peers' database tree", &public_key); self.peer_tree.remove(&public_key).map(|_| ()) } + + /// Add a post to the database by inserting an instance of the Post struct + /// into the post tree. + pub fn add_post(&self, public_key: &str, post: Post) -> Result> { + let post_key = format!("{}_{}", public_key, post.key); + debug!("Serializing post data for {} to bincode", &post_key); + let post_bytes = bincode::serialize(&post).unwrap(); + + debug!("Inserting post {} into 'posts' database tree", &post_key); + self.post_tree.insert(post_key.as_bytes(), post_bytes) + } + + /// Add a batch of posts to the database by inserting a vector of instances + /// of the Post struct into the post tree. + pub fn add_post_batch(&self, public_key: &str, posts: Vec) -> Result<()> { + let mut post_batch = Batch::default(); + + for post in posts { + let post_key = format!("{}_{}", public_key, post.key); + debug!("Serializing post data for {} to bincode", &post_key); + let post_bytes = bincode::serialize(&post).unwrap(); + + debug!("Inserting post {} into 'posts' database tree", &post_key); + post_batch.insert(post_key.as_bytes(), post_bytes) + } + + debug!("Applying batch insertion into 'posts' database tree"); + self.post_tree.apply_batch(post_batch) + } } diff --git a/part_4_posts_streams/src/sbot.rs b/part_4_posts_streams/src/sbot.rs index 32300be..57c7e10 100644 --- a/part_4_posts_streams/src/sbot.rs +++ b/part_4_posts_streams/src/sbot.rs @@ -1,7 +1,17 @@ use std::env; -use golgi::{api::friends::RelationshipQuery, sbot::Keystore, Sbot}; +use async_std::stream::StreamExt; +use chrono::NaiveDateTime; +use golgi::{ + api::{friends::RelationshipQuery, history_stream::CreateHistoryStream}, + messages::{SsbMessageContentType, SsbMessageKVT}, + sbot::Keystore, + GolgiError, Sbot, +}; use log::{info, warn}; +use serde_json::value::Value; + +use crate::db::Post; /// Initialise a connection to a Scuttlebutt server. pub async fn init_sbot() -> Result { @@ -53,16 +63,6 @@ pub async fn unfollow_peer(public_key: &str) -> Result { sbot.unfollow(public_key).await.map_err(|e| e.to_string()) } -/// Return the name (self-identifier) for the peer associated with the given -/// public key. -/// -/// The public key of the peer will be returned if a name is not found. -pub async fn get_name(public_key: &str) -> Result { - let mut sbot = init_sbot().await?; - - sbot.get_name(public_key).await.map_err(|e| e.to_string()) -} - /// Check the follow status of a remote peer and follow them if not already /// following. pub async fn follow_if_not_following(remote_peer: &str) -> Result<(), String> { @@ -128,3 +128,86 @@ pub async fn unfollow_if_following(remote_peer: &str) -> Result<(), String> { Err(err_msg) } } + +/// Return a stream of messages authored by the given public key. +/// +/// This returns all messages regardless of type. +pub async fn get_message_stream( + public_key: &str, + sequence_number: u64, +) -> impl futures::Stream> { + let mut sbot = init_sbot().await.unwrap(); + + let history_stream_args = CreateHistoryStream::new(public_key.to_string()) + .keys_values(true, true) + .after_seq(sequence_number); + + sbot.create_history_stream(history_stream_args) + .await + .unwrap() +} + +/// Return the name (self-identifier) for the peer associated with the given +/// public key. +/// +/// The public key of the peer will be returned if a name is not found. +pub async fn get_name(public_key: &str) -> Result { + let mut sbot = init_sbot().await?; + + sbot.get_name(public_key).await.map_err(|e| e.to_string()) +} + +/// Filter a stream of messages and return a vector of root posts. +/// +/// Each returned vector element includes the key of the post, the content +/// text, the date the post was published, the sequence number of the post +/// and whether it is read or unread. +pub async fn get_root_posts( + history_stream: impl futures::Stream>, +) -> (u64, Vec) { + let mut latest_sequence = 0; + let mut posts = Vec::new(); + + futures::pin_mut!(history_stream); + + while let Some(res) = history_stream.next().await { + match res { + Ok(msg) => { + if msg.value.is_message_type(SsbMessageContentType::Post) { + let content = msg.value.content.to_owned(); + if let Value::Object(content_map) = content { + if !content_map.contains_key("root") { + latest_sequence = msg.value.sequence; + + let text = match content_map.get_key_value("text") { + Some(value) => value.1.to_string(), + None => String::from(""), + }; + let timestamp = msg.value.timestamp.round() as i64 / 1000; + let datetime = NaiveDateTime::from_timestamp(timestamp, 0); + let date = datetime.format("%d %b %Y").to_string(); + let subject = text.get(0..52).map(|s| s.to_string()); + + let post = Post::new( + msg.key.to_owned(), + text, + date, + msg.value.sequence, + timestamp, + subject, + ); + + posts.push(post) + } + } + } + } + Err(err) => { + // Print the `GolgiError` of this element to `stderr`. + warn!("err: {:?}", err); + } + } + } + + (latest_sequence, posts) +}