Skip to content

Commit

Permalink
JsonLogger Implementation with unit tests
Browse files Browse the repository at this point in the history
Signed-off-by: wasup-yash <[email protected]>
  • Loading branch information
wasup-yash committed Oct 11, 2023
1 parent 69479b9 commit b6694f4
Show file tree
Hide file tree
Showing 4 changed files with 12 additions and 53 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions conmon-rs/common/proto/conmon.capnp
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ interface Conmon {
enum Type {
# The CRI logger, requires `path` to be set.
containerRuntimeInterface @0;
# The JSON logger, requires `path` to be set.
json @1;
}
}

Expand Down
1 change: 1 addition & 0 deletions conmon-rs/server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ prctl = "1.0.0"
regex = "1.9.6"
sendfd = { version = "0.4.3", features = ["tokio"] }
serde = { version = "1.0.188", features = ["derive"] }
serde_json = "1.0.107"
shadow-rs = "0.24.1"
signal-hook = "0.3.17"
strum = { version = "0.25.0", features = ["derive"] }
Expand Down
61 changes: 8 additions & 53 deletions conmon-rs/server/src/container_log.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::{container_io::Pipe, cri_logger::CriLogger};
use crate::{container_io::Pipe, cri_logger::CriLogger, json_logger::JsonLogger};
use anyhow::Result;
use capnp::struct_list::Reader;
use conmon_common::conmon_capnp::conmon::log_driver::{Owned, Type};
Expand All @@ -12,37 +12,33 @@ pub type SharedContainerLog = Arc<RwLock<ContainerLog>>;
pub struct ContainerLog {
drivers: Vec<LogDriver>,
}

#[derive(Debug)]
enum LogDriver {
ContainerRuntimeInterface(CriLogger),
Json(JsonLogger),
}

impl ContainerLog {
/// Create a new default SharedContainerLog.
//Create a new default SharedContainerLog
pub fn new() -> SharedContainerLog {
Arc::new(RwLock::new(Self::default()))
}

/// Create a new SharedContainerLog from an capnp owned reader.
pub fn from(reader: Reader<Owned>) -> Result<SharedContainerLog> {
let drivers = reader
.iter()
.flat_map(|x| -> Result<_> {
Ok(match x.get_type()? {
.map(|x| -> Result<_> {
match x.get_type()? {
Type::ContainerRuntimeInterface => {
LogDriver::ContainerRuntimeInterface(CriLogger::new(
Ok(LogDriver::ContainerRuntimeInterface(CriLogger::new(
x.get_path()?,
if x.get_max_size() > 0 {
Some(x.get_max_size() as usize)
} else {
None
},
)?)
)?))
}
<<<<<<< HEAD
})
=======
Type::Json => Ok(LogDriver::Json(JsonLogger::new(
x.get_path()?,
if x.get_max_size() > 0 {
Expand All @@ -52,30 +48,20 @@ impl ContainerLog {
},
)?)),
}
>>>>>>> d9216df (updated cargo fmt)
})
.collect();
.collect::<Result<Vec<_>>>()?;
Ok(Arc::new(RwLock::new(Self { drivers })))
}

/// Asynchronously initialize all loggers.
pub async fn init(&mut self) -> Result<()> {
join_all(
self.drivers
.iter_mut()
.map(|x| match x {
<<<<<<< HEAD
<<<<<<< HEAD
LogDriver::ContainerRuntimeInterface(ref mut cri_logger) => cri_logger.init(),
=======
LogDriver::ContainerRuntimeInterface(ref mut cri_logger) => cri_logger.init().boxed(),
=======
LogDriver::ContainerRuntimeInterface(ref mut cri_logger) => {
cri_logger.init().boxed()
}
>>>>>>> d9216df (updated cargo fmt)
LogDriver::Json(ref mut json_logger) => json_logger.init().boxed(),
>>>>>>> 217ede1 (updated logger)
})
.collect::<Vec<_>>(),
)
Expand All @@ -84,28 +70,16 @@ impl ContainerLog {
.collect::<Result<Vec<_>>>()?;
Ok(())
}
<<<<<<< HEAD

=======
>>>>>>> d9216df (updated cargo fmt)
/// Reopen the container logs.
pub async fn reopen(&mut self) -> Result<()> {
join_all(
self.drivers
.iter_mut()
.map(|x| match x {
<<<<<<< HEAD
<<<<<<< HEAD
LogDriver::ContainerRuntimeInterface(ref mut cri_logger) => cri_logger.reopen(),
=======
LogDriver::ContainerRuntimeInterface(ref mut cri_logger) => cri_logger.reopen().boxed(),
=======
LogDriver::ContainerRuntimeInterface(ref mut cri_logger) => {
cri_logger.reopen().boxed()
}
>>>>>>> d9216df (updated cargo fmt)
LogDriver::Json(ref mut json_logger) => json_logger.reopen().boxed(),
>>>>>>> 42ff85b (updated jsonlogger)
})
.collect::<Vec<_>>(),
)
Expand All @@ -115,26 +89,8 @@ impl ContainerLog {
Ok(())
}

/// Write the contents of the provided reader into all loggers.
pub async fn write<T>(&mut self, pipe: Pipe, bytes: T) -> Result<()>
where
<<<<<<< HEAD
T: AsyncBufRead + Unpin + Copy,
{
join_all(
self.drivers
.iter_mut()
.map(|x| match x {
LogDriver::ContainerRuntimeInterface(ref mut cri_logger) => {
cri_logger.write(pipe, bytes)
}
})
.collect::<Vec<_>>(),
)
.await
.into_iter()
.collect::<Result<Vec<_>>>()?;
=======
T: AsyncBufRead + Unpin + Clone,
{
let futures = self
Expand Down Expand Up @@ -162,7 +118,6 @@ impl ContainerLog {
.await
.into_iter()
.collect::<Result<Vec<_>>>()?;
>>>>>>> d9216df (updated cargo fmt)
Ok(())
}
}

0 comments on commit b6694f4

Please sign in to comment.