Add support for setting timeouts

This commit is contained in:
Jeremy Soller 2016-12-30 10:11:57 -07:00
parent 762b95e777
commit 3813999a6b

View file

@ -9,13 +9,14 @@ use std::cell::RefCell;
use std::fs::File; use std::fs::File;
use std::io::{self, Read, Write}; use std::io::{self, Read, Write};
use std::{mem, slice, str}; use std::{mem, slice, str};
use std::ops::{Deref, DerefMut};
use std::os::unix::io::FromRawFd; use std::os::unix::io::FromRawFd;
use std::rc::Rc; use std::rc::Rc;
use event::EventQueue; use event::EventQueue;
use netutils::{n16, Ipv4, Ipv4Addr, Ipv4Header, Checksum}; use netutils::{n16, Ipv4, Ipv4Addr, Ipv4Header, Checksum};
use netutils::udp::{Udp, UdpHeader}; use netutils::udp::{Udp, UdpHeader};
use syscall::data::Packet; use syscall::data::{Packet, TimeSpec};
use syscall::error::{Error, Result, EACCES, EADDRINUSE, EBADF, EIO, EINVAL, EMSGSIZE, ENOTCONN, EWOULDBLOCK}; use syscall::error::{Error, Result, EACCES, EADDRINUSE, EBADF, EIO, EINVAL, EMSGSIZE, ENOTCONN, EWOULDBLOCK};
use syscall::flag::{EVENT_READ, F_GETFL, F_SETFL, O_ACCMODE, O_CREAT, O_RDWR, O_NONBLOCK}; use syscall::flag::{EVENT_READ, F_GETFL, F_SETFL, O_ACCMODE, O_CREAT, O_RDWR, O_NONBLOCK};
use syscall::scheme::SchemeMut; use syscall::scheme::SchemeMut;
@ -27,15 +28,29 @@ fn parse_socket(socket: &str) -> (Ipv4Addr, u16) {
(host, port) (host, port)
} }
struct Handle { struct UdpHandle {
local: (Ipv4Addr, u16), local: (Ipv4Addr, u16),
remote: (Ipv4Addr, u16), remote: (Ipv4Addr, u16),
flags: usize, flags: usize,
events: usize, events: usize,
read_timeout: Option<TimeSpec>,
write_timeout: Option<TimeSpec>,
ttl: u8,
data: VecDeque<Vec<u8>>, data: VecDeque<Vec<u8>>,
todo: VecDeque<Packet>, todo: VecDeque<Packet>,
} }
#[derive(Copy, Clone)]
enum SettingKind {
Read,
Write
}
enum Handle {
Udp(UdpHandle),
Setting(usize, SettingKind),
}
struct Udpd { struct Udpd {
scheme_file: File, scheme_file: File,
udp_file: File, udp_file: File,
@ -69,7 +84,9 @@ impl Udpd {
if packet.a == (-EWOULDBLOCK) as usize { if packet.a == (-EWOULDBLOCK) as usize {
packet.a = a; packet.a = a;
if let Some(mut handle) = self.handles.get_mut(&packet.b) { if let Some(mut handle) = self.handles.get_mut(&packet.b) {
handle.todo.push_back(packet); if let Handle::Udp(ref mut handle) = *handle {
handle.todo.push_back(packet);
}
} }
} else { } else {
self.scheme_file.write(&packet)?; self.scheme_file.write(&packet)?;
@ -89,44 +106,46 @@ impl Udpd {
if let Some(ip) = Ipv4::from_bytes(&bytes[.. count]) { if let Some(ip) = Ipv4::from_bytes(&bytes[.. count]) {
if let Some(udp) = Udp::from_bytes(&ip.data) { if let Some(udp) = Udp::from_bytes(&ip.data) {
for (id, handle) in self.handles.iter_mut() { for (id, handle) in self.handles.iter_mut() {
// Local address not set or IP dst matches or is broadcast if let Handle::Udp(ref mut handle) = *handle {
if (handle.local.0 == Ipv4Addr::NULL || ip.header.dst == handle.local.0 || ip.header.dst == Ipv4Addr::BROADCAST) // Local address not set or IP dst matches or is broadcast
// Local port matches UDP dst if (handle.local.0 == Ipv4Addr::NULL || ip.header.dst == handle.local.0 || ip.header.dst == Ipv4Addr::BROADCAST)
&& udp.header.dst.get() == handle.local.1 // Local port matches UDP dst
// Remote address not set or is broadcast, or IP src matches && udp.header.dst.get() == handle.local.1
&& (handle.remote.0 == Ipv4Addr::NULL || handle.remote.0 == Ipv4Addr::BROADCAST || ip.header.src == handle.remote.0) // Remote address not set or is broadcast, or IP src matches
// Remote port not set or UDP src matches && (handle.remote.0 == Ipv4Addr::NULL || handle.remote.0 == Ipv4Addr::BROADCAST || ip.header.src == handle.remote.0)
&& (handle.remote.1 == 0 || udp.header.src.get() == handle.remote.1) // Remote port not set or UDP src matches
{ && (handle.remote.1 == 0 || udp.header.src.get() == handle.remote.1)
handle.data.push_back(udp.data.clone()); {
handle.data.push_back(udp.data.clone());
while ! handle.todo.is_empty() && ! handle.data.is_empty() { while ! handle.todo.is_empty() && ! handle.data.is_empty() {
let mut packet = handle.todo.pop_front().unwrap(); let mut packet = handle.todo.pop_front().unwrap();
let buf = unsafe { slice::from_raw_parts_mut(packet.c as *mut u8, packet.d) }; let buf = unsafe { slice::from_raw_parts_mut(packet.c as *mut u8, packet.d) };
let data = handle.data.pop_front().unwrap(); let data = handle.data.pop_front().unwrap();
let mut i = 0; let mut i = 0;
while i < buf.len() && i < data.len() { while i < buf.len() && i < data.len() {
buf[i] = data[i]; buf[i] = data[i];
i += 1; i += 1;
}
packet.a = i;
self.scheme_file.write(&packet)?;
} }
packet.a = i;
self.scheme_file.write(&packet)?; if handle.events & EVENT_READ == EVENT_READ {
} if let Some(data) = handle.data.get(0) {
self.scheme_file.write(&Packet {
if handle.events & EVENT_READ == EVENT_READ { id: 0,
if let Some(data) = handle.data.get(0) { pid: 0,
self.scheme_file.write(&Packet { uid: 0,
id: 0, gid: 0,
pid: 0, a: syscall::number::SYS_FEVENT,
uid: 0, b: *id,
gid: 0, c: EVENT_READ,
a: syscall::number::SYS_FEVENT, d: data.len()
b: *id, })?;
c: EVENT_READ, }
d: data.len()
})?;
} }
} }
} }
@ -164,41 +183,59 @@ impl SchemeMut for Udpd {
let id = self.next_id; let id = self.next_id;
self.next_id += 1; self.next_id += 1;
self.handles.insert(id, Handle { self.handles.insert(id, Handle::Udp(UdpHandle {
local: local, local: local,
remote: remote, remote: remote,
flags: flags, flags: flags,
events: 0, events: 0,
ttl: 64,
read_timeout: None,
write_timeout: None,
data: VecDeque::new(), data: VecDeque::new(),
todo: VecDeque::new(), todo: VecDeque::new(),
}); }));
Ok(id) Ok(id)
} }
fn dup(&mut self, file: usize, buf: &[u8]) -> Result<usize> { fn dup(&mut self, file: usize, buf: &[u8]) -> Result<usize> {
let mut handle = { let handle = match *self.handles.get(&file).ok_or(Error::new(EBADF))? {
let handle = self.handles.get(&file).ok_or(Error::new(EBADF))?; Handle::Udp(ref handle) => {
Handle { let mut handle = UdpHandle {
local: handle.local, local: handle.local,
remote: handle.remote, remote: handle.remote,
flags: handle.flags, flags: handle.flags,
events: 0, events: 0,
data: handle.data.clone(), ttl: handle.ttl,
todo: VecDeque::new(), read_timeout: handle.read_timeout,
write_timeout: handle.write_timeout,
data: handle.data.clone(),
todo: VecDeque::new(),
};
let path = str::from_utf8(buf).or(Err(Error::new(EINVAL)))?;
if path == "read_timeout" {
Handle::Setting(file, SettingKind::Read)
} else if path == "write_timeout" {
Handle::Setting(file, SettingKind::Write)
} else {
if handle.remote.0 == Ipv4Addr::NULL || handle.remote.1 == 0 {
handle.remote = parse_socket(path);
}
if let Some(mut port) = self.ports.get_mut(&handle.local.1) {
*port = *port + 1;
}
Handle::Udp(handle)
}
},
Handle::Setting(file, kind) => {
Handle::Setting(file, kind)
} }
}; };
let path = str::from_utf8(buf).or(Err(Error::new(EINVAL)))?;
if handle.remote.0 == Ipv4Addr::NULL || handle.remote.1 == 0 {
handle.remote = parse_socket(path);
}
if let Some(mut port) = self.ports.get_mut(&handle.local.1) {
*port = *port + 1;
}
let id = self.next_id; let id = self.next_id;
self.next_id += 1; self.next_id += 1;
@ -208,102 +245,153 @@ impl SchemeMut for Udpd {
} }
fn read(&mut self, file: usize, buf: &mut [u8]) -> Result<usize> { fn read(&mut self, file: usize, buf: &mut [u8]) -> Result<usize> {
let mut handle = self.handles.get_mut(&file).ok_or(Error::new(EBADF))?; let (file, kind) = match *self.handles.get_mut(&file).ok_or(Error::new(EBADF))? {
Handle::Udp(ref mut handle) => {
if handle.remote.0 == Ipv4Addr::NULL || handle.remote.1 == 0 {
return Err(Error::new(ENOTCONN));
} else if let Some(data) = handle.data.pop_front() {
let mut i = 0;
while i < buf.len() && i < data.len() {
buf[i] = data[i];
i += 1;
}
if handle.remote.0 == Ipv4Addr::NULL || handle.remote.1 == 0 { return Ok(i);
Err(Error::new(ENOTCONN)) } else if handle.flags & O_NONBLOCK == O_NONBLOCK {
} else if let Some(data) = handle.data.pop_front() { return Ok(0);
let mut i = 0; } else {
while i < buf.len() && i < data.len() { return Err(Error::new(EWOULDBLOCK));
buf[i] = data[i]; }
i += 1; },
Handle::Setting(file, kind) => {
(file, kind)
} }
};
Ok(i) if let Handle::Udp(ref mut handle) = *self.handles.get_mut(&file).ok_or(Error::new(EBADF))? {
} else if handle.flags & O_NONBLOCK == O_NONBLOCK { let timeout = match kind {
Ok(0) SettingKind::Read => handle.read_timeout,
SettingKind::Write => handle.write_timeout
};
let count = if let Some(timespec) = timeout {
timespec.deref().read(buf).map_err(|err| Error::new(err.raw_os_error().unwrap_or(EIO)))?
} else {
0
};
Ok(count)
} else { } else {
Err(Error::new(EWOULDBLOCK)) Err(Error::new(EBADF))
} }
} }
fn write(&mut self, file: usize, buf: &[u8]) -> Result<usize> { fn write(&mut self, file: usize, buf: &[u8]) -> Result<usize> {
let handle = self.handles.get(&file).ok_or(Error::new(EBADF))?; let (file, kind) = match *self.handles.get(&file).ok_or(Error::new(EBADF))? {
Handle::Udp(ref handle) => {
if handle.remote.0 == Ipv4Addr::NULL || handle.remote.1 == 0 {
return Err(Error::new(ENOTCONN));
} else if buf.len() >= 65507 {
return Err(Error::new(EMSGSIZE));
} else {
let udp_data = buf.to_vec();
if handle.remote.0 == Ipv4Addr::NULL || handle.remote.1 == 0 { let udp = Udp {
Err(Error::new(ENOTCONN)) header: UdpHeader {
} else if buf.len() >= 65507 { src: n16::new(handle.local.1),
Err(Error::new(EMSGSIZE)) dst: n16::new(handle.remote.1),
len: n16::new((udp_data.len() + mem::size_of::<UdpHeader>()) as u16),
checksum: Checksum { data: 0 }
},
data: udp_data
};
let ip_data = udp.to_bytes();
let ip = Ipv4 {
header: Ipv4Header {
ver_hlen: 0x45,
services: 0,
len: n16::new((ip_data.len() + mem::size_of::<Ipv4Header>()) as u16),
id: n16::new(self.rng.gen()),
flags_fragment: n16::new(0),
ttl: handle.ttl,
proto: 0x11,
checksum: Checksum { data: 0 },
src: handle.local.0,
dst: handle.remote.0
},
options: Vec::new(),
data: ip_data
};
return self.udp_file.write(&ip.to_bytes()).map_err(|err| Error::new(err.raw_os_error().unwrap_or(EIO))).and(Ok(buf.len()));
}
},
Handle::Setting(file, kind) => {
(file, kind)
}
};
if let Handle::Udp(ref mut handle) = *self.handles.get_mut(&file).ok_or(Error::new(EBADF))? {
let (count, timeout) = if buf.len() >= mem::size_of::<TimeSpec>() {
let mut timespec = TimeSpec::default();
let count = timespec.deref_mut().write(buf).map_err(|err| Error::new(err.raw_os_error().unwrap_or(EIO)))?;
(count, Some(timespec))
} else {
(0, None)
};
match kind {
SettingKind::Read => handle.read_timeout = timeout,
SettingKind::Write => handle.write_timeout = timeout
}
Ok(count)
} else { } else {
let udp_data = buf.to_vec(); Err(Error::new(EBADF))
let udp = Udp {
header: UdpHeader {
src: n16::new(handle.local.1),
dst: n16::new(handle.remote.1),
len: n16::new((udp_data.len() + mem::size_of::<UdpHeader>()) as u16),
checksum: Checksum { data: 0 }
},
data: udp_data
};
let ip_data = udp.to_bytes();
let ip = Ipv4 {
header: Ipv4Header {
ver_hlen: 0x45,
services: 0,
len: n16::new((ip_data.len() + mem::size_of::<Ipv4Header>()) as u16),
id: n16::new(self.rng.gen()),
flags_fragment: n16::new(0),
ttl: 127,
proto: 0x11,
checksum: Checksum { data: 0 },
src: handle.local.0,
dst: handle.remote.0
},
options: Vec::new(),
data: ip_data
};
self.udp_file.write(&ip.to_bytes()).map_err(|err| Error::new(err.raw_os_error().unwrap_or(EIO))).and(Ok(buf.len()))
} }
} }
fn fcntl(&mut self, file: usize, cmd: usize, arg: usize) -> Result<usize> { fn fcntl(&mut self, file: usize, cmd: usize, arg: usize) -> Result<usize> {
let mut handle = self.handles.get_mut(&file).ok_or(Error::new(EBADF))?; if let Handle::Udp(ref mut handle) = *self.handles.get_mut(&file).ok_or(Error::new(EBADF))? {
match cmd {
match cmd { F_GETFL => Ok(handle.flags),
F_GETFL => Ok(handle.flags), F_SETFL => {
F_SETFL => { handle.flags = arg & ! O_ACCMODE;
handle.flags = arg & ! O_ACCMODE; Ok(0)
Ok(0) },
}, _ => Err(Error::new(EINVAL))
_ => Err(Error::new(EINVAL)) }
} else {
Err(Error::new(EBADF))
} }
} }
fn fevent(&mut self, file: usize, flags: usize) -> Result<usize> { fn fevent(&mut self, file: usize, flags: usize) -> Result<usize> {
let mut handle = self.handles.get_mut(&file).ok_or(Error::new(EBADF))?; if let Handle::Udp(ref mut handle) = *self.handles.get_mut(&file).ok_or(Error::new(EBADF))? {
handle.events = flags;
handle.events = flags; Ok(file)
} else {
Ok(file) Err(Error::new(EBADF))
}
} }
fn fpath(&mut self, id: usize, buf: &mut [u8]) -> Result<usize> { fn fpath(&mut self, file: usize, buf: &mut [u8]) -> Result<usize> {
let handle = self.handles.get(&id).ok_or(Error::new(EBADF))?; if let Handle::Udp(ref mut handle) = *self.handles.get_mut(&file).ok_or(Error::new(EBADF))? {
let path_string = format!("udp:{}:{}/{}:{}", handle.remote.0.to_string(), handle.remote.1, handle.local.0.to_string(), handle.local.1);
let path = path_string.as_bytes();
let path_string = format!("udp:{}:{}/{}:{}", handle.remote.0.to_string(), handle.remote.1, handle.local.0.to_string(), handle.local.1); let mut i = 0;
let path = path_string.as_bytes(); while i < buf.len() && i < path.len() {
buf[i] = path[i];
i += 1;
}
let mut i = 0; Ok(i)
while i < buf.len() && i < path.len() { } else {
buf[i] = path[i]; Err(Error::new(EBADF))
i += 1;
} }
Ok(i)
} }
fn fsync(&mut self, file: usize) -> Result<usize> { fn fsync(&mut self, file: usize) -> Result<usize> {
@ -315,15 +403,17 @@ impl SchemeMut for Udpd {
fn close(&mut self, file: usize) -> Result<usize> { fn close(&mut self, file: usize) -> Result<usize> {
let handle = self.handles.remove(&file).ok_or(Error::new(EBADF))?; let handle = self.handles.remove(&file).ok_or(Error::new(EBADF))?;
let remove = if let Some(mut port) = self.ports.get_mut(&handle.local.1) { if let Handle::Udp(ref handle) = handle {
*port = *port + 1; let remove = if let Some(mut port) = self.ports.get_mut(&handle.local.1) {
*port == 0 *port = *port + 1;
} else { *port == 0
false } else {
}; false
};
if remove { if remove {
drop(self.ports.remove(&handle.local.1)); drop(self.ports.remove(&handle.local.1));
}
} }
drop(handle); drop(handle);