Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add streams unit test #681

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion conmon-rs/server/src/container_io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ pub enum ContainerIOType {
}

/// A message to be sent through the ContainerIO.
#[derive(Clone, Debug)]
#[derive(Clone, Debug, PartialEq, Eq)]
pub enum Message {
Data(Vec<u8>),
Done,
Expand Down
78 changes: 68 additions & 10 deletions conmon-rs/server/src/streams.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,35 +6,32 @@ use crate::{
container_log::SharedContainerLog,
};
use anyhow::Result;
use getset::{Getters, MutGetters};
use getset::Getters;
use std::os::unix::io::AsRawFd;
use tokio::{
process::{ChildStderr, ChildStdin, ChildStdout},
sync::mpsc,
sync::mpsc::{self, UnboundedReceiver, UnboundedSender},
task,
};
use tracing::{debug, debug_span, error, Instrument};

#[derive(Debug, Getters, MutGetters)]
#[getset(get)]
#[derive(Debug, Getters)]
pub struct Streams {
#[getset(get = "pub")]
logger: SharedContainerLog,

#[getset(get = "pub")]
attach: SharedContainerAttach,

#[getset(get = "pub")]
pub message_rx_stdout: mpsc::UnboundedReceiver<Message>,
pub message_rx_stdout: UnboundedReceiver<Message>,

#[getset(get = "pub")]
message_tx_stdout: mpsc::UnboundedSender<Message>,
message_tx_stdout: UnboundedSender<Message>,

#[getset(get = "pub")]
pub message_rx_stderr: mpsc::UnboundedReceiver<Message>,
pub message_rx_stderr: UnboundedReceiver<Message>,

#[getset(get = "pub")]
message_tx_stderr: mpsc::UnboundedSender<Message>,
message_tx_stderr: UnboundedSender<Message>,
}

impl Streams {
Expand Down Expand Up @@ -110,3 +107,64 @@ impl Streams {
}
}
}

#[cfg(test)]
mod tests {
use super::*;
use crate::{attach::SharedContainerAttach, container_log::ContainerLog};
use anyhow::{bail, Context};
use std::{process::Stdio, str::from_utf8};
use tokio::process::Command;

fn msg_string(message: Message) -> Result<String> {
match message {
Message::Data(v) => Ok(from_utf8(&v)?.into()),
_ => bail!("no data in message"),
}
}

#[tokio::test(flavor = "multi_thread")]
async fn new_success() -> Result<()> {
let logger = ContainerLog::new();
let attach = SharedContainerAttach::default();

let mut sut = Streams::new(logger, attach)?;

let expected = "hello world";
let mut child = Command::new("echo")
.arg("-n")
.arg(expected)
.stdin(Stdio::piped())
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.spawn()?;

sut.handle_stdio_receive(child.stdin.take(), child.stdout.take(), child.stderr.take());

let msg = sut
.message_rx_stdout
.recv()
.await
.context("no message on stdout")?;

assert_eq!(msg_string(msg)?, expected);

let msg = sut
.message_rx_stdout
.recv()
.await
.context("no message on stdout")?;
assert_eq!(msg, Message::Done);
assert!(sut.message_rx_stdout.try_recv().is_err());

let msg = sut
.message_rx_stderr
.recv()
.await
.context("no message on stderr")?;
assert_eq!(msg, Message::Done);
assert!(sut.message_rx_stderr.try_recv().is_err());

Ok(())
}
}