systemd: fetch remotes via busctl --host, applet-open gating, error states (step 4)
This commit is contained in:
parent
6224d86965
commit
2ab2af8a23
4 changed files with 302 additions and 34 deletions
|
|
@ -5,12 +5,21 @@
|
|||
// for QVariantMap/QVariantList, and cxx-qt main on git regressed qt-build-utils
|
||||
// to require QuickControls2.prl files that nixpkgs strips. Switch to
|
||||
// QList<QVariantMap> when a release ships with both fixes.
|
||||
//
|
||||
// Local data uses native zbus over the system + session buses. Remote data is
|
||||
// fetched by spawning `busctl --host=<target>`, which tunnels sd-bus through
|
||||
// SSH using whatever the user already has set up in `~/.ssh/config`. We parse
|
||||
// busctl's JSON output with serde_json. Remote fetches only happen while the
|
||||
// applet is open (toggled via `setAppletOpen`).
|
||||
|
||||
use crate::modules_service;
|
||||
use core::pin::Pin;
|
||||
use cxx_qt::CxxQtType;
|
||||
use cxx_qt_lib::QString;
|
||||
use serde::Serialize;
|
||||
use std::collections::HashMap;
|
||||
use std::sync::OnceLock;
|
||||
use std::time::{SystemTime, UNIX_EPOCH};
|
||||
use tokio::runtime::Runtime;
|
||||
use zbus::{Connection, proxy};
|
||||
|
||||
|
|
@ -28,16 +37,24 @@ pub mod qobject {
|
|||
#[qproperty(QString, hostname)]
|
||||
// Local failed unit count (drives the bar module label).
|
||||
#[qproperty(i32, failed_count, cxx_name = "failedCount")]
|
||||
// JSON array, local first then nspawn containers. Each entry:
|
||||
// JSON array, local first then nspawn containers then configured remotes.
|
||||
// Each entry:
|
||||
// { name, isLocal, marker, systemState, runningCount, totalCount,
|
||||
// failedUnits: [{name, description, subState, scope, machine}],
|
||||
// runningUnits: [...] }
|
||||
// runningUnits: [...],
|
||||
// errorKind: ""|"transient"|"permanent",
|
||||
// errorReason: string,
|
||||
// lastSeen: 0 | unix_seconds }
|
||||
#[qproperty(QString, machines_json, cxx_name = "machinesJson")]
|
||||
type SystemdService = super::SystemdServiceRust;
|
||||
|
||||
#[qinvokable]
|
||||
fn poll(self: Pin<&mut Self>);
|
||||
|
||||
#[qinvokable]
|
||||
#[cxx_name = "setAppletOpen"]
|
||||
fn set_applet_open(self: Pin<&mut Self>, open: bool);
|
||||
|
||||
#[qinvokable]
|
||||
#[cxx_name = "restartUnit"]
|
||||
fn restart_unit(self: Pin<&mut Self>, name: QString, scope: QString, machine: QString);
|
||||
|
|
@ -87,7 +104,7 @@ trait Machined {
|
|||
) -> zbus::Result<Vec<(String, String, String, zbus::zvariant::OwnedObjectPath)>>;
|
||||
}
|
||||
|
||||
#[derive(Serialize)]
|
||||
#[derive(Serialize, Clone)]
|
||||
struct UnitJson {
|
||||
name: String,
|
||||
description: String,
|
||||
|
|
@ -97,7 +114,7 @@ struct UnitJson {
|
|||
machine: String,
|
||||
}
|
||||
|
||||
#[derive(Serialize)]
|
||||
#[derive(Serialize, Clone)]
|
||||
struct MachineJson {
|
||||
name: String,
|
||||
#[serde(rename = "isLocal")]
|
||||
|
|
@ -113,12 +130,41 @@ struct MachineJson {
|
|||
failed_units: Vec<UnitJson>,
|
||||
#[serde(rename = "runningUnits")]
|
||||
running_units: Vec<UnitJson>,
|
||||
#[serde(rename = "errorKind")]
|
||||
error_kind: String,
|
||||
#[serde(rename = "errorReason")]
|
||||
error_reason: String,
|
||||
#[serde(rename = "lastSeen")]
|
||||
last_seen: u64,
|
||||
}
|
||||
|
||||
impl MachineJson {
|
||||
fn placeholder(name: String, is_local: bool) -> Self {
|
||||
Self {
|
||||
name,
|
||||
is_local,
|
||||
marker: String::new(),
|
||||
system_state: "pending".into(),
|
||||
running_count: 0,
|
||||
total_count: 0,
|
||||
failed_units: Vec::new(),
|
||||
running_units: Vec::new(),
|
||||
error_kind: String::new(),
|
||||
error_reason: String::new(),
|
||||
last_seen: 0,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub struct SystemdServiceRust {
|
||||
hostname: QString,
|
||||
failed_count: i32,
|
||||
machines_json: QString,
|
||||
applet_open: bool,
|
||||
// Last successful or attempted remote fetch per target. Persists across
|
||||
// polls so the UI can show stale data with a "last seen" timestamp when
|
||||
// the host goes unreachable.
|
||||
remote_cache: HashMap<String, MachineJson>,
|
||||
}
|
||||
|
||||
impl Default for SystemdServiceRust {
|
||||
|
|
@ -127,6 +173,8 @@ impl Default for SystemdServiceRust {
|
|||
hostname: QString::from(read_hostname()),
|
||||
failed_count: 0,
|
||||
machines_json: QString::from("[]"),
|
||||
applet_open: false,
|
||||
remote_cache: HashMap::new(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -137,6 +185,13 @@ fn read_hostname() -> String {
|
|||
.unwrap_or_else(|_| "localhost".to_string())
|
||||
}
|
||||
|
||||
fn now_unix() -> u64 {
|
||||
SystemTime::now()
|
||||
.duration_since(UNIX_EPOCH)
|
||||
.map(|d| d.as_secs())
|
||||
.unwrap_or(0)
|
||||
}
|
||||
|
||||
fn rt() -> &'static Runtime {
|
||||
static RT: OnceLock<Runtime> = OnceLock::new();
|
||||
RT.get_or_init(|| {
|
||||
|
|
@ -161,7 +216,7 @@ async fn fetch_units(bus: &Connection) -> (String, Vec<UnitTuple>) {
|
|||
(state, units)
|
||||
}
|
||||
|
||||
// Partition a unit list by active_state. (failed, running)
|
||||
// Partition a unit list by active_state. (failed, running, total)
|
||||
fn partition_units(
|
||||
units: Vec<UnitTuple>,
|
||||
scope: &str,
|
||||
|
|
@ -187,7 +242,7 @@ fn partition_units(
|
|||
(failed, running, total)
|
||||
}
|
||||
|
||||
async fn poll_async() -> (
|
||||
async fn poll_local() -> (
|
||||
String,
|
||||
Vec<UnitTuple>,
|
||||
Vec<UnitTuple>,
|
||||
|
|
@ -220,6 +275,115 @@ async fn poll_async() -> (
|
|||
(sys_state, sys_units, user_units, machines)
|
||||
}
|
||||
|
||||
// Spawn `busctl --host=<target>` to fetch system_state + units. Returns the
|
||||
// MachineJson on success, or an (kind, reason) on failure.
|
||||
fn fetch_remote(target: &str) -> Result<MachineJson, (&'static str, String)> {
|
||||
let state_out = std::process::Command::new("busctl")
|
||||
.args([
|
||||
"--host",
|
||||
target,
|
||||
"--json=short",
|
||||
"get-property",
|
||||
"org.freedesktop.systemd1",
|
||||
"/org/freedesktop/systemd1",
|
||||
"org.freedesktop.systemd1.Manager",
|
||||
"SystemState",
|
||||
])
|
||||
.output()
|
||||
.map_err(|e| ("transient", format!("busctl spawn failed: {e}")))?;
|
||||
|
||||
if !state_out.status.success() {
|
||||
let stderr = String::from_utf8_lossy(&state_out.stderr).into_owned();
|
||||
return Err((classify_error(&stderr), stderr));
|
||||
}
|
||||
let state_v: serde_json::Value = serde_json::from_slice(&state_out.stdout)
|
||||
.map_err(|e| ("transient", format!("invalid SystemState json: {e}")))?;
|
||||
let system_state = state_v
|
||||
.get("data")
|
||||
.and_then(|d| d.as_str())
|
||||
.unwrap_or("unknown")
|
||||
.to_string();
|
||||
|
||||
let units_out = std::process::Command::new("busctl")
|
||||
.args([
|
||||
"--host",
|
||||
target,
|
||||
"--json=short",
|
||||
"call",
|
||||
"org.freedesktop.systemd1",
|
||||
"/org/freedesktop/systemd1",
|
||||
"org.freedesktop.systemd1.Manager",
|
||||
"ListUnits",
|
||||
])
|
||||
.output()
|
||||
.map_err(|e| ("transient", format!("busctl spawn failed: {e}")))?;
|
||||
|
||||
if !units_out.status.success() {
|
||||
let stderr = String::from_utf8_lossy(&units_out.stderr).into_owned();
|
||||
return Err((classify_error(&stderr), stderr));
|
||||
}
|
||||
let v: serde_json::Value = serde_json::from_slice(&units_out.stdout)
|
||||
.map_err(|e| ("transient", format!("invalid ListUnits json: {e}")))?;
|
||||
let arr = v
|
||||
.get("data")
|
||||
.and_then(|d| d.get(0))
|
||||
.and_then(|d| d.as_array())
|
||||
.ok_or(("transient", "missing ListUnits data".to_string()))?;
|
||||
|
||||
let mut total = 0i32;
|
||||
let mut failed = Vec::new();
|
||||
let mut running = Vec::new();
|
||||
for u in arr {
|
||||
let arr = match u.as_array() {
|
||||
Some(a) if a.len() >= 5 => a,
|
||||
_ => continue,
|
||||
};
|
||||
total += 1;
|
||||
let entry = UnitJson {
|
||||
name: arr[0].as_str().unwrap_or("").to_string(),
|
||||
description: arr[1].as_str().unwrap_or("").to_string(),
|
||||
sub_state: arr[4].as_str().unwrap_or("").to_string(),
|
||||
scope: "system".to_string(),
|
||||
machine: target.to_string(),
|
||||
};
|
||||
match arr[3].as_str() {
|
||||
Some("failed") => failed.push(entry),
|
||||
Some("active") => running.push(entry),
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(MachineJson {
|
||||
name: target.to_string(),
|
||||
is_local: false,
|
||||
marker: String::new(),
|
||||
system_state,
|
||||
running_count: running.len() as i32,
|
||||
total_count: total,
|
||||
failed_units: failed,
|
||||
running_units: running,
|
||||
error_kind: String::new(),
|
||||
error_reason: String::new(),
|
||||
last_seen: now_unix(),
|
||||
})
|
||||
}
|
||||
|
||||
// Classify an ssh/busctl stderr blob into transient vs permanent so the UI
|
||||
// can pick a color and decide whether to retry aggressively.
|
||||
fn classify_error(stderr: &str) -> &'static str {
|
||||
let s = stderr.to_lowercase();
|
||||
if s.contains("permission denied")
|
||||
|| s.contains("publickey")
|
||||
|| s.contains("authentication failed")
|
||||
|| s.contains("host key verification failed")
|
||||
|| s.contains("could not resolve hostname")
|
||||
{
|
||||
"permanent"
|
||||
} else {
|
||||
"transient"
|
||||
}
|
||||
}
|
||||
|
||||
impl cxx_qt::Initialize for qobject::SystemdService {
|
||||
fn initialize(self: Pin<&mut Self>) {
|
||||
self.poll();
|
||||
|
|
@ -228,7 +392,7 @@ impl cxx_qt::Initialize for qobject::SystemdService {
|
|||
|
||||
impl qobject::SystemdService {
|
||||
fn poll(mut self: Pin<&mut Self>) {
|
||||
let (sys_state, sys_units, user_units, machines) = rt().block_on(poll_async());
|
||||
let (sys_state, sys_units, user_units, containers) = rt().block_on(poll_local());
|
||||
|
||||
let (sys_failed, sys_running, sys_total) = partition_units(sys_units, "system", "");
|
||||
let (user_failed, user_running, user_total) = partition_units(user_units, "user", "");
|
||||
|
|
@ -253,44 +417,85 @@ impl qobject::SystemdService {
|
|||
total_count: local_total_count,
|
||||
failed_units: failed,
|
||||
running_units: running,
|
||||
error_kind: String::new(),
|
||||
error_reason: String::new(),
|
||||
last_seen: now_unix(),
|
||||
};
|
||||
|
||||
// Containers: enumerate only; unit fetching for containers comes in step 5.
|
||||
let mut all_machines = Vec::with_capacity(1 + machines.len());
|
||||
let mut all_machines = Vec::new();
|
||||
all_machines.push(local);
|
||||
for (name, _class, _service) in &machines {
|
||||
all_machines.push(MachineJson {
|
||||
name: name.clone(),
|
||||
is_local: false,
|
||||
marker: String::new(),
|
||||
system_state: "unknown".into(),
|
||||
running_count: 0,
|
||||
total_count: 0,
|
||||
failed_units: Vec::new(),
|
||||
running_units: Vec::new(),
|
||||
});
|
||||
|
||||
// Local nspawn containers (unit fetching for them lands in step 5).
|
||||
for (name, _class, _service) in &containers {
|
||||
all_machines.push(MachineJson::placeholder(name.clone(), false));
|
||||
}
|
||||
|
||||
// Configured remote machines (placeholders; transport lands in step 4).
|
||||
// Dedup local: drop entries matching the local hostname or `localhost`,
|
||||
// with or without a `user@` prefix.
|
||||
// Configured remote machines. Dedup local: drop entries matching the
|
||||
// local hostname or `localhost`, with or without a `user@` prefix.
|
||||
let host = read_hostname();
|
||||
let cfg_machines = modules_service::load_systemd_machines();
|
||||
let mut want_targets: Vec<String> = Vec::new();
|
||||
for target in cfg_machines {
|
||||
let host_part = target.rsplit_once('@').map_or(target.as_str(), |(_, h)| h);
|
||||
if host_part == host || host_part == "localhost" {
|
||||
continue;
|
||||
}
|
||||
all_machines.push(MachineJson {
|
||||
name: target.clone(),
|
||||
is_local: false,
|
||||
marker: String::new(),
|
||||
system_state: "pending".into(),
|
||||
running_count: 0,
|
||||
total_count: 0,
|
||||
failed_units: Vec::new(),
|
||||
running_units: Vec::new(),
|
||||
});
|
||||
want_targets.push(target);
|
||||
}
|
||||
|
||||
// Fetch only while the applet is open. Keep last-known data in the
|
||||
// cache so a re-open shows something instantly.
|
||||
let applet_open = self.as_ref().rust().applet_open;
|
||||
if applet_open {
|
||||
for target in &want_targets {
|
||||
let prev_last_seen = self
|
||||
.as_ref()
|
||||
.rust()
|
||||
.remote_cache
|
||||
.get(target)
|
||||
.map(|m| m.last_seen)
|
||||
.unwrap_or(0);
|
||||
let entry = match fetch_remote(target) {
|
||||
Ok(m) => m,
|
||||
Err((kind, reason)) => MachineJson {
|
||||
name: target.clone(),
|
||||
is_local: false,
|
||||
marker: String::new(),
|
||||
system_state: "unreachable".into(),
|
||||
running_count: 0,
|
||||
total_count: 0,
|
||||
failed_units: Vec::new(),
|
||||
running_units: Vec::new(),
|
||||
error_kind: kind.to_string(),
|
||||
error_reason: reason,
|
||||
last_seen: prev_last_seen,
|
||||
},
|
||||
};
|
||||
self.as_mut()
|
||||
.rust_mut()
|
||||
.remote_cache
|
||||
.insert(target.clone(), entry);
|
||||
}
|
||||
}
|
||||
|
||||
// Drop cache entries that are no longer configured.
|
||||
let want_set: std::collections::HashSet<&str> =
|
||||
want_targets.iter().map(String::as_str).collect();
|
||||
self.as_mut()
|
||||
.rust_mut()
|
||||
.remote_cache
|
||||
.retain(|k: &String, _: &mut MachineJson| want_set.contains(k.as_str()));
|
||||
|
||||
// Emit configured remotes from cache (or placeholder if never fetched).
|
||||
for target in &want_targets {
|
||||
let entry = self
|
||||
.as_ref()
|
||||
.rust()
|
||||
.remote_cache
|
||||
.get(target)
|
||||
.cloned()
|
||||
.unwrap_or_else(|| MachineJson::placeholder(target.clone(), false));
|
||||
all_machines.push(entry);
|
||||
}
|
||||
|
||||
let machines_json = serde_json::to_string(&all_machines).unwrap_or_else(|_| "[]".into());
|
||||
|
|
@ -300,6 +505,14 @@ impl qobject::SystemdService {
|
|||
.set_machines_json(QString::from(machines_json));
|
||||
}
|
||||
|
||||
fn set_applet_open(mut self: Pin<&mut Self>, open: bool) {
|
||||
let was_open = self.as_ref().rust().applet_open;
|
||||
self.as_mut().rust_mut().applet_open = open;
|
||||
if open && !was_open {
|
||||
self.poll();
|
||||
}
|
||||
}
|
||||
|
||||
fn restart_unit(self: Pin<&mut Self>, name: QString, scope: QString, machine: QString) {
|
||||
let name = name.to_string();
|
||||
let scope = scope.to_string();
|
||||
|
|
@ -308,7 +521,7 @@ impl qobject::SystemdService {
|
|||
rt().block_on(async move {
|
||||
// Local-only restart for now. Container/remote restart comes later.
|
||||
if !machine.is_empty() {
|
||||
tracing::warn!(target: "nova_plugin", machine = %machine, "container restart not yet implemented");
|
||||
tracing::warn!(target: "nova_plugin", machine = %machine, "container/remote restart not yet implemented");
|
||||
return;
|
||||
}
|
||||
let conn = match scope.as_str() {
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue