From 4b88100373da71a1dd5cb22c5e371c0bab01515a Mon Sep 17 00:00:00 2001 From: nullishamy Date: Sat, 26 Apr 2025 12:44:45 +0100 Subject: [PATCH] feat: queue infra --- ferri-main/src/ap/mod.rs | 3 ++ ferri-main/src/ap/request_queue.rs | 56 ++++++++++++++++++++++++++++ ferri-server/src/endpoints/custom.rs | 6 ++- ferri-server/src/lib.rs | 15 +++++++- 4 files changed, 77 insertions(+), 3 deletions(-) create mode 100644 ferri-main/src/ap/request_queue.rs diff --git a/ferri-main/src/ap/mod.rs b/ferri-main/src/ap/mod.rs index f86ba50..98d7330 100644 --- a/ferri-main/src/ap/mod.rs +++ b/ferri-main/src/ap/mod.rs @@ -12,6 +12,9 @@ pub use user::*; mod post; pub use post::*; +mod request_queue; +pub use request_queue::*; + pub const AS_CONTEXT: &str = "https://www.w3.org/ns/activitystreams"; pub fn new_id() -> String { diff --git a/ferri-main/src/ap/request_queue.rs b/ferri-main/src/ap/request_queue.rs new file mode 100644 index 0000000..65c7f07 --- /dev/null +++ b/ferri-main/src/ap/request_queue.rs @@ -0,0 +1,56 @@ +use std::sync::mpsc; +use std::thread; +use tracing::{debug, info, span, Level}; + +#[derive(Debug)] +pub enum QueueMessage { + Heartbeat +} + +pub struct RequestQueue { + name: &'static str, + send: mpsc::Sender, + recv: mpsc::Receiver +} + +#[derive(Clone)] +pub struct QueueHandle { + send: mpsc::Sender +} + +impl QueueHandle { + pub fn send(&self, msg: QueueMessage) { + self.send.send(msg).unwrap(); + } +} + +impl RequestQueue { + pub fn new(name: &'static str) -> Self { + let (send, recv) = mpsc::channel(); + Self { + name, + send, + recv + } + } + + pub fn spawn(self) -> QueueHandle { + info!("starting up queue '{}'", self.name); + + thread::spawn(move || { + info!("queue '{}' up", self.name); + let recv = self.recv; + + while let Ok(req) = recv.recv() { + let s = span!(Level::INFO, "queue", queue_name = self.name); + let _enter = s.enter(); + + info!(?req, "got a message into the queue"); + + drop(_enter); + } + }); + + QueueHandle { send: self.send } + } +} diff --git a/ferri-server/src/endpoints/custom.rs b/ferri-server/src/endpoints/custom.rs index 9bf268f..8256121 100644 --- a/ferri-server/src/endpoints/custom.rs +++ b/ferri-server/src/endpoints/custom.rs @@ -1,6 +1,8 @@ use main::ap::http::HttpClient; use rocket::{State, get, response::status}; use rocket_db_pools::Connection; +use main::ap; +use crate::OutboundQueue; use uuid::Uuid; @@ -84,7 +86,9 @@ pub async fn resolve_user(acct: &str, host: &str) -> types::Person { } #[get("/test")] -pub async fn test(http: &State) -> &'static str { +pub async fn test(http: &State, outbound: &State) -> &'static str { + outbound.0.send(ap::QueueMessage::Heartbeat); + let user = resolve_user("amy@fedi.amy.mov", "fedi.amy.mov").await; let post = activity::CreateActivity { diff --git a/ferri-server/src/lib.rs b/ferri-server/src/lib.rs index cced509..f66921b 100644 --- a/ferri-server/src/lib.rs +++ b/ferri-server/src/lib.rs @@ -6,6 +6,8 @@ use endpoints::{ use tracing::Level; use tracing_subscriber::fmt; +use main::ap; + use main::ap::http; use main::config::Config; use rocket::{ @@ -88,7 +90,8 @@ impl<'a> FromRequest<'a> for AuthenticatedUser { } } - +pub struct OutboundQueue(pub ap::QueueHandle); +pub struct InboundQueue(pub ap::QueueHandle); pub fn launch(cfg: Config) -> Rocket { let format = fmt::format() @@ -104,11 +107,19 @@ pub fn launch(cfg: Config) -> Rocket { .event_format(format) .with_writer(std::io::stdout) .init(); - + + let outbound = ap::RequestQueue::new("outbound"); + let outbound_handle = outbound.spawn(); + + let inbound = ap::RequestQueue::new("inbound"); + let inbound_handle = inbound.spawn(); + let http_client = http::HttpClient::new(); build() .manage(cfg) .manage(http_client) + .manage(OutboundQueue(outbound_handle)) + .manage(InboundQueue(inbound_handle)) .attach(Db::init()) .attach(cors::CORS) .mount("/assets", rocket::fs::FileServer::from("./assets"))