From 9334cb0b78eefba868e7abfcac69d3a06bb5026c Mon Sep 17 00:00:00 2001 From: David Teller Date: Mon, 24 Oct 2016 23:02:57 +0200 Subject: [PATCH] Sharing the message queue between instances of ip:127.0.0.1/*. (#20) The initial version of the IP loopback had one message queue for each connection. This doesn't make sense, we need to share the message queue across all connections. --- schemes/ipd/src/main.rs | 36 ++++++++++++++++++++++++++++++++---- schemes/ipd/src/resource.rs | 21 ++++++++++++--------- schemes/ipd/src/scheme.rs | 14 ++++++++++++-- 3 files changed, 56 insertions(+), 15 deletions(-) diff --git a/schemes/ipd/src/main.rs b/schemes/ipd/src/main.rs index 7c66cff..de36d06 100644 --- a/schemes/ipd/src/main.rs +++ b/schemes/ipd/src/main.rs @@ -58,8 +58,7 @@ fn test() { println!("* Test that we can read a simple packet from the same connection."); let bytes = "TEST".as_bytes(); let mut scheme = IpScheme::new(); - let a = scheme.open("ip:127.0.0.1/11").unwrap(); - let b = scheme.open("ip:127.0.0.1/11").unwrap(); + let a = scheme.open(&"ip:127.0.0.1/11".as_bytes(), 0, 0, 0).unwrap(); let num_bytes_written = scheme.write(a, bytes).unwrap(); assert_eq!(num_bytes_written, bytes.len()); @@ -70,12 +69,41 @@ fn test() { let bytes_read = &buf[0..num_bytes_read]; assert_eq!(bytes, bytes_read); - println!("* Test that we cannot read the same packet from a different connection."); + + + println!("* Test that the loopback is now empty."); + let num_bytes_read = scheme.read(a, &mut buf).unwrap(); + assert_eq!(num_bytes_read, 0); + + + + println!("* Test that we can read the same packet from a different connection."); + let num_bytes_written = scheme.write(a, bytes).unwrap(); + assert_eq!(num_bytes_written, bytes.len()); + + let b = scheme.open("ip:127.0.0.1/11".as_bytes(), 0, 0, 0).unwrap(); + + let num_bytes_read = scheme.read(b, &mut buf).unwrap(); + assert_eq!(num_bytes_read, num_bytes_written); + + let bytes_read = &buf[0..num_bytes_read]; + assert_eq!(bytes, bytes_read); + + + + println!("* Test that the loopback is now empty for both connections."); + + let num_bytes_read = scheme.read(a, &mut buf).unwrap(); + assert_eq!(num_bytes_read, 0); + let num_bytes_read = scheme.read(b, &mut buf).unwrap(); assert_eq!(num_bytes_read, 0); + + + println!("* Push a number of packets, check that we get them in the right order."); - let mut payloads : Vec = 0..100.map(|i| format!("TEST {}", i)).collect(); + let mut payloads : Vec = (0..100).map(|i| format!("TEST {}", i)).collect(); for payload in &payloads { let bytes = payload.into_bytes(); let num_bytes_written = scheme.write(a, &bytes).unwrap(); diff --git a/schemes/ipd/src/resource.rs b/schemes/ipd/src/resource.rs index aad343a..87ea11c 100644 --- a/schemes/ipd/src/resource.rs +++ b/schemes/ipd/src/resource.rs @@ -1,5 +1,8 @@ +use std::cell::RefCell; use std::{cmp, mem}; use std::collections::VecDeque; +use std::rc::Rc; +use std::sync::Mutex; use netutils::{n16, Ipv4Addr, Checksum, Ipv4Header, Ipv4}; use resource_scheme::Resource; @@ -43,11 +46,13 @@ pub enum Connection { init_data: Vec, }, Loopback { - /// FIFO queue of packets written to the connection and waiting to be read. + /// FIFO queue of packets written to the loopback and waiting to be read. /// /// The data stored contains the exact data that has been added by the client /// calling `write()`, without adding any headers. - packets: VecDeque> + /// + /// This buffer is shared between all loopback connections. + packets: Rc>>> } } @@ -63,9 +68,7 @@ impl Resource for IpResource { fn dup(&self) -> Result> { use self::Connection::*; let connection = match self.connection { - Loopback { ref packets } => Loopback { - packets: packets.clone() - }, + Loopback { ref packets }=> Loopback { packets: packets.clone() }, Device { link, ref init_data } => { let link = try!(syscall::dup(link)); let init_data = init_data.clone(); @@ -122,8 +125,8 @@ impl Resource for IpResource { fn read(&mut self, buf: &mut [u8]) -> Result { use self::Connection::*; match self.connection { - Loopback { ref mut packets } => { - match packets.pop_front() { + Loopback { ref packets }=> { + match packets.borrow_mut().pop_front() { None => Ok(0), Some(data) => { for (b, d) in buf.iter_mut().zip(data.iter()) { @@ -176,7 +179,7 @@ impl Resource for IpResource { let ip_data = Vec::from(buf); match self.connection { - Loopback { ref mut packets } => { + Loopback { ref packets } => { // Make sure that we're not going to store data that can't be read. let buf = if buf.len() > MAX_PACKET_LENGTH { @@ -184,7 +187,7 @@ impl Resource for IpResource { } else { buf }; - packets.push_back(buf.to_vec()); + packets.borrow_mut().push_back(buf.to_vec()); return Ok(buf.len()) } Device { link, .. } => { diff --git a/schemes/ipd/src/scheme.rs b/schemes/ipd/src/scheme.rs index 5eda001..8604e54 100644 --- a/schemes/ipd/src/scheme.rs +++ b/schemes/ipd/src/scheme.rs @@ -1,6 +1,7 @@ use std::cell::RefCell; use std::collections::VecDeque; use std::rand; +use std::rc::Rc; use std::{str, u16}; use netutils::{getcfg, n16, MacAddr, Ipv4Addr, ArpHeader, Arp, Ipv4}; @@ -23,12 +24,21 @@ pub struct ArpEntry { /// A IP scheme pub struct IpScheme { pub arp: RefCell>, + + /// FIFO queue of packets written to the loopback and waiting to be read. + /// + /// The data stored contains the exact data that has been added by the client + /// calling `write()`, without adding any headers. + /// + /// This buffer is shared between all loopback connections. + pub loopback_fifo: Rc>>>, } impl IpScheme { pub fn new() -> IpScheme { IpScheme { - arp: RefCell::new(Vec::new()) + arp: RefCell::new(Vec::new()), + loopback_fifo: Rc::new(RefCell::new(VecDeque::new())), } } } @@ -55,7 +65,7 @@ impl ResourceScheme for IpScheme { if peer_addr.equals(LOCALHOST) { return Ok(Box::new(IpResource { connection: Connection::Loopback { - packets: VecDeque::new() + packets: self.loopback_fifo.clone() }, host_addr: ip_addr, peer_addr: peer_addr,