feat: queue infra

This commit is contained in:
nullishamy 2025-04-26 12:44:45 +01:00
parent 2270324711
commit 4b88100373
Signed by: amy
SSH key fingerprint: SHA256:WmV0uk6WgAQvDJlM8Ld4mFPHZo02CLXXP5VkwQ5xtyk
4 changed files with 77 additions and 3 deletions

View file

@ -12,6 +12,9 @@ pub use user::*;
mod post;
pub use post::*;
mod request_queue;
pub use request_queue::*;
pub const AS_CONTEXT: &str = "https://www.w3.org/ns/activitystreams";
pub fn new_id() -> String {

View file

@ -0,0 +1,56 @@
use std::sync::mpsc;
use std::thread;
use tracing::{debug, info, span, Level};
#[derive(Debug)]
pub enum QueueMessage {
Heartbeat
}
pub struct RequestQueue {
name: &'static str,
send: mpsc::Sender<QueueMessage>,
recv: mpsc::Receiver<QueueMessage>
}
#[derive(Clone)]
pub struct QueueHandle {
send: mpsc::Sender<QueueMessage>
}
impl QueueHandle {
pub fn send(&self, msg: QueueMessage) {
self.send.send(msg).unwrap();
}
}
impl RequestQueue {
pub fn new(name: &'static str) -> Self {
let (send, recv) = mpsc::channel();
Self {
name,
send,
recv
}
}
pub fn spawn(self) -> QueueHandle {
info!("starting up queue '{}'", self.name);
thread::spawn(move || {
info!("queue '{}' up", self.name);
let recv = self.recv;
while let Ok(req) = recv.recv() {
let s = span!(Level::INFO, "queue", queue_name = self.name);
let _enter = s.enter();
info!(?req, "got a message into the queue");
drop(_enter);
}
});
QueueHandle { send: self.send }
}
}

View file

@ -1,6 +1,8 @@
use main::ap::http::HttpClient;
use rocket::{State, get, response::status};
use rocket_db_pools::Connection;
use main::ap;
use crate::OutboundQueue;
use uuid::Uuid;
@ -84,7 +86,9 @@ pub async fn resolve_user(acct: &str, host: &str) -> types::Person {
}
#[get("/test")]
pub async fn test(http: &State<HttpClient>) -> &'static str {
pub async fn test(http: &State<HttpClient>, outbound: &State<OutboundQueue>) -> &'static str {
outbound.0.send(ap::QueueMessage::Heartbeat);
let user = resolve_user("amy@fedi.amy.mov", "fedi.amy.mov").await;
let post = activity::CreateActivity {

View file

@ -6,6 +6,8 @@ use endpoints::{
use tracing::Level;
use tracing_subscriber::fmt;
use main::ap;
use main::ap::http;
use main::config::Config;
use rocket::{
@ -88,7 +90,8 @@ impl<'a> FromRequest<'a> for AuthenticatedUser {
}
}
pub struct OutboundQueue(pub ap::QueueHandle);
pub struct InboundQueue(pub ap::QueueHandle);
pub fn launch(cfg: Config) -> Rocket<Build> {
let format = fmt::format()
@ -104,11 +107,19 @@ pub fn launch(cfg: Config) -> Rocket<Build> {
.event_format(format)
.with_writer(std::io::stdout)
.init();
let outbound = ap::RequestQueue::new("outbound");
let outbound_handle = outbound.spawn();
let inbound = ap::RequestQueue::new("inbound");
let inbound_handle = inbound.spawn();
let http_client = http::HttpClient::new();
build()
.manage(cfg)
.manage(http_client)
.manage(OutboundQueue(outbound_handle))
.manage(InboundQueue(inbound_handle))
.attach(Db::init())
.attach(cors::CORS)
.mount("/assets", rocket::fs::FileServer::from("./assets"))