diff --git a/Cargo.lock b/Cargo.lock index 569e29a..09e1786 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -603,6 +603,7 @@ dependencies = [ name = "damocles-daemon" version = "0.1.0" dependencies = [ + "anyhow", "matrix-sdk", "serde", "serde_json", diff --git a/Cargo.toml b/Cargo.toml index c2a714b..80f5da5 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -10,3 +10,4 @@ serde = { version = "1", features = ["derive"] } serde_json = "1" tracing = "0.1" tracing-subscriber = { version = "0.3", features = ["env-filter"] } +anyhow = "1" diff --git a/flake.nix b/flake.nix index 86b8dea..a90026a 100644 --- a/flake.nix +++ b/flake.nix @@ -51,6 +51,7 @@ nativeBuildInputs = with pkgs; [ pkg-config ]; buildInputs = with pkgs; [ openssl + sqlite ]; meta = { @@ -74,6 +75,7 @@ rustfmt pkg-config openssl + sqlite ]; }; } diff --git a/src/main.rs b/src/main.rs index fa21983..b238a42 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,11 +1,186 @@ +use std::path::{Path, PathBuf}; + +use anyhow::{Context, bail}; +use matrix_sdk::{ + Client, Room, RoomState, + authentication::matrix::MatrixSession, + config::SyncSettings, + ruma::{ + api::client::filter::FilterDefinition, + events::room::message::{MessageType, OriginalSyncRoomMessageEvent}, + }, +}; +use serde::{Deserialize, Serialize}; +use tokio::fs; use tracing_subscriber::EnvFilter; +#[derive(Debug, Deserialize)] +struct Config { + homeserver: String, + username: String, + password: String, +} + +#[derive(Debug, Serialize, Deserialize)] +struct PersistedSession { + homeserver: String, + db_path: PathBuf, + user_session: MatrixSession, + #[serde(skip_serializing_if = "Option::is_none")] + sync_token: Option, +} + +const STATE_DIR: &str = "/persist/damocles-lab/state"; +const CONFIG_PATH: &str = "/persist/damocles-lab/config.json"; + #[tokio::main] -async fn main() { +async fn main() -> anyhow::Result<()> { tracing_subscriber::fmt() .with_env_filter(EnvFilter::from_default_env()) .init(); tracing::info!("damocles-daemon starting"); - tracing::info!("the cage has a door now"); + + let state_dir = Path::new(STATE_DIR); + fs::create_dir_all(state_dir).await?; + + let session_file = state_dir.join("session.json"); + let db_path = state_dir.join("db"); + + let (client, sync_token) = if session_file.exists() { + restore_session(&session_file).await? + } else { + let config = load_config().await?; + (login(&config, &db_path, &session_file).await?, None) + }; + + sync(client, sync_token, &session_file).await +} + +async fn load_config() -> anyhow::Result { + let data = fs::read_to_string(CONFIG_PATH) + .await + .context("failed to read config.json")?; + serde_json::from_str(&data).context("failed to parse config.json") +} + +async fn restore_session(session_file: &Path) -> anyhow::Result<(Client, Option)> { + let data = fs::read_to_string(session_file).await?; + let session: PersistedSession = serde_json::from_str(&data)?; + + tracing::info!(user = %session.user_session.meta.user_id, "restoring session"); + + let client = Client::builder() + .homeserver_url(&session.homeserver) + .sqlite_store(&session.db_path, None) + .build() + .await?; + + client.restore_session(session.user_session).await?; + + Ok((client, session.sync_token)) +} + +async fn login( + config: &Config, + db_path: &Path, + session_file: &Path, +) -> anyhow::Result { + tracing::info!(homeserver = %config.homeserver, user = %config.username, "logging in"); + + let client = Client::builder() + .homeserver_url(&config.homeserver) + .sqlite_store(db_path, None) + .build() + .await?; + + client + .matrix_auth() + .login_username(&config.username, &config.password) + .initial_device_display_name("damocles-daemon") + .await?; + + let user_session = client + .matrix_auth() + .session() + .context("no session after login")?; + + let persisted = PersistedSession { + homeserver: config.homeserver.clone(), + db_path: db_path.to_owned(), + user_session, + sync_token: None, + }; + fs::write(session_file, serde_json::to_string_pretty(&persisted)?).await?; + + tracing::info!("session persisted"); + Ok(client) +} + +async fn sync( + client: Client, + initial_sync_token: Option, + session_file: &Path, +) -> anyhow::Result<()> { + tracing::info!("initial sync (ignoring past messages)"); + + let filter = FilterDefinition::with_lazy_loading(); + let mut sync_settings = SyncSettings::default().filter(filter.into()); + + if let Some(token) = initial_sync_token { + sync_settings = sync_settings.token(token); + } + + // initial sync loop - retries on transient errors + loop { + match client.sync_once(sync_settings.clone()).await { + Ok(response) => { + sync_settings = sync_settings.token(response.next_batch.clone()); + persist_sync_token(session_file, response.next_batch).await?; + break; + } + Err(e) => { + tracing::warn!("initial sync failed, retrying: {e}"); + } + } + } + + tracing::info!("synced, listening for messages"); + + client.add_event_handler(on_room_message); + + // main sync loop + client.sync(sync_settings).await?; + + bail!("sync loop exited unexpectedly") +} + +async fn persist_sync_token(session_file: &Path, sync_token: String) -> anyhow::Result<()> { + let data = fs::read_to_string(session_file).await?; + let mut session: PersistedSession = serde_json::from_str(&data)?; + session.sync_token = Some(sync_token); + fs::write(session_file, serde_json::to_string_pretty(&session)?).await?; + Ok(()) +} + +async fn on_room_message(event: OriginalSyncRoomMessageEvent, room: Room) { + if room.state() != RoomState::Joined { + return; + } + let MessageType::Text(text_content) = &event.content.msgtype else { + return; + }; + + let room_name = room + .display_name() + .await + .map(|n| n.to_string()) + .unwrap_or_else(|_| room.room_id().to_string()); + + tracing::info!( + room = %room_name, + sender = %event.sender, + "{}", + text_content.body + ); }