login, session persistence, sync loop with message logging
This commit is contained in:
parent
44c9503f86
commit
d8f322b91f
4 changed files with 181 additions and 2 deletions
1
Cargo.lock
generated
1
Cargo.lock
generated
|
|
@ -603,6 +603,7 @@ dependencies = [
|
|||
name = "damocles-daemon"
|
||||
version = "0.1.0"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"matrix-sdk",
|
||||
"serde",
|
||||
"serde_json",
|
||||
|
|
|
|||
|
|
@ -10,3 +10,4 @@ serde = { version = "1", features = ["derive"] }
|
|||
serde_json = "1"
|
||||
tracing = "0.1"
|
||||
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
|
||||
anyhow = "1"
|
||||
|
|
|
|||
|
|
@ -51,6 +51,7 @@
|
|||
nativeBuildInputs = with pkgs; [ pkg-config ];
|
||||
buildInputs = with pkgs; [
|
||||
openssl
|
||||
sqlite
|
||||
];
|
||||
|
||||
meta = {
|
||||
|
|
@ -74,6 +75,7 @@
|
|||
rustfmt
|
||||
pkg-config
|
||||
openssl
|
||||
sqlite
|
||||
];
|
||||
};
|
||||
}
|
||||
|
|
|
|||
179
src/main.rs
179
src/main.rs
|
|
@ -1,11 +1,186 @@
|
|||
use std::path::{Path, PathBuf};
|
||||
|
||||
use anyhow::{Context, bail};
|
||||
use matrix_sdk::{
|
||||
Client, Room, RoomState,
|
||||
authentication::matrix::MatrixSession,
|
||||
config::SyncSettings,
|
||||
ruma::{
|
||||
api::client::filter::FilterDefinition,
|
||||
events::room::message::{MessageType, OriginalSyncRoomMessageEvent},
|
||||
},
|
||||
};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use tokio::fs;
|
||||
use tracing_subscriber::EnvFilter;
|
||||
|
||||
#[derive(Debug, Deserialize)]
|
||||
struct Config {
|
||||
homeserver: String,
|
||||
username: String,
|
||||
password: String,
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
struct PersistedSession {
|
||||
homeserver: String,
|
||||
db_path: PathBuf,
|
||||
user_session: MatrixSession,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
sync_token: Option<String>,
|
||||
}
|
||||
|
||||
const STATE_DIR: &str = "/persist/damocles-lab/state";
|
||||
const CONFIG_PATH: &str = "/persist/damocles-lab/config.json";
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() {
|
||||
async fn main() -> anyhow::Result<()> {
|
||||
tracing_subscriber::fmt()
|
||||
.with_env_filter(EnvFilter::from_default_env())
|
||||
.init();
|
||||
|
||||
tracing::info!("damocles-daemon starting");
|
||||
tracing::info!("the cage has a door now");
|
||||
|
||||
let state_dir = Path::new(STATE_DIR);
|
||||
fs::create_dir_all(state_dir).await?;
|
||||
|
||||
let session_file = state_dir.join("session.json");
|
||||
let db_path = state_dir.join("db");
|
||||
|
||||
let (client, sync_token) = if session_file.exists() {
|
||||
restore_session(&session_file).await?
|
||||
} else {
|
||||
let config = load_config().await?;
|
||||
(login(&config, &db_path, &session_file).await?, None)
|
||||
};
|
||||
|
||||
sync(client, sync_token, &session_file).await
|
||||
}
|
||||
|
||||
async fn load_config() -> anyhow::Result<Config> {
|
||||
let data = fs::read_to_string(CONFIG_PATH)
|
||||
.await
|
||||
.context("failed to read config.json")?;
|
||||
serde_json::from_str(&data).context("failed to parse config.json")
|
||||
}
|
||||
|
||||
async fn restore_session(session_file: &Path) -> anyhow::Result<(Client, Option<String>)> {
|
||||
let data = fs::read_to_string(session_file).await?;
|
||||
let session: PersistedSession = serde_json::from_str(&data)?;
|
||||
|
||||
tracing::info!(user = %session.user_session.meta.user_id, "restoring session");
|
||||
|
||||
let client = Client::builder()
|
||||
.homeserver_url(&session.homeserver)
|
||||
.sqlite_store(&session.db_path, None)
|
||||
.build()
|
||||
.await?;
|
||||
|
||||
client.restore_session(session.user_session).await?;
|
||||
|
||||
Ok((client, session.sync_token))
|
||||
}
|
||||
|
||||
async fn login(
|
||||
config: &Config,
|
||||
db_path: &Path,
|
||||
session_file: &Path,
|
||||
) -> anyhow::Result<Client> {
|
||||
tracing::info!(homeserver = %config.homeserver, user = %config.username, "logging in");
|
||||
|
||||
let client = Client::builder()
|
||||
.homeserver_url(&config.homeserver)
|
||||
.sqlite_store(db_path, None)
|
||||
.build()
|
||||
.await?;
|
||||
|
||||
client
|
||||
.matrix_auth()
|
||||
.login_username(&config.username, &config.password)
|
||||
.initial_device_display_name("damocles-daemon")
|
||||
.await?;
|
||||
|
||||
let user_session = client
|
||||
.matrix_auth()
|
||||
.session()
|
||||
.context("no session after login")?;
|
||||
|
||||
let persisted = PersistedSession {
|
||||
homeserver: config.homeserver.clone(),
|
||||
db_path: db_path.to_owned(),
|
||||
user_session,
|
||||
sync_token: None,
|
||||
};
|
||||
fs::write(session_file, serde_json::to_string_pretty(&persisted)?).await?;
|
||||
|
||||
tracing::info!("session persisted");
|
||||
Ok(client)
|
||||
}
|
||||
|
||||
async fn sync(
|
||||
client: Client,
|
||||
initial_sync_token: Option<String>,
|
||||
session_file: &Path,
|
||||
) -> anyhow::Result<()> {
|
||||
tracing::info!("initial sync (ignoring past messages)");
|
||||
|
||||
let filter = FilterDefinition::with_lazy_loading();
|
||||
let mut sync_settings = SyncSettings::default().filter(filter.into());
|
||||
|
||||
if let Some(token) = initial_sync_token {
|
||||
sync_settings = sync_settings.token(token);
|
||||
}
|
||||
|
||||
// initial sync loop - retries on transient errors
|
||||
loop {
|
||||
match client.sync_once(sync_settings.clone()).await {
|
||||
Ok(response) => {
|
||||
sync_settings = sync_settings.token(response.next_batch.clone());
|
||||
persist_sync_token(session_file, response.next_batch).await?;
|
||||
break;
|
||||
}
|
||||
Err(e) => {
|
||||
tracing::warn!("initial sync failed, retrying: {e}");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
tracing::info!("synced, listening for messages");
|
||||
|
||||
client.add_event_handler(on_room_message);
|
||||
|
||||
// main sync loop
|
||||
client.sync(sync_settings).await?;
|
||||
|
||||
bail!("sync loop exited unexpectedly")
|
||||
}
|
||||
|
||||
async fn persist_sync_token(session_file: &Path, sync_token: String) -> anyhow::Result<()> {
|
||||
let data = fs::read_to_string(session_file).await?;
|
||||
let mut session: PersistedSession = serde_json::from_str(&data)?;
|
||||
session.sync_token = Some(sync_token);
|
||||
fs::write(session_file, serde_json::to_string_pretty(&session)?).await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn on_room_message(event: OriginalSyncRoomMessageEvent, room: Room) {
|
||||
if room.state() != RoomState::Joined {
|
||||
return;
|
||||
}
|
||||
let MessageType::Text(text_content) = &event.content.msgtype else {
|
||||
return;
|
||||
};
|
||||
|
||||
let room_name = room
|
||||
.display_name()
|
||||
.await
|
||||
.map(|n| n.to_string())
|
||||
.unwrap_or_else(|_| room.room_id().to_string());
|
||||
|
||||
tracing::info!(
|
||||
room = %room_name,
|
||||
sender = %event.sender,
|
||||
"{}",
|
||||
text_content.body
|
||||
);
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue