refactor: cleanup; add missing APIs; reorganise

This commit is contained in:
nullishamy 2025-05-06 22:25:15 +01:00
parent fafaf243c5
commit ab9836293e
Signed by: amy
SSH key fingerprint: SHA256:WmV0uk6WgAQvDJlM8Ld4mFPHZo02CLXXP5VkwQ5xtyk
22 changed files with 529 additions and 870 deletions

View file

@ -57,11 +57,13 @@ async fn main() {
acct: s("amy"),
remote: false,
url: s("https://ferri.amy.mov/@amy"),
created_at: main::ap::now(),
created_at: main::now(),
icon_url: s("https://ferri.amy.mov/assets/pfp.png"),
posts: db::UserPosts {
last_post_at: None
}
},
key_id: s("https://ferri.amy.mov/users/9b9d497b-2731-435f-a929-e609ca69dac9#main-key")
};
make::new_user(user, &mut *conn).await.unwrap();

View file

@ -1,141 +0,0 @@
use crate::ap::{Actor, User, http};
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use sqlx::Sqlite;
use std::fmt::Debug;
use tracing::{Level, event};
#[derive(Debug, Clone)]
pub enum ActivityType {
Follow,
Accept,
Create,
Unknown,
}
impl ActivityType {
fn to_raw(self) -> String {
match self {
ActivityType::Follow => "Follow".to_string(),
ActivityType::Accept => "Accept".to_string(),
ActivityType::Create => "Create".to_string(),
ActivityType::Unknown => "FIXME".to_string(),
}
}
}
#[derive(Debug, Clone)]
pub struct Activity<T: Serialize + Debug> {
pub id: String,
pub ty: ActivityType,
pub object: T,
pub published: DateTime<Utc>,
pub to: Vec<String>,
pub cc: Vec<String>,
}
impl<T: Serialize + Debug + Default> Default for Activity<T> {
fn default() -> Self {
Self {
id: Default::default(),
ty: ActivityType::Unknown,
object: Default::default(),
published: Utc::now(),
to: Default::default(),
cc: Default::default(),
}
}
}
pub type KeyId = String;
#[derive(Debug, Clone)]
pub struct OutgoingActivity<T: Serialize + Debug> {
pub signed_by: KeyId,
pub req: Activity<T>,
pub to: Actor,
}
impl<T: Serialize + Debug> OutgoingActivity<T> {
pub async fn save(&self, conn: impl sqlx::Executor<'_, Database = Sqlite>) {
let ty = self.req.ty.clone().to_raw();
let actor_id = self.to.id();
sqlx::query!(
r#"
INSERT INTO activity (id, ty, actor_id)
VALUES (?1, ?2, ?3)
"#,
self.req.id,
ty,
actor_id
)
.execute(conn)
.await
.unwrap();
}
}
#[derive(Serialize, Deserialize, Debug)]
struct RawActivity<T: Serialize + Debug> {
#[serde(rename = "@context")]
#[serde(skip_deserializing)]
context: String,
id: String,
#[serde(rename = "type")]
ty: String,
actor: String,
object: T,
published: String,
}
type OutboxTransport = http::HttpClient;
pub struct Outbox<'a> {
user: User,
transport: &'a OutboxTransport,
}
impl<'a> Outbox<'a> {
pub fn user(&self) -> &User {
&self.user
}
pub async fn post<T: Serialize + Debug>(&self, activity: OutgoingActivity<T>) {
event!(Level::INFO, ?activity, "activity in outbox");
let raw = RawActivity {
context: "https://www.w3.org/ns/activitystreams".to_string(),
id: activity.req.id.clone(),
ty: activity.req.ty.to_raw(),
actor: self.user.actor().id().to_string(),
object: activity.req.object,
published: activity.req.published.to_rfc3339(),
};
let outbox_res = self
.transport
.post(activity.to.inbox())
.activity()
.json(&raw)
.sign(&activity.signed_by)
.send()
.await
.unwrap()
.text()
.await
.unwrap();
event!(
Level::DEBUG,
outbox_res,
activity = activity.req.id,
"got response for outbox dispatch"
);
}
pub fn for_user(user: User, transport: &'a OutboxTransport) -> Outbox<'a> {
Outbox { user, transport }
}
}

View file

@ -1,199 +0,0 @@
use reqwest::{IntoUrl, Response};
use serde::Serialize;
use url::Url;
use rsa::{
RsaPrivateKey,
pkcs1v15::SigningKey,
pkcs8::DecodePrivateKey,
sha2::{Digest, Sha256},
signature::{RandomizedSigner, SignatureEncoding},
};
use base64::prelude::*;
use chrono::Utc;
use tracing::{Level, event};
pub struct HttpClient {
client: reqwest::Client,
}
#[derive(Debug)]
pub struct PostSignature {
date: String,
digest: String,
signature: String,
}
#[derive(Debug)]
struct GetSignature {
date: String,
signature: String,
}
enum RequestVerb {
GET,
POST,
}
pub struct RequestBuilder {
verb: RequestVerb,
url: Url,
body: String,
inner: reqwest::RequestBuilder,
}
impl RequestBuilder {
pub fn json(mut self, json: impl Serialize + Sized) -> RequestBuilder {
let body = serde_json::to_string(&json).unwrap();
self.inner = self.inner.body(body.clone());
self.body = body;
self
}
pub fn activity(mut self) -> RequestBuilder {
self.inner = self
.inner
.header("Content-Type", "application/activity+json")
.header("Accept", "application/activity+json");
self
}
pub async fn send(self) -> Result<Response, reqwest::Error> {
event!(Level::DEBUG, ?self.inner, "sending an http request");
self.inner.send().await
}
pub fn sign(mut self, key_id: &str) -> RequestBuilder {
match self.verb {
RequestVerb::GET => {
let sig = self.sign_get_request(key_id);
self.inner = self
.inner
.header("Date", sig.date)
.header("Signature", sig.signature);
self
}
RequestVerb::POST => {
let sig = self.sign_post_request(key_id);
self.inner = self
.inner
.header("Date", sig.date)
.header("Digest", sig.digest)
.header("Signature", sig.signature);
self
}
}
}
fn sign_get_request(&self, key_id: &str) -> GetSignature {
let url = &self.url;
let host = url.host_str().unwrap();
let path = url.path();
let private_key =
RsaPrivateKey::from_pkcs8_pem(include_str!("../../../private.pem")).unwrap();
let signing_key = SigningKey::<Sha256>::new(private_key);
// UTC=GMT for our purposes, use it
// RFC7231 is hardcoded to use GMT for.. some reason
let ts = Utc::now();
// RFC7231 string
let date = ts.format("%a, %d %b %Y %H:%M:%S GMT").to_string();
let to_sign = format!(
"(request-target): get {}\nhost: {}\ndate: {}",
path, host, date
);
let signature = signing_key.sign_with_rng(&mut rand::rngs::OsRng, &to_sign.into_bytes());
let header = format!(
"keyId=\"{}\",algorithm=\"rsa-sha256\",headers=\"(request-target) host date\",signature=\"{}\"",
key_id,
BASE64_STANDARD.encode(signature.to_bytes())
);
GetSignature {
date,
signature: header,
}
}
fn sign_post_request(&self, key_id: &str) -> PostSignature {
let body = &self.body;
let url = &self.url;
let host = url.host_str().unwrap();
let path = url.path();
let private_key =
RsaPrivateKey::from_pkcs8_pem(include_str!("../../../private.pem")).unwrap();
let signing_key = SigningKey::<Sha256>::new(private_key);
let mut hasher = Sha256::new();
hasher.update(body);
let sha256 = hasher.finalize();
let b64 = BASE64_STANDARD.encode(sha256);
let digest = format!("SHA-256={}", b64);
// UTC=GMT for our purposes, use it
// RFC7231 is hardcoded to use GMT for.. some reason
let ts = Utc::now();
// RFC7231 string
let date = ts.format("%a, %d %b %Y %H:%M:%S GMT").to_string();
let to_sign = format!(
"(request-target): post {}\nhost: {}\ndate: {}\ndigest: {}",
path, host, date, digest
);
let signature = signing_key.sign_with_rng(&mut rand::rngs::OsRng, &to_sign.into_bytes());
let header = format!(
"keyId=\"{}\",algorithm=\"rsa-sha256\",headers=\"(request-target) host date digest\",signature=\"{}\"",
key_id,
BASE64_STANDARD.encode(signature.to_bytes())
);
PostSignature {
date,
digest,
signature: header,
}
}
}
impl Default for HttpClient {
fn default() -> Self {
Self::new()
}
}
impl HttpClient {
pub fn new() -> Self {
Self {
client: reqwest::Client::new(),
}
}
pub fn get(&self, url: impl IntoUrl + Clone) -> RequestBuilder {
RequestBuilder {
verb: RequestVerb::GET,
url: url.clone().into_url().unwrap(),
body: String::new(),
inner: self.client.get(url),
}
}
pub fn post(&self, url: impl IntoUrl + Clone) -> RequestBuilder {
RequestBuilder {
verb: RequestVerb::POST,
url: url.clone().into_url().unwrap(),
body: String::new(),
inner: self.client.post(url),
}
}
}

View file

@ -1,30 +0,0 @@
use chrono::{DateTime, Utc};
use uuid::Uuid;
pub mod http;
mod activity;
pub use activity::*;
mod user;
pub use user::*;
mod post;
pub use post::*;
mod request_queue;
pub use request_queue::*;
pub const AS_CONTEXT: &str = "https://www.w3.org/ns/activitystreams";
pub fn new_id() -> String {
Uuid::new_v4().to_string()
}
pub fn new_ts() -> String {
now().to_rfc3339()
}
pub fn now() -> DateTime<Utc> {
Utc::now()
}

View file

@ -1,113 +0,0 @@
use crate::ap;
use chrono::{DateTime, Utc};
use serde::Serialize;
use sqlx::Sqlite;
const POST_TYPE: &str = "Note";
#[derive(Clone)]
pub struct Post {
id: String,
from: ap::User,
ts: DateTime<Utc>,
content: String,
to: Vec<String>,
cc: Vec<String>,
}
impl Post {
pub fn from_parts(id: String, content: String, from: ap::User) -> Self {
Self {
id,
content,
from,
ts: ap::now(),
to: vec![],
cc: vec![],
}
}
pub fn id(&self) -> &str {
&self.id
}
pub fn content(&self) -> &str {
&self.content
}
pub fn created_at(&self) -> String {
self.ts.to_rfc3339()
}
pub fn uri(&self) -> String {
format!(
"https://ferri.amy.mov/users/{}/posts/{}",
self.from.id(),
self.id
)
}
pub async fn save(&self, conn: impl sqlx::Executor<'_, Database = Sqlite>) {
let ts = self.ts.to_rfc3339();
let user_id = self.from.id();
let post_id = self.id();
let uri = self.uri();
let content = self.content.clone();
sqlx::query!(
r#"
INSERT INTO post (id, uri, user_id, content, created_at)
VALUES (?1, ?2, ?3, ?4, ?5)
"#,
post_id,
uri,
user_id,
content,
ts
)
.execute(conn)
.await
.unwrap();
}
pub fn to(mut self, recipient: String) -> Self {
self.to.push(recipient);
self
}
pub fn cc(mut self, recipient: String) -> Self {
self.cc.push(recipient);
self
}
pub fn to_ap(self) -> APPost {
APPost {
context: ap::AS_CONTEXT.to_string(),
id: self.uri(),
ty: POST_TYPE.to_string(),
ts: self.ts.to_rfc3339(),
content: self.content,
to: self.to,
cc: self.cc,
}
}
}
#[derive(Serialize, Debug, Default)]
pub struct APPost {
#[serde(rename = "@context")]
#[serde(skip_deserializing)]
context: String,
id: String,
#[serde(rename = "type")]
ty: String,
#[serde(rename = "published")]
ts: String,
content: String,
to: Vec<String>,
cc: Vec<String>,
}

View file

@ -1,55 +0,0 @@
use std::sync::mpsc;
use std::thread;
use tracing::{Level, info, span};
#[derive(Debug)]
pub enum QueueMessage {
Heartbeat,
}
pub struct RequestQueue {
name: &'static str,
send: mpsc::Sender<QueueMessage>,
recv: mpsc::Receiver<QueueMessage>,
}
#[derive(Clone)]
pub struct QueueHandle {
send: mpsc::Sender<QueueMessage>,
}
impl QueueHandle {
pub fn send(&self, msg: QueueMessage) {
self.send.send(msg).unwrap();
}
}
impl RequestQueue {
pub fn new(name: &'static str) -> Self {
let (send, recv) = mpsc::channel();
Self { name, send, recv }
}
pub fn spawn(self) -> QueueHandle {
info!("starting up queue '{}'", self.name);
thread::spawn(move || {
info!("queue '{}' up", self.name);
let recv = self.recv;
while let Ok(req) = recv.recv() {
// FIXME: When we make this do async things we will need to add tokio and
// use proper async handled spans as the enter/drop won't work.
// See inbox.rs for how we handle that.
let s = span!(Level::INFO, "queue", queue_name = self.name);
let _enter = s.enter();
info!(?req, "got a message into the queue");
drop(_enter);
}
});
QueueHandle { send: self.send }
}
}

View file

@ -1,154 +0,0 @@
use sqlx::Sqlite;
use std::fmt::Debug;
use thiserror::Error;
#[derive(Debug, Clone)]
pub struct Actor {
id: String,
inbox: String,
outbox: String,
}
impl Actor {
pub fn from_raw(id: String, inbox: String, outbox: String) -> Self {
Self { id, inbox, outbox }
}
pub fn id(&self) -> &str {
&self.id
}
pub fn inbox(&self) -> &str {
&self.inbox
}
pub fn outbox(&self) -> &str {
&self.outbox
}
}
#[derive(Debug, Clone)]
pub struct User {
id: String,
username: String,
actor: Actor,
display_name: String,
}
#[derive(Error, Debug)]
pub enum UserError {
#[error("user `{0}` not found")]
NotFound(String),
}
impl User {
pub fn id(&self) -> &str {
&self.id
}
pub fn username(&self) -> &str {
&self.username
}
pub fn actor_id(&self) -> &str {
&self.actor.id
}
pub fn display_name(&self) -> &str {
&self.display_name
}
pub fn actor(&self) -> &Actor {
&self.actor
}
pub fn uri(&self) -> String {
format!("https://ferri.amy.mov/users/{}", self.id())
}
pub async fn from_id(
uuid: &str,
conn: impl sqlx::Executor<'_, Database = Sqlite>,
) -> Result<User, UserError> {
let user = sqlx::query!(
r#"
SELECT u.*, a.id as "actor_own_id", a.inbox, a.outbox
FROM user u
INNER JOIN actor a ON u.actor_id = a.id
WHERE u.id = ?1
"#,
uuid
)
.fetch_one(conn)
.await
.map_err(|_| UserError::NotFound(uuid.to_string()))?;
Ok(User {
id: user.id,
username: user.username,
actor: Actor {
id: user.actor_own_id,
inbox: user.inbox,
outbox: user.outbox,
},
display_name: user.display_name,
})
}
pub async fn from_username(
username: &str,
conn: impl sqlx::Executor<'_, Database = Sqlite>,
) -> User {
let user = sqlx::query!(
r#"
SELECT u.*, a.id as "actor_own_id", a.inbox, a.outbox
FROM user u
INNER JOIN actor a ON u.actor_id = a.id
WHERE username = ?1
"#,
username
)
.fetch_one(conn)
.await
.unwrap();
User {
id: user.id,
username: user.username,
actor: Actor {
id: user.actor_own_id,
inbox: user.inbox,
outbox: user.outbox,
},
display_name: user.display_name,
}
}
pub async fn from_actor_id(
actor_id: &str,
conn: impl sqlx::Executor<'_, Database = Sqlite>,
) -> User {
let user = sqlx::query!(
r#"
SELECT u.*, a.id as "actor_own_id", a.inbox, a.outbox
FROM user u
INNER JOIN actor a ON u.actor_id = a.id
WHERE actor_id = ?1
"#,
actor_id
)
.fetch_one(conn)
.await
.unwrap();
User {
id: user.id,
username: user.username,
actor: Actor {
id: user.actor_own_id,
inbox: user.inbox,
outbox: user.outbox,
},
display_name: user.display_name,
}
}
}

View file

@ -1,10 +1,23 @@
use crate::ap::http::HttpClient;
use crate::types::ap;
use std::fmt::Debug;
use serde::Serialize;
use thiserror::Error;
use tracing::{Level, error, event, info};
use reqwest::{IntoUrl, Response};
use url::Url;
use rsa::{
RsaPrivateKey,
pkcs1v15::SigningKey,
pkcs8::DecodePrivateKey,
sha2::{Digest, Sha256},
signature::{RandomizedSigner, SignatureEncoding},
};
use base64::prelude::*;
use chrono::Utc;
use super::outbox::PreparedActivity;
pub struct HttpWrapper<'a> {
@ -108,3 +121,187 @@ impl<'a> HttpWrapper<'a> {
Ok(raw_body.to_string())
}
}
pub struct HttpClient {
client: reqwest::Client,
}
#[derive(Debug)]
pub struct PostSignature {
date: String,
digest: String,
signature: String,
}
#[derive(Debug)]
struct GetSignature {
date: String,
signature: String,
}
enum RequestVerb {
GET,
POST,
}
pub struct RequestBuilder {
verb: RequestVerb,
url: Url,
body: String,
inner: reqwest::RequestBuilder,
}
impl RequestBuilder {
pub fn json(mut self, json: impl Serialize + Sized) -> RequestBuilder {
let body = serde_json::to_string(&json).unwrap();
self.inner = self.inner.body(body.clone());
self.body = body;
self
}
pub fn activity(mut self) -> RequestBuilder {
self.inner = self
.inner
.header("Content-Type", "application/activity+json")
.header("Accept", "application/activity+json");
self
}
pub async fn send(self) -> Result<Response, reqwest::Error> {
event!(Level::DEBUG, ?self.inner, "sending an http request");
self.inner.send().await
}
pub fn sign(mut self, key_id: &str) -> RequestBuilder {
match self.verb {
RequestVerb::GET => {
let sig = self.sign_get_request(key_id);
self.inner = self
.inner
.header("Date", sig.date)
.header("Signature", sig.signature);
self
}
RequestVerb::POST => {
let sig = self.sign_post_request(key_id);
self.inner = self
.inner
.header("Date", sig.date)
.header("Digest", sig.digest)
.header("Signature", sig.signature);
self
}
}
}
fn sign_get_request(&self, key_id: &str) -> GetSignature {
let url = &self.url;
let host = url.host_str().unwrap();
let path = url.path();
let private_key =
RsaPrivateKey::from_pkcs8_pem(include_str!("../../../private.pem")).unwrap();
let signing_key = SigningKey::<Sha256>::new(private_key);
// UTC=GMT for our purposes, use it
// RFC7231 is hardcoded to use GMT for.. some reason
let ts = Utc::now();
// RFC7231 string
let date = ts.format("%a, %d %b %Y %H:%M:%S GMT").to_string();
let to_sign = format!(
"(request-target): get {}\nhost: {}\ndate: {}",
path, host, date
);
let signature = signing_key.sign_with_rng(&mut rand::rngs::OsRng, &to_sign.into_bytes());
let header = format!(
"keyId=\"{}\",algorithm=\"rsa-sha256\",headers=\"(request-target) host date\",signature=\"{}\"",
key_id,
BASE64_STANDARD.encode(signature.to_bytes())
);
GetSignature {
date,
signature: header,
}
}
fn sign_post_request(&self, key_id: &str) -> PostSignature {
let body = &self.body;
let url = &self.url;
let host = url.host_str().unwrap();
let path = url.path();
let private_key =
RsaPrivateKey::from_pkcs8_pem(include_str!("../../../private.pem")).unwrap();
let signing_key = SigningKey::<Sha256>::new(private_key);
let mut hasher = Sha256::new();
hasher.update(body);
let sha256 = hasher.finalize();
let b64 = BASE64_STANDARD.encode(sha256);
let digest = format!("SHA-256={}", b64);
// UTC=GMT for our purposes, use it
// RFC7231 is hardcoded to use GMT for.. some reason
let ts = Utc::now();
// RFC7231 string
let date = ts.format("%a, %d %b %Y %H:%M:%S GMT").to_string();
let to_sign = format!(
"(request-target): post {}\nhost: {}\ndate: {}\ndigest: {}",
path, host, date, digest
);
let signature = signing_key.sign_with_rng(&mut rand::rngs::OsRng, &to_sign.into_bytes());
let header = format!(
"keyId=\"{}\",algorithm=\"rsa-sha256\",headers=\"(request-target) host date digest\",signature=\"{}\"",
key_id,
BASE64_STANDARD.encode(signature.to_bytes())
);
PostSignature {
date,
digest,
signature: header,
}
}
}
impl Default for HttpClient {
fn default() -> Self {
Self::new()
}
}
impl HttpClient {
pub fn new() -> Self {
Self {
client: reqwest::Client::new(),
}
}
pub fn get(&self, url: impl IntoUrl + Clone) -> RequestBuilder {
RequestBuilder {
verb: RequestVerb::GET,
url: url.clone().into_url().unwrap(),
body: String::new(),
inner: self.client.get(url),
}
}
pub fn post(&self, url: impl IntoUrl + Clone) -> RequestBuilder {
RequestBuilder {
verb: RequestVerb::POST,
url: url.clone().into_url().unwrap(),
body: String::new(),
inner: self.client.post(url),
}
}
}

