diff --git a/src/dealer.rs b/src/dealer.rs index 04cfe74..161f612 100644 --- a/src/dealer.rs +++ b/src/dealer.rs @@ -5,7 +5,7 @@ use crate::transport::AcceptStopHandle; use crate::util::PeerIdentity; use crate::{ CaptureSocket, Endpoint, MultiPeerBackend, Socket, SocketBackend, SocketEvent, SocketOptions, - SocketRecv, SocketSend, SocketType, ZmqMessage, ZmqResult, + SocketRecv, SocketSend, SocketType, ZmqError, ZmqMessage, ZmqResult, }; use async_trait::async_trait; @@ -66,8 +66,19 @@ impl SocketRecv for DealerSocket { Some((_peer_id, Ok(Message::Message(message)))) => { return Ok(message); } - Some((_peer_id, _)) => todo!(), - None => todo!(), + Some((_peer_id, Ok(_))) => { + // Ignore non-message frames + continue; + } + Some((_peer_id, Err(e))) => { + // Handle potential errors from the fair queue + return Err(e.into()); + } + None => { + // The fair queue is empty, which shouldn't happen in normal operation + // We could either wait for more messages or return an error + return Err(ZmqError::NoMessage); + } }; } } diff --git a/src/lib.rs b/src/lib.rs index e244d5f..fe97287 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -357,8 +357,8 @@ pub async fn proxy { - todo!() + Err(e) => { + return Err(e); } } }, @@ -370,8 +370,8 @@ pub async fn proxy { - todo!() + Err(e) => { + return Err(e); } } } diff --git a/src/pull.rs b/src/pull.rs index 0144869..2c19076 100644 --- a/src/pull.rs +++ b/src/pull.rs @@ -5,7 +5,7 @@ use crate::transport::AcceptStopHandle; use crate::util::PeerIdentity; use crate::{ Endpoint, MultiPeerBackend, Socket, SocketEvent, SocketOptions, SocketRecv, SocketType, - ZmqMessage, ZmqResult, + ZmqError, ZmqMessage, ZmqResult, }; use async_trait::async_trait; @@ -60,11 +60,19 @@ impl SocketRecv for PullSocket { Some((_peer_id, Ok(Message::Message(message)))) => { return Ok(message); } - Some((_peer_id, Ok(msg))) => todo!("Unimplemented message: {:?}", msg), - Some((peer_id, Err(_))) => { + Some((_peer_id, Ok(_msg))) => { + // Ignore non-message frames (Command, Greeting) as PULL sockets are designed to only receive actual messages, not internal protocol frames. + continue; + } + Some((peer_id, Err(e))) => { self.backend.peer_disconnected(&peer_id); + // Handle potential errors from the fair queue + return Err(e.into()); + } + None => { + // The fair queue is empty, which shouldn't happen in normal operation + return Err(ZmqError::NoMessage); } - None => todo!(), }; } } diff --git a/src/rep.rs b/src/rep.rs index 2243f69..4eae8e7 100644 --- a/src/rep.rs +++ b/src/rep.rs @@ -149,7 +149,9 @@ impl SocketRecv for RepSocket { match self.fair_queue.next().await { Some((peer_id, Ok(message))) => match message { Message::Message(mut m) => { - assert!(m.len() > 1); + if m.len() < 2 { + return Err(ZmqError::Other("Invalid message format")); + } let mut at = 1; for (index, frame) in m.iter().enumerate() { if frame.is_empty() { @@ -163,9 +165,15 @@ impl SocketRecv for RepSocket { self.current_request = Some(peer_id); return Ok(data); } - _ => todo!(), + Message::Greeting(_) | Message::Command(_) => { + // Ignore non-message frames. REP sockets should only process actual messages. + continue; + } }, - Some((_peer_id, _)) => todo!(), + Some((peer_id, Err(e))) => { + self.backend.peer_disconnected(&peer_id); + return Err(e.into()); + } None => return Err(ZmqError::NoMessage), }; } diff --git a/src/req.rs b/src/req.rs index 714f3de..ad9f2fe 100644 --- a/src/req.rs +++ b/src/req.rs @@ -78,14 +78,24 @@ impl SocketRecv for ReqSocket { match self.current_request.take() { Some(peer_id) => { if let Some(mut peer) = self.backend.peers.get_mut(&peer_id) { - let message = peer.recv_queue.next().await; - match message { + match peer.recv_queue.next().await { Some(Ok(Message::Message(mut m))) => { - assert!(m.len() > 1); - assert!(m.pop_front().unwrap().is_empty()); // Ensure that we have delimeter as first part + if m.len() < 2 { + return Err(ZmqError::Other( + "Invalid message format: too few frames", + )); + } + if !m.pop_front().unwrap().is_empty() { + return Err(ZmqError::Other( + "Invalid message format: missing delimiter", + )); + } Ok(m) } - Some(Ok(_)) => todo!(), + Some(Ok(_)) => { + // Non-message frames should be ignored by the caller + Err(ZmqError::Other("Received non-message frame")) + } Some(Err(error)) => Err(error.into()), None => Err(ZmqError::NoMessage), } diff --git a/src/router.rs b/src/router.rs index 4ffb4b0..fda8a3f 100644 --- a/src/router.rs +++ b/src/router.rs @@ -68,11 +68,22 @@ impl SocketRecv for RouterSocket { message.push_front(peer_id.into()); return Ok(message); } - Some((_peer_id, Ok(msg))) => todo!("Unimplemented message: {:?}", msg), - Some((peer_id, Err(_))) => { + Some((_peer_id, Ok(_msg))) => { + // todo: Log or handle other message types if needed + // We could take an approach of using `tracing` and have that be an optional feature + // tracing::warn!("Received unimplemented message type: {:?}", msg); + continue; + } + Some((peer_id, Err(_e))) => { self.backend.peer_disconnected(&peer_id); + // We could take an approach of using `tracing` and have that be an optional feature + // tracing::error!("Error receiving message from peer {}: {:?}", peer_id, e); + continue; + } + None => { + // The fair queue is empty, which shouldn't happen in normal operation + return Err(ZmqError::NoMessage); } - None => todo!(), }; } } diff --git a/src/sub.rs b/src/sub.rs index 9e4818b..72217dd 100644 --- a/src/sub.rs +++ b/src/sub.rs @@ -1,7 +1,7 @@ use crate::backend::Peer; use crate::codec::{FramedIo, Message, ZmqFramedRead}; use crate::endpoint::Endpoint; -use crate::error::ZmqResult; +use crate::error::{ZmqError, ZmqResult}; use crate::fair_queue::FairQueue; use crate::fair_queue::QueueInner; use crate::message::ZmqMessage; @@ -191,11 +191,20 @@ impl SocketRecv for SubSocket { Some((_peer_id, Ok(Message::Message(message)))) => { return Ok(message); } - Some((_peer_id, Ok(msg))) => todo!("Unimplemented message: {:?}", msg), - Some((peer_id, Err(_))) => { + Some((_peer_id, Ok(_msg))) => { + // Ignore non-message frames. SUB sockets are designed to only receive actual messages, + // not internal protocol frames like commands or greetings. + continue; + } + Some((peer_id, Err(e))) => { self.backend.peer_disconnected(&peer_id); + // Handle potential errors from the fair queue + return Err(e.into()); + } + None => { + // The fair queue is empty, which shouldn't happen in normal operation + return Err(ZmqError::NoMessage); } - None => todo!(), } } }