add task loop, basic viewer functionality and routes

This commit is contained in:
mycognosist 2022-06-13 08:24:55 +01:00
parent 5012910c25
commit 23ce3f00f0
8 changed files with 3024 additions and 132 deletions

2
.gitignore vendored Normal file
View File

@ -0,0 +1,2 @@
/target
src/*.bak

2855
Cargo.lock generated Normal file

File diff suppressed because it is too large Load Diff

View File

@ -10,6 +10,14 @@ use log::{debug, info};
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use sled::{Batch, Db, IVec, Result, Tree}; use sled::{Batch, Db, IVec, Result, Tree};
// The latest sequence number and name of a Scuttlebutt peer.
#[derive(Debug, Deserialize, Serialize)]
pub struct Peer {
pub latest_sequence: u64,
//pub name: String,
//pub posts: u16,
}
// The text and metadata of a Scuttlebutt root post. // The text and metadata of a Scuttlebutt root post.
#[derive(Debug, Deserialize, Serialize)] #[derive(Debug, Deserialize, Serialize)]
pub struct Post { pub struct Post {
@ -133,4 +141,6 @@ impl Database {
Ok(post) Ok(post)
} }
// TODO: remove_post
} }

View File

@ -1,133 +1,24 @@
mod db; mod db;
mod routes;
mod sbot; mod sbot;
mod task_loop; mod task_loop;
mod utils; mod utils;
use std::{env, path::Path}; use std::{env, path::Path};
use async_std::{channel, channel::Sender}; use async_std::channel;
use log::{debug, info, warn}; use log::info;
use rocket::{ use rocket::{
fairing::AdHoc, fairing::AdHoc,
form::Form,
fs::{relative, FileServer}, fs::{relative, FileServer},
get, launch, post, launch, routes,
response::Redirect,
routes, uri, FromForm, State,
}; };
use rocket_dyn_templates::{tera::Context, Template}; use rocket_dyn_templates::Template;
use crate::{db::Database, task_loop::Task}; use crate::{db::Database, routes::*, task_loop::Task};
#[derive(FromForm)] pub struct WhoAmI {
struct Peer { pub public_key: String,
public_key: String,
}
#[get("/")]
async fn home(db: &State<Database>) -> Template {
let peers = db.get_peers();
let mut context = Context::new();
context.insert("peers", &peers);
Template::render("home", &context.into_json())
}
#[get("/posts/<public_key>")]
async fn posts(db: &State<Database>, public_key: &str) -> Template {
let peers = db.get_peers();
let posts = db.get_posts(public_key).unwrap();
let mut context = Context::new();
context.insert("selected_peer", &public_key);
context.insert("peers", &peers);
context.insert("posts", &posts);
Template::render("home", &context.into_json())
}
#[get("/posts/<public_key>/<msg_id>")]
async fn post(db: &State<Database>, public_key: &str, msg_id: &str) -> Template {
let peers = db.get_peers();
let posts = db.get_posts(public_key).unwrap();
let post = db.get_post(public_key, msg_id).unwrap();
let mut context = Context::new();
context.insert("selected_peer", &public_key);
context.insert("selected_post", &msg_id);
context.insert("peers", &peers);
context.insert("posts", &posts);
context.insert("post", &post);
Template::render("home", &context.into_json())
}
#[post("/subscribe", data = "<peer>")]
async fn subscribe_form(
db: &State<Database>,
whoami: &State<WhoAmI>,
tx: &State<Sender<Task>>,
peer: Form<Peer>,
) -> Redirect {
if let Ok(_) = utils::validate_public_key(&peer.public_key) {
debug!("public key {} is valid", &peer.public_key);
match db.add_peer(&peer.public_key) {
Ok(_) => {
debug!("added {} to peer tree in database", &peer.public_key);
// TODO: i don't think we actually want to follow...
// we might still have the data in our ssb db, even if we don't follow
match sbot::is_following(&whoami.public_key, &peer.public_key).await {
Ok(status) if status.as_str() == "false" => {
match sbot::follow_peer(&peer.public_key).await {
Ok(_) => debug!("followed {}", &peer.public_key),
Err(e) => warn!("failed to follow {}: {}", &peer.public_key, e),
}
}
Ok(status) if status.as_str() == "true" => {
debug!("we already follow {}", &peer.public_key)
}
_ => (),
}
let peer = peer.public_key.to_string();
if let Err(e) = tx.send(Task::FetchAll(peer)).await {
warn!("task loop error: {}", e)
}
}
Err(_e) => warn!(
"failed to add {} to peer tree in database",
&peer.public_key
),
}
} else {
warn!("{} is invalid", &peer.public_key);
}
Redirect::to(uri!(home))
}
#[post("/unsubscribe", data = "<peer>")]
fn unsubscribe_form(db: &State<Database>, peer: Form<Peer>) -> Redirect {
// validate the public key
match utils::validate_public_key(&peer.public_key) {
Ok(_) => {
debug!("public key {} is valid", &peer.public_key);
match db.remove_peer(&peer.public_key) {
Ok(_) => debug!("removed {} from peer tree in database", &peer.public_key),
Err(_e) => warn!(
"failed to remove {} from peer tree in database",
&peer.public_key
),
}
}
Err(e) => warn!("{} is invalid: {}", &peer.public_key, e),
}
Redirect::to(uri!(home))
}
struct WhoAmI {
public_key: String,
} }
#[launch] #[launch]

114
src/routes.rs Normal file
View File

@ -0,0 +1,114 @@
use async_std::channel::Sender;
use log::{debug, warn};
use rocket::{form::Form, get, post, response::Redirect, uri, FromForm, State};
use rocket_dyn_templates::{tera::Context, Template};
use crate::{db::Database, sbot, task_loop::Task, utils, WhoAmI};
#[derive(FromForm)]
pub struct Peer {
pub public_key: String,
}
#[get("/")]
pub async fn home(db: &State<Database>) -> Template {
let peers = db.get_peers();
let mut context = Context::new();
context.insert("peers", &peers);
Template::render("home", &context.into_json())
}
#[get("/posts/<public_key>")]
pub async fn posts(db: &State<Database>, public_key: &str) -> Template {
let peers = db.get_peers();
let posts = db.get_posts(public_key).unwrap();
let mut context = Context::new();
context.insert("selected_peer", &public_key);
context.insert("peers", &peers);
context.insert("posts", &posts);
Template::render("home", &context.into_json())
}
#[get("/posts/<public_key>/<msg_id>")]
pub async fn post(db: &State<Database>, public_key: &str, msg_id: &str) -> Template {
let peers = db.get_peers();
let posts = db.get_posts(public_key).unwrap();
let post = db.get_post(public_key, msg_id).unwrap();
let mut context = Context::new();
context.insert("selected_peer", &public_key);
context.insert("selected_post", &msg_id);
context.insert("peers", &peers);
context.insert("posts", &posts);
context.insert("post", &post);
Template::render("home", &context.into_json())
}
#[post("/subscribe", data = "<peer>")]
pub async fn subscribe_form(
db: &State<Database>,
whoami: &State<WhoAmI>,
tx: &State<Sender<Task>>,
peer: Form<Peer>,
) -> Redirect {
if utils::validate_public_key(&peer.public_key).is_ok() {
debug!("public key {} is valid", &peer.public_key);
// TODO: consider getting the peer name here so we can insert it
match db.add_peer(&peer.public_key) {
Ok(_) => {
debug!("added {} to peer tree in database", &peer.public_key);
// TODO: i don't think we actually want to follow...
// we might still have the data in our ssb db, even if we don't follow
match sbot::is_following(&whoami.public_key, &peer.public_key).await {
Ok(status) if status.as_str() == "false" => {
match sbot::follow_peer(&peer.public_key).await {
Ok(_) => debug!("followed {}", &peer.public_key),
Err(e) => warn!("failed to follow {}: {}", &peer.public_key, e),
}
}
Ok(status) if status.as_str() == "true" => {
debug!("we already follow {}", &peer.public_key)
}
_ => (),
}
let peer = peer.public_key.to_string();
if let Err(e) = tx.send(Task::FetchAll(peer)).await {
warn!("task loop error: {}", e)
}
}
Err(_e) => warn!(
"failed to add {} to peer tree in database",
&peer.public_key
),
}
} else {
warn!("{} is invalid", &peer.public_key);
}
Redirect::to(uri!(home))
}
#[post("/unsubscribe", data = "<peer>")]
pub fn unsubscribe_form(db: &State<Database>, peer: Form<Peer>) -> Redirect {
// validate the public key
match utils::validate_public_key(&peer.public_key) {
Ok(_) => {
debug!("public key {} is valid", &peer.public_key);
match db.remove_peer(&peer.public_key) {
Ok(_) => debug!("removed {} from peer tree in database", &peer.public_key),
Err(_e) => warn!(
"failed to remove {} from peer tree in database",
&peer.public_key
),
}
}
Err(e) => warn!("{} is invalid: {}", &peer.public_key, e),
}
Redirect::to(uri!(home))
}

View File

@ -1,7 +1,7 @@
// Scuttlebutt functionality. //! Scuttlebutt functionality.
use async_std::stream::StreamExt; use async_std::stream::StreamExt;
use chrono::{NaiveDate, NaiveDateTime, TimeZone, Utc}; use chrono::NaiveDateTime;
use golgi::{ use golgi::{
api::friends::RelationshipQuery, api::friends::RelationshipQuery,
messages::{SsbMessageContentType, SsbMessageKVT}, messages::{SsbMessageContentType, SsbMessageKVT},
@ -13,6 +13,7 @@ use serde_json::value::Value;
use crate::db::Post; use crate::db::Post;
/// Return the public key of the local sbot instance.
pub async fn whoami() -> Result<String, String> { pub async fn whoami() -> Result<String, String> {
let mut sbot = Sbot::init(Keystore::Patchwork, None, None) let mut sbot = Sbot::init(Keystore::Patchwork, None, None)
.await .await
@ -48,6 +49,9 @@ pub async fn is_following(public_key_a: &str, public_key_b: &str) -> Result<Stri
.map_err(|e| e.to_string()) .map_err(|e| e.to_string())
} }
/// Return a stream of messages authored by the given public key.
///
/// This returns all messages regardless of type.
pub async fn get_message_stream( pub async fn get_message_stream(
public_key: &str, public_key: &str,
) -> impl futures::Stream<Item = Result<SsbMessageKVT, GolgiError>> { ) -> impl futures::Stream<Item = Result<SsbMessageKVT, GolgiError>> {
@ -61,6 +65,11 @@ pub async fn get_message_stream(
.unwrap() .unwrap()
} }
/// Filter a stream of messages and return a vector of root posts.
///
/// Each returned vector element includes the key of the post, the content
/// text, the date the post was published, the sequence number of the post
/// and whether it is read or unread.
pub async fn get_root_posts( pub async fn get_root_posts(
history_stream: impl futures::Stream<Item = Result<SsbMessageKVT, GolgiError>>, history_stream: impl futures::Stream<Item = Result<SsbMessageKVT, GolgiError>>,
) -> Vec<Post> { ) -> Vec<Post> {
@ -73,14 +82,14 @@ pub async fn get_root_posts(
Ok(msg) => { Ok(msg) => {
if msg.value.is_message_type(SsbMessageContentType::Post) { if msg.value.is_message_type(SsbMessageContentType::Post) {
let content = msg.value.content.to_owned(); let content = msg.value.content.to_owned();
if let Value::Object(map) = content { if let Value::Object(content_map) = content {
if !map.contains_key("root") { if !content_map.contains_key("root") {
let text = map.get_key_value("text").unwrap();
let timestamp_int = msg.value.timestamp.round() as i64 / 1000; let timestamp_int = msg.value.timestamp.round() as i64 / 1000;
//let timestamp = Utc.timestamp(timestamp_int, 0);
let datetime = NaiveDateTime::from_timestamp(timestamp_int, 0); let datetime = NaiveDateTime::from_timestamp(timestamp_int, 0);
//let datetime = timestamp.to_rfc2822();
let date = datetime.format("%d %b %Y").to_string(); let date = datetime.format("%d %b %Y").to_string();
let text = content_map.get_key_value("text").unwrap();
posts.push(Post { posts.push(Post {
key: msg.key.to_owned(), key: msg.key.to_owned(),
text: text.1.to_string(), text: text.1.to_string(),

View File

@ -6,6 +6,7 @@ use crate::{db::Database, sbot};
pub enum Task { pub enum Task {
Cancel, Cancel,
FetchAll(String), FetchAll(String),
//FetchLatest(String),
} }
pub async fn spawn(rx: Receiver<Task>, db: Database) { pub async fn spawn(rx: Receiver<Task>, db: Database) {
@ -17,15 +18,23 @@ pub async fn spawn(rx: Receiver<Task>, db: Database) {
// database. // database.
Task::FetchAll(peer) => { Task::FetchAll(peer) => {
let peer_msgs = sbot::get_message_stream(&peer).await; let peer_msgs = sbot::get_message_stream(&peer).await;
// TODO: return a tuple from sbot::get_root_posts
//let (root_posts, latest_sequence) = sbot::get_root_posts(peer_msgs).await;
// TODO: update the sequence number if required
//if latest_sequence > db.get_latest_sequence(&peer) {
// db.update_sequence(&peer, latest_sequence)
//}
let root_posts = sbot::get_root_posts(peer_msgs).await; let root_posts = sbot::get_root_posts(peer_msgs).await;
match db.insert_post_batch(&peer, root_posts) { match db.insert_post_batch(&peer, root_posts) {
Ok(_) => debug!("inserted message batch into peer tree for {}", &peer), Ok(_) => debug!("inserted batch of posts into post tree for {}", &peer),
Err(e) => warn!( Err(e) => warn!(
"failed to insert message batch into peer tree for {}: {}", "failed to insert batch of posts into post tree for {}: {}",
&peer, e &peer, e
), ),
} }
} }
// TODO: fetch all msgs with sequence number > peer.latest_sequence for peer
//Task::FetchLatest(peer) => {
Task::Cancel => { Task::Cancel => {
info!("exiting task loop..."); info!("exiting task loop...");
break; break;

View File

@ -12,6 +12,7 @@
grid-area: nav; grid-area: nav;
border: 5px solid #19A974; border: 5px solid #19A974;
border-radius: 15px; border-radius: 15px;
padding: 1rem;
} }
.peers { .peers {
@ -31,6 +32,7 @@
border: 5px solid #FFD700; border: 5px solid #FFD700;
border-radius: 15px; border-radius: 15px;
grid-area: content; grid-area: content;
padding: 1.5rem;
} }
.container { .container {
@ -62,14 +64,14 @@
.grid-container > div { .grid-container > div {
background-color: rgba(255, 255, 255, 0.8); background-color: rgba(255, 255, 255, 0.8);
/* text-align: center; */ /* text-align: center; */
padding: 20px 0; /* padding: 20px 0; */
/* font-size: 30px; */ /* font-size: 30px; */
} }
.flex-container { .flex-container {
display: flex; display: flex;
align-items: center; align-items: center;
flex-wrap: wrap; flex-wrap: wrap;
} }
.flex-container > input { .flex-container > input {
@ -84,7 +86,7 @@
<div class="grid-container"> <div class="grid-container">
<div class="nav"> <div class="nav">
<div class="flex-container"> <div class="flex-container">
<a href="/" style="margin-left: 20px;"><img src="/icons/download.png" style="width: 55px;"></a> <a href="/"><img src="/icons/download.png" style="width: 55px;"></a>
<a href="/" style="margin-left: 20px;"><img src="/icons/read_post.png" style="width: 55px;"></a> <a href="/" style="margin-left: 20px;"><img src="/icons/read_post.png" style="width: 55px;"></a>
<a href="/" style="margin-left: 20px;"><img src="/icons/unread_post.png" style="width: 55px;"></a> <a href="/" style="margin-left: 20px;"><img src="/icons/unread_post.png" style="width: 55px;"></a>
<a href="/" style="margin-left: 20px;"><img src="/icons/delete_post.png" style="width: 55px;"></a> <a href="/" style="margin-left: 20px;"><img src="/icons/delete_post.png" style="width: 55px;"></a>
@ -99,7 +101,7 @@
<div class="peers" style="text-align: center;"> <div class="peers" style="text-align: center;">
<ul style="padding-left: 0;"> <ul style="padding-left: 0;">
{% for peer in peers -%} {% for peer in peers -%}
<li style="list-style: none; font-size: 12px;"> <li style="list-style: none; font-size: 12px;">
<a href="/posts/{{ peer | replace(from="/", to="%2F") }}"> <a href="/posts/{{ peer | replace(from="/", to="%2F") }}">
<code style="word-wrap: anywhere;{% if selected_peer and peer == selected_peer %} font-weight: bold;{% endif %}">{{ peer }}</code> <code style="word-wrap: anywhere;{% if selected_peer and peer == selected_peer %} font-weight: bold;{% endif %}">{{ peer }}</code>
</a> </a>
@ -115,7 +117,7 @@
<a href="/posts/{{ selected_peer | replace(from="/", to="%2F") }}/{{ post.key | replace(from="/", to="%2F") }}"> <a href="/posts/{{ selected_peer | replace(from="/", to="%2F") }}/{{ post.key | replace(from="/", to="%2F") }}">
<code style="word-wrap: anywhere;{% if selected_post and post.key == selected_post %} font-weight: bold;{% endif %}">{{ post.key }}</code> <code style="word-wrap: anywhere;{% if selected_post and post.key == selected_post %} font-weight: bold;{% endif %}">{{ post.key }}</code>
| {{ post.date }} | {{ post.date }}
</a> </a>
</li> </li>
{%- endfor %} {%- endfor %}
</ul> </ul>