Fixed read, when data was exactly same as payload with a read_timeout. Not the best solution, but it is ok for the first version.

This commit is contained in:
Martin Slot 2023-11-28 12:19:00 +01:00
parent 155b25f69e
commit 66ddc37a39
4 changed files with 184 additions and 9 deletions

View File

@ -1,5 +1,6 @@
use prost::Message;
use std::sync::Mutex;
use std::time::Duration;
use std::{
io::{Cursor, Write},
net::{TcpListener, TcpStream},
@ -64,8 +65,8 @@ impl Host {
}
let mut reader = StreamReader::new(StreamReaderOptions {
limit: 512,
read_size: 512,
limit: 1024,
read_size: 128,
});
let mut stream = match stream {
@ -73,6 +74,11 @@ impl Host {
Err(e) => return Err(HostError::ReadError),
};
match stream.set_read_timeout(Some(Duration::new(1, 0))) {
Ok(_) => {}
Err(_) => return Err(HostError::ReadError),
};
let bytes = reader.read(&mut stream);
let bytes = match bytes {

View File

@ -26,11 +26,12 @@ impl StreamReader {
pub fn read<T: Read>(&mut self, stream: &mut T) -> Result<Vec<u8>, StreamReaderError> {
let mut total_size = 0;
loop {
let mut buffer = vec![0u8; self.read_size];
let read_bytes_len = match stream.read(&mut buffer) {
Ok(len) => len,
Err(e) => return Err(StreamReaderError::ReadError),
Err(_) => break, // we return the buffer read - the read_timeout is set to 1 sec
};
total_size += read_bytes_len;
@ -39,7 +40,7 @@ impl StreamReader {
return Err(StreamReaderError::LimitReached);
}
// we are done reading, because client closed connection,
// we are done reading, because client closed connection (on Linux),
// according to https://doc.rust-lang.org/stable/std/io/trait.Read.html#tymethod.read
if read_bytes_len == 0 {
break;

View File

@ -32,7 +32,7 @@ impl StreamClient {
)) {
Ok(mut stream) => match stream.write(&bytes) {
Ok(size) => {
let mut buf = Vec::with_capacity(size); // TODO: this needs to be more dynamic
let mut buf = Vec::with_capacity(size);
match stream.read_to_end(&mut buf) {
// TODO: what happens if there is nothing to read?
Ok(_) => Ok(buf.to_vec()),

View File

@ -13,7 +13,7 @@ use prost::Message;
#[test]
fn does_not_start_when_calling_stop_before_start() {
let mut host = Host::new(ConnectOptions::new(String::from("127.0.0.1"), 6664));
let mut host = Host::new(ConnectOptions::new(String::from("127.0.0.1"), 6661));
match host.stop() {
Ok(_) => println!("host stopped"),
@ -30,7 +30,7 @@ fn does_not_start_when_calling_stop_before_start() {
#[test]
fn can_start_and_stop_gracefully_after_entering_listening() {
let mut host = Host::new(ConnectOptions::new(String::from("127.0.0.1"), 6665));
let mut host = Host::new(ConnectOptions::new(String::from("127.0.0.1"), 6662));
match host.start() {
Ok(_) => println!("host started"),
@ -52,7 +52,7 @@ fn can_start_and_stop_gracefully_after_entering_listening() {
#[test]
fn can_add_one_action_and_start() {
let mut host = Host::new(ConnectOptions::new(String::from("127.0.0.1"), 6667));
let mut host = Host::new(ConnectOptions::new(String::from("127.0.0.1"), 6663));
let test_action = ServerTestAction;
host.add_action("route/one", Box::new(test_action));
@ -76,7 +76,7 @@ fn can_add_one_action_and_start() {
#[test]
fn can_add_one_action_and_start_and_call_it() {
let connect_options = ConnectOptions::new(String::from("127.0.0.1"), 6668);
let connect_options = ConnectOptions::new(String::from("127.0.0.1"), 6664);
let mut host = Host::new(connect_options.clone());
let test_action = ServerTestAction;
host.add_action("route/one", Box::new(test_action));
@ -127,6 +127,174 @@ fn can_add_one_action_and_start_and_call_it() {
println!("stop called");
}
#[test]
fn can_add_one_action_and_start_and_call_it_with_exactly_512_bytes() {
let connect_options = ConnectOptions::new(String::from("127.0.0.1"), 6665);
let mut host = Host::new(connect_options.clone());
let test_action = ServerTestAction;
host.add_action("route/one", Box::new(test_action));
match host.start() {
Ok(_) => println!("host started"),
Err(_e) => {
panic!("didn't start host")
}
}
thread::sleep(time::Duration::from_millis(10)); // we need a bit of time for the server to start
let client = StreamClient::new(connect_options);
let payload = Payload {
route: String::from("route/one"),
body: "hello from clientaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa".as_bytes().to_vec(),
};
let encoded_bytes = payload.encode_to_vec();
assert_eq!(512, encoded_bytes.len());
let received_data = client.send(encoded_bytes);
let received_data = match received_data {
Ok(data) => data,
Err(_) => {
panic!("something went wrong unwrapping")
}
};
let received_data = match str::from_utf8(received_data.as_slice()) {
Ok(converted) => converted,
Err(_) => {
panic!("Invalid UTF8 bytes")
}
};
assert_eq!(
"Hello from ServerTestAction. Got: hello from clientaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa",
received_data
);
println!("calling stop");
match host.stop() {
Ok(_) => println!("host stopped"),
Err(_e) => panic!("couldn't stop"),
}
println!("stop called");
}
#[test]
fn can_add_one_action_and_start_and_call_it_with_exactly_511_bytes() {
let connect_options = ConnectOptions::new(String::from("127.0.0.1"), 6666);
let mut host = Host::new(connect_options.clone());
let test_action = ServerTestAction;
host.add_action("route/one", Box::new(test_action));
match host.start() {
Ok(_) => println!("host started"),
Err(_e) => {
panic!("didn't start host")
}
}
thread::sleep(time::Duration::from_millis(10)); // we need a bit of time for the server to start
let client = StreamClient::new(connect_options);
let payload = Payload {
route: String::from("route/one"),
body: "hello from clientaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa".as_bytes().to_vec(),
};
let encoded_bytes = payload.encode_to_vec();
assert_eq!(511, encoded_bytes.len());
let received_data = client.send(encoded_bytes);
let received_data = match received_data {
Ok(data) => data,
Err(_) => {
panic!("something went wrong unwrapping")
}
};
let received_data = match str::from_utf8(received_data.as_slice()) {
Ok(converted) => converted,
Err(_) => {
panic!("Invalid UTF8 bytes")
}
};
assert_eq!(
"Hello from ServerTestAction. Got: hello from clientaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa",
received_data
);
println!("calling stop");
match host.stop() {
Ok(_) => println!("host stopped"),
Err(_e) => panic!("couldn't stop"),
}
println!("stop called");
}
#[test]
fn can_add_one_action_and_start_and_call_it_with_exactly_513_bytes() {
let connect_options = ConnectOptions::new(String::from("127.0.0.1"), 6617);
let mut host = Host::new(connect_options.clone());
let test_action = ServerTestAction;
host.add_action("route/one", Box::new(test_action));
match host.start() {
Ok(_) => println!("host started"),
Err(_e) => {
panic!("didn't start host")
}
}
thread::sleep(time::Duration::from_millis(10)); // we need a bit of time for the server to start
let client = StreamClient::new(connect_options);
let payload = Payload {
route: String::from("route/one"),
body: "hello from clientaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa".as_bytes().to_vec(),
};
let encoded_bytes = payload.encode_to_vec();
assert_eq!(513, encoded_bytes.len());
let received_data = client.send(encoded_bytes);
let received_data = match received_data {
Ok(data) => data,
Err(_) => {
panic!("something went wrong unwrapping")
}
};
let received_data = match str::from_utf8(received_data.as_slice()) {
Ok(converted) => converted,
Err(_) => {
panic!("Invalid UTF8 bytes")
}
};
assert_eq!(
"Hello from ServerTestAction. Got: hello from clientaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa",
received_data
);
println!("calling stop");
match host.stop() {
Ok(_) => println!("host stopped"),
Err(_e) => panic!("couldn't stop"),
}
println!("stop called");
}
struct ServerTestAction;
impl Action for ServerTestAction {
fn work(&self, data: &Vec<u8>) -> Vec<u8> {