Added first read/write functional test. I have found a read bug that i need to replicate and fix.

This commit is contained in:
Martin Slot 2023-11-28 10:32:19 +01:00
parent 055fb6e489
commit bed7194cf7
7 changed files with 61 additions and 14 deletions

1
Cargo.lock generated
View File

@ -8,6 +8,7 @@ version = "0.0.1"
dependencies = [
"commander_host",
"kernel",
"prost",
]
[[package]]

View File

@ -43,10 +43,10 @@ impl Host {
}
pub fn start(&mut self) -> Result<(), HostError> {
let alive = self.should_stop.clone();
let should_stop = self.should_stop.clone();
let options = self.connect_options.clone();
if !alive.load(Ordering::Relaxed) {
if should_stop.load(Ordering::Relaxed) {
return Ok(());
}
@ -59,7 +59,7 @@ impl Host {
for mut stream in listener.incoming() {
println!("received data");
if alive.load(Ordering::Relaxed) {
if should_stop.load(Ordering::Relaxed) {
break;
}
@ -87,7 +87,7 @@ impl Host {
};
let router = router.lock().unwrap(); // TODO: do we want to handle a potential deadlock? Is this even the correct rust pattern for locking?
let return_data = router.call(payload.route, &bytes);
let return_data = router.call(payload.route, &payload.body.to_vec());
match stream.write(&return_data) {
Ok(_) => {} // do nothing for now

View File

@ -28,7 +28,9 @@ impl Router {
}
pub trait Action: Send + Sync {
fn work(&self, data: &Vec<u8>) -> Vec<u8>; // TODO: this always needs to return, which isn't always the case: something could go wrong. Make it more "errorable"
// TODO: this always needs to return, which isn't always the case: something could go wrong. Make it more "errorable"
// TODO: the return type should maybe just be an array?
fn work(&self, data: &Vec<u8>) -> Vec<u8>;
}
mod tests {
use super::{Action, Router};

View File

@ -46,6 +46,12 @@ impl StreamReader {
}
self.dynamic_buffer.add(buffer, read_bytes_len);
// no more reading to be done
// TODO: this doesn't handle the edge case where read_size == read_bytes_len: stream.read(...) will still block :(
if read_bytes_len < self.read_size {
break;
}
}
Ok(self.dynamic_buffer.merge())

View File

@ -14,7 +14,7 @@ impl ConnectOptions {
}
}
struct StreamClient {
pub struct StreamClient {
connect_options: ConnectOptions,
}
@ -32,8 +32,8 @@ impl StreamClient {
)) {
Ok(mut stream) => match stream.write(&bytes) {
Ok(size) => {
let mut buf = [0u8; 128]; // TODO: this needs to be more dynamic
match stream.read(&mut buf) {
let mut buf = Vec::with_capacity(size); // TODO: this needs to be more dynamic
match stream.read_to_end(&mut buf) {
// TODO: what happens if there is nothing to read?
Ok(_) => Ok(buf.to_vec()),
Err(_) => Err(StreamClientError::Read),
@ -41,7 +41,10 @@ impl StreamClient {
}
Err(_) => Err(StreamClientError::Write),
},
Err(_) => Err(StreamClientError::Connect),
Err(e) => {
eprintln!("{e}");
Err(StreamClientError::Connect)
}
}
}
}

View File

@ -7,7 +7,9 @@ authors = [ "Martin Slot <slot@descore.dk>" ]
kernel = { path = "../kernel" }
commander_host = { path = "../commander_host" }
[[test]]
name = "server_test"
path = "server_test.rs"
[dependencies]
prost = { version = "0.11.9", features = [] }

View File

@ -5,7 +5,11 @@ use std::{str, thread, time};
use commander_host::Host;
use kernel::router::Action;
use kernel::tcp::ConnectOptions;
use kernel::tcp::{ConnectOptions, StreamClient};
use kernel::Payload;
extern crate prost;
use prost::Message;
#[test]
fn does_not_start_when_calling_stop_before_start() {
@ -72,7 +76,8 @@ fn can_add_one_action_and_start() {
#[test]
fn can_add_one_action_and_start_and_call_it() {
let mut host = Host::new(ConnectOptions::new(String::from("127.0.0.1"), 6666));
let connect_options = ConnectOptions::new(String::from("127.0.0.1"), 6666);
let mut host = Host::new(connect_options.clone());
let test_action = ServerTestAction;
host.add_action("route/one", Box::new(test_action));
@ -83,8 +88,36 @@ fn can_add_one_action_and_start_and_call_it() {
}
}
thread::sleep(time::Duration::from_millis(1000)); // we need a bit of time for the server to start
let client = StreamClient::new(connect_options);
let payload = Payload {
route: String::from("route/one"),
body: "hello from client".as_bytes().to_vec(),
};
let encoded_bytes = payload.encode_to_vec();
let received_data = client.send(encoded_bytes);
let received_data = match received_data {
Ok(data) => data,
Err(_) => {
panic!("something went wrong unwrapping")
}
};
let received_data = match str::from_utf8(received_data.as_slice()) {
Ok(converted) => converted,
Err(_) => {
panic!("Invalid UTF8 bytes")
}
};
assert_eq!(
"Hello from ServerTestAction. Got: hello from client",
received_data
);
println!("calling stop");
thread::sleep(time::Duration::from_millis(10)); // we need a bit of time for the server to start
match host.stop() {
Ok(_) => println!("host stopped"),
@ -100,7 +133,7 @@ impl Action for ServerTestAction {
let got = match str::from_utf8(data) {
Ok(from_raw) => from_raw,
Err(_) => {
panic!("Could not read input. Invalid utf8 :(")
panic!("Could not read input. Invalid utf8 :(") // this shouldn't actually panic, but be handled more gracefully, but it is a test, so we live with it
}
};
let return_string = format!("Hello from ServerTestAction. Got: {}", got);