Sharing the message queue between instances of ip:127.0.0.1/*. (#20)

The initial version of the IP loopback had one message queue for each
connection. This doesn't make sense, we need to share the message
queue across all connections.
This commit is contained in:
David Teller 2016-10-24 23:02:57 +02:00 committed by Jeremy Soller
parent a5f59c8774
commit 9334cb0b78
3 changed files with 56 additions and 15 deletions

View file

@ -58,8 +58,7 @@ fn test() {
println!("* Test that we can read a simple packet from the same connection."); println!("* Test that we can read a simple packet from the same connection.");
let bytes = "TEST".as_bytes(); let bytes = "TEST".as_bytes();
let mut scheme = IpScheme::new(); let mut scheme = IpScheme::new();
let a = scheme.open("ip:127.0.0.1/11").unwrap(); let a = scheme.open(&"ip:127.0.0.1/11".as_bytes(), 0, 0, 0).unwrap();
let b = scheme.open("ip:127.0.0.1/11").unwrap();
let num_bytes_written = scheme.write(a, bytes).unwrap(); let num_bytes_written = scheme.write(a, bytes).unwrap();
assert_eq!(num_bytes_written, bytes.len()); assert_eq!(num_bytes_written, bytes.len());
@ -70,12 +69,41 @@ fn test() {
let bytes_read = &buf[0..num_bytes_read]; let bytes_read = &buf[0..num_bytes_read];
assert_eq!(bytes, bytes_read); assert_eq!(bytes, bytes_read);
println!("* Test that we cannot read the same packet from a different connection.");
println!("* Test that the loopback is now empty.");
let num_bytes_read = scheme.read(a, &mut buf).unwrap();
assert_eq!(num_bytes_read, 0);
println!("* Test that we can read the same packet from a different connection.");
let num_bytes_written = scheme.write(a, bytes).unwrap();
assert_eq!(num_bytes_written, bytes.len());
let b = scheme.open("ip:127.0.0.1/11".as_bytes(), 0, 0, 0).unwrap();
let num_bytes_read = scheme.read(b, &mut buf).unwrap();
assert_eq!(num_bytes_read, num_bytes_written);
let bytes_read = &buf[0..num_bytes_read];
assert_eq!(bytes, bytes_read);
println!("* Test that the loopback is now empty for both connections.");
let num_bytes_read = scheme.read(a, &mut buf).unwrap();
assert_eq!(num_bytes_read, 0);
let num_bytes_read = scheme.read(b, &mut buf).unwrap(); let num_bytes_read = scheme.read(b, &mut buf).unwrap();
assert_eq!(num_bytes_read, 0); assert_eq!(num_bytes_read, 0);
println!("* Push a number of packets, check that we get them in the right order."); println!("* Push a number of packets, check that we get them in the right order.");
let mut payloads : Vec<String> = 0..100.map(|i| format!("TEST {}", i)).collect(); let mut payloads : Vec<String> = (0..100).map(|i| format!("TEST {}", i)).collect();
for payload in &payloads { for payload in &payloads {
let bytes = payload.into_bytes(); let bytes = payload.into_bytes();
let num_bytes_written = scheme.write(a, &bytes).unwrap(); let num_bytes_written = scheme.write(a, &bytes).unwrap();

View file

@ -1,5 +1,8 @@
use std::cell::RefCell;
use std::{cmp, mem}; use std::{cmp, mem};
use std::collections::VecDeque; use std::collections::VecDeque;
use std::rc::Rc;
use std::sync::Mutex;
use netutils::{n16, Ipv4Addr, Checksum, Ipv4Header, Ipv4}; use netutils::{n16, Ipv4Addr, Checksum, Ipv4Header, Ipv4};
use resource_scheme::Resource; use resource_scheme::Resource;
@ -43,11 +46,13 @@ pub enum Connection {
init_data: Vec<u8>, init_data: Vec<u8>,
}, },
Loopback { Loopback {
/// FIFO queue of packets written to the connection and waiting to be read. /// FIFO queue of packets written to the loopback and waiting to be read.
/// ///
/// The data stored contains the exact data that has been added by the client /// The data stored contains the exact data that has been added by the client
/// calling `write()`, without adding any headers. /// calling `write()`, without adding any headers.
packets: VecDeque<Vec<u8>> ///
/// This buffer is shared between all loopback connections.
packets: Rc<RefCell<VecDeque<Vec<u8>>>>
} }
} }
@ -63,9 +68,7 @@ impl Resource for IpResource {
fn dup(&self) -> Result<Box<Self>> { fn dup(&self) -> Result<Box<Self>> {
use self::Connection::*; use self::Connection::*;
let connection = match self.connection { let connection = match self.connection {
Loopback { ref packets } => Loopback { Loopback { ref packets }=> Loopback { packets: packets.clone() },
packets: packets.clone()
},
Device { link, ref init_data } => { Device { link, ref init_data } => {
let link = try!(syscall::dup(link)); let link = try!(syscall::dup(link));
let init_data = init_data.clone(); let init_data = init_data.clone();
@ -122,8 +125,8 @@ impl Resource for IpResource {
fn read(&mut self, buf: &mut [u8]) -> Result<usize> { fn read(&mut self, buf: &mut [u8]) -> Result<usize> {
use self::Connection::*; use self::Connection::*;
match self.connection { match self.connection {
Loopback { ref mut packets } => { Loopback { ref packets }=> {
match packets.pop_front() { match packets.borrow_mut().pop_front() {
None => Ok(0), None => Ok(0),
Some(data) => { Some(data) => {
for (b, d) in buf.iter_mut().zip(data.iter()) { for (b, d) in buf.iter_mut().zip(data.iter()) {
@ -176,7 +179,7 @@ impl Resource for IpResource {
let ip_data = Vec::from(buf); let ip_data = Vec::from(buf);
match self.connection { match self.connection {
Loopback { ref mut packets } => { Loopback { ref packets } => {
// Make sure that we're not going to store data that can't be read. // Make sure that we're not going to store data that can't be read.
let buf = let buf =
if buf.len() > MAX_PACKET_LENGTH { if buf.len() > MAX_PACKET_LENGTH {
@ -184,7 +187,7 @@ impl Resource for IpResource {
} else { } else {
buf buf
}; };
packets.push_back(buf.to_vec()); packets.borrow_mut().push_back(buf.to_vec());
return Ok(buf.len()) return Ok(buf.len())
} }
Device { link, .. } => { Device { link, .. } => {

View file

@ -1,6 +1,7 @@
use std::cell::RefCell; use std::cell::RefCell;
use std::collections::VecDeque; use std::collections::VecDeque;
use std::rand; use std::rand;
use std::rc::Rc;
use std::{str, u16}; use std::{str, u16};
use netutils::{getcfg, n16, MacAddr, Ipv4Addr, ArpHeader, Arp, Ipv4}; use netutils::{getcfg, n16, MacAddr, Ipv4Addr, ArpHeader, Arp, Ipv4};
@ -23,12 +24,21 @@ pub struct ArpEntry {
/// A IP scheme /// A IP scheme
pub struct IpScheme { pub struct IpScheme {
pub arp: RefCell<Vec<ArpEntry>>, pub arp: RefCell<Vec<ArpEntry>>,
/// FIFO queue of packets written to the loopback and waiting to be read.
///
/// The data stored contains the exact data that has been added by the client
/// calling `write()`, without adding any headers.
///
/// This buffer is shared between all loopback connections.
pub loopback_fifo: Rc<RefCell<VecDeque<Vec<u8>>>>,
} }
impl IpScheme { impl IpScheme {
pub fn new() -> IpScheme { pub fn new() -> IpScheme {
IpScheme { IpScheme {
arp: RefCell::new(Vec::new()) arp: RefCell::new(Vec::new()),
loopback_fifo: Rc::new(RefCell::new(VecDeque::new())),
} }
} }
} }
@ -55,7 +65,7 @@ impl ResourceScheme<IpResource> for IpScheme {
if peer_addr.equals(LOCALHOST) { if peer_addr.equals(LOCALHOST) {
return Ok(Box::new(IpResource { return Ok(Box::new(IpResource {
connection: Connection::Loopback { connection: Connection::Loopback {
packets: VecDeque::new() packets: self.loopback_fifo.clone()
}, },
host_addr: ip_addr, host_addr: ip_addr,
peer_addr: peer_addr, peer_addr: peer_addr,