process unread messages on restart by registering handlers before initial sync
This commit is contained in:
parent
3f5208cab1
commit
e538be2c3a
1 changed files with 23 additions and 5 deletions
28
src/main.rs
28
src/main.rs
|
|
@ -114,7 +114,12 @@ async fn sync(
|
||||||
session_file: &std::path::Path,
|
session_file: &std::path::Path,
|
||||||
state: Arc<Mutex<DaemonState>>,
|
state: Arc<Mutex<DaemonState>>,
|
||||||
) -> anyhow::Result<()> {
|
) -> anyhow::Result<()> {
|
||||||
tracing::info!("initial sync (ignoring past messages)");
|
let has_token = initial_sync_token.is_some();
|
||||||
|
if has_token {
|
||||||
|
tracing::info!("restoring with persisted token, will process unread messages");
|
||||||
|
} else {
|
||||||
|
tracing::info!("first start, ignoring historical messages");
|
||||||
|
}
|
||||||
|
|
||||||
let filter = FilterDefinition::with_lazy_loading();
|
let filter = FilterDefinition::with_lazy_loading();
|
||||||
let mut sync_settings = SyncSettings::default().filter(filter.into());
|
let mut sync_settings = SyncSettings::default().filter(filter.into());
|
||||||
|
|
@ -123,6 +128,13 @@ async fn sync(
|
||||||
sync_settings = sync_settings.token(token);
|
sync_settings = sync_settings.token(token);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Register handlers BEFORE sync_once iff we have a token, so messages
|
||||||
|
// received while we were down trigger the queue. On first start we skip
|
||||||
|
// this to avoid backlogging every historical message.
|
||||||
|
if has_token {
|
||||||
|
register_event_handlers(&client, state.clone());
|
||||||
|
}
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
match client.sync_once(sync_settings.clone()).await {
|
match client.sync_once(sync_settings.clone()).await {
|
||||||
Ok(response) => {
|
Ok(response) => {
|
||||||
|
|
@ -136,8 +148,18 @@ async fn sync(
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if !has_token {
|
||||||
|
register_event_handlers(&client, state.clone());
|
||||||
|
}
|
||||||
|
|
||||||
tracing::info!("synced, listening for messages");
|
tracing::info!("synced, listening for messages");
|
||||||
|
|
||||||
|
client.sync(sync_settings).await?;
|
||||||
|
|
||||||
|
bail!("sync loop exited unexpectedly")
|
||||||
|
}
|
||||||
|
|
||||||
|
fn register_event_handlers(client: &Client, state: Arc<Mutex<DaemonState>>) {
|
||||||
let msg_state = state.clone();
|
let msg_state = state.clone();
|
||||||
client.add_event_handler(
|
client.add_event_handler(
|
||||||
move |event: matrix_sdk::ruma::events::room::message::OriginalSyncRoomMessageEvent,
|
move |event: matrix_sdk::ruma::events::room::message::OriginalSyncRoomMessageEvent,
|
||||||
|
|
@ -160,10 +182,6 @@ async fn sync(
|
||||||
);
|
);
|
||||||
|
|
||||||
client.add_event_handler(handlers::on_stripped_state_member);
|
client.add_event_handler(handlers::on_stripped_state_member);
|
||||||
|
|
||||||
client.sync(sync_settings).await?;
|
|
||||||
|
|
||||||
bail!("sync loop exited unexpectedly")
|
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn process_loop(state: Arc<Mutex<DaemonState>>, client: Client, socket_path: &PathBuf) {
|
async fn process_loop(state: Arc<Mutex<DaemonState>>, client: Client, socket_path: &PathBuf) {
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue