From 4b58d55878db55372d1b09de49c6caf363fe3c06 Mon Sep 17 00:00:00 2001 From: Vasil Dimov Date: Tue, 6 Dec 2022 13:38:21 +0100 Subject: [PATCH 1/3] test: move the implementation of StaticContentsSock to .cpp Move the implementation (method definitions) from `test/util/net.h` to `test/util/net.cpp` to make the header easier to follow. --- src/test/util/net.cpp | 90 +++++++++++++++++++++++++++++++++++++++++++ src/test/util/net.h | 89 ++++++++---------------------------------- 2 files changed, 107 insertions(+), 72 deletions(-) diff --git a/src/test/util/net.cpp b/src/test/util/net.cpp index beefc32bee4..0861c2cc09b 100644 --- a/src/test/util/net.cpp +++ b/src/test/util/net.cpp @@ -137,3 +137,93 @@ std::vector GetRandomNodeEvictionCandidates(int n_candida } return candidates; } + +StaticContentsSock::StaticContentsSock(const std::string& contents) + : Sock{INVALID_SOCKET}, m_contents{contents} +{ +} + +StaticContentsSock::~StaticContentsSock() { m_socket = INVALID_SOCKET; } + +StaticContentsSock& StaticContentsSock::operator=(Sock&& other) +{ + assert(false && "Move of Sock into MockSock not allowed."); + return *this; +} + +ssize_t StaticContentsSock::Send(const void*, size_t len, int) const { return len; } + +ssize_t StaticContentsSock::Recv(void* buf, size_t len, int flags) const +{ + const size_t consume_bytes{std::min(len, m_contents.size() - m_consumed)}; + std::memcpy(buf, m_contents.data() + m_consumed, consume_bytes); + if ((flags & MSG_PEEK) == 0) { + m_consumed += consume_bytes; + } + return consume_bytes; +} + +int StaticContentsSock::Connect(const sockaddr*, socklen_t) const { return 0; } + +int StaticContentsSock::Bind(const sockaddr*, socklen_t) const { return 0; } + +int StaticContentsSock::Listen(int) const { return 0; } + +std::unique_ptr StaticContentsSock::Accept(sockaddr* addr, socklen_t* addr_len) const +{ + if (addr != nullptr) { + // Pretend all connections come from 5.5.5.5:6789 + memset(addr, 0x00, *addr_len); + const socklen_t write_len = static_cast(sizeof(sockaddr_in)); + if (*addr_len >= write_len) { + *addr_len = write_len; + sockaddr_in* addr_in = reinterpret_cast(addr); + addr_in->sin_family = AF_INET; + memset(&addr_in->sin_addr, 0x05, sizeof(addr_in->sin_addr)); + addr_in->sin_port = htons(6789); + } + } + return std::make_unique(""); +}; + +int StaticContentsSock::GetSockOpt(int level, int opt_name, void* opt_val, socklen_t* opt_len) const +{ + std::memset(opt_val, 0x0, *opt_len); + return 0; +} + +int StaticContentsSock::SetSockOpt(int, int, const void*, socklen_t) const { return 0; } + +int StaticContentsSock::GetSockName(sockaddr* name, socklen_t* name_len) const +{ + std::memset(name, 0x0, *name_len); + return 0; +} + +bool StaticContentsSock::SetNonBlocking() const { return true; } + +bool StaticContentsSock::IsSelectable() const { return true; } + +bool StaticContentsSock::Wait(std::chrono::milliseconds timeout, + Event requested, + Event* occurred) const +{ + if (occurred != nullptr) { + *occurred = requested; + } + return true; +} + +bool StaticContentsSock::WaitMany(std::chrono::milliseconds timeout, EventsPerSock& events_per_sock) const +{ + for (auto& [sock, events] : events_per_sock) { + (void)sock; + events.occurred = events.requested; + } + return true; +} + +bool StaticContentsSock::IsConnected(std::string&) const +{ + return true; +} diff --git a/src/test/util/net.h b/src/test/util/net.h index 043e317bf08..9397dbad3d7 100644 --- a/src/test/util/net.h +++ b/src/test/util/net.h @@ -141,96 +141,41 @@ constexpr auto ALL_NETWORKS = std::array{ class StaticContentsSock : public Sock { public: - explicit StaticContentsSock(const std::string& contents) - : Sock{INVALID_SOCKET}, - m_contents{contents} - { - } + explicit StaticContentsSock(const std::string& contents); - ~StaticContentsSock() override { m_socket = INVALID_SOCKET; } + ~StaticContentsSock() override; - StaticContentsSock& operator=(Sock&& other) override - { - assert(false && "Move of Sock into MockSock not allowed."); - return *this; - } + StaticContentsSock& operator=(Sock&& other) override; - ssize_t Send(const void*, size_t len, int) const override { return len; } + ssize_t Send(const void*, size_t len, int) const override; - ssize_t Recv(void* buf, size_t len, int flags) const override - { - const size_t consume_bytes{std::min(len, m_contents.size() - m_consumed)}; - std::memcpy(buf, m_contents.data() + m_consumed, consume_bytes); - if ((flags & MSG_PEEK) == 0) { - m_consumed += consume_bytes; - } - return consume_bytes; - } + ssize_t Recv(void* buf, size_t len, int flags) const override; - int Connect(const sockaddr*, socklen_t) const override { return 0; } + int Connect(const sockaddr*, socklen_t) const override; - int Bind(const sockaddr*, socklen_t) const override { return 0; } + int Bind(const sockaddr*, socklen_t) const override; - int Listen(int) const override { return 0; } + int Listen(int) const override; - std::unique_ptr Accept(sockaddr* addr, socklen_t* addr_len) const override - { - if (addr != nullptr) { - // Pretend all connections come from 5.5.5.5:6789 - memset(addr, 0x00, *addr_len); - const socklen_t write_len = static_cast(sizeof(sockaddr_in)); - if (*addr_len >= write_len) { - *addr_len = write_len; - sockaddr_in* addr_in = reinterpret_cast(addr); - addr_in->sin_family = AF_INET; - memset(&addr_in->sin_addr, 0x05, sizeof(addr_in->sin_addr)); - addr_in->sin_port = htons(6789); - } - } - return std::make_unique(""); - }; + std::unique_ptr Accept(sockaddr* addr, socklen_t* addr_len) const override; - int GetSockOpt(int level, int opt_name, void* opt_val, socklen_t* opt_len) const override - { - std::memset(opt_val, 0x0, *opt_len); - return 0; - } + int GetSockOpt(int level, int opt_name, void* opt_val, socklen_t* opt_len) const override; - int SetSockOpt(int, int, const void*, socklen_t) const override { return 0; } + int SetSockOpt(int, int, const void*, socklen_t) const override; - int GetSockName(sockaddr* name, socklen_t* name_len) const override - { - std::memset(name, 0x0, *name_len); - return 0; - } + int GetSockName(sockaddr* name, socklen_t* name_len) const override; - bool SetNonBlocking() const override { return true; } + bool SetNonBlocking() const override; - bool IsSelectable() const override { return true; } + bool IsSelectable() const override; bool Wait(std::chrono::milliseconds timeout, Event requested, - Event* occurred = nullptr) const override - { - if (occurred != nullptr) { - *occurred = requested; - } - return true; - } + Event* occurred = nullptr) const override; - bool WaitMany(std::chrono::milliseconds timeout, EventsPerSock& events_per_sock) const override - { - for (auto& [sock, events] : events_per_sock) { - (void)sock; - events.occurred = events.requested; - } - return true; - } + bool WaitMany(std::chrono::milliseconds timeout, EventsPerSock& events_per_sock) const override; - bool IsConnected(std::string&) const override - { - return true; - } + bool IsConnected(std::string&) const override; private: const std::string m_contents; From f1864148c4a091afd63be75bc1ff14ae93383523 Mon Sep 17 00:00:00 2001 From: Vasil Dimov Date: Fri, 6 Sep 2024 11:16:50 +0200 Subject: [PATCH 2/3] test: put the generic parts from StaticContentsSock into a separate class This allows reusing them in other mocked implementations. --- src/test/util/net.cpp | 83 +++++++++++++++++++++++++------------------ src/test/util/net.h | 40 ++++++++++++++++----- 2 files changed, 79 insertions(+), 44 deletions(-) diff --git a/src/test/util/net.cpp b/src/test/util/net.cpp index 0861c2cc09b..77ce3b7585d 100644 --- a/src/test/util/net.cpp +++ b/src/test/util/net.cpp @@ -138,38 +138,31 @@ std::vector GetRandomNodeEvictionCandidates(int n_candida return candidates; } -StaticContentsSock::StaticContentsSock(const std::string& contents) - : Sock{INVALID_SOCKET}, m_contents{contents} -{ -} +// Have different ZeroSock (or others that inherit from it) objects have different +// m_socket because EqualSharedPtrSock compares m_socket and we want to avoid two +// different objects comparing as equal. +static std::atomic g_mocked_sock_fd{0}; -StaticContentsSock::~StaticContentsSock() { m_socket = INVALID_SOCKET; } +ZeroSock::ZeroSock() : Sock{g_mocked_sock_fd++} {} -StaticContentsSock& StaticContentsSock::operator=(Sock&& other) -{ - assert(false && "Move of Sock into MockSock not allowed."); - return *this; -} +// Sock::~Sock() would try to close(2) m_socket if it is not INVALID_SOCKET, avoid that. +ZeroSock::~ZeroSock() { m_socket = INVALID_SOCKET; } -ssize_t StaticContentsSock::Send(const void*, size_t len, int) const { return len; } +ssize_t ZeroSock::Send(const void*, size_t len, int) const { return len; } -ssize_t StaticContentsSock::Recv(void* buf, size_t len, int flags) const +ssize_t ZeroSock::Recv(void* buf, size_t len, int flags) const { - const size_t consume_bytes{std::min(len, m_contents.size() - m_consumed)}; - std::memcpy(buf, m_contents.data() + m_consumed, consume_bytes); - if ((flags & MSG_PEEK) == 0) { - m_consumed += consume_bytes; - } - return consume_bytes; + memset(buf, 0x0, len); + return len; } -int StaticContentsSock::Connect(const sockaddr*, socklen_t) const { return 0; } +int ZeroSock::Connect(const sockaddr*, socklen_t) const { return 0; } -int StaticContentsSock::Bind(const sockaddr*, socklen_t) const { return 0; } +int ZeroSock::Bind(const sockaddr*, socklen_t) const { return 0; } -int StaticContentsSock::Listen(int) const { return 0; } +int ZeroSock::Listen(int) const { return 0; } -std::unique_ptr StaticContentsSock::Accept(sockaddr* addr, socklen_t* addr_len) const +std::unique_ptr ZeroSock::Accept(sockaddr* addr, socklen_t* addr_len) const { if (addr != nullptr) { // Pretend all connections come from 5.5.5.5:6789 @@ -183,30 +176,28 @@ std::unique_ptr StaticContentsSock::Accept(sockaddr* addr, socklen_t* addr addr_in->sin_port = htons(6789); } } - return std::make_unique(""); -}; + return std::make_unique(); +} -int StaticContentsSock::GetSockOpt(int level, int opt_name, void* opt_val, socklen_t* opt_len) const +int ZeroSock::GetSockOpt(int level, int opt_name, void* opt_val, socklen_t* opt_len) const { std::memset(opt_val, 0x0, *opt_len); return 0; } -int StaticContentsSock::SetSockOpt(int, int, const void*, socklen_t) const { return 0; } +int ZeroSock::SetSockOpt(int, int, const void*, socklen_t) const { return 0; } -int StaticContentsSock::GetSockName(sockaddr* name, socklen_t* name_len) const +int ZeroSock::GetSockName(sockaddr* name, socklen_t* name_len) const { std::memset(name, 0x0, *name_len); return 0; } -bool StaticContentsSock::SetNonBlocking() const { return true; } +bool ZeroSock::SetNonBlocking() const { return true; } -bool StaticContentsSock::IsSelectable() const { return true; } +bool ZeroSock::IsSelectable() const { return true; } -bool StaticContentsSock::Wait(std::chrono::milliseconds timeout, - Event requested, - Event* occurred) const +bool ZeroSock::Wait(std::chrono::milliseconds timeout, Event requested, Event* occurred) const { if (occurred != nullptr) { *occurred = requested; @@ -214,7 +205,7 @@ bool StaticContentsSock::Wait(std::chrono::milliseconds timeout, return true; } -bool StaticContentsSock::WaitMany(std::chrono::milliseconds timeout, EventsPerSock& events_per_sock) const +bool ZeroSock::WaitMany(std::chrono::milliseconds timeout, EventsPerSock& events_per_sock) const { for (auto& [sock, events] : events_per_sock) { (void)sock; @@ -223,7 +214,29 @@ bool StaticContentsSock::WaitMany(std::chrono::milliseconds timeout, EventsPerSo return true; } -bool StaticContentsSock::IsConnected(std::string&) const +ZeroSock& ZeroSock::operator=(Sock&& other) { - return true; + assert(false && "Move of Sock into ZeroSock not allowed."); + return *this; +} + +StaticContentsSock::StaticContentsSock(const std::string& contents) + : m_contents{contents} +{ +} + +ssize_t StaticContentsSock::Recv(void* buf, size_t len, int flags) const +{ + const size_t consume_bytes{std::min(len, m_contents.size() - m_consumed)}; + std::memcpy(buf, m_contents.data() + m_consumed, consume_bytes); + if ((flags & MSG_PEEK) == 0) { + m_consumed += consume_bytes; + } + return consume_bytes; +} + +StaticContentsSock& StaticContentsSock::operator=(Sock&& other) +{ + assert(false && "Move of Sock into StaticContentsSock not allowed."); + return *this; } diff --git a/src/test/util/net.h b/src/test/util/net.h index 9397dbad3d7..20b70cc4544 100644 --- a/src/test/util/net.h +++ b/src/test/util/net.h @@ -134,18 +134,15 @@ constexpr auto ALL_NETWORKS = std::array{ }; /** - * A mocked Sock alternative that returns a statically contained data upon read and succeeds - * and ignores all writes. The data to be returned is given to the constructor and when it is - * exhausted an EOF is returned by further reads. + * A mocked Sock alternative that succeeds on all operations. + * Returns infinite amount of 0x0 bytes on reads. */ -class StaticContentsSock : public Sock +class ZeroSock : public Sock { public: - explicit StaticContentsSock(const std::string& contents); - - ~StaticContentsSock() override; + ZeroSock(); - StaticContentsSock& operator=(Sock&& other) override; + ~ZeroSock() override; ssize_t Send(const void*, size_t len, int) const override; @@ -175,9 +172,34 @@ class StaticContentsSock : public Sock bool WaitMany(std::chrono::milliseconds timeout, EventsPerSock& events_per_sock) const override; - bool IsConnected(std::string&) const override; +private: + ZeroSock& operator=(Sock&& other) override; +}; + +/** + * A mocked Sock alternative that returns a statically contained data upon read and succeeds + * and ignores all writes. The data to be returned is given to the constructor and when it is + * exhausted an EOF is returned by further reads. + */ +class StaticContentsSock : public ZeroSock +{ +public: + explicit StaticContentsSock(const std::string& contents); + + /** + * Return parts of the contents that was provided at construction until it is exhausted + * and then return 0 (EOF). + */ + ssize_t Recv(void* buf, size_t len, int flags) const override; + + bool IsConnected(std::string&) const override + { + return true; + } private: + StaticContentsSock& operator=(Sock&& other) override; + const std::string m_contents; mutable size_t m_consumed{0}; }; From b448b014947093cd217dbde47c8fb9e6c2bc8ba3 Mon Sep 17 00:00:00 2001 From: Vasil Dimov Date: Tue, 6 Dec 2022 13:42:03 +0100 Subject: [PATCH 3/3] test: add a mocked Sock that allows inspecting what has been Send() to it And also allows gradually providing the data to be returned by `Recv()` and sending and receiving net messages (`CNetMessage`). --- src/test/util/net.cpp | 168 ++++++++++++++++++++++++++++++++++++++++++ src/test/util/net.h | 154 ++++++++++++++++++++++++++++++++++++++ 2 files changed, 322 insertions(+) diff --git a/src/test/util/net.cpp b/src/test/util/net.cpp index 77ce3b7585d..ddd96a50640 100644 --- a/src/test/util/net.cpp +++ b/src/test/util/net.cpp @@ -14,7 +14,10 @@ #include #include #include +#include +#include +#include #include void ConnmanTestMsg::Handshake(CNode& node, @@ -240,3 +243,168 @@ StaticContentsSock& StaticContentsSock::operator=(Sock&& other) assert(false && "Move of Sock into StaticContentsSock not allowed."); return *this; } + +ssize_t DynSock::Pipe::GetBytes(void* buf, size_t len, int flags) +{ + WAIT_LOCK(m_mutex, lock); + + if (m_data.empty()) { + if (m_eof) { + return 0; + } + errno = EAGAIN; // Same as recv(2) on a non-blocking socket. + return -1; + } + + const size_t read_bytes{std::min(len, m_data.size())}; + + std::memcpy(buf, m_data.data(), read_bytes); + if ((flags & MSG_PEEK) == 0) { + m_data.erase(m_data.begin(), m_data.begin() + read_bytes); + } + + return read_bytes; +} + +std::optional DynSock::Pipe::GetNetMsg() +{ + V1Transport transport{NodeId{0}}; + + { + WAIT_LOCK(m_mutex, lock); + + WaitForDataOrEof(lock); + if (m_eof && m_data.empty()) { + return std::nullopt; + } + + for (;;) { + Span s{m_data}; + if (!transport.ReceivedBytes(s)) { // Consumed bytes are removed from the front of s. + return std::nullopt; + } + m_data.erase(m_data.begin(), m_data.begin() + m_data.size() - s.size()); + if (transport.ReceivedMessageComplete()) { + break; + } + if (m_data.empty()) { + WaitForDataOrEof(lock); + if (m_eof && m_data.empty()) { + return std::nullopt; + } + } + } + } + + bool reject{false}; + CNetMessage msg{transport.GetReceivedMessage(/*time=*/{}, reject)}; + if (reject) { + return std::nullopt; + } + return std::make_optional(std::move(msg)); +} + +void DynSock::Pipe::PushBytes(const void* buf, size_t len) +{ + LOCK(m_mutex); + const uint8_t* b = static_cast(buf); + m_data.insert(m_data.end(), b, b + len); + m_cond.notify_all(); +} + +void DynSock::Pipe::Eof() +{ + LOCK(m_mutex); + m_eof = true; + m_cond.notify_all(); +} + +void DynSock::Pipe::WaitForDataOrEof(UniqueLock& lock) +{ + Assert(lock.mutex() == &m_mutex); + + m_cond.wait(lock, [&]() EXCLUSIVE_LOCKS_REQUIRED(m_mutex) { + AssertLockHeld(m_mutex); + return !m_data.empty() || m_eof; + }); +} + +DynSock::DynSock(std::shared_ptr pipes, std::shared_ptr accept_sockets) + : m_pipes{pipes}, m_accept_sockets{accept_sockets} +{ +} + +DynSock::~DynSock() +{ + m_pipes->send.Eof(); +} + +ssize_t DynSock::Recv(void* buf, size_t len, int flags) const +{ + return m_pipes->recv.GetBytes(buf, len, flags); +} + +ssize_t DynSock::Send(const void* buf, size_t len, int) const +{ + m_pipes->send.PushBytes(buf, len); + return len; +} + +std::unique_ptr DynSock::Accept(sockaddr* addr, socklen_t* addr_len) const +{ + ZeroSock::Accept(addr, addr_len); + return m_accept_sockets->Pop().value_or(nullptr); +} + +bool DynSock::Wait(std::chrono::milliseconds timeout, + Event requested, + Event* occurred) const +{ + EventsPerSock ev; + ev.emplace(this, Events{requested}); + const bool ret{WaitMany(timeout, ev)}; + if (occurred != nullptr) { + *occurred = ev.begin()->second.occurred; + } + return ret; +} + +bool DynSock::WaitMany(std::chrono::milliseconds timeout, EventsPerSock& events_per_sock) const +{ + const auto deadline = std::chrono::steady_clock::now() + timeout; + bool at_least_one_event_occurred{false}; + + for (;;) { + // Check all sockets for readiness without waiting. + for (auto& [sock, events] : events_per_sock) { + if ((events.requested & Sock::SEND) != 0) { + // Always ready for Send(). + events.occurred |= Sock::SEND; + at_least_one_event_occurred = true; + } + + if ((events.requested & Sock::RECV) != 0) { + auto dyn_sock = reinterpret_cast(sock.get()); + uint8_t b; + if (dyn_sock->m_pipes->recv.GetBytes(&b, 1, MSG_PEEK) == 1 || !dyn_sock->m_accept_sockets->Empty()) { + events.occurred |= Sock::RECV; + at_least_one_event_occurred = true; + } + } + } + + if (at_least_one_event_occurred || std::chrono::steady_clock::now() > deadline) { + break; + } + + std::this_thread::sleep_for(10ms); + } + + return true; +} + +DynSock& DynSock::operator=(Sock&&) +{ + assert(false && "Move of Sock into DynSock not allowed."); + return *this; +} diff --git a/src/test/util/net.h b/src/test/util/net.h index 20b70cc4544..acc135b0c13 100644 --- a/src/test/util/net.h +++ b/src/test/util/net.h @@ -6,6 +6,7 @@ #define BITCOIN_TEST_UTIL_NET_H #include +#include #include #include #include @@ -19,9 +20,11 @@ #include #include #include +#include #include #include #include +#include #include #include #include @@ -204,6 +207,157 @@ class StaticContentsSock : public ZeroSock mutable size_t m_consumed{0}; }; +/** + * A mocked Sock alternative that allows providing the data to be returned by Recv() + * and inspecting the data that has been supplied to Send(). + */ +class DynSock : public ZeroSock +{ +public: + /** + * Unidirectional bytes or CNetMessage queue (FIFO). + */ + class Pipe + { + public: + /** + * Get bytes and remove them from the pipe. + * @param[in] buf Destination to write bytes to. + * @param[in] len Write up to this number of bytes. + * @param[in] flags Same as the flags of `recv(2)`. Just `MSG_PEEK` is honored. + * @return The number of bytes written to `buf`. `0` if `Eof()` has been called. + * If no bytes are available then `-1` is returned and `errno` is set to `EAGAIN`. + */ + ssize_t GetBytes(void* buf, size_t len, int flags = 0) EXCLUSIVE_LOCKS_REQUIRED(!m_mutex); + + /** + * Deserialize a `CNetMessage` and remove it from the pipe. + * If not enough bytes are available then the function will wait. If parsing fails + * or EOF is signaled to the pipe, then `std::nullopt` is returned. + */ + std::optional GetNetMsg() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex); + + /** + * Push bytes to the pipe. + */ + void PushBytes(const void* buf, size_t len) EXCLUSIVE_LOCKS_REQUIRED(!m_mutex); + + /** + * Construct and push CNetMessage to the pipe. + */ + template + void PushNetMsg(const std::string& type, Args&&... payload) EXCLUSIVE_LOCKS_REQUIRED(!m_mutex); + + /** + * Signal end-of-file on the receiving end (`GetBytes()` or `GetNetMsg()`). + */ + void Eof() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex); + + private: + /** + * Return when there is some data to read or EOF has been signaled. + * @param[in,out] lock Unique lock that must have been derived from `m_mutex` by `WAIT_LOCK(m_mutex, lock)`. + */ + void WaitForDataOrEof(UniqueLock& lock) EXCLUSIVE_LOCKS_REQUIRED(m_mutex); + + Mutex m_mutex; + std::condition_variable m_cond; + std::vector m_data GUARDED_BY(m_mutex); + bool m_eof GUARDED_BY(m_mutex){false}; + }; + + struct Pipes { + Pipe recv; + Pipe send; + }; + + /** + * A basic thread-safe queue, used for queuing sockets to be returned by Accept(). + */ + class Queue + { + public: + using S = std::unique_ptr; + + void Push(S s) EXCLUSIVE_LOCKS_REQUIRED(!m_mutex) + { + LOCK(m_mutex); + m_queue.push(std::move(s)); + } + + std::optional Pop() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex) + { + LOCK(m_mutex); + if (m_queue.empty()) { + return std::nullopt; + } + S front{std::move(m_queue.front())}; + m_queue.pop(); + return front; + } + + bool Empty() const EXCLUSIVE_LOCKS_REQUIRED(!m_mutex) + { + LOCK(m_mutex); + return m_queue.empty(); + } + + private: + mutable Mutex m_mutex; + std::queue m_queue GUARDED_BY(m_mutex); + }; + + /** + * Create a new mocked sock. + * @param[in] pipes Send/recv pipes used by the Send() and Recv() methods. + * @param[in] accept_sockets Sockets to return by the Accept() method. + */ + explicit DynSock(std::shared_ptr pipes, std::shared_ptr accept_sockets); + + ~DynSock(); + + ssize_t Recv(void* buf, size_t len, int flags) const override; + + ssize_t Send(const void* buf, size_t len, int) const override; + + std::unique_ptr Accept(sockaddr* addr, socklen_t* addr_len) const override; + + bool Wait(std::chrono::milliseconds timeout, + Event requested, + Event* occurred = nullptr) const override; + + bool WaitMany(std::chrono::milliseconds timeout, EventsPerSock& events_per_sock) const override; + +private: + DynSock& operator=(Sock&&) override; + + std::shared_ptr m_pipes; + std::shared_ptr m_accept_sockets; +}; + +template +void DynSock::Pipe::PushNetMsg(const std::string& type, Args&&... payload) +{ + auto msg = NetMsg::Make(type, std::forward(payload)...); + V1Transport transport{NodeId{0}}; + + const bool queued{transport.SetMessageToSend(msg)}; + assert(queued); + + LOCK(m_mutex); + + for (;;) { + const auto& [bytes, _more, _msg_type] = transport.GetBytesToSend(/*have_next_message=*/true); + if (bytes.empty()) { + break; + } + m_data.insert(m_data.end(), bytes.begin(), bytes.end()); + transport.MarkBytesSent(bytes.size()); + } + + m_cond.notify_all(); +} + std::vector GetRandomNodeEvictionCandidates(int n_candidates, FastRandomContext& random_context); #endif // BITCOIN_TEST_UTIL_NET_H