Implement timeouts in UDP

This commit is contained in:
Jeremy Soller 2017-04-09 09:01:40 -06:00
parent fbbc6a3ba5
commit 1be3f08463
2 changed files with 127 additions and 36 deletions

View file

@ -173,9 +173,6 @@ impl Tcpd {
break; break;
} }
let mut time = TimeSpec::default();
syscall::clock_gettime(CLOCK_MONOTONIC, &mut time).map_err(|err| io::Error::from_raw_os_error(err.errno))?;
let a = packet.a; let a = packet.a;
self.handle(&mut packet); self.handle(&mut packet);
if packet.a == (-EWOULDBLOCK) as usize { if packet.a == (-EWOULDBLOCK) as usize {
@ -191,6 +188,9 @@ impl Tcpd {
let timeout = match handle.read_timeout { let timeout = match handle.read_timeout {
Some(read_timeout) => { Some(read_timeout) => {
let mut time = TimeSpec::default();
syscall::clock_gettime(CLOCK_MONOTONIC, &mut time).map_err(|err| io::Error::from_raw_os_error(err.errno))?;
let timeout = add_time(&time, &read_timeout); let timeout = add_time(&time, &read_timeout);
self.time_file.write(&timeout)?; self.time_file.write(&timeout)?;
Some(timeout) Some(timeout)
@ -205,6 +205,9 @@ impl Tcpd {
let timeout = match handle.write_timeout { let timeout = match handle.write_timeout {
Some(write_timeout) => { Some(write_timeout) => {
let mut time = TimeSpec::default();
syscall::clock_gettime(CLOCK_MONOTONIC, &mut time).map_err(|err| io::Error::from_raw_os_error(err.errno))?;
let timeout = add_time(&time, &write_timeout); let timeout = add_time(&time, &write_timeout);
self.time_file.write(&timeout)?; self.time_file.write(&timeout)?;
Some(timeout) Some(timeout)
@ -677,7 +680,7 @@ impl SchemeMut for Tcpd {
}; };
if let Handle::Tcp(ref mut handle) = *self.handles.get_mut(&file).ok_or(Error::new(EBADF))? { if let Handle::Tcp(ref mut handle) = *self.handles.get_mut(&file).ok_or(Error::new(EBADF))? {
let read_timeout = |timeout: &Option<TimeSpec>, buf: &mut [u8]| -> Result<usize> { let get_timeout = |timeout: &Option<TimeSpec>, buf: &mut [u8]| -> Result<usize> {
if let Some(ref timespec) = *timeout { if let Some(ref timespec) = *timeout {
timespec.deref().read(buf).map_err(|err| Error::new(err.raw_os_error().unwrap_or(EIO))) timespec.deref().read(buf).map_err(|err| Error::new(err.raw_os_error().unwrap_or(EIO)))
} else { } else {
@ -695,10 +698,10 @@ impl SchemeMut for Tcpd {
} }
}, },
SettingKind::ReadTimeout => { SettingKind::ReadTimeout => {
read_timeout(&handle.read_timeout, buf) get_timeout(&handle.read_timeout, buf)
}, },
SettingKind::WriteTimeout => { SettingKind::WriteTimeout => {
read_timeout(&handle.write_timeout, buf) get_timeout(&handle.write_timeout, buf)
} }
} }
} else { } else {
@ -734,7 +737,7 @@ impl SchemeMut for Tcpd {
}; };
if let Handle::Tcp(ref mut handle) = *self.handles.get_mut(&file).ok_or(Error::new(EBADF))? { if let Handle::Tcp(ref mut handle) = *self.handles.get_mut(&file).ok_or(Error::new(EBADF))? {
let write_timeout = |timeout: &mut Option<TimeSpec>, buf: &[u8]| -> Result<usize> { let set_timeout = |timeout: &mut Option<TimeSpec>, buf: &[u8]| -> Result<usize> {
if buf.len() >= mem::size_of::<TimeSpec>() { if buf.len() >= mem::size_of::<TimeSpec>() {
let mut timespec = TimeSpec::default(); let mut timespec = TimeSpec::default();
let count = timespec.deref_mut().write(buf).map_err(|err| Error::new(err.raw_os_error().unwrap_or(EIO)))?; let count = timespec.deref_mut().write(buf).map_err(|err| Error::new(err.raw_os_error().unwrap_or(EIO)))?;
@ -756,10 +759,10 @@ impl SchemeMut for Tcpd {
} }
}, },
SettingKind::ReadTimeout => { SettingKind::ReadTimeout => {
write_timeout(&mut handle.read_timeout, buf) set_timeout(&mut handle.read_timeout, buf)
}, },
SettingKind::WriteTimeout => { SettingKind::WriteTimeout => {
write_timeout(&mut handle.write_timeout, buf) set_timeout(&mut handle.write_timeout, buf)
} }
} }
} else { } else {

View file

@ -17,10 +17,26 @@ use event::EventQueue;
use netutils::{n16, Ipv4, Ipv4Addr, Ipv4Header, Checksum}; use netutils::{n16, Ipv4, Ipv4Addr, Ipv4Header, Checksum};
use netutils::udp::{Udp, UdpHeader}; use netutils::udp::{Udp, UdpHeader};
use syscall::data::{Packet, TimeSpec}; use syscall::data::{Packet, TimeSpec};
use syscall::error::{Error, Result, EACCES, EADDRINUSE, EBADF, EIO, EINVAL, EMSGSIZE, ENOTCONN, EWOULDBLOCK}; use syscall::error::{Error, Result, EACCES, EADDRINUSE, EBADF, EIO, EINVAL, EMSGSIZE, ENOTCONN, ETIMEDOUT, EWOULDBLOCK};
use syscall::flag::{EVENT_READ, F_GETFL, F_SETFL, O_ACCMODE, O_CREAT, O_RDWR, O_NONBLOCK}; use syscall::flag::{CLOCK_MONOTONIC, EVENT_READ, F_GETFL, F_SETFL, O_ACCMODE, O_CREAT, O_RDWR, O_NONBLOCK};
use syscall::number::{SYS_READ, SYS_WRITE};
use syscall::scheme::SchemeMut; use syscall::scheme::SchemeMut;
fn add_time(a: &TimeSpec, b: &TimeSpec) -> TimeSpec {
let mut secs = a.tv_sec + b.tv_sec;
let mut nsecs = a.tv_nsec + b.tv_nsec;
while nsecs >= 1000000000 {
nsecs -= 1000000000;
secs += 1;
}
TimeSpec {
tv_sec: secs,
tv_nsec: nsecs
}
}
fn parse_socket(socket: &str) -> (Ipv4Addr, u16) { fn parse_socket(socket: &str) -> (Ipv4Addr, u16) {
let mut socket_parts = socket.split(":"); let mut socket_parts = socket.split(":");
let host = Ipv4Addr::from_str(socket_parts.next().unwrap_or("")); let host = Ipv4Addr::from_str(socket_parts.next().unwrap_or(""));
@ -37,7 +53,7 @@ struct UdpHandle {
write_timeout: Option<TimeSpec>, write_timeout: Option<TimeSpec>,
ttl: u8, ttl: u8,
data: VecDeque<Vec<u8>>, data: VecDeque<Vec<u8>>,
todo: VecDeque<Packet>, todo: VecDeque<(Option<TimeSpec>, Packet)>,
} }
#[derive(Copy, Clone)] #[derive(Copy, Clone)]
@ -55,6 +71,7 @@ enum Handle {
struct Udpd { struct Udpd {
scheme_file: File, scheme_file: File,
udp_file: File, udp_file: File,
time_file: File,
ports: BTreeMap<u16, usize>, ports: BTreeMap<u16, usize>,
next_id: usize, next_id: usize,
handles: BTreeMap<usize, Handle>, handles: BTreeMap<usize, Handle>,
@ -62,10 +79,11 @@ struct Udpd {
} }
impl Udpd { impl Udpd {
fn new(scheme_file: File, udp_file: File) -> Self { fn new(scheme_file: File, udp_file: File, time_file: File) -> Self {
Udpd { Udpd {
scheme_file: scheme_file, scheme_file: scheme_file,
udp_file: udp_file, udp_file: udp_file,
time_file: time_file,
ports: BTreeMap::new(), ports: BTreeMap::new(),
next_id: 1, next_id: 1,
handles: BTreeMap::new(), handles: BTreeMap::new(),
@ -86,7 +104,33 @@ impl Udpd {
packet.a = a; packet.a = a;
if let Some(mut handle) = self.handles.get_mut(&packet.b) { if let Some(mut handle) = self.handles.get_mut(&packet.b) {
if let Handle::Udp(ref mut handle) = *handle { if let Handle::Udp(ref mut handle) = *handle {
handle.todo.push_back(packet); let timeout = match packet.a {
SYS_READ => match handle.read_timeout {
Some(read_timeout) => {
let mut time = TimeSpec::default();
syscall::clock_gettime(CLOCK_MONOTONIC, &mut time).map_err(|err| io::Error::from_raw_os_error(err.errno))?;
let timeout = add_time(&time, &read_timeout);
self.time_file.write(&timeout)?;
Some(timeout)
},
None => None
},
SYS_WRITE => match handle.write_timeout {
Some(write_timeout) => {
let mut time = TimeSpec::default();
syscall::clock_gettime(CLOCK_MONOTONIC, &mut time).map_err(|err| io::Error::from_raw_os_error(err.errno))?;
let timeout = add_time(&time, &write_timeout);
self.time_file.write(&timeout)?;
Some(timeout)
},
None => None
},
_ => None
};
handle.todo.push_back((timeout, packet));
} }
} }
} else { } else {
@ -120,7 +164,7 @@ impl Udpd {
handle.data.push_back(udp.data.clone()); handle.data.push_back(udp.data.clone());
while ! handle.todo.is_empty() && ! handle.data.is_empty() { while ! handle.todo.is_empty() && ! handle.data.is_empty() {
let mut packet = handle.todo.pop_front().unwrap(); let (_timeout, mut packet) = handle.todo.pop_front().unwrap();
let buf = unsafe { slice::from_raw_parts_mut(packet.c as *mut u8, packet.d) }; let buf = unsafe { slice::from_raw_parts_mut(packet.c as *mut u8, packet.d) };
let data = handle.data.pop_front().unwrap(); let data = handle.data.pop_front().unwrap();
@ -157,6 +201,34 @@ impl Udpd {
Ok(()) Ok(())
} }
fn time_event(&mut self) -> io::Result<()> {
let mut time = TimeSpec::default();
if self.time_file.read(&mut time)? < mem::size_of::<TimeSpec>() {
return Err(io::Error::from_raw_os_error(EINVAL));
}
for (_id, handle) in self.handles.iter_mut() {
if let Handle::Udp(ref mut handle) = *handle {
let mut i = 0;
while i < handle.todo.len() {
if let Some(timeout) = handle.todo.get(i).map(|e| e.0.clone()).unwrap_or(None) {
if time.tv_sec > timeout.tv_sec || (time.tv_sec == timeout.tv_sec && time.tv_nsec >= timeout.tv_nsec) {
let (_timeout, mut packet) = handle.todo.remove(i).unwrap();
packet.a = (-ETIMEDOUT) as usize;
self.scheme_file.write(&packet)?;
} else {
i += 1;
}
} else {
i += 1;
}
}
}
}
Ok(())
}
} }
impl SchemeMut for Udpd { impl SchemeMut for Udpd {
@ -272,7 +344,7 @@ impl SchemeMut for Udpd {
}; };
if let Handle::Udp(ref mut handle) = *self.handles.get_mut(&file).ok_or(Error::new(EBADF))? { if let Handle::Udp(ref mut handle) = *self.handles.get_mut(&file).ok_or(Error::new(EBADF))? {
let read_timeout = |timeout: &Option<TimeSpec>, buf: &mut [u8]| -> Result<usize> { let get_timeout = |timeout: &Option<TimeSpec>, buf: &mut [u8]| -> Result<usize> {
if let Some(ref timespec) = *timeout { if let Some(ref timespec) = *timeout {
timespec.deref().read(buf).map_err(|err| Error::new(err.raw_os_error().unwrap_or(EIO))) timespec.deref().read(buf).map_err(|err| Error::new(err.raw_os_error().unwrap_or(EIO)))
} else { } else {
@ -290,10 +362,10 @@ impl SchemeMut for Udpd {
} }
}, },
SettingKind::ReadTimeout => { SettingKind::ReadTimeout => {
read_timeout(&handle.read_timeout, buf) get_timeout(&handle.read_timeout, buf)
}, },
SettingKind::WriteTimeout => { SettingKind::WriteTimeout => {
read_timeout(&handle.write_timeout, buf) get_timeout(&handle.write_timeout, buf)
} }
} }
} else { } else {
@ -349,7 +421,7 @@ impl SchemeMut for Udpd {
}; };
if let Handle::Udp(ref mut handle) = *self.handles.get_mut(&file).ok_or(Error::new(EBADF))? { if let Handle::Udp(ref mut handle) = *self.handles.get_mut(&file).ok_or(Error::new(EBADF))? {
let write_timeout = |timeout: &mut Option<TimeSpec>, buf: &[u8]| -> Result<usize> { let set_timeout = |timeout: &mut Option<TimeSpec>, buf: &[u8]| -> Result<usize> {
if buf.len() >= mem::size_of::<TimeSpec>() { if buf.len() >= mem::size_of::<TimeSpec>() {
let mut timespec = TimeSpec::default(); let mut timespec = TimeSpec::default();
let count = timespec.deref_mut().write(buf).map_err(|err| Error::new(err.raw_os_error().unwrap_or(EIO)))?; let count = timespec.deref_mut().write(buf).map_err(|err| Error::new(err.raw_os_error().unwrap_or(EIO)))?;
@ -371,10 +443,10 @@ impl SchemeMut for Udpd {
} }
}, },
SettingKind::ReadTimeout => { SettingKind::ReadTimeout => {
write_timeout(&mut handle.read_timeout, buf) set_timeout(&mut handle.read_timeout, buf)
}, },
SettingKind::WriteTimeout => { SettingKind::WriteTimeout => {
write_timeout(&mut handle.write_timeout, buf) set_timeout(&mut handle.write_timeout, buf)
} }
} }
} else { } else {
@ -450,14 +522,21 @@ impl SchemeMut for Udpd {
Ok(0) Ok(0)
} }
} }
fn daemon(udp_fd: usize, scheme_fd: usize) { fn daemon(scheme_fd: usize, udp_fd: usize, time_fd: usize) {
let udp_file = unsafe { File::from_raw_fd(udp_fd) };
let scheme_file = unsafe { File::from_raw_fd(scheme_fd) }; let scheme_file = unsafe { File::from_raw_fd(scheme_fd) };
let udp_file = unsafe { File::from_raw_fd(udp_fd) };
let time_file = unsafe { File::from_raw_fd(time_fd) };
let udpd = Rc::new(RefCell::new(Udpd::new(scheme_file, udp_file))); let udpd = Rc::new(RefCell::new(Udpd::new(scheme_file, udp_file, time_file)));
let mut event_queue = EventQueue::<()>::new().expect("udpd: failed to create event queue"); let mut event_queue = EventQueue::<()>::new().expect("udpd: failed to create event queue");
let time_udpd = udpd.clone();
event_queue.add(time_fd, move |_count: usize| -> io::Result<Option<()>> {
time_udpd.borrow_mut().time_event()?;
Ok(None)
}).expect("udpd: failed to listen to events on time:");
let udp_udpd = udpd.clone(); let udp_udpd = udpd.clone();
event_queue.add(udp_fd, move |_count: usize| -> io::Result<Option<()>> { event_queue.add(udp_fd, move |_count: usize| -> io::Result<Option<()>> {
udp_udpd.borrow_mut().udp_event()?; udp_udpd.borrow_mut().udp_event()?;
@ -475,13 +554,16 @@ fn daemon(udp_fd: usize, scheme_fd: usize) {
} }
fn main() { fn main() {
let time_path = format!("time:{}", CLOCK_MONOTONIC);
match syscall::open(&time_path, O_RDWR) {
Ok(time_fd) => {
match syscall::open("ip:11", O_RDWR | O_NONBLOCK) { match syscall::open("ip:11", O_RDWR | O_NONBLOCK) {
Ok(udp_fd) => { Ok(udp_fd) => {
// Daemonize // Daemonize
if unsafe { syscall::clone(0).unwrap() } == 0 { if unsafe { syscall::clone(0).unwrap() } == 0 {
match syscall::open(":udp", O_RDWR | O_CREAT | O_NONBLOCK) { match syscall::open(":udp", O_RDWR | O_CREAT | O_NONBLOCK) {
Ok(scheme_fd) => { Ok(scheme_fd) => {
daemon(udp_fd, scheme_fd); daemon(scheme_fd, udp_fd, time_fd);
}, },
Err(err) => { Err(err) => {
println!("udpd: failed to create udp scheme: {}", err); println!("udpd: failed to create udp scheme: {}", err);
@ -495,4 +577,10 @@ fn main() {
process::exit(1); process::exit(1);
} }
} }
},
Err(err) => {
println!("udpd: failed to open {}: {}", time_path, err);
process::exit(1);
}
}
} }