From ecb706e93fcf50b99d48b91dd95d2898d5da4328 Mon Sep 17 00:00:00 2001 From: nullishamy Date: Sat, 3 May 2025 16:01:23 +0100 Subject: [PATCH] feat: start refactor of ap module to federation module --- Cargo.lock | 1 + ferri-main/Cargo.toml | 1 + ferri-main/src/config/mod.rs | 4 +- ferri-main/src/federation/http.rs | 106 +++++ ferri-main/src/federation/inbox.rs | 141 ++++++ ferri-main/src/federation/mod.rs | 15 + ferri-main/src/federation/outbox.rs | 53 +++ ferri-main/src/federation/request_queue.rs | 72 +++ ferri-main/src/lib.rs | 5 + ferri-main/src/types/make.rs | 53 +++ ferri-main/src/types/mod.rs | 36 ++ ferri-server/src/endpoints/api/search.rs | 4 +- ferri-server/src/endpoints/api/timeline.rs | 2 +- ferri-server/src/endpoints/custom.rs | 2 +- ferri-server/src/endpoints/inbox.rs | 485 +++------------------ ferri-server/src/lib.rs | 14 +- 16 files changed, 564 insertions(+), 430 deletions(-) create mode 100644 ferri-main/src/federation/http.rs create mode 100644 ferri-main/src/federation/inbox.rs create mode 100644 ferri-main/src/federation/mod.rs create mode 100644 ferri-main/src/federation/outbox.rs create mode 100644 ferri-main/src/federation/request_queue.rs diff --git a/Cargo.lock b/Cargo.lock index fd43ed1..5e3f38e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1316,6 +1316,7 @@ dependencies = [ "serde_json", "sqlx", "thiserror 2.0.12", + "tokio", "tracing", "url", "uuid", diff --git a/ferri-main/Cargo.toml b/ferri-main/Cargo.toml index 77dc8d9..b85a4da 100644 --- a/ferri-main/Cargo.toml +++ b/ferri-main/Cargo.toml @@ -17,3 +17,4 @@ serde_json = { workspace = true } base64 = "0.22.1" rsa = { version = "0.9.8", features = ["sha2"] } url = "2.5.4" +tokio = "1.44.2" diff --git a/ferri-main/src/config/mod.rs b/ferri-main/src/config/mod.rs index ebe2532..b41a952 100644 --- a/ferri-main/src/config/mod.rs +++ b/ferri-main/src/config/mod.rs @@ -1,11 +1,11 @@ use serde::{Deserialize, Serialize}; -#[derive(Serialize, Deserialize, Debug)] +#[derive(Serialize, Deserialize, Debug, Clone)] pub struct ServerConfig { pub host: String, } -#[derive(Serialize, Deserialize, Debug)] +#[derive(Serialize, Deserialize, Debug, Clone)] pub struct Config { pub server: ServerConfig, } diff --git a/ferri-main/src/federation/http.rs b/ferri-main/src/federation/http.rs new file mode 100644 index 0000000..4d686e3 --- /dev/null +++ b/ferri-main/src/federation/http.rs @@ -0,0 +1,106 @@ +use crate::ap::http::HttpClient; +use crate::types::ap; +use std::fmt::Debug; +use serde::Serialize; +use thiserror::Error; +use tracing::{Level, error, event, info}; + +use super::outbox::PreparedActivity; + +pub struct HttpWrapper<'a> { + client: &'a HttpClient, + key_id: &'a str, +} + +#[derive(Error, Debug)] +pub enum HttpError { + #[error("entity of type `{0}` @ URL `{1}` could not be loaded")] + LoadFailure(String, String), + #[error("entity of type `{0}` @ URL `{1}` could not be parsed ({2})")] + ParseFailure(String, String, String), +} + +impl<'a> HttpWrapper<'a> { + pub fn new(client: &'a HttpClient, key_id: &'a str) -> HttpWrapper<'a> { + Self { client, key_id } + } + + pub fn client(&self) -> &'a HttpClient { + self.client + } + + async fn get( + &self, + ty: &str, + url: &str, + ) -> Result { + let ty = ty.to_string(); + event!(Level::INFO, url, "loading {}", ty); + + let http_result = self + .client + .get(url) + .sign(self.key_id) + .activity() + .send() + .await; + + if let Err(e) = http_result { + error!("could not load url {}: {:#?}", url, e); + return Err(HttpError::LoadFailure(ty, url.to_string())); + } + + let raw_body = http_result.unwrap().text().await; + if let Err(e) = raw_body { + error!("could not get text for url {}: {:#?}", url, e); + return Err(HttpError::LoadFailure(ty, url.to_string())); + } + + let raw_body = raw_body.unwrap(); + info!("raw body {}", raw_body); + let decoded = serde_json::from_str::(&raw_body); + + if let Err(e) = decoded { + error!( + "could not parse {} for url {}: {:#?} {}", + ty, url, e, &raw_body + ); + return Err(HttpError::ParseFailure(ty, url.to_string(), e.to_string())); + } + + Ok(decoded.unwrap()) + } + + pub async fn get_person(&self, url: &str) -> Result { + self.get("Person", url).await + } + + pub async fn post_activity( + &self, + inbox: &str, + activity: PreparedActivity + ) -> Result { + let http_result = self + .client + .post(inbox) + .sign(self.key_id) + .json(activity) + .activity() + .send() + .await; + + if let Err(e) = http_result { + error!("could not load url {}: {:#?}", inbox, e); + return Err(HttpError::LoadFailure("Activity".to_string(), inbox.to_string())); + } + + let raw_body = http_result.unwrap().text().await; + if let Err(e) = raw_body { + error!("could not get text for url {}: {:#?}", inbox, e); + return Err(HttpError::LoadFailure("Activity".to_string(), inbox.to_string())); + } + + let raw_body = raw_body.unwrap(); + Ok(raw_body.to_string()) + } +} diff --git a/ferri-main/src/federation/inbox.rs b/ferri-main/src/federation/inbox.rs new file mode 100644 index 0000000..f0e35fa --- /dev/null +++ b/ferri-main/src/federation/inbox.rs @@ -0,0 +1,141 @@ +use crate::types::{ap, as_context, db, get, make, Object, ObjectUri, ObjectUuid}; +use crate::ap::http::HttpClient; + +use super::http::HttpWrapper; +use super::outbox::OutboxRequest; +use super::QueueMessage; + +use chrono::DateTime; +use tracing::warn; + +#[derive(Debug)] +pub enum InboxRequest { + Delete(ap::DeleteActivity, db::User), + Follow { + activity: ap::FollowActivity, + followed: db::User, + conn: sqlx::SqliteConnection, + outbound: super::QueueHandle + }, + Create(ap::CreateActivity, db::User, sqlx::SqliteConnection), + Like(ap::LikeActivity, db::User), + Boost(ap::BoostActivity, db::User) +} + +fn key_id(user: &db::User) -> String { + format!("https://ferri.amy.mov/users/{}#main-key", user.id.0) +} + +pub async fn handle_inbox_request( + req: InboxRequest, + http: &HttpClient, +) { + match req { + InboxRequest::Delete(_, _) => { + todo!() + }, + InboxRequest::Follow { activity, followed, mut conn, outbound } => { + let kid = key_id(&followed); + let http = HttpWrapper::new(http, &kid); + + let follower = http.get_person(&activity.actor).await.unwrap(); + + let follow = db::Follow { + id: ObjectUri( + format!("https://ferri.amy.mov/activities/{}", crate::new_id()) + ), + follower: follower.obj.id.clone(), + followed: followed.actor.id.clone() + }; + + make::new_follow(follow, &mut conn).await.unwrap(); + + let activity = ap::AcceptActivity { + obj: Object { + context: as_context(), + id: ObjectUri( + format!("https://ferri.amy.mov/activities/{}", crate::new_id()) + ) + }, + ty: ap::ActivityType::Accept, + object: activity.obj.id.0.clone(), + actor: followed.actor.id.0.clone() + }; + + let msg = QueueMessage::Outbound( + OutboxRequest::Accept(activity, kid, follower) + ); + + outbound.send(msg).await; + }, + InboxRequest::Create(activity, user, mut conn) => { + let id = key_id(&user); + let http = HttpWrapper::new(http, &id); + let person = http.get_person(&activity.actor).await.unwrap(); + let rmt = person.remote_info(); + + let post = activity.object; + let post_id = crate::ap::new_id(); + + let created_at = DateTime::parse_from_rfc3339(&activity.ts) + .map(|dt| dt.to_utc()) + .unwrap(); + + let actor_uri = person.obj.id; + + let actor = db::Actor { + id: actor_uri, + inbox: person.inbox, + outbox: person.outbox + }; + + make::new_actor(actor.clone(), &mut conn) + .await + .unwrap(); + + let user = get::user_by_actor_uri(actor.id.clone(), &mut conn) + .await + .unwrap_or_else(|_| { + db::User { + id: ObjectUuid(crate::new_id()), + actor, + username: person.preferred_username, + display_name: person.name, + acct: rmt.acct, + remote: rmt.is_remote, + url: rmt.web_url, + created_at: crate::ap::now(), + icon_url: person.icon.map(|ic| ic.url) + .unwrap_or("https//ferri.amy.mov/assets/pfp.png".to_string()), + posts: db::UserPosts { + last_post_at: None + } + } + }); + + make::new_user(user.clone(), &mut conn) + .await + .unwrap(); + + let post = db::Post { + id: ObjectUuid(post_id), + uri: post.obj.id, + user, + content: post.content, + created_at, + boosted_post: None + }; + + + make::new_post(post, &mut conn) + .await + .unwrap(); + }, + InboxRequest::Like(_, _) => { + warn!("unimplemented Like in inbox"); + }, + InboxRequest::Boost(_, _) => { + warn!("unimplemented Boost in inbox"); + }, + } +} diff --git a/ferri-main/src/federation/mod.rs b/ferri-main/src/federation/mod.rs new file mode 100644 index 0000000..489ba51 --- /dev/null +++ b/ferri-main/src/federation/mod.rs @@ -0,0 +1,15 @@ +/* +What should we handle here: +- Inbound/Outbound queues +- Accepting events from the API to process + - Which entails the logic for all of that +- Remote actioning (webfinger, httpwrapper if possible) + */ + +mod request_queue; +pub use request_queue::*; + +pub mod inbox; +pub mod outbox; +pub mod http; + diff --git a/ferri-main/src/federation/outbox.rs b/ferri-main/src/federation/outbox.rs new file mode 100644 index 0000000..072e5af --- /dev/null +++ b/ferri-main/src/federation/outbox.rs @@ -0,0 +1,53 @@ +use serde::{Deserialize, Serialize}; +use tracing::info; +use std::fmt::Debug; +use crate::{ap::http::HttpClient, federation::http::HttpWrapper, types::{ap, ObjectContext}}; + +#[derive(Debug)] +pub enum OutboxRequest { + // FIXME: Make the String (key_id) nicer + // Probably store it in the DB and pass a db::User here + Accept(ap::AcceptActivity, String, ap::Person) +} + +#[derive(Serialize, Deserialize, Debug)] +pub struct PreparedActivity { + #[serde(rename = "@context")] + context: ObjectContext, + + id: String, + #[serde(rename = "type")] + ty: ap::ActivityType, + + actor: String, + object: T, + published: String, +} + +pub async fn handle_outbox_request( + req: OutboxRequest, + http: &HttpClient +) { + match req { + OutboxRequest::Accept(activity, key_id, person) => { + let http = HttpWrapper::new(http, &key_id); + + info!("accepting {}", activity.object); + let activity = PreparedActivity { + context: activity.obj.context, + id: activity.obj.id.0, + ty: activity.ty, + actor: activity.actor, + object: activity.object, + published: crate::ap::new_ts() + }; + + let res = http + .post_activity(&person.inbox, activity) + .await + .unwrap(); + + info!("accept res {}", res); + }, + } +} diff --git a/ferri-main/src/federation/request_queue.rs b/ferri-main/src/federation/request_queue.rs new file mode 100644 index 0000000..4eef7ef --- /dev/null +++ b/ferri-main/src/federation/request_queue.rs @@ -0,0 +1,72 @@ +use tokio::sync::mpsc; +use tracing::{info, span, Instrument, Level}; + +use crate::ap::http::HttpClient; +use crate::config::Config; +use crate::federation::inbox::handle_inbox_request; +use crate::federation::outbox::handle_outbox_request; + +use super::inbox::InboxRequest; +use super::outbox::OutboxRequest; + +#[derive(Debug)] +pub enum QueueMessage { + Heartbeat, + Inbound(InboxRequest), + Outbound(OutboxRequest), +} + +pub struct RequestQueue { + name: &'static str, + send: mpsc::Sender, + recv: mpsc::Receiver, +} + +#[derive(Clone, Debug)] +pub struct QueueHandle { + send: mpsc::Sender, +} + +impl QueueHandle { + pub async fn send(&self, msg: QueueMessage) { + self.send.send(msg).await.unwrap(); + } +} + +impl RequestQueue { + pub fn new(name: &'static str) -> Self { + let (send, recv) = mpsc::channel(1024); + Self { name, send, recv } + } + + pub fn spawn(self, config: Config) -> QueueHandle { + info!("starting up queue '{}'", self.name); + let span = span!(Level::INFO, "queue", queue_name = self.name); + + let fut = async move { + info!("using config {:#?}, queue is up", config); + let mut recv = self.recv; + let http = HttpClient::new(); + + while let Some(req) = recv.recv().await { + info!(?req, "got a message into the queue"); + + match req { + QueueMessage::Heartbeat => { + info!("heartbeat on queue"); + }, + QueueMessage::Inbound(inbox_request) => { + handle_inbox_request(inbox_request, &http).await; + }, + QueueMessage::Outbound(outbox_request) => { + handle_outbox_request(outbox_request, &http).await; + }, + } + } + }.instrument(span); + + tokio::spawn(fut); + + QueueHandle { send: self.send } + } +} diff --git a/ferri-main/src/lib.rs b/ferri-main/src/lib.rs index ac5d47b..c9a8744 100644 --- a/ferri-main/src/lib.rs +++ b/ferri-main/src/lib.rs @@ -1,6 +1,7 @@ pub mod ap; pub mod config; pub mod types; +pub mod federation; use rand::{Rng, distributions::Alphanumeric}; @@ -11,3 +12,7 @@ pub fn gen_token(len: usize) -> String { .map(char::from) .collect() } + +pub fn new_id() -> String { + uuid::Uuid::new_v4().to_string() +} diff --git a/ferri-main/src/types/make.rs b/ferri-main/src/types/make.rs index 5383d98..d569f91 100644 --- a/ferri-main/src/types/make.rs +++ b/ferri-main/src/types/make.rs @@ -8,6 +8,7 @@ pub async fn new_user(user: db::User, conn: &mut SqliteConnection) -> Result Result { + sqlx::query!( + r#" + INSERT INTO follow (id, follower_id, followed_id) + VALUES (?1, ?2, ?3) + "#, + follow.id.0, + follow.follower.0, + follow.follower.0, + ) + .execute(conn) + .await + .map_err(|e| DbError::CreationError(e.to_string()))?; + + Ok(follow) +} + + +pub async fn new_post( + post: db::Post, + conn: &mut SqliteConnection, +) -> Result { + let ts = post.created_at.to_rfc3339(); + let boosted = post.boosted_post.as_ref().map(|b| &b.0); + + sqlx::query!( + r#" + INSERT INTO post (id, uri, user_id, content, created_at, boosted_post_id) + VALUES (?1, ?2, ?3, ?4, ?5, ?6) + ON CONFLICT(uri) DO NOTHING + "#, + post.id.0, + post.uri.0, + post.user.id.0, + post.content, + ts, + boosted + + ) + .execute(conn) + .await + .map_err(|e| DbError::CreationError(e.to_string()))?; + + Ok(post) +} + + diff --git a/ferri-main/src/types/mod.rs b/ferri-main/src/types/mod.rs index f827fbe..666b62e 100644 --- a/ferri-main/src/types/mod.rs +++ b/ferri-main/src/types/mod.rs @@ -63,6 +63,13 @@ pub mod db { use super::*; use chrono::{DateTime, Utc}; + #[derive(Debug, Eq, PartialEq, Clone)] + pub struct Follow { + pub id: ObjectUri, + pub follower: ObjectUri, + pub followed: ObjectUri, + } + #[derive(Debug, Eq, PartialEq, Clone)] pub struct Actor { pub id: ObjectUri, @@ -105,12 +112,14 @@ pub mod db { pub mod ap { use super::*; use serde::{Deserialize, Serialize}; + use url::Url; #[derive(Serialize, Deserialize, Debug, Eq, PartialEq)] pub enum ActivityType { Create, Note, Delete, + Undo, Accept, Announce, Like, @@ -261,6 +270,33 @@ pub mod ap { pub icon: Option } + pub struct RemoteInfo { + pub is_remote: bool, + pub web_url: String, + pub acct: String + } + + impl Person { + pub fn remote_info(&self) -> RemoteInfo { + let url = Url::parse(&self.obj.id.0).unwrap(); + let host = url.host_str().unwrap(); + + let (acct, remote) = if host != "ferri.amy.mov" { + (format!("{}@{}", self.preferred_username, host), true) + } else { + (self.preferred_username.clone(), false) + }; + + let url = format!("https://ferri.amy.mov/{}", acct); + + RemoteInfo { + acct: acct.to_string(), + web_url: url, + is_remote: remote, + } + } + } + #[derive(Serialize, Deserialize, Debug, Eq, PartialEq)] pub struct UserKey { pub id: String, diff --git a/ferri-server/src/endpoints/api/search.rs b/ferri-server/src/endpoints/api/search.rs index c009c0f..f332fc7 100644 --- a/ferri-server/src/endpoints/api/search.rs +++ b/ferri-server/src/endpoints/api/search.rs @@ -59,7 +59,7 @@ pub async fn search( } }; - let user = get::user_by_actor_uri(person.unwrap().obj.id, &mut **db) + let user = get::user_by_actor_uri(person.unwrap().obj.id, &mut db) .await .unwrap(); @@ -67,7 +67,7 @@ pub async fn search( }, SearchType::Statuses => { if q == "me" { - let st = get::posts_for_user_id(user.id, &mut **db) + let st = get::posts_for_user_id(user.id, &mut db) .await .unwrap(); diff --git a/ferri-server/src/endpoints/api/timeline.rs b/ferri-server/src/endpoints/api/timeline.rs index 57be00f..2ca3b03 100644 --- a/ferri-server/src/endpoints/api/timeline.rs +++ b/ferri-server/src/endpoints/api/timeline.rs @@ -1,6 +1,6 @@ use crate::{AuthenticatedUser, Db, endpoints::api::user::CredentialAcount}; use rocket::{ - State, get, + get, serde::{Deserialize, Serialize, json::Json}, }; use rocket_db_pools::Connection; diff --git a/ferri-server/src/endpoints/custom.rs b/ferri-server/src/endpoints/custom.rs index f730a24..f7ac81c 100644 --- a/ferri-server/src/endpoints/custom.rs +++ b/ferri-server/src/endpoints/custom.rs @@ -81,7 +81,7 @@ pub async fn resolve_user(acct: &str, host: &str) -> ap::Person { #[get("/test")] pub async fn test(outbound: &State, mut db: Connection) -> &'static str { use main::types::{ObjectUuid, api, get}; - outbound.0.send(main::ap::QueueMessage::Heartbeat); + outbound.0.send(main::federation::QueueMessage::Heartbeat).await; let id = ObjectUuid("9b9d497b-2731-435f-a929-e609ca69dac9".to_string()); let user = dbg!(get::user_by_id(id, &mut db).await.unwrap()); diff --git a/ferri-server/src/endpoints/inbox.rs b/ferri-server/src/endpoints/inbox.rs index 9750187..0c2670f 100644 --- a/ferri-server/src/endpoints/inbox.rs +++ b/ferri-server/src/endpoints/inbox.rs @@ -1,447 +1,98 @@ -use crate::http_wrapper::HttpWrapper; -use chrono::Local; -use rocket::serde::json::serde_json; -use rocket::{State, post}; +use main::{ + federation::{ + QueueMessage, + inbox::InboxRequest + }, + types::{ap, get, ObjectUuid} +}; +use rocket::{State, post, serde::json::serde_json}; use rocket_db_pools::Connection; -use sqlx::Sqlite; -use sqlx::SqliteConnection; -use tracing::Instrument; -use tracing::{Level, debug, error, event, info, span, warn}; -use url::Url; -use uuid::Uuid; +use serde::de::DeserializeOwned; +use tracing::{debug, event, span, warn, Instrument, Level}; -use crate::Db; -use main::types::{DbError, ObjectUri, ObjectUuid, ap, db, make}; +use crate::{Db, InboundQueue, OutboundQueue}; -fn handle_delete_activity(activity: ap::DeleteActivity) { - warn!(?activity, "unimplemented delete activity"); +fn deser(body: &str) -> T { + serde_json::from_str(body).unwrap() } -async fn create_actor( - user: &ap::Person, - actor: &str, - conn: impl sqlx::Executor<'_, Database = Sqlite>, -) { - sqlx::query!( - r#" - INSERT INTO actor (id, inbox, outbox) - VALUES ( ?1, ?2, ?3 ) - ON CONFLICT(id) DO NOTHING; - "#, - actor, - user.inbox, - user.outbox - ) - .execute(conn) - .await - .unwrap(); -} - -async fn create_user( - user: &ap::Person, - actor: &str, - conn: impl sqlx::Executor<'_, Database = Sqlite>, -) { - // HACK: Allow us to formulate a `user@host` username by assuming the actor is on the same host as the user - let url = Url::parse(actor).unwrap(); - let host = url.host_str().unwrap(); - info!( - "creating user '{}'@'{}' ({:#?})", - user.preferred_username, host, user - ); - - let (acct, remote) = if host != "ferri.amy.mov" { - (format!("{}@{}", user.preferred_username, host), true) - } else { - (user.preferred_username.clone(), false) - }; - - let url = format!("https://ferri.amy.mov/{}", acct); - let icon_url = user.icon.as_ref().map(|ic| ic.url.clone()).unwrap_or( - "https://ferri.amy.mov/assets/pfp.png".to_string() - ); - - let uuid = Uuid::new_v4().to_string(); - // FIXME: Pull from user - let ts = main::ap::new_ts(); - sqlx::query!( - r#" - INSERT INTO user ( - id, acct, url, remote, username, - actor_id, display_name, created_at, icon_url - ) - VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9) - ON CONFLICT(actor_id) DO NOTHING; - "#, - uuid, - acct, - url, - remote, - user.preferred_username, - actor, - user.name, - ts, - icon_url - ) - .execute(conn) - .await - .unwrap(); -} - -async fn create_follow( - activity: &ap::FollowActivity, - conn: impl sqlx::Executor<'_, Database = Sqlite>, -) { - sqlx::query!( - r#" - INSERT INTO follow (id, follower_id, followed_id) - VALUES ( ?1, ?2, ?3 ) - ON CONFLICT(id) DO NOTHING; - "#, - activity.obj.id.0, - activity.actor, - activity.object - ) - .execute(conn) - .await - .unwrap(); -} - -struct RemoteInfo { - acct: String, - web_url: String, - is_remote: bool, -} - -fn get_remote_info(actor_url: &str, person: &ap::Person) -> RemoteInfo { - let url = Url::parse(actor_url).unwrap(); - let host = url.host_str().unwrap(); - - let (acct, remote) = if host != "ferri.amy.mov" { - (format!("{}@{}", person.preferred_username, host), true) - } else { - (person.preferred_username.clone(), false) - }; - - let url = format!("https://ferri.amy.mov/{}", acct); - - RemoteInfo { - acct: acct.to_string(), - web_url: url, - is_remote: remote, - } -} - -async fn resolve_actor<'a>( - actor_url: &str, - http: &HttpWrapper<'a>, - conn: &mut SqliteConnection, -) -> Result { - let person = { - let res = http.get_person(actor_url).await; - if let Err(e) = res { - error!("could not load user {}: {}", actor_url, e.to_string()); - return Err(DbError::FetchError(format!( - "could not load user {}: {}", - actor_url, e - ))); - } - - res.unwrap() - }; - - let user_id = ObjectUuid::new(); - let remote_info = get_remote_info(actor_url, &person); - - let actor = db::Actor { - id: ObjectUri(actor_url.to_string()), - inbox: person.inbox.clone(), - outbox: person.outbox.clone(), - }; - - info!("creating actor {}", actor_url); - - let actor = make::new_actor(actor.clone(), conn).await.unwrap_or(actor); - - info!("creating user {} ({:#?})", remote_info.acct, person); - - let user = db::User { - id: user_id, - actor, - username: person.name, - display_name: person.preferred_username, - acct: remote_info.acct, - remote: remote_info.is_remote, - url: remote_info.web_url, - created_at: main::ap::now(), - icon_url: person.icon.map(|ic| ic.url).unwrap_or( - "https://ferri.amy.mov/assets/pfp.png".to_string() - ), - - posts: db::UserPosts { last_post_at: None }, - }; - - Ok(make::new_user(user.clone(), conn).await.unwrap_or(user)) -} - -async fn handle_follow_activity<'a>( - followed_account: &str, - activity: ap::FollowActivity, - http: HttpWrapper<'a>, +#[post("/users//inbox", data = "")] +pub async fn inbox( mut db: Connection, + queue: &State, + outbound: &State, + user_uuid: &str, + body: String ) { - let actor = resolve_actor(&activity.actor, &http, &mut db) + let user = get::user_by_id( + ObjectUuid(user_uuid.to_string()), + &mut db + ) .await .unwrap(); - - info!("{:?} follows {}", actor, followed_account); - - create_follow(&activity, &mut **db).await; - - let follower = main::ap::User::from_actor_id(&activity.actor, &mut **db).await; - let followed = main::ap::User::from_id(followed_account, &mut **db) - .await - .unwrap(); - let outbox = main::ap::Outbox::for_user(followed.clone(), http.client()); - - let activity = main::ap::Activity { - id: format!("https://ferri.amy.mov/activities/{}", Uuid::new_v4()), - ty: main::ap::ActivityType::Accept, - object: activity.obj.id.0, - ..Default::default() - }; - - let req = main::ap::OutgoingActivity { - signed_by: format!( - "https://ferri.amy.mov/users/{}#main-key", - followed.username() - ), - req: activity, - to: follower.actor().clone(), - }; - - req.save(&mut **db).await; - outbox.post(req).await; -} - -async fn handle_like_activity(activity: ap::LikeActivity, mut db: Connection) { - warn!(?activity, "unimplemented like activity"); - - let target_post = sqlx::query!("SELECT * FROM post WHERE uri = ?1", activity.object) - .fetch_one(&mut **db) - .await; - - if let Ok(post) = target_post { - warn!(?post, "tried to like post"); - } else { - warn!(post = ?activity.object, "could not find post"); - } -} - -async fn handle_boost_activity<'a>( - activity: ap::BoostActivity, - http: HttpWrapper<'a>, - mut db: Connection, -) { - let key_id = "https://ferri.amy.mov/users/9b9d497b-2731-435f-a929-e609ca69dac9#main-key"; - dbg!(&activity); - let post = http - .client() - .get(&activity.object) - .activity() - .sign(key_id) - .send() - .await - .unwrap() - .text() - .await - .unwrap(); - - info!("{}", post); - - let post = serde_json::from_str::(&post); - if let Err(e) = post { - error!(?e, "when decoding post"); - return; - } - - let post = post.unwrap(); - - info!("{:#?}", post); - let attribution = post.attributed_to.unwrap(); - - let post_user = http.get_person(&attribution).await; - if let Err(e) = post_user { - error!( - "could not load post_user {}: {}", - attribution, - e.to_string() - ); - return; - } - let post_user = post_user.unwrap(); - - let user = http.get_person(&activity.actor).await; - if let Err(e) = user { - error!("could not load actor {}: {}", activity.actor, e.to_string()); - return; - } - let user = user.unwrap(); - - debug!("creating actor {}", activity.actor); - create_actor(&user, &activity.actor, &mut **db).await; - - debug!("creating user {}", activity.actor); - create_user(&user, &activity.actor, &mut **db).await; - - debug!("creating actor {}", attribution); - create_actor(&post_user, &attribution, &mut **db).await; - - debug!("creating user {}", attribution); - create_user(&post_user, &attribution, &mut **db).await; - - let attributed_user = main::ap::User::from_actor_id(&attribution, &mut **db).await; - let actor_user = main::ap::User::from_actor_id(&activity.actor, &mut **db).await; - - let base_id = main::ap::new_id(); - let now = main::ap::new_ts(); - - let reblog_id = main::ap::new_id(); - - let attr_id = attributed_user.id(); - // HACK: ON CONFLICT is to avoid duplicate remote posts coming in - // check this better in future - sqlx::query!( - " - INSERT INTO post (id, uri, user_id, content, created_at) - VALUES (?1, ?2, ?3, ?4, ?5) - ON CONFLICT(uri) DO NOTHING - ", - reblog_id, - post.obj.id.0, - attr_id, - post.content, - post.ts - ) - .execute(&mut **db) - .await - .unwrap(); - - let uri = format!( - "https://ferri.amy.mov/users/{}/posts/{}", - actor_user.id(), - base_id - ); - let user_id = actor_user.id(); - - info!("inserting post with id {} uri {}", base_id, uri); - - sqlx::query!( - " - INSERT INTO post (id, uri, user_id, content, created_at, boosted_post_id) - VALUES (?1, ?2, ?3, ?4, ?5, ?6) - ", - base_id, - uri, - user_id, - "", - now, - reblog_id - ) - .execute(&mut **db) - .await - .unwrap(); -} - -async fn handle_create_activity<'a>( - activity: ap::CreateActivity, - http: HttpWrapper<'a>, - mut db: Connection, -) { - assert!(activity.object.ty == ap::ActivityType::Note); - debug!("resolving user {}", activity.actor); - - let user = http.get_person(&activity.actor).await; - if let Err(e) = user { - error!("could not load user {}: {}", activity.actor, e.to_string()); - return; - } - - let user = user.unwrap(); - - debug!("creating actor {}", activity.actor); - create_actor(&user, &activity.actor, &mut **db).await; - - debug!("creating user {}", activity.actor); - create_user(&user, &activity.actor, &mut **db).await; - - let user = main::ap::User::from_actor_id(&activity.actor, &mut **db).await; - debug!("user created {:?}", user); - - let user_id = user.id(); - let now = Local::now().to_rfc3339(); - let content = activity.object.content.clone(); - let post_id = Uuid::new_v4().to_string(); - let uri = activity.obj.id.0; - - info!(post_id, "creating post"); - - sqlx::query!( - r#" - INSERT INTO post (id, uri, user_id, content, created_at) - VALUES (?1, ?2, ?3, ?4, ?5) - "#, - post_id, - uri, - user_id, - content, - now - ) - .execute(&mut **db) - .await - .unwrap(); -} - -#[post("/users//inbox", data = "")] -pub async fn inbox(db: Connection, helpers: &State, user: &str, body: String) { + debug!("body in inbox: {}", body); - let min = serde_json::from_str::(&body).unwrap(); - let inbox_span = span!(Level::INFO, "inbox-post", user_id = user); + let min = deser::(&body); + let span = span!(Level::INFO, "user-inbox", user_id = user_uuid); + + let conn = db.into_inner(); + let conn = conn.detach(); async move { event!(Level::INFO, ?min, "received an activity"); - let key_id = "https://ferri.amy.mov/users/9b9d497b-2731-435f-a929-e609ca69dac9#main-key"; - let wrapper = HttpWrapper::new(&helpers.http, key_id); - match min.ty { ap::ActivityType::Delete => { - let activity = serde_json::from_str::(&body).unwrap(); - handle_delete_activity(activity); + let activity = deser::(&body); + let msg = QueueMessage::Inbound( + InboxRequest::Delete(activity, user) + ); + + queue.0.send(msg).await; } ap::ActivityType::Follow => { - let activity = serde_json::from_str::(&body).unwrap(); - handle_follow_activity(user, activity, wrapper, db).await; + let activity = deser::(&body); + let msg = QueueMessage::Inbound( + InboxRequest::Follow { + activity, + followed: user, + conn, + outbound: outbound.0.clone() + } + ); + + queue.0.send(msg).await; } ap::ActivityType::Create => { - let activity = serde_json::from_str::(&body).unwrap(); - handle_create_activity(activity, wrapper, db).await; + let activity = deser::(&body); + let msg = QueueMessage::Inbound( + InboxRequest::Create(activity, user, conn) + ); + + queue.0.send(msg).await; } ap::ActivityType::Like => { - let activity = serde_json::from_str::(&body).unwrap(); - handle_like_activity(activity, db).await; + let activity = deser::(&body); + let msg = QueueMessage::Inbound( + InboxRequest::Like(activity, user) + ); + + queue.0.send(msg).await; } ap::ActivityType::Announce => { - let activity = serde_json::from_str::(&body).unwrap(); - handle_boost_activity(activity, wrapper, db).await; + let activity = deser::(&body); + let msg = QueueMessage::Inbound( + InboxRequest::Boost(activity, user) + ); + + queue.0.send(msg).await; + }, + unimpl => { + warn!("unimplemented {:?}", unimpl); } - ap::ActivityType::Note => todo!(), - ap::ActivityType::Accept => todo!(), } } - // Allow the span to be used inside the async code - // https://docs.rs/tracing/latest/tracing/span/struct.EnteredSpan.html#deref-methods-Span - .instrument(inbox_span) + .instrument(span) .await; } diff --git a/ferri-server/src/lib.rs b/ferri-server/src/lib.rs index 04b1e4d..614061a 100644 --- a/ferri-server/src/lib.rs +++ b/ferri-server/src/lib.rs @@ -5,7 +5,7 @@ use endpoints::{ use tracing_subscriber::fmt; -use main::{ap, types::{ObjectUri, ObjectUuid}}; +use main::{federation, types::{ObjectUri, ObjectUuid}}; use main::ap::http; use main::config::Config; @@ -83,8 +83,8 @@ impl<'a> FromRequest<'a> for AuthenticatedUser { } } -pub struct OutboundQueue(pub ap::QueueHandle); -pub struct InboundQueue(pub ap::QueueHandle); +pub struct OutboundQueue(pub federation::QueueHandle); +pub struct InboundQueue(pub federation::QueueHandle); pub struct Helpers { http: http::HttpClient, @@ -106,11 +106,11 @@ pub fn launch(cfg: Config) -> Rocket { .with_writer(std::io::stdout) .init(); - let outbound = ap::RequestQueue::new("outbound"); - let outbound_handle = outbound.spawn(); + let outbound = federation::RequestQueue::new("outbound"); + let outbound_handle = outbound.spawn(cfg.clone()); - let inbound = ap::RequestQueue::new("inbound"); - let inbound_handle = inbound.spawn(); + let inbound = federation::RequestQueue::new("inbound"); + let inbound_handle = inbound.spawn(cfg.clone()); build() .manage(Helpers {