reminder: add background scheduler loop - checks & delivers due reminders every 5s
This commit is contained in:
parent
4fc9c02934
commit
f38510930a
2 changed files with 50 additions and 0 deletions
|
|
@ -118,6 +118,10 @@ impl Coordinator {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn list_agents(&self) -> Vec<String> {
|
||||||
|
self.agents.lock().unwrap().keys().cloned().collect()
|
||||||
|
}
|
||||||
|
|
||||||
/// Mark an agent as in-progress (only one state per agent for now).
|
/// Mark an agent as in-progress (only one state per agent for now).
|
||||||
pub fn set_transient(&self, name: &str, kind: TransientKind) {
|
pub fn set_transient(&self, name: &str, kind: TransientKind) {
|
||||||
self.transient.lock().unwrap().insert(
|
self.transient.lock().unwrap().insert(
|
||||||
|
|
|
||||||
|
|
@ -157,6 +157,52 @@ async fn main() -> Result<()> {
|
||||||
// when a previously-running container goes away without an
|
// when a previously-running container goes away without an
|
||||||
// operator-initiated transient state.
|
// operator-initiated transient state.
|
||||||
crash_watch::spawn(coord.clone());
|
crash_watch::spawn(coord.clone());
|
||||||
|
// Reminder scheduler: checks for due reminders every 5 seconds,
|
||||||
|
// delivers them as inbox messages from "reminder".
|
||||||
|
let reminder_coord = coord.clone();
|
||||||
|
tokio::spawn(async move {
|
||||||
|
use hive_sh4re::Message;
|
||||||
|
loop {
|
||||||
|
// Get all agents currently registered
|
||||||
|
let agents = reminder_coord.list_agents();
|
||||||
|
for agent in agents {
|
||||||
|
match reminder_coord.broker.get_due_reminders(&agent) {
|
||||||
|
Ok(reminders) => {
|
||||||
|
for (id, message, _file_path) in reminders {
|
||||||
|
// Deliver as inbox message from "reminder"
|
||||||
|
if let Err(e) = reminder_coord.broker.send(&Message {
|
||||||
|
from: "reminder".to_owned(),
|
||||||
|
to: agent.clone(),
|
||||||
|
body: message.clone(),
|
||||||
|
}) {
|
||||||
|
tracing::warn!(
|
||||||
|
reminder_id = id,
|
||||||
|
%agent,
|
||||||
|
error = ?e,
|
||||||
|
"failed to deliver reminder"
|
||||||
|
);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
// Mark as sent
|
||||||
|
if let Err(e) = reminder_coord.broker.mark_reminder_sent(id) {
|
||||||
|
tracing::warn!(
|
||||||
|
reminder_id = id,
|
||||||
|
error = ?e,
|
||||||
|
"failed to mark reminder sent"
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Err(e) => tracing::warn!(
|
||||||
|
%agent,
|
||||||
|
error = ?e,
|
||||||
|
"failed to query due reminders"
|
||||||
|
),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
tokio::time::sleep(std::time::Duration::from_secs(5)).await;
|
||||||
|
}
|
||||||
|
});
|
||||||
let dash_coord = coord.clone();
|
let dash_coord = coord.clone();
|
||||||
tokio::spawn(async move {
|
tokio::spawn(async move {
|
||||||
if let Err(e) = dashboard::serve(dashboard_port, dash_coord).await {
|
if let Err(e) = dashboard::serve(dashboard_port, dash_coord).await {
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue