Skip to content

Commit

Permalink
Adds DA committee broadcast TransmitType (EspressoSystems#2616)
Browse files Browse the repository at this point in the history
* Adds DA committee broadcast TransmitType

* adds TransmitType::DACommitteeBroadcast

* memory and web impl just forward to broadcast_message

* libp2p sends direct messages to DA Committee members

* Send direct messages to DA committee members in parallel

* Use into_iter on BTreeSet in da_broadcast_message
  • Loading branch information
lukaszrzasik authored Feb 22, 2024
1 parent b8040a6 commit a98df52
Show file tree
Hide file tree
Showing 6 changed files with 92 additions and 1 deletion.
8 changes: 8 additions & 0 deletions hotshot/src/traits/networking/combined_network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,14 @@ impl<TYPES: NodeType> ConnectedNetwork<Message<TYPES>, TYPES::SignatureKey>
.await
}

async fn da_broadcast_message(
&self,
message: Message<TYPES>,
recipients: BTreeSet<TYPES::SignatureKey>,
) -> Result<(), NetworkError> {
self.broadcast_message(message, recipients).await
}

async fn direct_message(
&self,
message: Message<TYPES>,
Expand Down
33 changes: 33 additions & 0 deletions hotshot/src/traits/networking/libp2p_network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ use snafu::ResultExt;
#[cfg(feature = "hotshot-testing")]
use std::{collections::HashSet, num::NonZeroUsize, str::FromStr};

use futures::future::join_all;
use std::{
collections::BTreeSet,
fmt::Debug,
Expand Down Expand Up @@ -738,6 +739,32 @@ impl<M: NetworkMsg, K: SignatureKey + 'static> ConnectedNetwork<M, K> for Libp2p
}
}

#[instrument(name = "Libp2pNetwork::da_broadcast_message", skip_all)]
async fn da_broadcast_message(
&self,
message: M,
recipients: BTreeSet<K>,
) -> Result<(), NetworkError> {
let future_results = recipients
.into_iter()
.map(|r| self.direct_message(message.clone(), r));
let results = join_all(future_results).await;

let errors: Vec<_> = results
.into_iter()
.filter_map(|r| match r {
Err(NetworkError::Libp2p { source }) => Some(source),
_ => None,
})
.collect();

if errors.is_empty() {
Ok(())
} else {
Err(NetworkError::Libp2pMulti { sources: errors })
}
}

#[instrument(name = "Libp2pNetwork::direct_message", skip_all)]
async fn direct_message(&self, message: M, recipient: K) -> Result<(), NetworkError> {
if self.inner.handle.is_killed() {
Expand Down Expand Up @@ -852,6 +879,12 @@ impl<M: NetworkMsg, K: SignatureKey + 'static> ConnectedNetwork<M, K> for Libp2p
.add(result.len());
Ok(result)
}
TransmitType::DACommitteeBroadcast => {
error!("Received DACommitteeBroadcast, it should have not happened.");
Err(NetworkError::Libp2p {
source: NetworkNodeHandleError::Killed,
})
}
}
}
};
Expand Down
16 changes: 16 additions & 0 deletions hotshot/src/traits/networking/memory_network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use async_trait::async_trait;
use bincode::Options;
use dashmap::DashMap;
use futures::StreamExt;
use hotshot_types::traits::network::MemoryNetworkError;
use hotshot_types::{
boxed_sync,
message::Message,
Expand Down Expand Up @@ -352,6 +353,15 @@ impl<M: NetworkMsg, K: SignatureKey + 'static> ConnectedNetwork<M, K> for Memory
Ok(())
}

#[instrument(name = "MemoryNetwork::da_broadcast_message")]
async fn da_broadcast_message(
&self,
message: M,
recipients: BTreeSet<K>,
) -> Result<(), NetworkError> {
self.broadcast_message(message, recipients).await
}

#[instrument(name = "MemoryNetwork::direct_message")]
async fn direct_message(&self, message: M, recipient: K) -> Result<(), NetworkError> {
// debug!(?message, ?recipient, "Sending direct message");
Expand Down Expand Up @@ -450,6 +460,12 @@ impl<M: NetworkMsg, K: SignatureKey + 'static> ConnectedNetwork<M, K> for Memory
.add(ret.len());
Ok(ret)
}
TransmitType::DACommitteeBroadcast => {
error!("Received DACommitteeBroadcast, it should have not happened.");
Err(NetworkError::MemoryNetwork {
source: MemoryNetworkError::Stub,
})
}
}
};
boxed_sync(closure)
Expand Down
16 changes: 16 additions & 0 deletions hotshot/src/traits/networking/web_server_network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -825,6 +825,16 @@ impl<TYPES: NodeType + 'static> ConnectedNetwork<Message<TYPES>, TYPES::Signatur
}
}

/// broadcast a message only to a DA committee
/// blocking
async fn da_broadcast_message(
&self,
message: Message<TYPES>,
recipients: BTreeSet<TYPES::SignatureKey>,
) -> Result<(), NetworkError> {
self.broadcast_message(message, recipients).await
}

/// Sends a direct message to a specific node
/// blocking
async fn direct_message(
Expand Down Expand Up @@ -882,6 +892,12 @@ impl<TYPES: NodeType + 'static> ConnectedNetwork<Message<TYPES>, TYPES::Signatur
.map(|x| x.get_message().unwrap())
.collect())
}
TransmitType::DACommitteeBroadcast => {
error!("Received DACommitteeBroadcast, it should have not happened.");
Err(NetworkError::WebServer {
source: WebServerNetworkError::ClientDisconnected,
})
}
}
};
boxed_sync(closure)
Expand Down
5 changes: 4 additions & 1 deletion task-impls/src/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,7 @@ impl<TYPES: NodeType, COMMCHANNEL: ConnectedNetwork<Message<TYPES>, TYPES::Signa
MessageKind::<TYPES>::from_consensus_message(SequencingMessage(Right(
CommitteeConsensusMessage::DAProposal(proposal),
))),
TransmitType::Broadcast,
TransmitType::DACommitteeBroadcast,
None,
),
HotShotEvent::DAVoteSend(vote) => (
Expand Down Expand Up @@ -376,6 +376,9 @@ impl<TYPES: NodeType, COMMCHANNEL: ConnectedNetwork<Message<TYPES>, TYPES::Signa
let transmit_result = match transmit_type {
TransmitType::Direct => net.direct_message(message, recipient.unwrap()).await,
TransmitType::Broadcast => net.broadcast_message(message, committee).await,
TransmitType::DACommitteeBroadcast => {
net.da_broadcast_message(message, committee).await
}
};

match transmit_result {
Expand Down
15 changes: 15 additions & 0 deletions types/src/traits/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,8 @@ pub enum TransmitType {
Direct,
/// broadcast the message to all
Broadcast,
/// broadcast to DA committee
DACommitteeBroadcast,
}

/// Error type for networking
Expand All @@ -87,6 +89,11 @@ pub enum NetworkError {
/// source of error
source: NetworkNodeHandleError,
},
/// collection of libp2p secific errors
Libp2pMulti {
/// sources of errors
sources: Vec<NetworkNodeHandleError>,
},
/// memory network specific errors
MemoryNetwork {
/// source of error
Expand Down Expand Up @@ -258,6 +265,14 @@ pub trait ConnectedNetwork<M: NetworkMsg, K: SignatureKey + 'static>:
recipients: BTreeSet<K>,
) -> Result<(), NetworkError>;

/// broadcast a message only to a DA committee
/// blocking
async fn da_broadcast_message(
&self,
message: M,
recipients: BTreeSet<K>,
) -> Result<(), NetworkError>;

/// Sends a direct message to a specific node
/// blocking
async fn direct_message(&self, message: M, recipient: K) -> Result<(), NetworkError>;
Expand Down

0 comments on commit a98df52

Please sign in to comment.