diff --git a/schemes/tcpd/src/main.rs b/schemes/tcpd/src/main.rs index 7eed94e..8d124d8 100644 --- a/schemes/tcpd/src/main.rs +++ b/schemes/tcpd/src/main.rs @@ -173,9 +173,6 @@ impl Tcpd { break; } - let mut time = TimeSpec::default(); - syscall::clock_gettime(CLOCK_MONOTONIC, &mut time).map_err(|err| io::Error::from_raw_os_error(err.errno))?; - let a = packet.a; self.handle(&mut packet); if packet.a == (-EWOULDBLOCK) as usize { @@ -191,6 +188,9 @@ impl Tcpd { let timeout = match handle.read_timeout { Some(read_timeout) => { + let mut time = TimeSpec::default(); + syscall::clock_gettime(CLOCK_MONOTONIC, &mut time).map_err(|err| io::Error::from_raw_os_error(err.errno))?; + let timeout = add_time(&time, &read_timeout); self.time_file.write(&timeout)?; Some(timeout) @@ -205,6 +205,9 @@ impl Tcpd { let timeout = match handle.write_timeout { Some(write_timeout) => { + let mut time = TimeSpec::default(); + syscall::clock_gettime(CLOCK_MONOTONIC, &mut time).map_err(|err| io::Error::from_raw_os_error(err.errno))?; + let timeout = add_time(&time, &write_timeout); self.time_file.write(&timeout)?; Some(timeout) @@ -677,7 +680,7 @@ impl SchemeMut for Tcpd { }; if let Handle::Tcp(ref mut handle) = *self.handles.get_mut(&file).ok_or(Error::new(EBADF))? { - let read_timeout = |timeout: &Option, buf: &mut [u8]| -> Result { + let get_timeout = |timeout: &Option, buf: &mut [u8]| -> Result { if let Some(ref timespec) = *timeout { timespec.deref().read(buf).map_err(|err| Error::new(err.raw_os_error().unwrap_or(EIO))) } else { @@ -695,10 +698,10 @@ impl SchemeMut for Tcpd { } }, SettingKind::ReadTimeout => { - read_timeout(&handle.read_timeout, buf) + get_timeout(&handle.read_timeout, buf) }, SettingKind::WriteTimeout => { - read_timeout(&handle.write_timeout, buf) + get_timeout(&handle.write_timeout, buf) } } } else { @@ -734,7 +737,7 @@ impl SchemeMut for Tcpd { }; if let Handle::Tcp(ref mut handle) = *self.handles.get_mut(&file).ok_or(Error::new(EBADF))? { - let write_timeout = |timeout: &mut Option, buf: &[u8]| -> Result { + let set_timeout = |timeout: &mut Option, buf: &[u8]| -> Result { if buf.len() >= mem::size_of::() { let mut timespec = TimeSpec::default(); let count = timespec.deref_mut().write(buf).map_err(|err| Error::new(err.raw_os_error().unwrap_or(EIO)))?; @@ -756,10 +759,10 @@ impl SchemeMut for Tcpd { } }, SettingKind::ReadTimeout => { - write_timeout(&mut handle.read_timeout, buf) + set_timeout(&mut handle.read_timeout, buf) }, SettingKind::WriteTimeout => { - write_timeout(&mut handle.write_timeout, buf) + set_timeout(&mut handle.write_timeout, buf) } } } else { diff --git a/schemes/udpd/src/main.rs b/schemes/udpd/src/main.rs index 0715105..23d6479 100644 --- a/schemes/udpd/src/main.rs +++ b/schemes/udpd/src/main.rs @@ -17,10 +17,26 @@ use event::EventQueue; use netutils::{n16, Ipv4, Ipv4Addr, Ipv4Header, Checksum}; use netutils::udp::{Udp, UdpHeader}; use syscall::data::{Packet, TimeSpec}; -use syscall::error::{Error, Result, EACCES, EADDRINUSE, EBADF, EIO, EINVAL, EMSGSIZE, ENOTCONN, EWOULDBLOCK}; -use syscall::flag::{EVENT_READ, F_GETFL, F_SETFL, O_ACCMODE, O_CREAT, O_RDWR, O_NONBLOCK}; +use syscall::error::{Error, Result, EACCES, EADDRINUSE, EBADF, EIO, EINVAL, EMSGSIZE, ENOTCONN, ETIMEDOUT, EWOULDBLOCK}; +use syscall::flag::{CLOCK_MONOTONIC, EVENT_READ, F_GETFL, F_SETFL, O_ACCMODE, O_CREAT, O_RDWR, O_NONBLOCK}; +use syscall::number::{SYS_READ, SYS_WRITE}; use syscall::scheme::SchemeMut; +fn add_time(a: &TimeSpec, b: &TimeSpec) -> TimeSpec { + let mut secs = a.tv_sec + b.tv_sec; + + let mut nsecs = a.tv_nsec + b.tv_nsec; + while nsecs >= 1000000000 { + nsecs -= 1000000000; + secs += 1; + } + + TimeSpec { + tv_sec: secs, + tv_nsec: nsecs + } +} + fn parse_socket(socket: &str) -> (Ipv4Addr, u16) { let mut socket_parts = socket.split(":"); let host = Ipv4Addr::from_str(socket_parts.next().unwrap_or("")); @@ -37,7 +53,7 @@ struct UdpHandle { write_timeout: Option, ttl: u8, data: VecDeque>, - todo: VecDeque, + todo: VecDeque<(Option, Packet)>, } #[derive(Copy, Clone)] @@ -55,6 +71,7 @@ enum Handle { struct Udpd { scheme_file: File, udp_file: File, + time_file: File, ports: BTreeMap, next_id: usize, handles: BTreeMap, @@ -62,10 +79,11 @@ struct Udpd { } impl Udpd { - fn new(scheme_file: File, udp_file: File) -> Self { + fn new(scheme_file: File, udp_file: File, time_file: File) -> Self { Udpd { scheme_file: scheme_file, udp_file: udp_file, + time_file: time_file, ports: BTreeMap::new(), next_id: 1, handles: BTreeMap::new(), @@ -86,7 +104,33 @@ impl Udpd { packet.a = a; if let Some(mut handle) = self.handles.get_mut(&packet.b) { if let Handle::Udp(ref mut handle) = *handle { - handle.todo.push_back(packet); + let timeout = match packet.a { + SYS_READ => match handle.read_timeout { + Some(read_timeout) => { + let mut time = TimeSpec::default(); + syscall::clock_gettime(CLOCK_MONOTONIC, &mut time).map_err(|err| io::Error::from_raw_os_error(err.errno))?; + + let timeout = add_time(&time, &read_timeout); + self.time_file.write(&timeout)?; + Some(timeout) + }, + None => None + }, + SYS_WRITE => match handle.write_timeout { + Some(write_timeout) => { + let mut time = TimeSpec::default(); + syscall::clock_gettime(CLOCK_MONOTONIC, &mut time).map_err(|err| io::Error::from_raw_os_error(err.errno))?; + + let timeout = add_time(&time, &write_timeout); + self.time_file.write(&timeout)?; + Some(timeout) + }, + None => None + }, + _ => None + }; + + handle.todo.push_back((timeout, packet)); } } } else { @@ -120,7 +164,7 @@ impl Udpd { handle.data.push_back(udp.data.clone()); while ! handle.todo.is_empty() && ! handle.data.is_empty() { - let mut packet = handle.todo.pop_front().unwrap(); + let (_timeout, mut packet) = handle.todo.pop_front().unwrap(); let buf = unsafe { slice::from_raw_parts_mut(packet.c as *mut u8, packet.d) }; let data = handle.data.pop_front().unwrap(); @@ -157,6 +201,34 @@ impl Udpd { Ok(()) } + + fn time_event(&mut self) -> io::Result<()> { + let mut time = TimeSpec::default(); + if self.time_file.read(&mut time)? < mem::size_of::() { + return Err(io::Error::from_raw_os_error(EINVAL)); + } + + for (_id, handle) in self.handles.iter_mut() { + if let Handle::Udp(ref mut handle) = *handle { + let mut i = 0; + while i < handle.todo.len() { + if let Some(timeout) = handle.todo.get(i).map(|e| e.0.clone()).unwrap_or(None) { + if time.tv_sec > timeout.tv_sec || (time.tv_sec == timeout.tv_sec && time.tv_nsec >= timeout.tv_nsec) { + let (_timeout, mut packet) = handle.todo.remove(i).unwrap(); + packet.a = (-ETIMEDOUT) as usize; + self.scheme_file.write(&packet)?; + } else { + i += 1; + } + } else { + i += 1; + } + } + } + } + + Ok(()) + } } impl SchemeMut for Udpd { @@ -272,7 +344,7 @@ impl SchemeMut for Udpd { }; if let Handle::Udp(ref mut handle) = *self.handles.get_mut(&file).ok_or(Error::new(EBADF))? { - let read_timeout = |timeout: &Option, buf: &mut [u8]| -> Result { + let get_timeout = |timeout: &Option, buf: &mut [u8]| -> Result { if let Some(ref timespec) = *timeout { timespec.deref().read(buf).map_err(|err| Error::new(err.raw_os_error().unwrap_or(EIO))) } else { @@ -290,10 +362,10 @@ impl SchemeMut for Udpd { } }, SettingKind::ReadTimeout => { - read_timeout(&handle.read_timeout, buf) + get_timeout(&handle.read_timeout, buf) }, SettingKind::WriteTimeout => { - read_timeout(&handle.write_timeout, buf) + get_timeout(&handle.write_timeout, buf) } } } else { @@ -349,7 +421,7 @@ impl SchemeMut for Udpd { }; if let Handle::Udp(ref mut handle) = *self.handles.get_mut(&file).ok_or(Error::new(EBADF))? { - let write_timeout = |timeout: &mut Option, buf: &[u8]| -> Result { + let set_timeout = |timeout: &mut Option, buf: &[u8]| -> Result { if buf.len() >= mem::size_of::() { let mut timespec = TimeSpec::default(); let count = timespec.deref_mut().write(buf).map_err(|err| Error::new(err.raw_os_error().unwrap_or(EIO)))?; @@ -371,10 +443,10 @@ impl SchemeMut for Udpd { } }, SettingKind::ReadTimeout => { - write_timeout(&mut handle.read_timeout, buf) + set_timeout(&mut handle.read_timeout, buf) }, SettingKind::WriteTimeout => { - write_timeout(&mut handle.write_timeout, buf) + set_timeout(&mut handle.write_timeout, buf) } } } else { @@ -450,14 +522,21 @@ impl SchemeMut for Udpd { Ok(0) } } -fn daemon(udp_fd: usize, scheme_fd: usize) { - let udp_file = unsafe { File::from_raw_fd(udp_fd) }; +fn daemon(scheme_fd: usize, udp_fd: usize, time_fd: usize) { let scheme_file = unsafe { File::from_raw_fd(scheme_fd) }; + let udp_file = unsafe { File::from_raw_fd(udp_fd) }; + let time_file = unsafe { File::from_raw_fd(time_fd) }; - let udpd = Rc::new(RefCell::new(Udpd::new(scheme_file, udp_file))); + let udpd = Rc::new(RefCell::new(Udpd::new(scheme_file, udp_file, time_file))); let mut event_queue = EventQueue::<()>::new().expect("udpd: failed to create event queue"); + let time_udpd = udpd.clone(); + event_queue.add(time_fd, move |_count: usize| -> io::Result> { + time_udpd.borrow_mut().time_event()?; + Ok(None) + }).expect("udpd: failed to listen to events on time:"); + let udp_udpd = udpd.clone(); event_queue.add(udp_fd, move |_count: usize| -> io::Result> { udp_udpd.borrow_mut().udp_event()?; @@ -475,23 +554,32 @@ fn daemon(udp_fd: usize, scheme_fd: usize) { } fn main() { - match syscall::open("ip:11", O_RDWR | O_NONBLOCK) { - Ok(udp_fd) => { - // Daemonize - if unsafe { syscall::clone(0).unwrap() } == 0 { - match syscall::open(":udp", O_RDWR | O_CREAT | O_NONBLOCK) { - Ok(scheme_fd) => { - daemon(udp_fd, scheme_fd); - }, - Err(err) => { - println!("udpd: failed to create udp scheme: {}", err); - process::exit(1); + let time_path = format!("time:{}", CLOCK_MONOTONIC); + match syscall::open(&time_path, O_RDWR) { + Ok(time_fd) => { + match syscall::open("ip:11", O_RDWR | O_NONBLOCK) { + Ok(udp_fd) => { + // Daemonize + if unsafe { syscall::clone(0).unwrap() } == 0 { + match syscall::open(":udp", O_RDWR | O_CREAT | O_NONBLOCK) { + Ok(scheme_fd) => { + daemon(scheme_fd, udp_fd, time_fd); + }, + Err(err) => { + println!("udpd: failed to create udp scheme: {}", err); + process::exit(1); + } + } } + }, + Err(err) => { + println!("udpd: failed to open ip:11: {}", err); + process::exit(1); } } }, Err(err) => { - println!("udpd: failed to open ip:11: {}", err); + println!("udpd: failed to open {}: {}", time_path, err); process::exit(1); } }