From d59660da3738b78a2d990a0b2f2eac7f41aadacb Mon Sep 17 00:00:00 2001 From: nullishamy Date: Mon, 28 Apr 2025 00:26:59 +0100 Subject: [PATCH] feat: http wrapper, start type rewrite, add nextest --- Cargo.lock | 3 + Cargo.toml | 1 + ferri-main/Cargo.toml | 2 +- ferri-main/src/ap/post.rs | 2 +- ferri-main/src/ap/request_queue.rs | 3 + ferri-main/src/ap/user.rs | 7 +- ferri-main/src/lib.rs | 2 + ferri-main/src/types_rewrite/convert.rs | 28 ++++ ferri-main/src/types_rewrite/mod.rs | 87 +++++++++++ ferri-server/Cargo.toml | 5 +- ferri-server/src/endpoints/api/status.rs | 55 +++---- ferri-server/src/endpoints/api/timeline.rs | 3 +- ferri-server/src/endpoints/inbox.rs | 174 +++++++++++---------- ferri-server/src/endpoints/user.rs | 67 +++++--- ferri-server/src/http_wrapper.rs | 71 +++++++++ ferri-server/src/lib.rs | 1 + flake.nix | 1 + migrations/20250410112325_add_follow.sql | 1 + 18 files changed, 369 insertions(+), 144 deletions(-) create mode 100644 ferri-main/src/types_rewrite/convert.rs create mode 100644 ferri-main/src/types_rewrite/mod.rs create mode 100644 ferri-server/src/http_wrapper.rs diff --git a/Cargo.lock b/Cargo.lock index fc47954..fe3d61d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2162,7 +2162,10 @@ dependencies = [ "reqwest", "rocket", "rocket_db_pools", + "serde", + "serde_json", "sqlx", + "thiserror 2.0.12", "tracing", "tracing-subscriber", "url", diff --git a/Cargo.toml b/Cargo.toml index 0f7cb23..94415ed 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -11,6 +11,7 @@ uuid = { version = "1.16.0", features = ["v4"] } chrono = "0.4.40" rand = "0.8" thiserror = "2.0.12" +serde_json = "1.0.140" tracing = "0.1.40" tracing-appender = "0.2.3" diff --git a/ferri-main/Cargo.toml b/ferri-main/Cargo.toml index cca1ac5..77dc8d9 100644 --- a/ferri-main/Cargo.toml +++ b/ferri-main/Cargo.toml @@ -12,8 +12,8 @@ uuid = { workspace = true } rand = { workspace = true } thiserror = { workspace = true } tracing = { workspace = true } +serde_json = { workspace = true } base64 = "0.22.1" rsa = { version = "0.9.8", features = ["sha2"] } url = "2.5.4" -serde_json = "1.0.140" \ No newline at end of file diff --git a/ferri-main/src/ap/post.rs b/ferri-main/src/ap/post.rs index 53c33e0..a42a246 100644 --- a/ferri-main/src/ap/post.rs +++ b/ferri-main/src/ap/post.rs @@ -3,7 +3,7 @@ use chrono::{DateTime, Utc}; use serde::Serialize; use sqlx::Sqlite; -const POST_TYPE: &str = "Post"; +const POST_TYPE: &str = "Note"; #[derive(Clone)] pub struct Post { diff --git a/ferri-main/src/ap/request_queue.rs b/ferri-main/src/ap/request_queue.rs index 9088266..2a230df 100644 --- a/ferri-main/src/ap/request_queue.rs +++ b/ferri-main/src/ap/request_queue.rs @@ -42,6 +42,9 @@ impl RequestQueue { let recv = self.recv; while let Ok(req) = recv.recv() { + // FIXME: When we make this do async things we will need to add tokio and + // use proper async handled spans as the enter/drop won't work. + // See inbox.rs for how we handle that. let s = span!(Level::INFO, "queue", queue_name = self.name); let _enter = s.enter(); diff --git a/ferri-main/src/ap/user.rs b/ferri-main/src/ap/user.rs index b080181..361d0d8 100644 --- a/ferri-main/src/ap/user.rs +++ b/ferri-main/src/ap/user.rs @@ -106,9 +106,10 @@ impl User { "#, username ) - .fetch_one(conn) - .await - .unwrap(); + .fetch_one(conn) + .await + .unwrap(); + User { id: user.id, username: user.username, diff --git a/ferri-main/src/lib.rs b/ferri-main/src/lib.rs index 620300a..31cafd7 100644 --- a/ferri-main/src/lib.rs +++ b/ferri-main/src/lib.rs @@ -1,5 +1,7 @@ pub mod ap; pub mod config; +mod types_rewrite; + use rand::{Rng, distributions::Alphanumeric}; pub fn gen_token(len: usize) -> String { diff --git a/ferri-main/src/types_rewrite/convert.rs b/ferri-main/src/types_rewrite/convert.rs new file mode 100644 index 0000000..16ed63c --- /dev/null +++ b/ferri-main/src/types_rewrite/convert.rs @@ -0,0 +1,28 @@ +use crate::types_rewrite::api; +use crate::types_rewrite::ap; +use crate::types_rewrite::db; + +use crate::types_rewrite::{Object, as_context}; + +impl From for ap::Actor { + fn from(val: db::Actor) -> ap::Actor { + ap::Actor { + obj: Object { + context: as_context(), + id: val.id + }, + inbox: val.inbox, + outbox: val.outbox + } + } +} + +impl From for db::Actor { + fn from(val: ap::Actor) -> db::Actor { + db::Actor { + id: val.obj.id, + inbox: val.inbox, + outbox: val.outbox + } + } +} diff --git a/ferri-main/src/types_rewrite/mod.rs b/ferri-main/src/types_rewrite/mod.rs new file mode 100644 index 0000000..b1baa5a --- /dev/null +++ b/ferri-main/src/types_rewrite/mod.rs @@ -0,0 +1,87 @@ +use serde::{Serialize, Deserialize}; + +mod convert; +pub use convert::*; + +pub const AS_CONTEXT_RAW: &'static str = "https://www.w3.org/ns/activitystreams"; +pub fn as_context() -> ObjectContext { + ObjectContext::Str(AS_CONTEXT_RAW.to_string()) +} + +#[derive(Serialize, Deserialize, Debug, Eq, PartialEq)] +#[serde(untagged)] +pub enum ObjectContext { + Str(String), + Vec(Vec), +} + +#[derive(Serialize, Deserialize, Debug, Eq, PartialEq)] +pub struct ObjectUri(String); + +#[derive(Serialize, Deserialize, Debug, Eq, PartialEq)] +pub struct Object { + #[serde(rename = "@context")] + context: ObjectContext, + id: ObjectUri, +} + +pub mod db { + use serde::{Serialize, Deserialize}; + use super::*; + + #[derive(Serialize, Deserialize, Debug, Eq, PartialEq)] + pub struct Actor { + pub id: ObjectUri, + pub inbox: String, + pub outbox: String, + } +} + +pub mod ap { + use serde::{Serialize, Deserialize}; + use super::*; + + #[derive(Serialize, Deserialize, Debug, Eq, PartialEq)] + pub struct Actor { + #[serde(flatten)] + pub obj: Object, + + pub inbox: String, + pub outbox: String, + } +} + +pub mod api { + use serde::{Serialize, Deserialize}; + use super::*; + + // API will not really use actors so treat them as DB actors + // until we require specificity + pub type Actor = db::Actor; +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn ap_actor_to_db() { + let domain = "https://example.com"; + let ap = ap::Actor { + obj: Object { + context: as_context(), + id: ObjectUri(format!("{}/users/sample", domain)), + }, + inbox: format!("{}/users/sample/inbox", domain), + outbox: format!("{}/users/sample/outbox", domain), + }; + + let db: db::Actor = ap.into(); + + assert_eq!(db, db::Actor { + id: ObjectUri("https://example.com/users/sample".to_string()), + inbox: "https://example.com/users/sample/inbox".to_string(), + outbox: "https://example.com/users/sample/outbox".to_string(), + }); + } +} diff --git a/ferri-server/Cargo.toml b/ferri-server/Cargo.toml index 91b72fe..a378685 100644 --- a/ferri-server/Cargo.toml +++ b/ferri-server/Cargo.toml @@ -15,4 +15,7 @@ uuid = { workspace = true } chrono = { workspace = true } rand = { workspace = true } tracing = { workspace = true } -tracing-subscriber = { workspace = true } \ No newline at end of file +tracing-subscriber = { workspace = true } +thiserror = { workspace = true } +serde_json = { workspace = true } +serde = { workspace = true } \ No newline at end of file diff --git a/ferri-server/src/endpoints/api/status.rs b/ferri-server/src/endpoints/api/status.rs index e716de4..ca74a07 100644 --- a/ferri-server/src/endpoints/api/status.rs +++ b/ferri-server/src/endpoints/api/status.rs @@ -56,44 +56,37 @@ async fn create_status( post.save(&mut **db).await; - let actors = sqlx::query!("SELECT * FROM actor") - .fetch_all(&mut **db) + let actor = sqlx::query!("SELECT * FROM actor WHERE id = ?1", "https://fedi.amy.mov/users/9zkygethkdw60001") + .fetch_one(&mut **db) .await .unwrap(); - for record in actors { - // Don't send to ourselves - if record.id == user.actor_id() { - continue; - } + let create_id = format!("https://ferri.amy.mov/activities/{}", Uuid::new_v4()); - let create_id = format!("https://ferri.amy.mov/activities/{}", Uuid::new_v4()); + let activity = ap::Activity { + id: create_id, + ty: ap::ActivityType::Create, + object: post.clone().to_ap(), + to: vec![format!("{}/followers", user.uri())], + published: now, + cc: vec!["https://www.w3.org/ns/activitystreams#Public".to_string()], + ..Default::default() + }; - let activity = ap::Activity { - id: create_id, - ty: ap::ActivityType::Create, - object: post.clone().to_ap(), - to: vec![format!("{}/followers", user.uri())], - published: now, - cc: vec!["https://www.w3.org/ns/activitystreams#Public".to_string()], - ..Default::default() - }; + let actor = ap::Actor::from_raw( + actor.id.clone(), + actor.inbox.clone(), + actor.outbox.clone(), + ); - let actor = ap::Actor::from_raw( - record.id.clone(), - record.inbox.clone(), - record.outbox.clone(), - ); + let req = ap::OutgoingActivity { + req: activity, + signed_by: format!("{}#main-key", user.uri()), + to: actor, + }; - let req = ap::OutgoingActivity { - req: activity, - signed_by: format!("{}#main-key", user.uri()), - to: actor, - }; - - req.save(&mut **db).await; - outbox.post(req).await; - } + req.save(&mut **db).await; + outbox.post(req).await; TimelineStatus { id: post.id().to_string(), diff --git a/ferri-server/src/endpoints/api/timeline.rs b/ferri-server/src/endpoints/api/timeline.rs index 884b82e..04920e1 100644 --- a/ferri-server/src/endpoints/api/timeline.rs +++ b/ferri-server/src/endpoints/api/timeline.rs @@ -33,11 +33,10 @@ pub struct TimelineStatus { pub account: TimelineAccount, } -#[get("/timelines/home?<_limit>")] +#[get("/timelines/home")] pub async fn home( mut db: Connection, config: &State, - _limit: i64, _user: AuthenticatedUser, ) -> Json> { let posts = sqlx::query!( diff --git a/ferri-server/src/endpoints/inbox.rs b/ferri-server/src/endpoints/inbox.rs index bb95091..924ae61 100644 --- a/ferri-server/src/endpoints/inbox.rs +++ b/ferri-server/src/endpoints/inbox.rs @@ -1,4 +1,5 @@ use chrono::Local; +use tracing::Instrument; use main::ap; use rocket::serde::json::serde_json; use rocket::{State, post}; @@ -6,7 +7,8 @@ use rocket_db_pools::Connection; use sqlx::Sqlite; use url::Url; use uuid::Uuid; -use tracing::{event, span, Level, debug, warn, info}; +use tracing::{event, span, Level, debug, warn, info, error}; +use crate::http_wrapper::HttpWrapper; use crate::{ Db, @@ -46,7 +48,9 @@ async fn create_user( // HACK: Allow us to formulate a `user@host` username by assuming the actor is on the same host as the user let url = Url::parse(&actor).unwrap(); let host = url.host_str().unwrap(); - let username = format!("{}@{}", user.name, host); + info!("creating user '{}'@'{}' ({:#?})", user.preferred_username, host, user); + + let username = format!("{}@{}", user.preferred_username, host); let uuid = Uuid::new_v4().to_string(); sqlx::query!( @@ -58,7 +62,7 @@ async fn create_user( uuid, username, actor, - user.preferred_username + user.name ) .execute(conn) .await @@ -84,29 +88,27 @@ async fn create_follow( .unwrap(); } -async fn handle_follow_activity( +async fn handle_follow_activity<'a>( followed_account: &str, activity: activity::FollowActivity, - http: &HttpClient, + http: HttpWrapper<'a>, mut db: Connection, ) { - let user = http - .get(&activity.actor) - .activity() - .send() - .await - .unwrap() - .json::() - .await - .unwrap(); + let user = http.get_person(&activity.actor).await; + if let Err(e) = user { + error!("could not load user {}: {}", activity.actor, e.to_string()); + return + } + + let user = user.unwrap(); create_actor(&user, &activity.actor, &mut **db).await; create_user(&user, &activity.actor, &mut **db).await; create_follow(&activity, &mut **db).await; let follower = ap::User::from_actor_id(&activity.actor, &mut **db).await; - let followed = ap::User::from_username(&followed_account, &mut **db).await; - let outbox = ap::Outbox::for_user(followed.clone(), http); + let followed = ap::User::from_id(&followed_account, &mut **db).await.unwrap(); + let outbox = ap::Outbox::for_user(followed.clone(), http.client()); let activity = ap::Activity { id: format!("https://ferri.amy.mov/activities/{}", Uuid::new_v4()), @@ -142,49 +144,51 @@ async fn handle_like_activity(activity: activity::LikeActivity, mut db: Connecti } } -async fn handle_boost_activity( +async fn handle_boost_activity<'a>( activity: activity::BoostActivity, - http: &HttpClient, + http: HttpWrapper<'a>, mut db: Connection, ) { let key_id = "https://ferri.amy.mov/users/amy#main-key"; dbg!(&activity); let post = http + .client() .get(&activity.object) .activity() .sign(&key_id) .send() .await .unwrap() - .json::() + .text() .await .unwrap(); + + info!("{}", post); + + let post = serde_json::from_str::(&post); + if let Err(e) = post { + error!(?e, "when decoding post"); + return + } + + let post = post.unwrap(); - dbg!(&post); + info!("{:#?}", post); let attribution = post.attributed_to.unwrap(); - let post_user = http - .get(&attribution) - .activity() - .sign(&key_id) - .send() - .await - .unwrap() - .json::() - .await - .unwrap(); + + let post_user = http.get_person(&attribution).await; + if let Err(e) = post_user { + error!("could not load post_user {}: {}", attribution, e.to_string()); + return + } + let post_user = post_user.unwrap(); - let user = http - .get(&activity.actor) - .activity() - .sign(&key_id) - .send() - .await - .unwrap() - .json::() - .await - .unwrap(); - - dbg!(&post_user); + let user = http.get_person(&activity.actor).await; + if let Err(e) = user { + error!("could not load actor {}: {}", activity.actor, e.to_string()); + return + } + let user = user.unwrap(); debug!("creating actor {}", activity.actor); create_actor(&user, &activity.actor, &mut **db).await; @@ -228,23 +232,21 @@ async fn handle_boost_activity( } -async fn handle_create_activity( +async fn handle_create_activity<'a>( activity: activity::CreateActivity, - http: &HttpClient, + http: HttpWrapper<'a>, mut db: Connection, ) { assert!(&activity.object.ty == "Note"); debug!("resolving user {}", activity.actor); - let user = http - .get(&activity.actor) - .activity() - .send() - .await - .unwrap() - .json::() - .await - .unwrap(); + let user = http.get_person(&activity.actor).await; + if let Err(e) = user { + error!("could not load user {}: {}", activity.actor, e.to_string()); + return + } + + let user = user.unwrap(); debug!("creating actor {}", activity.actor); create_actor(&user, &activity.actor, &mut **db).await; @@ -284,36 +286,42 @@ pub async fn inbox(db: Connection, http: &State, user: &str, bod let min = serde_json::from_str::(&body).unwrap(); let inbox_span = span!(Level::INFO, "inbox-post", user_id = user); - let _enter = inbox_span.enter(); - event!(Level::INFO, ?min, "received an activity"); - - match min.ty.as_str() { - "Delete" => { - let activity = serde_json::from_str::(&body).unwrap(); - handle_delete_activity(activity); + async move { + event!(Level::INFO, ?min, "received an activity"); + + let key_id = "https://ferri.amy.mov/users/amy#main-key"; + let wrapper = HttpWrapper::new(http.inner(), key_id); + + match min.ty.as_str() { + "Delete" => { + let activity = serde_json::from_str::(&body).unwrap(); + handle_delete_activity(activity); + } + "Follow" => { + let activity = serde_json::from_str::(&body).unwrap(); + handle_follow_activity(user, activity, wrapper, db).await; + } + "Create" => { + let activity = serde_json::from_str::(&body).unwrap(); + handle_create_activity(activity, wrapper, db).await; + } + "Like" => { + let activity = serde_json::from_str::(&body).unwrap(); + handle_like_activity(activity, db).await; + } + "Announce" => { + let activity = serde_json::from_str::(&body).unwrap(); + handle_boost_activity(activity, wrapper, db).await; + } + + act => { + warn!(act, body, "unknown activity"); + } } - "Follow" => { - let activity = serde_json::from_str::(&body).unwrap(); - handle_follow_activity(user, activity, http.inner(), db).await; - } - "Create" => { - let activity = serde_json::from_str::(&body).unwrap(); - handle_create_activity(activity, http.inner(), db).await; - } - "Like" => { - let activity = serde_json::from_str::(&body).unwrap(); - handle_like_activity(activity, db).await; - } - "Announce" => { - let activity = serde_json::from_str::(&body).unwrap(); - handle_boost_activity(activity, http.inner(), db).await; - } - - act => { - warn!(act, body, "unknown activity"); - } - } - debug!("body in inbox: {}", body); - drop(_enter) + debug!("body in inbox: {}", body); + } + // Allow the span to be used inside the async code + // https://docs.rs/tracing/latest/tracing/span/struct.EnteredSpan.html#deref-methods-Span + .instrument(inbox_span).await; } diff --git a/ferri-server/src/endpoints/user.rs b/ferri-server/src/endpoints/user.rs index 0464123..ce24ab5 100644 --- a/ferri-server/src/endpoints/user.rs +++ b/ferri-server/src/endpoints/user.rs @@ -1,6 +1,7 @@ use main::ap; -use rocket::{get, http::ContentType, serde::json::Json, State}; +use rocket::{get, http::ContentType, serde::json::Json, State, Responder}; use rocket_db_pools::Connection; +use rocket::response::Redirect; use rocket::response::status::NotFound; use crate::{ @@ -119,34 +120,56 @@ pub async fn post( ) } +#[derive(Debug, Responder)] +pub enum UserFetchError { + NotFound(NotFound), + Moved(Redirect), +} + +type ActivityResponse = (ContentType, T); +fn ap_response(t: T) -> ActivityResponse { + (activity_type(), t) +} + +fn ap_ok(t: T) -> Result, E> { + Ok(ap_response(t)) +} + #[get("/users/")] pub async fn user( mut db: Connection, config: &State, uuid: &str -) -> Result<(ContentType, Json), NotFound> { +) -> Result>, UserFetchError> { + if uuid == "amy" { + return Err( + UserFetchError::Moved( + Redirect::permanent("https://ferri.amy.mov/users/9b9d497b-2731-435f-a929-e609ca69dac9") + ) + ) + } + let user = ap::User::from_id(uuid, &mut **db) .await - .map_err(|e| NotFound(e.to_string()))?; + .map_err(|e| UserFetchError::NotFound(NotFound(e.to_string())))?; - Ok(( - activity_type(), - Json(Person { - context: "https://www.w3.org/ns/activitystreams".to_string(), - ty: "Person".to_string(), - id: config.user_url(user.id()), - name: user.username().to_string(), - preferred_username: user.display_name().to_string(), - followers: config.followers_url(user.id()), - following: config.following_url(user.id()), - summary: format!("ferri {}", user.username()), - inbox: config.inbox_url(user.id()), - outbox: config.outbox_url(user.id()), - public_key: Some(UserKey { - id: format!("https://ferri.amy.mov/users/{}#main-key", uuid), - owner: config.user_url(user.id()), - public_key: include_str!("../../../public.pem").to_string(), - }), + let person = Person { + context: "https://www.w3.org/ns/activitystreams".to_string(), + ty: "Person".to_string(), + id: config.user_url(user.id()), + name: user.username().to_string(), + preferred_username: user.display_name().to_string(), + followers: config.followers_url(user.id()), + following: config.following_url(user.id()), + summary: format!("ferri {}", user.username()), + inbox: config.inbox_url(user.id()), + outbox: config.outbox_url(user.id()), + public_key: Some(UserKey { + id: format!("https://ferri.amy.mov/users/{}#main-key", uuid), + owner: config.user_url(user.id()), + public_key: include_str!("../../../public.pem").to_string(), }), - )) + }; + + ap_ok(Json(person)) } diff --git a/ferri-server/src/http_wrapper.rs b/ferri-server/src/http_wrapper.rs new file mode 100644 index 0000000..0470b3f --- /dev/null +++ b/ferri-server/src/http_wrapper.rs @@ -0,0 +1,71 @@ +use thiserror::Error; +use tracing::{error, event, Level}; +use crate::http::HttpClient; +use crate::types::Person; +use std::fmt::Debug; + +pub struct HttpWrapper<'a> { + client: &'a HttpClient, + key_id: &'a str +} + +#[derive(Error, Debug)] +pub enum HttpError { + #[error("entity of type `{0}` @ URL `{1}` could not be loaded")] + LoadFailure(String, String), + #[error("entity of type `{0}` @ URL `{1}` could not be parsed ({2})")] + ParseFailure(String, String, String), +} + +impl <'a> HttpWrapper<'a> { + pub fn new(client: &'a HttpClient, key_id: &'a str) -> HttpWrapper<'a> { + Self { + client, + key_id + } + } + + pub fn client(&self) -> &'a HttpClient { + &self.client + } + + async fn get(&self, ty: &str, url: &str) -> Result { + let ty = ty.to_string(); + event!(Level::INFO, url, "loading {}", ty); + + let http_result = self.client + .get(url) + .sign(self.key_id) + .activity() + .send() + .await; + + if let Err(e) = http_result { + error!("could not load url {}: {:#?}", url, e); + return Err(HttpError::LoadFailure(ty, url.to_string())); + } + + let raw_body = http_result.unwrap().text().await; + if let Err(e) = raw_body { + error!("could not get text for url {}: {:#?}", url, e); + return Err(HttpError::LoadFailure(ty, url.to_string())); + } + + let decoded = serde_json::from_str::(&raw_body.unwrap()); + + if let Err(e) = decoded { + error!("could not parse {} for url {}: {:#?}", ty, url, e); + return Err(HttpError::ParseFailure( + ty, + url.to_string(), + e.to_string() + )); + } + + Ok(decoded.unwrap()) + } + + pub async fn get_person(&self, url: &str) -> Result { + self.get("Person", url).await + } +} diff --git a/ferri-server/src/lib.rs b/ferri-server/src/lib.rs index 495e902..520e213 100644 --- a/ferri-server/src/lib.rs +++ b/ferri-server/src/lib.rs @@ -20,6 +20,7 @@ use rocket_db_pools::{Connection, Database, sqlx}; mod cors; mod endpoints; mod types; +mod http_wrapper; #[derive(Database)] #[database("sqlite_ferri")] diff --git a/flake.nix b/flake.nix index 15cf319..5a00eb6 100644 --- a/flake.nix +++ b/flake.nix @@ -29,6 +29,7 @@ packages = with pkgs; [ sqlx-cli + cargo-nextest (rust-bin.stable.latest.default.override { extensions = [ "rust-src" "rust-analyzer" ]; targets = [ ]; diff --git a/migrations/20250410112325_add_follow.sql b/migrations/20250410112325_add_follow.sql index ce518cd..4e8922b 100644 --- a/migrations/20250410112325_add_follow.sql +++ b/migrations/20250410112325_add_follow.sql @@ -4,6 +4,7 @@ CREATE TABLE IF NOT EXISTS follow id TEXT PRIMARY KEY NOT NULL, follower_id TEXT NOT NULL, followed_id TEXT NOT NULL, + FOREIGN KEY(follower_id) REFERENCES actor(id), FOREIGN KEY(followed_id) REFERENCES actor(id) );