agent_web_port: collision-aware sticky allocation
operator hit 'coder' and 'test' colliding on the same hashed port — fnv-1a mod 900 has ~0.1% collision probability per pair and clearly that's not enough. agent_web_port goes stateful: - per-agent port persisted to /var/lib/hyperhive/agents/<name>/port - on first call, look up the file; if absent, hash, then probe forward through the allocated range skipping any port other agents already claim, then write the chosen value back - subsequent calls return the persisted port (sticky) other agents' ports come from their port file if present, else the fallback is the hashed value — that handles existing deployments without forcing a rebuild-all just to migrate. rebuilding the colliding agent re-runs agent_web_port, sees its peer's implicit hash port as taken, picks the next free slot, persists. range exhaustion (very unlikely — 900 slots) logs a warning and returns the hash; the bind-with-retry on the harness will surface the failure honestly rather than silently looping.
This commit is contained in:
parent
754db7830e
commit
79a46f359a
1 changed files with 82 additions and 3 deletions
|
|
@ -45,14 +45,51 @@ const WEB_PORT_RANGE: u16 = 900;
|
||||||
const DEFAULT_MEMORY_MAX: &str = "2G";
|
const DEFAULT_MEMORY_MAX: &str = "2G";
|
||||||
const DEFAULT_CPU_QUOTA: &str = "50%";
|
const DEFAULT_CPU_QUOTA: &str = "50%";
|
||||||
|
|
||||||
/// Returns the per-agent web UI port. Same hash on both sides — manager,
|
/// Returns the per-agent web UI port. Manager is fixed at `MANAGER_PORT`.
|
||||||
/// dashboard, and agent harness all agree. Manager is fixed at
|
/// For sub-agents the port is sticky once chosen: looked up from
|
||||||
/// `MANAGER_PORT`.
|
/// `agent_state_root(name)/port` if present, otherwise derived from
|
||||||
|
/// the FNV-1a hash of the name and *probed forward* through the
|
||||||
|
/// allocated range to skip any port another sub-agent has already
|
||||||
|
/// claimed (birthday-paradox collisions are real even at 2–3
|
||||||
|
/// agents). The chosen port is written back so subsequent calls
|
||||||
|
/// resolve to the same value without re-probing.
|
||||||
#[must_use]
|
#[must_use]
|
||||||
pub fn agent_web_port(name: &str) -> u16 {
|
pub fn agent_web_port(name: &str) -> u16 {
|
||||||
if name == MANAGER_NAME {
|
if name == MANAGER_NAME {
|
||||||
return MANAGER_PORT;
|
return MANAGER_PORT;
|
||||||
}
|
}
|
||||||
|
let state_root = crate::coordinator::Coordinator::agent_state_root(name);
|
||||||
|
let port_file = state_root.join("port");
|
||||||
|
if let Ok(s) = std::fs::read_to_string(&port_file)
|
||||||
|
&& let Ok(port) = s.trim().parse::<u16>()
|
||||||
|
&& (WEB_PORT_BASE..WEB_PORT_BASE + WEB_PORT_RANGE).contains(&port)
|
||||||
|
{
|
||||||
|
return port;
|
||||||
|
}
|
||||||
|
let taken = scan_taken_ports(name);
|
||||||
|
let start = port_hash(name);
|
||||||
|
let mut port = start;
|
||||||
|
for _ in 0..WEB_PORT_RANGE {
|
||||||
|
if !taken.contains(&port) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
port = next_port(port);
|
||||||
|
if port == start {
|
||||||
|
// Range fully exhausted (very unlikely — 900 slots) —
|
||||||
|
// give up and just use the hashed value; collisions are
|
||||||
|
// surfaced as bind errors by the harness retry loop.
|
||||||
|
tracing::warn!(%name, "agent_web_port: range exhausted, returning hash");
|
||||||
|
return start;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
let _ = std::fs::create_dir_all(&state_root);
|
||||||
|
if let Err(e) = std::fs::write(&port_file, format!("{port}\n")) {
|
||||||
|
tracing::warn!(error = ?e, file = %port_file.display(), "persisting agent port failed");
|
||||||
|
}
|
||||||
|
port
|
||||||
|
}
|
||||||
|
|
||||||
|
fn port_hash(name: &str) -> u16 {
|
||||||
let mut hash: u32 = 2_166_136_261;
|
let mut hash: u32 = 2_166_136_261;
|
||||||
for b in name.bytes() {
|
for b in name.bytes() {
|
||||||
hash ^= u32::from(b);
|
hash ^= u32::from(b);
|
||||||
|
|
@ -62,6 +99,48 @@ pub fn agent_web_port(name: &str) -> u16 {
|
||||||
WEB_PORT_BASE + u16::try_from(hash % u32::from(WEB_PORT_RANGE)).unwrap_or(0)
|
WEB_PORT_BASE + u16::try_from(hash % u32::from(WEB_PORT_RANGE)).unwrap_or(0)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn next_port(port: u16) -> u16 {
|
||||||
|
let p = port + 1;
|
||||||
|
if p >= WEB_PORT_BASE + WEB_PORT_RANGE {
|
||||||
|
WEB_PORT_BASE
|
||||||
|
} else {
|
||||||
|
p
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Scan every other agent's effective web UI port: prefer the
|
||||||
|
/// persisted `port` file when present, fall back to the hashed
|
||||||
|
/// value for legacy agents that pre-date the port-file scheme. The
|
||||||
|
/// latter is important on existing deployments — without it, a new
|
||||||
|
/// agent's collision check wouldn't see incumbents that haven't
|
||||||
|
/// written their port file yet, and we'd re-emit the same
|
||||||
|
/// collision the operator just hit.
|
||||||
|
fn scan_taken_ports(name: &str) -> std::collections::HashSet<u16> {
|
||||||
|
let mut out = std::collections::HashSet::new();
|
||||||
|
let Ok(rd) = std::fs::read_dir("/var/lib/hyperhive/agents") else {
|
||||||
|
return out;
|
||||||
|
};
|
||||||
|
for entry in rd.flatten() {
|
||||||
|
let Ok(file_name) = entry.file_name().into_string() else {
|
||||||
|
continue;
|
||||||
|
};
|
||||||
|
if file_name == name || file_name == MANAGER_NAME {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
let pf = entry.path().join("port");
|
||||||
|
if let Ok(s) = std::fs::read_to_string(&pf)
|
||||||
|
&& let Ok(port) = s.trim().parse::<u16>()
|
||||||
|
{
|
||||||
|
out.insert(port);
|
||||||
|
} else {
|
||||||
|
// Legacy: no port file yet → its effective port is the
|
||||||
|
// bare hash. Treat as taken so we don't collide with it.
|
||||||
|
out.insert(port_hash(&file_name));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
out
|
||||||
|
}
|
||||||
|
|
||||||
#[must_use]
|
#[must_use]
|
||||||
pub fn container_name(name: &str) -> String {
|
pub fn container_name(name: &str) -> String {
|
||||||
if name == MANAGER_NAME {
|
if name == MANAGER_NAME {
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue