diff --git a/ferri-cli/src/main.rs b/ferri-cli/src/main.rs index afbe020..e785c88 100644 --- a/ferri-cli/src/main.rs +++ b/ferri-cli/src/main.rs @@ -57,11 +57,13 @@ async fn main() { acct: s("amy"), remote: false, url: s("https://ferri.amy.mov/@amy"), - created_at: main::ap::now(), + created_at: main::now(), icon_url: s("https://ferri.amy.mov/assets/pfp.png"), posts: db::UserPosts { last_post_at: None - } + }, + key_id: s("https://ferri.amy.mov/users/9b9d497b-2731-435f-a929-e609ca69dac9#main-key") + }; make::new_user(user, &mut *conn).await.unwrap(); diff --git a/ferri-main/src/ap/activity.rs b/ferri-main/src/ap/activity.rs deleted file mode 100644 index 80f45e7..0000000 --- a/ferri-main/src/ap/activity.rs +++ /dev/null @@ -1,141 +0,0 @@ -use crate::ap::{Actor, User, http}; -use chrono::{DateTime, Utc}; -use serde::{Deserialize, Serialize}; -use sqlx::Sqlite; -use std::fmt::Debug; -use tracing::{Level, event}; - -#[derive(Debug, Clone)] -pub enum ActivityType { - Follow, - Accept, - Create, - Unknown, -} - -impl ActivityType { - fn to_raw(self) -> String { - match self { - ActivityType::Follow => "Follow".to_string(), - ActivityType::Accept => "Accept".to_string(), - ActivityType::Create => "Create".to_string(), - ActivityType::Unknown => "FIXME".to_string(), - } - } -} - -#[derive(Debug, Clone)] -pub struct Activity { - pub id: String, - pub ty: ActivityType, - pub object: T, - pub published: DateTime, - pub to: Vec, - pub cc: Vec, -} - -impl Default for Activity { - fn default() -> Self { - Self { - id: Default::default(), - ty: ActivityType::Unknown, - object: Default::default(), - published: Utc::now(), - to: Default::default(), - cc: Default::default(), - } - } -} - -pub type KeyId = String; - -#[derive(Debug, Clone)] -pub struct OutgoingActivity { - pub signed_by: KeyId, - pub req: Activity, - pub to: Actor, -} - -impl OutgoingActivity { - pub async fn save(&self, conn: impl sqlx::Executor<'_, Database = Sqlite>) { - let ty = self.req.ty.clone().to_raw(); - let actor_id = self.to.id(); - - sqlx::query!( - r#" - INSERT INTO activity (id, ty, actor_id) - VALUES (?1, ?2, ?3) - "#, - self.req.id, - ty, - actor_id - ) - .execute(conn) - .await - .unwrap(); - } -} - -#[derive(Serialize, Deserialize, Debug)] -struct RawActivity { - #[serde(rename = "@context")] - #[serde(skip_deserializing)] - context: String, - - id: String, - #[serde(rename = "type")] - ty: String, - - actor: String, - object: T, - published: String, -} - -type OutboxTransport = http::HttpClient; -pub struct Outbox<'a> { - user: User, - transport: &'a OutboxTransport, -} - -impl<'a> Outbox<'a> { - pub fn user(&self) -> &User { - &self.user - } - - pub async fn post(&self, activity: OutgoingActivity) { - event!(Level::INFO, ?activity, "activity in outbox"); - - let raw = RawActivity { - context: "https://www.w3.org/ns/activitystreams".to_string(), - id: activity.req.id.clone(), - ty: activity.req.ty.to_raw(), - actor: self.user.actor().id().to_string(), - object: activity.req.object, - published: activity.req.published.to_rfc3339(), - }; - - let outbox_res = self - .transport - .post(activity.to.inbox()) - .activity() - .json(&raw) - .sign(&activity.signed_by) - .send() - .await - .unwrap() - .text() - .await - .unwrap(); - - event!( - Level::DEBUG, - outbox_res, - activity = activity.req.id, - "got response for outbox dispatch" - ); - } - - pub fn for_user(user: User, transport: &'a OutboxTransport) -> Outbox<'a> { - Outbox { user, transport } - } -} diff --git a/ferri-main/src/ap/http.rs b/ferri-main/src/ap/http.rs deleted file mode 100644 index 5b8f20b..0000000 --- a/ferri-main/src/ap/http.rs +++ /dev/null @@ -1,199 +0,0 @@ -use reqwest::{IntoUrl, Response}; -use serde::Serialize; -use url::Url; - -use rsa::{ - RsaPrivateKey, - pkcs1v15::SigningKey, - pkcs8::DecodePrivateKey, - sha2::{Digest, Sha256}, - signature::{RandomizedSigner, SignatureEncoding}, -}; - -use base64::prelude::*; -use chrono::Utc; -use tracing::{Level, event}; - -pub struct HttpClient { - client: reqwest::Client, -} - -#[derive(Debug)] -pub struct PostSignature { - date: String, - digest: String, - signature: String, -} - -#[derive(Debug)] -struct GetSignature { - date: String, - signature: String, -} - -enum RequestVerb { - GET, - POST, -} - -pub struct RequestBuilder { - verb: RequestVerb, - url: Url, - body: String, - inner: reqwest::RequestBuilder, -} - -impl RequestBuilder { - pub fn json(mut self, json: impl Serialize + Sized) -> RequestBuilder { - let body = serde_json::to_string(&json).unwrap(); - self.inner = self.inner.body(body.clone()); - self.body = body; - self - } - - pub fn activity(mut self) -> RequestBuilder { - self.inner = self - .inner - .header("Content-Type", "application/activity+json") - .header("Accept", "application/activity+json"); - self - } - - pub async fn send(self) -> Result { - event!(Level::DEBUG, ?self.inner, "sending an http request"); - - self.inner.send().await - } - - pub fn sign(mut self, key_id: &str) -> RequestBuilder { - match self.verb { - RequestVerb::GET => { - let sig = self.sign_get_request(key_id); - self.inner = self - .inner - .header("Date", sig.date) - .header("Signature", sig.signature); - self - } - RequestVerb::POST => { - let sig = self.sign_post_request(key_id); - self.inner = self - .inner - .header("Date", sig.date) - .header("Digest", sig.digest) - .header("Signature", sig.signature); - self - } - } - } - - fn sign_get_request(&self, key_id: &str) -> GetSignature { - let url = &self.url; - let host = url.host_str().unwrap(); - let path = url.path(); - - let private_key = - RsaPrivateKey::from_pkcs8_pem(include_str!("../../../private.pem")).unwrap(); - let signing_key = SigningKey::::new(private_key); - - // UTC=GMT for our purposes, use it - // RFC7231 is hardcoded to use GMT for.. some reason - let ts = Utc::now(); - - // RFC7231 string - let date = ts.format("%a, %d %b %Y %H:%M:%S GMT").to_string(); - - let to_sign = format!( - "(request-target): get {}\nhost: {}\ndate: {}", - path, host, date - ); - - let signature = signing_key.sign_with_rng(&mut rand::rngs::OsRng, &to_sign.into_bytes()); - let header = format!( - "keyId=\"{}\",algorithm=\"rsa-sha256\",headers=\"(request-target) host date\",signature=\"{}\"", - key_id, - BASE64_STANDARD.encode(signature.to_bytes()) - ); - - GetSignature { - date, - signature: header, - } - } - - fn sign_post_request(&self, key_id: &str) -> PostSignature { - let body = &self.body; - let url = &self.url; - - let host = url.host_str().unwrap(); - let path = url.path(); - - let private_key = - RsaPrivateKey::from_pkcs8_pem(include_str!("../../../private.pem")).unwrap(); - let signing_key = SigningKey::::new(private_key); - - let mut hasher = Sha256::new(); - hasher.update(body); - let sha256 = hasher.finalize(); - - let b64 = BASE64_STANDARD.encode(sha256); - let digest = format!("SHA-256={}", b64); - - // UTC=GMT for our purposes, use it - // RFC7231 is hardcoded to use GMT for.. some reason - let ts = Utc::now(); - - // RFC7231 string - let date = ts.format("%a, %d %b %Y %H:%M:%S GMT").to_string(); - - let to_sign = format!( - "(request-target): post {}\nhost: {}\ndate: {}\ndigest: {}", - path, host, date, digest - ); - - let signature = signing_key.sign_with_rng(&mut rand::rngs::OsRng, &to_sign.into_bytes()); - let header = format!( - "keyId=\"{}\",algorithm=\"rsa-sha256\",headers=\"(request-target) host date digest\",signature=\"{}\"", - key_id, - BASE64_STANDARD.encode(signature.to_bytes()) - ); - - PostSignature { - date, - digest, - signature: header, - } - } -} - -impl Default for HttpClient { - fn default() -> Self { - Self::new() - } -} - -impl HttpClient { - pub fn new() -> Self { - Self { - client: reqwest::Client::new(), - } - } - - pub fn get(&self, url: impl IntoUrl + Clone) -> RequestBuilder { - RequestBuilder { - verb: RequestVerb::GET, - url: url.clone().into_url().unwrap(), - body: String::new(), - inner: self.client.get(url), - } - } - - pub fn post(&self, url: impl IntoUrl + Clone) -> RequestBuilder { - RequestBuilder { - verb: RequestVerb::POST, - url: url.clone().into_url().unwrap(), - body: String::new(), - inner: self.client.post(url), - } - } -} diff --git a/ferri-main/src/ap/mod.rs b/ferri-main/src/ap/mod.rs deleted file mode 100644 index 98d7330..0000000 --- a/ferri-main/src/ap/mod.rs +++ /dev/null @@ -1,30 +0,0 @@ -use chrono::{DateTime, Utc}; -use uuid::Uuid; - -pub mod http; - -mod activity; -pub use activity::*; - -mod user; -pub use user::*; - -mod post; -pub use post::*; - -mod request_queue; -pub use request_queue::*; - -pub const AS_CONTEXT: &str = "https://www.w3.org/ns/activitystreams"; - -pub fn new_id() -> String { - Uuid::new_v4().to_string() -} - -pub fn new_ts() -> String { - now().to_rfc3339() -} - -pub fn now() -> DateTime { - Utc::now() -} diff --git a/ferri-main/src/ap/post.rs b/ferri-main/src/ap/post.rs deleted file mode 100644 index a42a246..0000000 --- a/ferri-main/src/ap/post.rs +++ /dev/null @@ -1,113 +0,0 @@ -use crate::ap; -use chrono::{DateTime, Utc}; -use serde::Serialize; -use sqlx::Sqlite; - -const POST_TYPE: &str = "Note"; - -#[derive(Clone)] -pub struct Post { - id: String, - from: ap::User, - ts: DateTime, - content: String, - - to: Vec, - cc: Vec, -} - -impl Post { - pub fn from_parts(id: String, content: String, from: ap::User) -> Self { - Self { - id, - content, - from, - ts: ap::now(), - to: vec![], - cc: vec![], - } - } - - pub fn id(&self) -> &str { - &self.id - } - - pub fn content(&self) -> &str { - &self.content - } - - pub fn created_at(&self) -> String { - self.ts.to_rfc3339() - } - - pub fn uri(&self) -> String { - format!( - "https://ferri.amy.mov/users/{}/posts/{}", - self.from.id(), - self.id - ) - } - - pub async fn save(&self, conn: impl sqlx::Executor<'_, Database = Sqlite>) { - let ts = self.ts.to_rfc3339(); - let user_id = self.from.id(); - let post_id = self.id(); - let uri = self.uri(); - let content = self.content.clone(); - - sqlx::query!( - r#" - INSERT INTO post (id, uri, user_id, content, created_at) - VALUES (?1, ?2, ?3, ?4, ?5) - "#, - post_id, - uri, - user_id, - content, - ts - ) - .execute(conn) - .await - .unwrap(); - } - - pub fn to(mut self, recipient: String) -> Self { - self.to.push(recipient); - self - } - - pub fn cc(mut self, recipient: String) -> Self { - self.cc.push(recipient); - self - } - - pub fn to_ap(self) -> APPost { - APPost { - context: ap::AS_CONTEXT.to_string(), - id: self.uri(), - ty: POST_TYPE.to_string(), - ts: self.ts.to_rfc3339(), - content: self.content, - to: self.to, - cc: self.cc, - } - } -} - -#[derive(Serialize, Debug, Default)] -pub struct APPost { - #[serde(rename = "@context")] - #[serde(skip_deserializing)] - context: String, - id: String, - - #[serde(rename = "type")] - ty: String, - - #[serde(rename = "published")] - ts: String, - - content: String, - to: Vec, - cc: Vec, -} diff --git a/ferri-main/src/ap/request_queue.rs b/ferri-main/src/ap/request_queue.rs deleted file mode 100644 index fc5ebeb..0000000 --- a/ferri-main/src/ap/request_queue.rs +++ /dev/null @@ -1,55 +0,0 @@ -use std::sync::mpsc; -use std::thread; -use tracing::{Level, info, span}; - -#[derive(Debug)] -pub enum QueueMessage { - Heartbeat, -} - -pub struct RequestQueue { - name: &'static str, - send: mpsc::Sender, - recv: mpsc::Receiver, -} - -#[derive(Clone)] -pub struct QueueHandle { - send: mpsc::Sender, -} - -impl QueueHandle { - pub fn send(&self, msg: QueueMessage) { - self.send.send(msg).unwrap(); - } -} - -impl RequestQueue { - pub fn new(name: &'static str) -> Self { - let (send, recv) = mpsc::channel(); - Self { name, send, recv } - } - - pub fn spawn(self) -> QueueHandle { - info!("starting up queue '{}'", self.name); - - thread::spawn(move || { - info!("queue '{}' up", self.name); - let recv = self.recv; - - while let Ok(req) = recv.recv() { - // FIXME: When we make this do async things we will need to add tokio and - // use proper async handled spans as the enter/drop won't work. - // See inbox.rs for how we handle that. - let s = span!(Level::INFO, "queue", queue_name = self.name); - let _enter = s.enter(); - - info!(?req, "got a message into the queue"); - - drop(_enter); - } - }); - - QueueHandle { send: self.send } - } -} diff --git a/ferri-main/src/ap/user.rs b/ferri-main/src/ap/user.rs deleted file mode 100644 index 3edd614..0000000 --- a/ferri-main/src/ap/user.rs +++ /dev/null @@ -1,154 +0,0 @@ -use sqlx::Sqlite; -use std::fmt::Debug; -use thiserror::Error; - -#[derive(Debug, Clone)] -pub struct Actor { - id: String, - inbox: String, - outbox: String, -} - -impl Actor { - pub fn from_raw(id: String, inbox: String, outbox: String) -> Self { - Self { id, inbox, outbox } - } - - pub fn id(&self) -> &str { - &self.id - } - - pub fn inbox(&self) -> &str { - &self.inbox - } - - pub fn outbox(&self) -> &str { - &self.outbox - } -} - -#[derive(Debug, Clone)] -pub struct User { - id: String, - username: String, - actor: Actor, - display_name: String, -} - -#[derive(Error, Debug)] -pub enum UserError { - #[error("user `{0}` not found")] - NotFound(String), -} - -impl User { - pub fn id(&self) -> &str { - &self.id - } - - pub fn username(&self) -> &str { - &self.username - } - - pub fn actor_id(&self) -> &str { - &self.actor.id - } - - pub fn display_name(&self) -> &str { - &self.display_name - } - - pub fn actor(&self) -> &Actor { - &self.actor - } - - pub fn uri(&self) -> String { - format!("https://ferri.amy.mov/users/{}", self.id()) - } - - pub async fn from_id( - uuid: &str, - conn: impl sqlx::Executor<'_, Database = Sqlite>, - ) -> Result { - let user = sqlx::query!( - r#" - SELECT u.*, a.id as "actor_own_id", a.inbox, a.outbox - FROM user u - INNER JOIN actor a ON u.actor_id = a.id - WHERE u.id = ?1 - "#, - uuid - ) - .fetch_one(conn) - .await - .map_err(|_| UserError::NotFound(uuid.to_string()))?; - - Ok(User { - id: user.id, - username: user.username, - actor: Actor { - id: user.actor_own_id, - inbox: user.inbox, - outbox: user.outbox, - }, - display_name: user.display_name, - }) - } - - pub async fn from_username( - username: &str, - conn: impl sqlx::Executor<'_, Database = Sqlite>, - ) -> User { - let user = sqlx::query!( - r#" - SELECT u.*, a.id as "actor_own_id", a.inbox, a.outbox - FROM user u - INNER JOIN actor a ON u.actor_id = a.id - WHERE username = ?1 - "#, - username - ) - .fetch_one(conn) - .await - .unwrap(); - - User { - id: user.id, - username: user.username, - actor: Actor { - id: user.actor_own_id, - inbox: user.inbox, - outbox: user.outbox, - }, - display_name: user.display_name, - } - } - - pub async fn from_actor_id( - actor_id: &str, - conn: impl sqlx::Executor<'_, Database = Sqlite>, - ) -> User { - let user = sqlx::query!( - r#" - SELECT u.*, a.id as "actor_own_id", a.inbox, a.outbox - FROM user u - INNER JOIN actor a ON u.actor_id = a.id - WHERE actor_id = ?1 - "#, - actor_id - ) - .fetch_one(conn) - .await - .unwrap(); - User { - id: user.id, - username: user.username, - actor: Actor { - id: user.actor_own_id, - inbox: user.inbox, - outbox: user.outbox, - }, - display_name: user.display_name, - } - } -} diff --git a/ferri-main/src/federation/http.rs b/ferri-main/src/federation/http.rs index d6e592d..65fcc52 100644 --- a/ferri-main/src/federation/http.rs +++ b/ferri-main/src/federation/http.rs @@ -1,10 +1,23 @@ -use crate::ap::http::HttpClient; use crate::types::ap; use std::fmt::Debug; use serde::Serialize; use thiserror::Error; use tracing::{Level, error, event, info}; +use reqwest::{IntoUrl, Response}; +use url::Url; + +use rsa::{ + RsaPrivateKey, + pkcs1v15::SigningKey, + pkcs8::DecodePrivateKey, + sha2::{Digest, Sha256}, + signature::{RandomizedSigner, SignatureEncoding}, +}; + +use base64::prelude::*; +use chrono::Utc; + use super::outbox::PreparedActivity; pub struct HttpWrapper<'a> { @@ -108,3 +121,187 @@ impl<'a> HttpWrapper<'a> { Ok(raw_body.to_string()) } } + +pub struct HttpClient { + client: reqwest::Client, +} + +#[derive(Debug)] +pub struct PostSignature { + date: String, + digest: String, + signature: String, +} + +#[derive(Debug)] +struct GetSignature { + date: String, + signature: String, +} + +enum RequestVerb { + GET, + POST, +} + +pub struct RequestBuilder { + verb: RequestVerb, + url: Url, + body: String, + inner: reqwest::RequestBuilder, +} + +impl RequestBuilder { + pub fn json(mut self, json: impl Serialize + Sized) -> RequestBuilder { + let body = serde_json::to_string(&json).unwrap(); + self.inner = self.inner.body(body.clone()); + self.body = body; + self + } + + pub fn activity(mut self) -> RequestBuilder { + self.inner = self + .inner + .header("Content-Type", "application/activity+json") + .header("Accept", "application/activity+json"); + self + } + + pub async fn send(self) -> Result { + event!(Level::DEBUG, ?self.inner, "sending an http request"); + + self.inner.send().await + } + + pub fn sign(mut self, key_id: &str) -> RequestBuilder { + match self.verb { + RequestVerb::GET => { + let sig = self.sign_get_request(key_id); + self.inner = self + .inner + .header("Date", sig.date) + .header("Signature", sig.signature); + self + } + RequestVerb::POST => { + let sig = self.sign_post_request(key_id); + self.inner = self + .inner + .header("Date", sig.date) + .header("Digest", sig.digest) + .header("Signature", sig.signature); + self + } + } + } + + fn sign_get_request(&self, key_id: &str) -> GetSignature { + let url = &self.url; + let host = url.host_str().unwrap(); + let path = url.path(); + + let private_key = + RsaPrivateKey::from_pkcs8_pem(include_str!("../../../private.pem")).unwrap(); + let signing_key = SigningKey::::new(private_key); + + // UTC=GMT for our purposes, use it + // RFC7231 is hardcoded to use GMT for.. some reason + let ts = Utc::now(); + + // RFC7231 string + let date = ts.format("%a, %d %b %Y %H:%M:%S GMT").to_string(); + + let to_sign = format!( + "(request-target): get {}\nhost: {}\ndate: {}", + path, host, date + ); + + let signature = signing_key.sign_with_rng(&mut rand::rngs::OsRng, &to_sign.into_bytes()); + let header = format!( + "keyId=\"{}\",algorithm=\"rsa-sha256\",headers=\"(request-target) host date\",signature=\"{}\"", + key_id, + BASE64_STANDARD.encode(signature.to_bytes()) + ); + + GetSignature { + date, + signature: header, + } + } + + fn sign_post_request(&self, key_id: &str) -> PostSignature { + let body = &self.body; + let url = &self.url; + + let host = url.host_str().unwrap(); + let path = url.path(); + + let private_key = + RsaPrivateKey::from_pkcs8_pem(include_str!("../../../private.pem")).unwrap(); + let signing_key = SigningKey::::new(private_key); + + let mut hasher = Sha256::new(); + hasher.update(body); + let sha256 = hasher.finalize(); + + let b64 = BASE64_STANDARD.encode(sha256); + let digest = format!("SHA-256={}", b64); + + // UTC=GMT for our purposes, use it + // RFC7231 is hardcoded to use GMT for.. some reason + let ts = Utc::now(); + + // RFC7231 string + let date = ts.format("%a, %d %b %Y %H:%M:%S GMT").to_string(); + + let to_sign = format!( + "(request-target): post {}\nhost: {}\ndate: {}\ndigest: {}", + path, host, date, digest + ); + + let signature = signing_key.sign_with_rng(&mut rand::rngs::OsRng, &to_sign.into_bytes()); + let header = format!( + "keyId=\"{}\",algorithm=\"rsa-sha256\",headers=\"(request-target) host date digest\",signature=\"{}\"", + key_id, + BASE64_STANDARD.encode(signature.to_bytes()) + ); + + PostSignature { + date, + digest, + signature: header, + } + } +} + +impl Default for HttpClient { + fn default() -> Self { + Self::new() + } +} + +impl HttpClient { + pub fn new() -> Self { + Self { + client: reqwest::Client::new(), + } + } + + pub fn get(&self, url: impl IntoUrl + Clone) -> RequestBuilder { + RequestBuilder { + verb: RequestVerb::GET, + url: url.clone().into_url().unwrap(), + body: String::new(), + inner: self.client.get(url), + } + } + + pub fn post(&self, url: impl IntoUrl + Clone) -> RequestBuilder { + RequestBuilder { + verb: RequestVerb::POST, + url: url.clone().into_url().unwrap(), + body: String::new(), + inner: self.client.post(url), + } + } +} diff --git a/ferri-main/src/federation/inbox.rs b/ferri-main/src/federation/inbox.rs index 68dd742..23d6fd1 100644 --- a/ferri-main/src/federation/inbox.rs +++ b/ferri-main/src/federation/inbox.rs @@ -1,7 +1,6 @@ use crate::types::{ap, as_context, db, get, make, Object, ObjectUri, ObjectUuid}; -use crate::ap::http::HttpClient; -use super::http::HttpWrapper; +use super::http::{HttpClient, HttpWrapper}; use super::outbox::OutboxRequest; use super::QueueMessage; @@ -83,7 +82,7 @@ pub async fn handle_inbox_request( let rmt = person.remote_info(); let post = activity.object; - let post_id = crate::ap::new_id(); + let post_id = crate::new_id(); let created_at = DateTime::parse_from_rfc3339(&activity.ts) .map(|dt| dt.to_utc()) @@ -104,20 +103,25 @@ pub async fn handle_inbox_request( let user = get::user_by_actor_uri(actor.id.clone(), &mut conn) .await .unwrap_or_else(|_| { + let id = crate::new_id(); db::User { - id: ObjectUuid(crate::new_id()), + id: ObjectUuid(id.clone()), actor, username: person.preferred_username, display_name: person.name, acct: rmt.acct, remote: rmt.is_remote, url: rmt.web_url, - created_at: crate::ap::now(), + created_at: crate::now(), icon_url: person.icon.map(|ic| ic.url) .unwrap_or("https//ferri.amy.mov/assets/pfp.png".to_string()), posts: db::UserPosts { last_post_at: None - } + }, + key_id: format!( + "https://ferri.amy.mov/users/{}#main-key", + id + ) } }); @@ -199,8 +203,9 @@ pub async fn handle_inbox_request( let user = get::user_by_actor_uri(actor.id.clone(), &mut conn) .await .unwrap_or_else(|_| { + let id = crate::new_id(); db::User { - id: ObjectUuid(crate::new_id()), + id: ObjectUuid(id.clone()), actor, username: boosted_author.preferred_username, display_name: boosted_author.name, @@ -208,12 +213,16 @@ pub async fn handle_inbox_request( remote: boosted_rmt.is_remote, url: boosted_rmt.web_url, // FIXME: Come from boosted_author - created_at: crate::ap::now(), + created_at: crate::now(), icon_url: boosted_author.icon.map(|ic| ic.url) .unwrap_or("https//ferri.amy.mov/assets/pfp.png".to_string()), posts: db::UserPosts { last_post_at: None - } + }, + key_id: format!( + "https://ferri.amy.mov/users/{}#main-key", + id + ) } }); @@ -270,20 +279,25 @@ pub async fn handle_inbox_request( let user = get::user_by_actor_uri(actor.id.clone(), &mut conn) .await .unwrap_or_else(|_| { + let id = crate::new_id(); db::User { - id: ObjectUuid(crate::new_id()), + id: ObjectUuid(id.clone()), actor, username: person.preferred_username, display_name: person.name, acct: rmt.acct, remote: rmt.is_remote, url: rmt.web_url, - created_at: crate::ap::now(), + created_at: crate::now(), icon_url: person.icon.map(|ic| ic.url) .unwrap_or("https//ferri.amy.mov/assets/pfp.png".to_string()), posts: db::UserPosts { last_post_at: None - } + }, + key_id: format!( + "https://ferri.amy.mov/users/{}#main-key", + id + ) } }); diff --git a/ferri-main/src/federation/outbox.rs b/ferri-main/src/federation/outbox.rs index 0ffa2ad..9a08af0 100644 --- a/ferri-main/src/federation/outbox.rs +++ b/ferri-main/src/federation/outbox.rs @@ -1,14 +1,22 @@ use serde::{Deserialize, Serialize}; +use sqlx::SqliteConnection; use tracing::info; use std::fmt::Debug; -use crate::{ap::http::HttpClient, federation::http::HttpWrapper, types::{ap::{self, ActivityType}, as_context, db, Object, ObjectContext, ObjectUri}}; +use crate::{federation::http::HttpWrapper, types::{ap::{self, ActivityType}, as_context, db, make, Object, ObjectContext, ObjectUri}}; + +use super::http::HttpClient; #[derive(Debug)] pub enum OutboxRequest { // FIXME: Make the String (key_id) nicer // Probably store it in the DB and pass a db::User here Accept(ap::AcceptActivity, String, ap::Person), - Status(db::Post, String) + Status(db::Post, String), + Follow { + follower: db::User, + followed: db::User, + conn: SqliteConnection + } } #[derive(Serialize, Deserialize, Debug)] @@ -40,7 +48,7 @@ pub async fn handle_outbox_request( ty: activity.ty, actor: activity.actor, object: activity.object, - published: crate::ap::new_ts() + published: crate::now_str(), }; let res = http @@ -80,7 +88,7 @@ pub async fn handle_outbox_request( attachment: vec![], attributed_to: Some(post.user.actor.id.0) }, - published: crate::ap::new_ts() + published: crate::now_str(), }; let res = http @@ -90,5 +98,40 @@ pub async fn handle_outbox_request( info!("status res {}", res); } + OutboxRequest::Follow { follower, followed, mut conn } => { + let follow = db::Follow { + id: ObjectUri(format!( + "https://ferri.amy.mov/activities/{}", + crate::new_id()) + ), + follower: follower.actor.id.clone(), + followed: followed.actor.id.clone(), + }; + + make::new_follow(follow, &mut conn) + .await + .unwrap(); + + let http = HttpWrapper::new(http, &follower.key_id); + + let activity = PreparedActivity { + context: as_context(), + id: format!( + "https://ferri.amy.mov/activities/{}", + crate::new_id() + ), + ty: ActivityType::Follow, + actor: follower.actor.id.0, + object: followed.actor.id.0, + published: crate::now_str(), + }; + + let res = http + .post_activity(&followed.actor.inbox, activity) + .await + .unwrap(); + + info!("follow res {}", res); + }, } } diff --git a/ferri-main/src/federation/request_queue.rs b/ferri-main/src/federation/request_queue.rs index facf294..014af49 100644 --- a/ferri-main/src/federation/request_queue.rs +++ b/ferri-main/src/federation/request_queue.rs @@ -1,8 +1,8 @@ use tokio::sync::mpsc; use tracing::{info, span, Instrument, Level}; -use crate::ap::http::HttpClient; use crate::config::Config; +use crate::federation::http::HttpClient; use crate::federation::inbox::handle_inbox_request; use crate::federation::outbox::handle_outbox_request; diff --git a/ferri-main/src/lib.rs b/ferri-main/src/lib.rs index c9a8744..d18d33e 100644 --- a/ferri-main/src/lib.rs +++ b/ferri-main/src/lib.rs @@ -1,8 +1,8 @@ -pub mod ap; pub mod config; pub mod types; pub mod federation; +use chrono::{DateTime, Utc}; use rand::{Rng, distributions::Alphanumeric}; pub fn gen_token(len: usize) -> String { @@ -16,3 +16,11 @@ pub fn gen_token(len: usize) -> String { pub fn new_id() -> String { uuid::Uuid::new_v4().to_string() } + +pub fn now() -> DateTime { + Utc::now() +} + +pub fn now_str() -> String { + now().to_rfc3339() +} diff --git a/ferri-main/src/types/api.rs b/ferri-main/src/types/api.rs index 81594a5..ab71ead 100644 --- a/ferri-main/src/types/api.rs +++ b/ferri-main/src/types/api.rs @@ -70,6 +70,24 @@ pub struct Status { pub poll: Option<()>, } +#[derive(Serialize, Deserialize, Debug, Eq, PartialEq)] +pub struct Relationship { + id: ObjectUuid, + following: bool, + showing_reblogs: bool, + notifying: bool, + followed_by: bool, + blocking: bool, + blocked_by: bool, + muting: bool, + muting_notifications: bool, + requested: bool, + requested_by: bool, + domain_blocking: bool, + endorsed: bool, + note: String +} + #[derive(Serialize, Deserialize, Debug, Eq, PartialEq)] pub struct Account { pub id: ObjectUuid, diff --git a/ferri-main/src/types/db.rs b/ferri-main/src/types/db.rs index 2c8ce0e..4b9dece 100644 --- a/ferri-main/src/types/db.rs +++ b/ferri-main/src/types/db.rs @@ -34,6 +34,7 @@ pub struct User { pub icon_url: String, pub posts: UserPosts, + pub key_id: String } #[derive(Debug, Eq, PartialEq, Clone)] diff --git a/ferri-main/src/types/get.rs b/ferri-main/src/types/get.rs index 0db4500..acab381 100644 --- a/ferri-main/src/types/get.rs +++ b/ferri-main/src/types/get.rs @@ -25,7 +25,10 @@ fn parse_ts(ts: String) -> Option> { Some(dt.unwrap()) } -pub async fn user_by_id(id: ObjectUuid, conn: &mut SqliteConnection) -> Result { +pub async fn user_by_id( + id: ObjectUuid, + conn: &mut SqliteConnection +) -> Result { info!("fetching user by uuid '{:?}' from the database", id); let record = sqlx::query!( @@ -89,7 +92,7 @@ pub async fn user_by_id(id: ObjectUuid, conn: &mut SqliteConnection) -> Result Result Result { + info!("fetching user by username '{}' from the database", username); + + let record = sqlx::query!( + r#" + SELECT + u.id as "user_id", + u.username, + u.actor_id, + u.display_name, + a.inbox, + a.outbox, + u.url, + u.acct, + u.remote, + u.created_at, + u.icon_url + FROM "user" u + INNER JOIN "actor" a ON u.actor_id = a.id + WHERE u.username = ?1 + "#, + username + ) + .fetch_one(&mut *conn) + .await + .map_err(|e| DbError::FetchError(e.to_string()))?; + + let follower_count = sqlx::query_scalar!( + r#" + SELECT COUNT(follower_id) + FROM "follow" + WHERE followed_id = ?1 + "#, + record.actor_id + ) + .fetch_one(&mut *conn) + .await + .map_err(|e| DbError::FetchError(e.to_string()))?; + + let last_post_at = sqlx::query_scalar!( + r#" + SELECT datetime(p.created_at) + FROM post p + WHERE p.user_id = ?1 + ORDER BY datetime(p.created_at) DESC + LIMIT 1 + "#, + record.user_id + ) + .fetch_optional(&mut *conn) + .await + .map_err(|e| DbError::FetchError(e.to_string()))? + .flatten() + .and_then(|ts| { + info!("parsing timestamp {}", ts); + parse_ts(ts) + }); + + let user_created = parse_ts(record.created_at).expect("no db corruption"); + + info!("user {} has {} followers", record.user_id, follower_count); + info!("user {} last posted {:?}", record.user_id, last_post_at); + + Ok(db::User { + id: ObjectUuid(record.user_id.clone()), + actor: db::Actor { + id: ObjectUri(record.actor_id), + inbox: record.inbox, + outbox: record.outbox, + }, + acct: record.acct, + remote: record.remote, + username: record.username, + display_name: record.display_name, + created_at: user_created, + url: record.url, + posts: db::UserPosts { last_post_at }, + icon_url: record.icon_url, + key_id: format!( + "https://ferri.amy.mov/users/{}#main-key", + record.user_id + ) }) } @@ -170,7 +265,7 @@ pub async fn user_by_actor_uri(uri: ObjectUri, conn: &mut SqliteConnection) -> R info!("user {:?} last posted {:?}", record.user_id, last_post_at); Ok(db::User { - id: ObjectUuid(record.user_id), + id: ObjectUuid(record.user_id.clone()), actor: db::Actor { id: ObjectUri(record.actor_id), inbox: record.inbox, @@ -183,7 +278,11 @@ pub async fn user_by_actor_uri(uri: ObjectUri, conn: &mut SqliteConnection) -> R created_at: user_created, url: record.url, posts: db::UserPosts { last_post_at }, - icon_url: record.icon_url + icon_url: record.icon_url, + key_id: format!( + "https://ferri.amy.mov/users/{}#main-key", + record.user_id + ) }) } @@ -248,7 +347,7 @@ pub async fn posts_for_user_id( id: ObjectUuid(record.post_id), uri: ObjectUri(record.post_uri), user: db::User { - id: ObjectUuid(record.user_id), + id: ObjectUuid(record.user_id.clone()), actor: db::Actor { id: ObjectUri(record.actor_id), inbox: record.inbox, @@ -263,7 +362,11 @@ pub async fn posts_for_user_id( icon_url: record.icon_url, posts: db::UserPosts { last_post_at: None - } + }, + key_id: format!( + "https://ferri.amy.mov/users/{}#main-key", + record.user_id + ) }, attachments, content: record.content, @@ -305,7 +408,7 @@ pub async fn home_timeline( id: ObjectUuid(p.post_id), uri: ObjectUri(p.post_uri), user: db::User { - id: ObjectUuid(p.user_id), + id: ObjectUuid(p.user_id.clone()), actor: db::Actor { id: ObjectUri(p.actor_id), inbox: p.inbox, @@ -320,7 +423,11 @@ pub async fn home_timeline( icon_url: p.icon_url, posts: db::UserPosts { last_post_at: None - } + }, + key_id: format!( + "https://ferri.amy.mov/users/{}#main-key", + p.user_id + ) }, content: p.content, created_at: parse_ts(p.post_created).unwrap(), @@ -391,3 +498,53 @@ pub async fn home_timeline( Ok(out) } + +pub async fn followers_for_user( + user_id: ObjectUuid, + conn: &mut SqliteConnection +) -> Result, DbError> { + let followers = sqlx::query!( + "SELECT * FROM follow WHERE followed_id = ?", + user_id.0 + ) + .fetch_all(&mut *conn) + .await + .unwrap(); + + let followers = followers.into_iter() + .map(|f| { + db::Follow { + id: ObjectUri(f.id), + follower: ObjectUri(f.follower_id), + followed: ObjectUri(f.followed_id) + } + }) + .collect::>(); + + Ok(followers) +} + +pub async fn following_for_user( + user_id: ObjectUuid, + conn: &mut SqliteConnection +) -> Result, DbError> { + let followers = sqlx::query!( + "SELECT * FROM follow WHERE follower_id = ?", + user_id.0 + ) + .fetch_all(&mut *conn) + .await + .unwrap(); + + let followers = followers.into_iter() + .map(|f| { + db::Follow { + id: ObjectUri(f.id), + follower: ObjectUri(f.follower_id), + followed: ObjectUri(f.followed_id) + } + }) + .collect::>(); + + Ok(followers) +} diff --git a/ferri-server/src/endpoints/api/search.rs b/ferri-server/src/endpoints/api/search.rs index f332fc7..8750f2d 100644 --- a/ferri-server/src/endpoints/api/search.rs +++ b/ferri-server/src/endpoints/api/search.rs @@ -1,12 +1,12 @@ use rocket::{ get, serde::json::Json, FromFormField, State, }; -use main::types::{api, get}; +use main::{federation::http::HttpWrapper, types::{api, get}}; use rocket_db_pools::Connection; use serde::{Deserialize, Serialize}; use tracing::{info, error}; -use crate::{http_wrapper::HttpWrapper, AuthenticatedUser, Db}; +use crate::{AuthenticatedUser, Db}; #[derive(Serialize, Deserialize, FromFormField, Debug)] #[serde(rename_all = "lowercase")] diff --git a/ferri-server/src/endpoints/api/status.rs b/ferri-server/src/endpoints/api/status.rs index 2ce38be..e884812 100644 --- a/ferri-server/src/endpoints/api/status.rs +++ b/ferri-server/src/endpoints/api/status.rs @@ -43,7 +43,7 @@ fn to_db_post(req: &CreateStatus, user: &AuthenticatedUser, config: &Config) -> uri: ObjectUri(config.post_url(&user.id.0, &post_id)), user: user.user.clone(), content: req.status.clone(), - created_at: main::ap::now(), + created_at: main::now(), boosted_post: None, attachments: vec![] } diff --git a/ferri-server/src/endpoints/api/user.rs b/ferri-server/src/endpoints/api/user.rs index 6cb86a0..63d824f 100644 --- a/ferri-server/src/endpoints/api/user.rs +++ b/ferri-server/src/endpoints/api/user.rs @@ -1,4 +1,5 @@ -use main::ap; +use main::federation::outbox::OutboxRequest; +use main::federation::QueueMessage; use main::types::{api, get, ObjectUuid}; use rocket::response::status::NotFound; use rocket::{ @@ -6,10 +7,9 @@ use rocket::{ serde::{Deserialize, Serialize, json::Json}, }; use rocket_db_pools::Connection; -use uuid::Uuid; use tracing::info; -use crate::{AuthenticatedUser, Db}; +use crate::{AuthenticatedUser, Db, OutboundQueue}; #[derive(Debug, Serialize, Deserialize)] #[serde(crate = "rocket::serde")] @@ -62,39 +62,38 @@ pub async fn verify_credentials(user: AuthenticatedUser) -> Json/follow")] pub async fn new_follow( mut db: Connection, - helpers: &State, + outbound: &State, uuid: &str, user: AuthenticatedUser, ) -> Result<(), NotFound> { - let http = &helpers.http; - - let follower = ap::User::from_actor_id(&user.actor_id.0, &mut **db).await; - - let followed = ap::User::from_id(uuid, &mut **db) + let follower = user.user; + let followed = get::user_by_id(ObjectUuid(uuid.to_string()), &mut **db) .await - .map_err(|e| NotFound(e.to_string()))?; + .unwrap(); - let outbox = ap::Outbox::for_user(follower.clone(), http); - - let activity = ap::Activity { - id: format!("https://ferri.amy.mov/activities/{}", Uuid::new_v4()), - ty: ap::ActivityType::Follow, - object: followed.actor_id().to_string(), - ..Default::default() - }; - - let req = ap::OutgoingActivity { - signed_by: format!("{}#main-key", follower.uri()), - req: activity, - to: followed.actor().clone(), - }; - - req.save(&mut **db).await; - outbox.post(req).await; + let conn = db.into_inner(); + let conn = conn.detach(); + let msg = QueueMessage::Outbound(OutboxRequest::Follow { + follower, + followed, + conn + }); + + outbound.0.send(msg).await; + Ok(()) } +#[get("/accounts/relationships?")] +pub async fn relationships( + id: Vec, + user: AuthenticatedUser +) -> Result>, ()> { + info!("{} looking up relationships for {:#?}", user.username, id); + Ok(Json(vec![])) +} + #[get("/accounts/")] pub async fn account( mut db: Connection, diff --git a/ferri-server/src/endpoints/user.rs b/ferri-server/src/endpoints/user.rs index 61ff21d..6dcf5c3 100644 --- a/ferri-server/src/endpoints/user.rs +++ b/ferri-server/src/endpoints/user.rs @@ -50,23 +50,14 @@ pub async fn followers( mut db: Connection, uuid: &str, ) -> Result>, NotFound> { - let target = main::ap::User::from_id(uuid, &mut **db) + let user = get::user_by_id(ObjectUuid(uuid.to_string()), &mut **db) .await - .map_err(|e| NotFound(e.to_string()))?; - - let actor_id = target.actor_id(); - - let followers = sqlx::query!( - r#" - SELECT follower_id FROM follow - WHERE followed_id = ? - "#, - actor_id - ) - .fetch_all(&mut **db) - .await - .unwrap(); - + .unwrap(); + + let followers = get::followers_for_user(user.id.clone(), &mut **db) + .await + .unwrap(); + ap_ok(Json(OrderedCollection { context: as_context(), ty: "OrderedCollection".to_string(), @@ -74,8 +65,8 @@ pub async fn followers( id: format!("https://ferri.amy.mov/users/{}/followers", uuid), ordered_items: followers .into_iter() - .map(|f| f.follower_id) - .collect::>(), + .map(|f| f.follower.0) + .collect(), })) } @@ -84,32 +75,23 @@ pub async fn following( mut db: Connection, uuid: &str, ) -> Result>, NotFound> { - let target = main::ap::User::from_id(uuid, &mut **db) + let user = get::user_by_id(ObjectUuid(uuid.to_string()), &mut **db) .await - .map_err(|e| NotFound(e.to_string()))?; - - let actor_id = target.actor_id(); - - let following = sqlx::query!( - r#" - SELECT followed_id FROM follow - WHERE follower_id = ? - "#, - actor_id - ) - .fetch_all(&mut **db) - .await - .unwrap(); - + .unwrap(); + + let followers = get::following_for_user(user.id.clone(), &mut **db) + .await + .unwrap(); + ap_ok(Json(OrderedCollection { context: as_context(), ty: "OrderedCollection".to_string(), total_items: 1, id: format!("https://ferri.amy.mov/users/{}/following", uuid), - ordered_items: following + ordered_items: followers .into_iter() - .map(|f| f.followed_id) - .collect::>(), + .map(|f| f.followed.0) + .collect(), })) } diff --git a/ferri-server/src/endpoints/well_known.rs b/ferri-server/src/endpoints/well_known.rs index 42d0373..d73c99c 100644 --- a/ferri-server/src/endpoints/well_known.rs +++ b/ferri-server/src/endpoints/well_known.rs @@ -1,7 +1,6 @@ use crate::Db; -use main::ap; -use main::types::api; -use rocket::{State, get, serde::json::Json}; +use main::types::{api, get}; +use rocket::{get, serde::json::Json, State}; use rocket_db_pools::Connection; use tracing::info; @@ -27,24 +26,26 @@ pub async fn webfinger( let acct = resource.strip_prefix("acct:").unwrap(); let (user, _) = acct.split_once("@").unwrap(); - let user = ap::User::from_username(user, &mut **db).await; + let user = get::user_by_username(user, &mut **db) + .await + .unwrap(); Json(api::WebfingerHit { subject: resource.to_string(), aliases: vec![ - config.user_url(user.id()), - config.user_web_url(user.username()), + config.user_url(&user.id.0), + config.user_web_url(&user.username), ], links: vec![ api::WebfingerLink { rel: "http://webfinger.net/rel/profile-page".to_string(), ty: Some("text/html".to_string()), - href: Some(config.user_web_url(user.username())), + href: Some(config.user_web_url(&user.username)), }, api::WebfingerLink { rel: "self".to_string(), ty: Some("application/activity+json".to_string()), - href: Some(config.user_url(user.id())), + href: Some(config.user_url(&user.id.0)), }, ], }) diff --git a/ferri-server/src/http_wrapper.rs b/ferri-server/src/http_wrapper.rs index e065010..e69de29 100644 --- a/ferri-server/src/http_wrapper.rs +++ b/ferri-server/src/http_wrapper.rs @@ -1,70 +0,0 @@ -use crate::http::HttpClient; -use main::types::ap; -use std::fmt::Debug; -use thiserror::Error; -use tracing::{Level, error, event, info}; - -pub struct HttpWrapper<'a> { - client: &'a HttpClient, - key_id: &'a str, -} - -#[derive(Error, Debug)] -pub enum HttpError { - #[error("entity of type `{0}` @ URL `{1}` could not be loaded")] - LoadFailure(String, String), - #[error("entity of type `{0}` @ URL `{1}` could not be parsed ({2})")] - ParseFailure(String, String, String), -} - -impl<'a> HttpWrapper<'a> { - pub fn new(client: &'a HttpClient, key_id: &'a str) -> HttpWrapper<'a> { - Self { client, key_id } - } - - async fn get( - &self, - ty: &str, - url: &str, - ) -> Result { - let ty = ty.to_string(); - event!(Level::INFO, url, "loading {}", ty); - - let http_result = self - .client - .get(url) - .sign(self.key_id) - .activity() - .send() - .await; - - if let Err(e) = http_result { - error!("could not load url {}: {:#?}", url, e); - return Err(HttpError::LoadFailure(ty, url.to_string())); - } - - let raw_body = http_result.unwrap().text().await; - if let Err(e) = raw_body { - error!("could not get text for url {}: {:#?}", url, e); - return Err(HttpError::LoadFailure(ty, url.to_string())); - } - - let raw_body = raw_body.unwrap(); - info!("raw body {}", raw_body); - let decoded = serde_json::from_str::(&raw_body); - - if let Err(e) = decoded { - error!( - "could not parse {} for url {}: {:#?} {}", - ty, url, e, &raw_body - ); - return Err(HttpError::ParseFailure(ty, url.to_string(), e.to_string())); - } - - Ok(decoded.unwrap()) - } - - pub async fn get_person(&self, url: &str) -> Result { - self.get("Person", url).await - } -} diff --git a/ferri-server/src/lib.rs b/ferri-server/src/lib.rs index 0154ded..420d05a 100644 --- a/ferri-server/src/lib.rs +++ b/ferri-server/src/lib.rs @@ -4,10 +4,8 @@ use endpoints::{ }; use tracing_subscriber::fmt; +use main::{federation::{self, http}, types::{db, get, ObjectUri, ObjectUuid}}; -use main::{federation, types::{db, get, ObjectUri, ObjectUuid}}; - -use main::ap::http; use main::config::Config; use rocket::{ Build, Request, Rocket, build, get, @@ -170,6 +168,7 @@ pub fn launch(cfg: Config) -> Rocket { api::user::new_follow, api::user::statuses, api::user::account, + api::user::relationships, api::apps::new_app, api::preferences::preferences, api::user::verify_credentials,