View file

@ -1,7 +1,6 @@
use crate::types::{ap, as_context, db, get, make, Object, ObjectUri, ObjectUuid};
use crate::ap::http::HttpClient;
use super::http::HttpWrapper;
use super::http::{HttpClient, HttpWrapper};
use super::outbox::OutboxRequest;
use super::QueueMessage;
@ -83,7 +82,7 @@ pub async fn handle_inbox_request(
let rmt = person.remote_info();
let post = activity.object;
let post_id = crate::ap::new_id();
let post_id = crate::new_id();
let created_at = DateTime::parse_from_rfc3339(&activity.ts)
.map(|dt| dt.to_utc())
@ -104,20 +103,25 @@ pub async fn handle_inbox_request(
let user = get::user_by_actor_uri(actor.id.clone(), &mut conn)
.await
.unwrap_or_else(|_| {
let id = crate::new_id();
db::User {
id: ObjectUuid(crate::new_id()),
id: ObjectUuid(id.clone()),
actor,
username: person.preferred_username,
display_name: person.name,
acct: rmt.acct,
remote: rmt.is_remote,
url: rmt.web_url,
created_at: crate::ap::now(),
created_at: crate::now(),
icon_url: person.icon.map(|ic| ic.url)
.unwrap_or("https//ferri.amy.mov/assets/pfp.png".to_string()),
posts: db::UserPosts {
last_post_at: None
}
},
key_id: format!(
"https://ferri.amy.mov/users/{}#main-key",
id
)
}
});
@ -199,8 +203,9 @@ pub async fn handle_inbox_request(
let user = get::user_by_actor_uri(actor.id.clone(), &mut conn)
.await
.unwrap_or_else(|_| {
let id = crate::new_id();
db::User {
id: ObjectUuid(crate::new_id()),
id: ObjectUuid(id.clone()),
actor,
username: boosted_author.preferred_username,
display_name: boosted_author.name,
@ -208,12 +213,16 @@ pub async fn handle_inbox_request(
remote: boosted_rmt.is_remote,
url: boosted_rmt.web_url,
// FIXME: Come from boosted_author
created_at: crate::ap::now(),
created_at: crate::now(),
icon_url: boosted_author.icon.map(|ic| ic.url)
.unwrap_or("https//ferri.amy.mov/assets/pfp.png".to_string()),
posts: db::UserPosts {
last_post_at: None
}
},
key_id: format!(
"https://ferri.amy.mov/users/{}#main-key",
id
)
}
});
@ -270,20 +279,25 @@ pub async fn handle_inbox_request(
let user = get::user_by_actor_uri(actor.id.clone(), &mut conn)
.await
.unwrap_or_else(|_| {
let id = crate::new_id();
db::User {
id: ObjectUuid(crate::new_id()),
id: ObjectUuid(id.clone()),
actor,
username: person.preferred_username,
display_name: person.name,
acct: rmt.acct,
remote: rmt.is_remote,
url: rmt.web_url,
created_at: crate::ap::now(),
created_at: crate::now(),
icon_url: person.icon.map(|ic| ic.url)
.unwrap_or("https//ferri.amy.mov/assets/pfp.png".to_string()),
posts: db::UserPosts {
last_post_at: None
}
},
key_id: format!(
"https://ferri.amy.mov/users/{}#main-key",
id
)
}
});

View file

@ -1,14 +1,22 @@
use serde::{Deserialize, Serialize};
use sqlx::SqliteConnection;
use tracing::info;
use std::fmt::Debug;
use crate::{ap::http::HttpClient, federation::http::HttpWrapper, types::{ap::{self, ActivityType}, as_context, db, Object, ObjectContext, ObjectUri}};
use crate::{federation::http::HttpWrapper, types::{ap::{self, ActivityType}, as_context, db, make, Object, ObjectContext, ObjectUri}};
use super::http::HttpClient;
#[derive(Debug)]
pub enum OutboxRequest {
// FIXME: Make the String (key_id) nicer
// Probably store it in the DB and pass a db::User here
Accept(ap::AcceptActivity, String, ap::Person),
Status(db::Post, String)
Status(db::Post, String),
Follow {
follower: db::User,
followed: db::User,
conn: SqliteConnection
}
}
#[derive(Serialize, Deserialize, Debug)]
@ -40,7 +48,7 @@ pub async fn handle_outbox_request(
ty: activity.ty,
actor: activity.actor,
object: activity.object,
published: crate::ap::new_ts()
published: crate::now_str(),
};
let res = http
@ -80,7 +88,7 @@ pub async fn handle_outbox_request(
attachment: vec![],
attributed_to: Some(post.user.actor.id.0)
},
published: crate::ap::new_ts()
published: crate::now_str(),
};
let res = http
@ -90,5 +98,40 @@ pub async fn handle_outbox_request(
info!("status res {}", res);
}
OutboxRequest::Follow { follower, followed, mut conn } => {
let follow = db::Follow {
id: ObjectUri(format!(
"https://ferri.amy.mov/activities/{}",
crate::new_id())
),
follower: follower.actor.id.clone(),
followed: followed.actor.id.clone(),
};
make::new_follow(follow, &mut conn)
.await
.unwrap();
let http = HttpWrapper::new(http, &follower.key_id);
let activity = PreparedActivity {
context: as_context(),
id: format!(
"https://ferri.amy.mov/activities/{}",
crate::new_id()
),
ty: ActivityType::Follow,
actor: follower.actor.id.0,
object: followed.actor.id.0,
published: crate::now_str(),
};
let res = http
.post_activity(&followed.actor.inbox, activity)
.await
.unwrap();
info!("follow res {}", res);
},
}
}

