mirror of
https://github.com/nullishamy/ferri.git
synced 2025-04-29 20:29:23 +00:00
feat: start on CRUD ops for new types
This commit is contained in:
parent
d252131e0d
commit
e6b654a0b3
6 changed files with 149 additions and 32 deletions
|
@ -1,24 +1,18 @@
|
|||
use crate::types_rewrite::{ObjectUuid, ObjectUri, db};
|
||||
use sqlx::SqliteConnection;
|
||||
use thiserror::Error;
|
||||
use tracing::info;
|
||||
use chrono::{NaiveDateTime, DateTime, Utc};
|
||||
use crate::types_rewrite::DbError;
|
||||
|
||||
const SQLITE_TIME_FMT: &'static str = "%Y-%m-%d %H:%M:%S";
|
||||
|
||||
#[derive(Debug, Error)]
|
||||
pub enum FetchError {
|
||||
#[error("an unknown error occured when fetching: {0}")]
|
||||
Unknown(String)
|
||||
}
|
||||
|
||||
fn parse_ts(ts: String) -> Option<DateTime<Utc>> {
|
||||
NaiveDateTime::parse_from_str(&ts, SQLITE_TIME_FMT)
|
||||
.ok()
|
||||
.map(|nt| nt.and_utc())
|
||||
}
|
||||
|
||||
pub async fn user_by_id(id: ObjectUuid, conn: &mut SqliteConnection) -> Result<db::User, FetchError> {
|
||||
pub async fn user_by_id(id: ObjectUuid, conn: &mut SqliteConnection) -> Result<db::User, DbError> {
|
||||
info!("fetching user by uuid '{:?}' from the database", id);
|
||||
|
||||
let record = sqlx::query!(r#"
|
||||
|
@ -39,7 +33,7 @@ pub async fn user_by_id(id: ObjectUuid, conn: &mut SqliteConnection) -> Result<d
|
|||
"#, id.0)
|
||||
.fetch_one(&mut *conn)
|
||||
.await
|
||||
.map_err(|e| FetchError::Unknown(e.to_string()))?;
|
||||
.map_err(|e| DbError::FetchError(e.to_string()))?;
|
||||
|
||||
let follower_count = sqlx::query_scalar!(r#"
|
||||
SELECT COUNT(follower_id)
|
||||
|
@ -48,7 +42,7 @@ pub async fn user_by_id(id: ObjectUuid, conn: &mut SqliteConnection) -> Result<d
|
|||
"#, record.actor_id)
|
||||
.fetch_one(&mut *conn)
|
||||
.await
|
||||
.map_err(|e| FetchError::Unknown(e.to_string()))?;
|
||||
.map_err(|e| DbError::FetchError(e.to_string()))?;
|
||||
|
||||
let last_post_at = sqlx::query_scalar!(r#"
|
||||
SELECT datetime(p.created_at)
|
||||
|
@ -59,7 +53,7 @@ pub async fn user_by_id(id: ObjectUuid, conn: &mut SqliteConnection) -> Result<d
|
|||
"#, record.user_id)
|
||||
.fetch_one(&mut *conn)
|
||||
.await
|
||||
.map_err(|e| FetchError::Unknown(e.to_string()))?
|
||||
.map_err(|e| DbError::FetchError(e.to_string()))?
|
||||
.and_then(|ts| {
|
||||
info!("parsing timestamp {}", ts);
|
||||
parse_ts(ts)
|
30
ferri-main/src/types_rewrite/make.rs
Normal file
30
ferri-main/src/types_rewrite/make.rs
Normal file
|
@ -0,0 +1,30 @@
|
|||
use crate::types_rewrite::db;
|
||||
use sqlx::SqliteConnection;
|
||||
use crate::types_rewrite::DbError;
|
||||
|
||||
pub async fn new_user(user: db::User, conn: &mut SqliteConnection) -> Result<db::User, DbError> {
|
||||
let ts = user.created_at.to_rfc3339();
|
||||
sqlx::query!(r#"
|
||||
INSERT INTO user (id, acct, url, created_at, remote, username, actor_id, display_name)
|
||||
VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)
|
||||
"#, user.id.0, user.acct, user.url, ts,
|
||||
user.remote, user.username, user.actor.id.0, user.display_name
|
||||
)
|
||||
.execute(conn)
|
||||
.await
|
||||
.map_err(|e| DbError::CreationError(e.to_string()))?;
|
||||
|
||||
Ok(user)
|
||||
}
|
||||
|
||||
pub async fn new_actor(actor: db::Actor, conn: &mut SqliteConnection) -> Result<db::Actor, DbError> {
|
||||
sqlx::query!(r#"
|
||||
INSERT INTO actor (id, inbox, outbox)
|
||||
VALUES (?1, ?2, ?3)
|
||||
"#, actor.id.0, actor.inbox, actor.outbox)
|
||||
.execute(conn)
|
||||
.await
|
||||
.map_err(|e| DbError::CreationError(e.to_string()))?;
|
||||
|
||||
Ok(actor)
|
||||
}
|
|
@ -1,7 +1,19 @@
|
|||
use serde::{Serialize, Deserialize};
|
||||
use thiserror::Error;
|
||||
use std::fmt::Debug;
|
||||
use uuid::Uuid;
|
||||
|
||||
pub mod convert;
|
||||
pub mod fetch;
|
||||
pub mod get;
|
||||
pub mod make;
|
||||
|
||||
#[derive(Debug, Error)]
|
||||
pub enum DbError {
|
||||
#[error("an unknown error occured when creating: {0}")]
|
||||
CreationError(String),
|
||||
#[error("an unknown error occured when fetching: {0}")]
|
||||
FetchError(String)
|
||||
}
|
||||
|
||||
pub const AS_CONTEXT_RAW: &'static str = "https://www.w3.org/ns/activitystreams";
|
||||
pub fn as_context() -> ObjectContext {
|
||||
|
@ -15,12 +27,18 @@ pub enum ObjectContext {
|
|||
Vec(Vec<serde_json::Value>),
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug, Eq, PartialEq)]
|
||||
#[derive(Serialize, Deserialize, Debug, Eq, PartialEq, Clone)]
|
||||
pub struct ObjectUri(pub String);
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug, Eq, PartialEq)]
|
||||
#[derive(Serialize, Deserialize, Debug, Eq, PartialEq, Clone)]
|
||||
pub struct ObjectUuid(pub String);
|
||||
|
||||
impl ObjectUuid {
|
||||
pub fn new() -> Self {
|
||||
Self(Uuid::new_v4().to_string())
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug, Eq, PartialEq)]
|
||||
pub struct Object {
|
||||
#[serde(rename = "@context")]
|
||||
|
@ -32,20 +50,20 @@ pub mod db {
|
|||
use chrono::{DateTime, Utc};
|
||||
use super::*;
|
||||
|
||||
#[derive(Debug, Eq, PartialEq)]
|
||||
#[derive(Debug, Eq, PartialEq, Clone)]
|
||||
pub struct Actor {
|
||||
pub id: ObjectUri,
|
||||
pub inbox: String,
|
||||
pub outbox: String,
|
||||
}
|
||||
|
||||
#[derive(Debug, Eq, PartialEq)]
|
||||
#[derive(Debug, Eq, PartialEq, Clone)]
|
||||
pub struct UserPosts {
|
||||
// User may have no posts
|
||||
pub last_post_at: Option<DateTime<Utc>>
|
||||
}
|
||||
|
||||
#[derive(Debug, Eq, PartialEq)]
|
||||
#[derive(Debug, Eq, PartialEq, Clone)]
|
||||
pub struct User {
|
||||
pub id: ObjectUuid,
|
||||
pub actor: Actor,
|
||||
|
|
|
@ -51,7 +51,8 @@ pub async fn home(
|
|||
display_name: String,
|
||||
username: String
|
||||
}
|
||||
|
||||
|
||||
// FIXME: query! can't cope with this. returns a type error
|
||||
let posts = sqlx::query_as::<_, Post>(
|
||||
r#"
|
||||
WITH RECURSIVE get_home_timeline_with_boosts(
|
||||
|
@ -84,8 +85,6 @@ pub async fn home(
|
|||
.await
|
||||
.unwrap();
|
||||
|
||||
dbg!(&posts);
|
||||
|
||||
let mut out = Vec::<TimelineStatus>::new();
|
||||
for record in posts.iter() {
|
||||
let mut boost: Option<Box<TimelineStatus>> = None;
|
||||
|
|
|
@ -89,11 +89,11 @@ pub async fn test(
|
|||
outbound: &State<OutboundQueue>,
|
||||
mut db: Connection<Db>
|
||||
) -> &'static str {
|
||||
use main::types_rewrite::{ObjectUuid, fetch, api};
|
||||
use main::types_rewrite::{ObjectUuid, get, api};
|
||||
outbound.0.send(ap::QueueMessage::Heartbeat);
|
||||
|
||||
let id = ObjectUuid("9b9d497b-2731-435f-a929-e609ca69dac9".to_string());
|
||||
let user= dbg!(fetch::user_by_id(id, &mut **db).await.unwrap());
|
||||
let user= dbg!(get::user_by_id(id, &mut **db).await.unwrap());
|
||||
let apu: api::Account = user.into();
|
||||
dbg!(apu);
|
||||
|
||||
|
|
|
@ -4,12 +4,15 @@ use main::ap;
|
|||
use rocket::serde::json::serde_json;
|
||||
use rocket::{State, post};
|
||||
use rocket_db_pools::Connection;
|
||||
use sqlx::SqliteConnection;
|
||||
use sqlx::Sqlite;
|
||||
use url::Url;
|
||||
use uuid::Uuid;
|
||||
use tracing::{event, span, Level, debug, warn, info, error};
|
||||
use crate::http_wrapper::HttpWrapper;
|
||||
|
||||
use main::types_rewrite::{make, db, ObjectUuid, ObjectUri, self};
|
||||
|
||||
use crate::{
|
||||
Db,
|
||||
http::HttpClient,
|
||||
|
@ -103,22 +106,92 @@ async fn create_follow(
|
|||
.unwrap();
|
||||
}
|
||||
|
||||
struct RemoteInfo {
|
||||
acct: String,
|
||||
web_url: String,
|
||||
is_remote: bool,
|
||||
}
|
||||
|
||||
fn get_remote_info(actor_url: &str, person: &Person) -> RemoteInfo {
|
||||
let url = Url::parse(&actor_url).unwrap();
|
||||
let host = url.host_str().unwrap();
|
||||
|
||||
let (acct, remote) = if host != "ferri.amy.mov" {
|
||||
(format!("{}@{}", person.preferred_username, host), true)
|
||||
} else {
|
||||
(person.preferred_username.clone(), false)
|
||||
};
|
||||
|
||||
let url = format!("https://ferri.amy.mov/{}", acct);
|
||||
|
||||
RemoteInfo {
|
||||
acct: acct.to_string(),
|
||||
web_url: url,
|
||||
is_remote: remote
|
||||
}
|
||||
}
|
||||
|
||||
async fn resolve_actor<'a>(
|
||||
actor_url: &str,
|
||||
http: &HttpWrapper<'a>,
|
||||
conn: &mut SqliteConnection
|
||||
) -> Result<db::User, types_rewrite::DbError> {
|
||||
let person = {
|
||||
let res = http.get_person(&actor_url).await;
|
||||
if let Err(e) = res {
|
||||
error!("could not load user {}: {}", actor_url, e.to_string());
|
||||
return Err(types_rewrite::DbError::FetchError(
|
||||
format!("could not load user {}: {}", actor_url, e.to_string())
|
||||
))
|
||||
}
|
||||
|
||||
res.unwrap()
|
||||
};
|
||||
|
||||
let user_id = ObjectUuid::new();
|
||||
let remote_info = get_remote_info(actor_url, &person);
|
||||
|
||||
let actor = db::Actor {
|
||||
id: ObjectUri(actor_url.to_string()),
|
||||
inbox: person.inbox.clone(),
|
||||
outbox: person.outbox.clone(),
|
||||
};
|
||||
|
||||
info!("creating actor {}", actor_url);
|
||||
|
||||
let actor = make::new_actor(actor.clone(), conn).await.unwrap_or(actor);
|
||||
|
||||
info!("creating user {} ({:#?})", remote_info.acct, person);
|
||||
|
||||
let user = db::User {
|
||||
id: user_id,
|
||||
actor,
|
||||
username: person.name,
|
||||
display_name: person.preferred_username,
|
||||
acct: remote_info.acct,
|
||||
remote: remote_info.is_remote,
|
||||
url: remote_info.web_url,
|
||||
created_at: main::ap::now(),
|
||||
|
||||
posts: db::UserPosts {
|
||||
last_post_at: None
|
||||
}
|
||||
};
|
||||
|
||||
Ok(make::new_user(user.clone(), conn).await.unwrap_or(user))
|
||||
}
|
||||
|
||||
async fn handle_follow_activity<'a>(
|
||||
followed_account: &str,
|
||||
activity: activity::FollowActivity,
|
||||
http: HttpWrapper<'a>,
|
||||
mut db: Connection<Db>,
|
||||
) {
|
||||
let user = http.get_person(&activity.actor).await;
|
||||
if let Err(e) = user {
|
||||
error!("could not load user {}: {}", activity.actor, e.to_string());
|
||||
return
|
||||
}
|
||||
|
||||
let user = user.unwrap();
|
||||
let actor = resolve_actor(&activity.actor, &http, &mut **db)
|
||||
.await.unwrap();
|
||||
|
||||
create_actor(&user, &activity.actor, &mut **db).await;
|
||||
create_user(&user, &activity.actor, &mut **db).await;
|
||||
info!("{:?} follows {}", actor, followed_account);
|
||||
|
||||
create_follow(&activity, &mut **db).await;
|
||||
|
||||
let follower = ap::User::from_actor_id(&activity.actor, &mut **db).await;
|
||||
|
@ -226,15 +299,18 @@ async fn handle_boost_activity<'a>(
|
|||
let reblog_id = ap::new_id();
|
||||
|
||||
let attr_id = attributed_user.id();
|
||||
// HACK: ON CONFLICT is to avoid duplicate remote posts coming in
|
||||
// check this better in future
|
||||
sqlx::query!("
|
||||
INSERT INTO post (id, uri, user_id, content, created_at)
|
||||
VALUES (?1, ?2, ?3, ?4, ?5)
|
||||
ON CONFLICT(uri) DO NOTHING
|
||||
", reblog_id, post.id, attr_id, post.content, post.ts)
|
||||
.execute(&mut **db)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let uri = format!("https://ferri.amy.mov/users/{}/posts/{}", actor_user.id(), post.id);
|
||||
let uri = format!("https://ferri.amy.mov/users/{}/posts/{}", actor_user.id(), base_id);
|
||||
let user_id = actor_user.id();
|
||||
|
||||
sqlx::query!("
|
||||
|
|
Loading…
Add table
Reference in a new issue