View file

@ -1,8 +1,8 @@
use tokio::sync::mpsc;
use tracing::{info, span, Instrument, Level};
use crate::ap::http::HttpClient;
use crate::config::Config;
use crate::federation::http::HttpClient;
use crate::federation::inbox::handle_inbox_request;
use crate::federation::outbox::handle_outbox_request;

View file

@ -1,8 +1,8 @@
pub mod ap;
pub mod config;
pub mod types;
pub mod federation;
use chrono::{DateTime, Utc};
use rand::{Rng, distributions::Alphanumeric};
pub fn gen_token(len: usize) -> String {
@ -16,3 +16,11 @@ pub fn gen_token(len: usize) -> String {
pub fn new_id() -> String {
uuid::Uuid::new_v4().to_string()
}
pub fn now() -> DateTime<Utc> {
Utc::now()
}
pub fn now_str() -> String {
now().to_rfc3339()
}

View file

@ -70,6 +70,24 @@ pub struct Status {
pub poll: Option<()>,
}
#[derive(Serialize, Deserialize, Debug, Eq, PartialEq)]
pub struct Relationship {
id: ObjectUuid,
following: bool,
showing_reblogs: bool,
notifying: bool,
followed_by: bool,
blocking: bool,
blocked_by: bool,
muting: bool,
muting_notifications: bool,
requested: bool,
requested_by: bool,
domain_blocking: bool,
endorsed: bool,
note: String
}
#[derive(Serialize, Deserialize, Debug, Eq, PartialEq)]
pub struct Account {
pub id: ObjectUuid,

View file

@ -34,6 +34,7 @@ pub struct User {
pub icon_url: String,
pub posts: UserPosts,
pub key_id: String
}
#[derive(Debug, Eq, PartialEq, Clone)]

View file

@ -25,7 +25,10 @@ fn parse_ts(ts: String) -> Option<DateTime<Utc>> {
Some(dt.unwrap())
}
pub async fn user_by_id(id: ObjectUuid, conn: &mut SqliteConnection) -> Result<db::User, DbError> {
pub async fn user_by_id(
id: ObjectUuid,
conn: &mut SqliteConnection
) -> Result<db::User, DbError> {
info!("fetching user by uuid '{:?}' from the database", id);
let record = sqlx::query!(
@ -89,7 +92,7 @@ pub async fn user_by_id(id: ObjectUuid, conn: &mut SqliteConnection) -> Result<d
info!("user {:?} last posted {:?}", id, last_post_at);
Ok(db::User {
id: ObjectUuid(record.user_id),
id: ObjectUuid(record.user_id.clone()),
actor: db::Actor {
id: ObjectUri(record.actor_id),
inbox: record.inbox,
@ -102,7 +105,99 @@ pub async fn user_by_id(id: ObjectUuid, conn: &mut SqliteConnection) -> Result<d
created_at: user_created,
url: record.url,
posts: db::UserPosts { last_post_at },
icon_url: record.icon_url
icon_url: record.icon_url,
key_id: format!(
"https://ferri.amy.mov/users/{}#main-key",
record.user_id
)
})
}
pub async fn user_by_username(
username: &str,
conn: &mut SqliteConnection
) -> Result<db::User, DbError> {
info!("fetching user by username '{}' from the database", username);
let record = sqlx::query!(
r#"
SELECT
u.id as "user_id",
u.username,
u.actor_id,
u.display_name,
a.inbox,
a.outbox,
u.url,
u.acct,
u.remote,
u.created_at,
u.icon_url
FROM "user" u
INNER JOIN "actor" a ON u.actor_id = a.id
WHERE u.username = ?1
"#,
username
)
.fetch_one(&mut *conn)
.await
.map_err(|e| DbError::FetchError(e.to_string()))?;
let follower_count = sqlx::query_scalar!(
r#"
SELECT COUNT(follower_id)
FROM "follow"
WHERE followed_id = ?1
"#,
record.actor_id
)
.fetch_one(&mut *conn)
.await
.map_err(|e| DbError::FetchError(e.to_string()))?;
let last_post_at = sqlx::query_scalar!(
r#"
SELECT datetime(p.created_at)
FROM post p
WHERE p.user_id = ?1
ORDER BY datetime(p.created_at) DESC
LIMIT 1
"#,
record.user_id
)
.fetch_optional(&mut *conn)
.await
.map_err(|e| DbError::FetchError(e.to_string()))?
.flatten()
.and_then(|ts| {
info!("parsing timestamp {}", ts);
parse_ts(ts)
});
let user_created = parse_ts(record.created_at).expect("no db corruption");
info!("user {} has {} followers", record.user_id, follower_count);
info!("user {} last posted {:?}", record.user_id, last_post_at);
Ok(db::User {
id: ObjectUuid(record.user_id.clone()),
actor: db::Actor {
id: ObjectUri(record.actor_id),
inbox: record.inbox,
outbox: record.outbox,
},
acct: record.acct,
remote: record.remote,
username: record.username,
display_name: record.display_name,
created_at: user_created,
url: record.url,
posts: db::UserPosts { last_post_at },
icon_url: record.icon_url,
key_id: format!(
"https://ferri.amy.mov/users/{}#main-key",
record.user_id
)
})
}
@ -170,7 +265,7 @@ pub async fn user_by_actor_uri(uri: ObjectUri, conn: &mut SqliteConnection) -> R
info!("user {:?} last posted {:?}", record.user_id, last_post_at);
Ok(db::User {
id: ObjectUuid(record.user_id),
id: ObjectUuid(record.user_id.clone()),
actor: db::Actor {
id: ObjectUri(record.actor_id),
inbox: record.inbox,
@ -183,7 +278,11 @@ pub async fn user_by_actor_uri(uri: ObjectUri, conn: &mut SqliteConnection) -> R
created_at: user_created,
url: record.url,
posts: db::UserPosts { last_post_at },
icon_url: record.icon_url
icon_url: record.icon_url,
key_id: format!(
"https://ferri.amy.mov/users/{}#main-key",
record.user_id
)
})
}
@ -248,7 +347,7 @@ pub async fn posts_for_user_id(
id: ObjectUuid(record.post_id),
uri: ObjectUri(record.post_uri),
user: db::User {
id: ObjectUuid(record.user_id),
id: ObjectUuid(record.user_id.clone()),
actor: db::Actor {
id: ObjectUri(record.actor_id),
inbox: record.inbox,
@ -263,7 +362,11 @@ pub async fn posts_for_user_id(
icon_url: record.icon_url,
posts: db::UserPosts {
last_post_at: None
}
},
key_id: format!(
"https://ferri.amy.mov/users/{}#main-key",
record.user_id
)
},
attachments,
content: record.content,
@ -305,7 +408,7 @@ pub async fn home_timeline(
id: ObjectUuid(p.post_id),
uri: ObjectUri(p.post_uri),
user: db::User {
id: ObjectUuid(p.user_id),
id: ObjectUuid(p.user_id.clone()),
actor: db::Actor {
id: ObjectUri(p.actor_id),
inbox: p.inbox,
@ -320,7 +423,11 @@ pub async fn home_timeline(
icon_url: p.icon_url,
posts: db::UserPosts {
last_post_at: None
}
},
key_id: format!(
"https://ferri.amy.mov/users/{}#main-key",
p.user_id
)
},
content: p.content,
created_at: parse_ts(p.post_created).unwrap(),
@ -391,3 +498,53 @@ pub async fn home_timeline(
Ok(out)
}
pub async fn followers_for_user(
user_id: ObjectUuid,
conn: &mut SqliteConnection
) -> Result<Vec<db::Follow>, DbError> {
let followers = sqlx::query!(
"SELECT * FROM follow WHERE followed_id = ?",
user_id.0
)
.fetch_all(&mut *conn)
.await
.unwrap();
let followers = followers.into_iter()
.map(|f| {
db::Follow {
id: ObjectUri(f.id),
follower: ObjectUri(f.follower_id),
followed: ObjectUri(f.followed_id)
}
})
.collect::<Vec<_>>();
Ok(followers)
}
pub async fn following_for_user(
user_id: ObjectUuid,
conn: &mut SqliteConnection
) -> Result<Vec<db::Follow>, DbError> {
let followers = sqlx::query!(
"SELECT * FROM follow WHERE follower_id = ?",
user_id.0
)
.fetch_all(&mut *conn)
.await
.unwrap();
let followers = followers.into_iter()
.map(|f| {
db::Follow {
id: ObjectUri(f.id),
follower: ObjectUri(f.follower_id),
followed: ObjectUri(f.followed_id)
}
})
.collect::<Vec<_>>();
Ok(followers)
}

View file

@ -1,12 +1,12 @@
use rocket::{
get, serde::json::Json, FromFormField, State,
};
use main::types::{api, get};
use main::{federation::http::HttpWrapper, types::{api, get}};
use rocket_db_pools::Connection;
use serde::{Deserialize, Serialize};
use tracing::{info, error};
use crate::{http_wrapper::HttpWrapper, AuthenticatedUser, Db};
use crate::{AuthenticatedUser, Db};
#[derive(Serialize, Deserialize, FromFormField, Debug)]
#[serde(rename_all = "lowercase")]

View file

@ -43,7 +43,7 @@ fn to_db_post(req: &CreateStatus, user: &AuthenticatedUser, config: &Config) ->
uri: ObjectUri(config.post_url(&user.id.0, &post_id)),
user: user.user.clone(),
content: req.status.clone(),
created_at: main::ap::now(),
created_at: main::now(),
boosted_post: None,
attachments: vec![]
}

View file

@ -1,4 +1,5 @@
use main::ap;
use main::federation::outbox::OutboxRequest;
use main::federation::QueueMessage;
use main::types::{api, get, ObjectUuid};
use rocket::response::status::NotFound;
use rocket::{
@ -6,10 +7,9 @@ use rocket::{
serde::{Deserialize, Serialize, json::Json},
};
use rocket_db_pools::Connection;
use uuid::Uuid;
use tracing::info;
use crate::{AuthenticatedUser, Db};
use crate::{AuthenticatedUser, Db, OutboundQueue};
#[derive(Debug, Serialize, Deserialize)]
#[serde(crate = "rocket::serde")]
@ -62,39 +62,38 @@ pub async fn verify_credentials(user: AuthenticatedUser) -> Json<CredentialAcoun
#[post("/accounts/<uuid>/follow")]
pub async fn new_follow(
mut db: Connection<Db>,
helpers: &State<crate::Helpers>,
outbound: &State<OutboundQueue>,
uuid: &str,
user: AuthenticatedUser,
) -> Result<(), NotFound<String>> {
let http = &helpers.http;
let follower = ap::User::from_actor_id(&user.actor_id.0, &mut **db).await;
let followed = ap::User::from_id(uuid, &mut **db)
let follower = user.user;
let followed = get::user_by_id(ObjectUuid(uuid.to_string()), &mut **db)
.await
.map_err(|e| NotFound(e.to_string()))?;
.unwrap();
let outbox = ap::Outbox::for_user(follower.clone(), http);
let conn = db.into_inner();
let conn = conn.detach();
let activity = ap::Activity {
id: format!("https://ferri.amy.mov/activities/{}", Uuid::new_v4()),
ty: ap::ActivityType::Follow,
object: followed.actor_id().to_string(),
..Default::default()
};
let msg = QueueMessage::Outbound(OutboxRequest::Follow {
follower,
followed,
conn
});
let req = ap::OutgoingActivity {
signed_by: format!("{}#main-key", follower.uri()),
req: activity,
to: followed.actor().clone(),
};
req.save(&mut **db).await;
outbox.post(req).await;
outbound.0.send(msg).await;
Ok(())
}
#[get("/accounts/relationships?<id>")]
pub async fn relationships(
id: Vec<String>,
user: AuthenticatedUser
) -> Result<Json<Vec<api::Relationship>>, ()> {
info!("{} looking up relationships for {:#?}", user.username, id);
Ok(Json(vec![]))
}
#[get("/accounts/<uuid>")]
pub async fn account(
mut db: Connection<Db>,

View file

@ -50,22 +50,13 @@ pub async fn followers(
mut db: Connection<Db>,
uuid: &str,
) -> Result<ActivityResponse<Json<OrderedCollection>>, NotFound<String>> {
let target = main::ap::User::from_id(uuid, &mut **db)
let user = get::user_by_id(ObjectUuid(uuid.to_string()), &mut **db)
.await
.map_err(|e| NotFound(e.to_string()))?;
.unwrap();
let actor_id = target.actor_id();
let followers = sqlx::query!(
r#"
SELECT follower_id FROM follow
WHERE followed_id = ?
"#,
actor_id
)
.fetch_all(&mut **db)
.await
.unwrap();
let followers = get::followers_for_user(user.id.clone(), &mut **db)
.await
.unwrap();
ap_ok(Json(OrderedCollection {
context: as_context(),
@ -74,8 +65,8 @@ pub async fn followers(
id: format!("https://ferri.amy.mov/users/{}/followers", uuid),
ordered_items: followers
.into_iter()
.map(|f| f.follower_id)
.collect::<Vec<_>>(),
.map(|f| f.follower.0)
.collect(),
}))
}
@ -84,32 +75,23 @@ pub async fn following(
mut db: Connection<Db>,
uuid: &str,
) -> Result<ActivityResponse<Json<OrderedCollection>>, NotFound<String>> {
let target = main::ap::User::from_id(uuid, &mut **db)
let user = get::user_by_id(ObjectUuid(uuid.to_string()), &mut **db)
.await
.map_err(|e| NotFound(e.to_string()))?;
.unwrap();
let actor_id = target.actor_id();
let following = sqlx::query!(
r#"
SELECT followed_id FROM follow
WHERE follower_id = ?
"#,
actor_id
)
.fetch_all(&mut **db)
.await
.unwrap();
let followers = get::following_for_user(user.id.clone(), &mut **db)
.await
.unwrap();
ap_ok(Json(OrderedCollection {
context: as_context(),
ty: "OrderedCollection".to_string(),
total_items: 1,
id: format!("https://ferri.amy.mov/users/{}/following", uuid),
ordered_items: following
ordered_items: followers
.into_iter()
.map(|f| f.followed_id)
.collect::<Vec<_>>(),
.map(|f| f.followed.0)
.collect(),
}))
}

View file

@ -1,7 +1,6 @@
use crate::Db;
use main::ap;
use main::types::api;
use rocket::{State, get, serde::json::Json};
use main::types::{api, get};
use rocket::{get, serde::json::Json, State};
use rocket_db_pools::Connection;
use tracing::info;
@ -27,24 +26,26 @@ pub async fn webfinger(
let acct = resource.strip_prefix("acct:").unwrap();
let (user, _) = acct.split_once("@").unwrap();
let user = ap::User::from_username(user, &mut **db).await;
let user = get::user_by_username(user, &mut **db)
.await
.unwrap();
Json(api::WebfingerHit {
subject: resource.to_string(),
aliases: vec![
config.user_url(user.id()),
config.user_web_url(user.username()),
config.user_url(&user.id.0),
config.user_web_url(&user.username),
],
links: vec![
api::WebfingerLink {
rel: "http://webfinger.net/rel/profile-page".to_string(),
ty: Some("text/html".to_string()),
href: Some(config.user_web_url(user.username())),
href: Some(config.user_web_url(&user.username)),
},
api::WebfingerLink {
rel: "self".to_string(),
ty: Some("application/activity+json".to_string()),
href: Some(config.user_url(user.id())),
href: Some(config.user_url(&user.id.0)),
},
],
})

View file

@ -1,70 +0,0 @@
use crate::http::HttpClient;
use main::types::ap;
use std::fmt::Debug;
use thiserror::Error;
use tracing::{Level, error, event, info};
pub struct HttpWrapper<'a> {
client: &'a HttpClient,
key_id: &'a str,
}
#[derive(Error, Debug)]
pub enum HttpError {
#[error("entity of type `{0}` @ URL `{1}` could not be loaded")]
LoadFailure(String, String),
#[error("entity of type `{0}` @ URL `{1}` could not be parsed ({2})")]
ParseFailure(String, String, String),
}
impl<'a> HttpWrapper<'a> {
pub fn new(client: &'a HttpClient, key_id: &'a str) -> HttpWrapper<'a> {
Self { client, key_id }
}
async fn get<T: serde::de::DeserializeOwned + Debug>(
&self,
ty: &str,
url: &str,
) -> Result<T, HttpError> {
let ty = ty.to_string();
event!(Level::INFO, url, "loading {}", ty);
let http_result = self
.client
.get(url)
.sign(self.key_id)
.activity()
.send()
.await;
if let Err(e) = http_result {
error!("could not load url {}: {:#?}", url, e);
return Err(HttpError::LoadFailure(ty, url.to_string()));
}
let raw_body = http_result.unwrap().text().await;
if let Err(e) = raw_body {
error!("could not get text for url {}: {:#?}", url, e);
return Err(HttpError::LoadFailure(ty, url.to_string()));
}
let raw_body = raw_body.unwrap();
info!("raw body {}", raw_body);
let decoded = serde_json::from_str::<T>(&raw_body);
if let Err(e) = decoded {
error!(
"could not parse {} for url {}: {:#?} {}",
ty, url, e, &raw_body
);
return Err(HttpError::ParseFailure(ty, url.to_string(), e.to_string()));
}
Ok(decoded.unwrap())
}
pub async fn get_person(&self, url: &str) -> Result<ap::Person, HttpError> {
self.get("Person", url).await
}
}

View file

@ -4,10 +4,8 @@ use endpoints::{
};
use tracing_subscriber::fmt;
use main::{federation::{self, http}, types::{db, get, ObjectUri, ObjectUuid}};
use main::{federation, types::{db, get, ObjectUri, ObjectUuid}};
use main::ap::http;
use main::config::Config;
use rocket::{
Build, Request, Rocket, build, get,
@ -170,6 +168,7 @@ pub fn launch(cfg: Config) -> Rocket<Build> {
api::user::new_follow,
api::user::statuses,
api::user::account,
api::user::relationships,
api::apps::new_app,
api::preferences::preferences,
api::user::verify_credentials,