Skip to content

Commit

Permalink
support timed dequeues in coro queues
Browse files Browse the repository at this point in the history
Summary:
Adding `co_try_dequeue_for(duration)` to the queue API's.

Similar to `fibers::Semaphore::co_try_wait_for()` this helps prevent the mistake of wrapping calls to `dequeue` with `timeout()` instead of `timeoutNoDiscard()`

Reviewed By: iahs

Differential Revision: D69140972

fbshipit-source-id: 217037def69f07876c3bd4fc3dcea58d2cae6d15
  • Loading branch information
David Geraghty (SEA) authored and facebook-github-bot committed Feb 8, 2025
1 parent 4a8630b commit 7fd73ff
Show file tree
Hide file tree
Showing 4 changed files with 120 additions and 4 deletions.
20 changes: 20 additions & 0 deletions third-party/folly/src/folly/coro/BoundedQueue.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,13 @@ class BoundedQueue {
return true;
}

// Dequeue a value from the queue.
// Note that this operation can be safely cancelled by requesting cancellation
// on the awaiting coroutine's associated CancellationToken.
// If the operation is successfully cancelled then it will complete with
// an error of type folly::OperationCancelled.
// WARNING: It is not safe to wrap this with folly::coro::timeout(). Wrap with
// folly::coro::timeoutNoDiscard(), or use co_try_dequeue_for() instead.
folly::coro::Task<T> dequeue() {
co_await folly::coro::co_nothrow(dequeueSemaphore_.co_wait());
T item;
Expand All @@ -70,6 +77,19 @@ class BoundedQueue {
co_return item;
}

// Try to dequeue a value from the queue with a timeout. The operation will
// either successfully dequeue an item from the queue, or else be cancelled
// and complete with an error of type folly::OperationCancelled.
template <typename Duration>
folly::coro::Task<T> co_try_dequeue_for(Duration timeout) {
co_await folly::coro::co_nothrow(
dequeueSemaphore_.co_try_wait_for(timeout));
T item;
dequeueReady(item);
enqueueSemaphore_.signal();
co_return item;
}

folly::coro::Task<void> dequeue(T& item) {
co_await folly::coro::co_nothrow(dequeueSemaphore_.co_wait());
dequeueReady(item);
Expand Down
21 changes: 21 additions & 0 deletions third-party/folly/src/folly/coro/UnboundedQueue.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,13 @@ class UnboundedQueue {
sem_.signal();
}

// Dequeue a value from the queue.
// Note that this operation can be safely cancelled by requesting cancellation
// on the awaiting coroutine's associated CancellationToken.
// If the operation is successfully cancelled then it will complete with
// an error of type folly::OperationCancelled.
// WARNING: It is not safe to wrap this with folly::coro::timeout(). Wrap with
// folly::coro::timeoutNoDiscard(), or use co_try_dequeue_for() instead.
folly::coro::Task<T> dequeue() {
folly::Try<void> result = co_await folly::coro::co_awaitTry(sem_.co_wait());
if (result.hasException()) {
Expand All @@ -46,6 +53,20 @@ class UnboundedQueue {
co_return queue_.dequeue();
}

// Try to dequeue a value from the queue with a timeout. The operation will
// either successfully dequeue an item from the queue, or else be cancelled
// and complete with an error of type folly::OperationCancelled.
template <typename Duration>
folly::coro::Task<T> co_try_dequeue_for(Duration timeout) {
folly::Try<void> result =
co_await folly::coro::co_awaitTry(sem_.co_try_wait_for(timeout));
if (result.hasException()) {
co_yield co_error(std::move(result).exception());
}

co_return queue_.dequeue();
}

folly::coro::Task<void> dequeue(T& out) {
co_await sem_.co_wait();
queue_.dequeue(out);
Expand Down
39 changes: 39 additions & 0 deletions third-party/folly/src/folly/coro/test/BoundedQueueTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@
#include <folly/portability/GTest.h>
#if FOLLY_HAS_COROUTINES

using namespace std::chrono_literals;

namespace {
struct SlowMover {
explicit SlowMover(bool slow = false) : slow(slow) {}
Expand Down Expand Up @@ -305,4 +307,41 @@ TEST(BoundedQueueTest, UnorderedDequeueCompletion) {
}
}

TEST(BoundedQueueTest, TryDequeueFor) {
folly::coro::blockingWait([]() -> folly::coro::Task<void> {
folly::coro::BoundedQueue<int> queue(2);

EXPECT_THROW(
(co_await queue.co_try_dequeue_for(1ms)), folly::OperationCancelled);

co_await queue.enqueue(42);
auto val = co_await queue.co_try_dequeue_for(1ms);
EXPECT_EQ(val, 42);

co_await folly::coro::collectAll(
[&]() -> folly::coro::Task<void> {
co_await folly::coro::co_reschedule_on_current_executor;
co_await queue.enqueue(43);
}(),
[&]() -> folly::coro::Task<void> {
EXPECT_TRUE(queue.empty());
val = co_await queue.co_try_dequeue_for(1h);
EXPECT_EQ(val, 43);
}());

folly::CancellationSource cancelSource;
co_await folly::coro::collectAll(
[&]() -> folly::coro::Task<void> {
co_await folly::coro::co_reschedule_on_current_executor;
cancelSource.requestCancellation();
}(),
[&]() -> folly::coro::Task<void> {
EXPECT_THROW(
(co_await folly::coro::co_withCancellation(
cancelSource.getToken(), queue.co_try_dequeue_for(1h))),
folly::OperationCancelled);
}());
}());
}

#endif
44 changes: 40 additions & 4 deletions third-party/folly/src/folly/coro/test/UnboundedQueueTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,14 @@

#include <folly/portability/GTest.h>

#include <chrono>
#include <string>
#include <thread>

#if FOLLY_HAS_COROUTINES

using namespace std::chrono_literals;

TEST(UnboundedQueueTest, EnqueueDeque) {
folly::coro::UnboundedQueue<std::string, true, true> queue;

Expand Down Expand Up @@ -159,8 +162,6 @@ TEST(UnboundedQueueTest, EnqueueDequeMPMC) {

TEST(UnboundedQueueTest, CancelledDequeueThrowsOperationCancelled) {
folly::coro::blockingWait([]() -> folly::coro::Task<void> {
// Cancellation currently only supported on SingleConsumer variants of
// UnboundedQueue.
folly::coro::UnboundedQueue<int> queue;
folly::CancellationSource cancelSource;

Expand All @@ -181,8 +182,6 @@ TEST(UnboundedQueueTest, CancelledDequeueThrowsOperationCancelled) {

TEST(UnboundedQueueTest, CancelledDequeueCompletesNormallyIfAnItemIsAvailable) {
folly::coro::blockingWait([]() -> folly::coro::Task<void> {
// Cancellation currently only supported on SingleConsumer variants of
// UnboundedQueue.
folly::coro::UnboundedQueue<int> queue;
folly::CancellationSource cancelSource;
cancelSource.requestCancellation();
Expand Down Expand Up @@ -236,4 +235,41 @@ TEST(UnboundedQueueTest, TryPeekSingleConsumer) {
EXPECT_EQ(nullptr, queue.try_peek());
}

TEST(UnboundedQueueTest, TryDequeueFor) {
folly::coro::blockingWait([]() -> folly::coro::Task<void> {
folly::coro::UnboundedQueue<int> queue;

EXPECT_THROW(
(co_await queue.co_try_dequeue_for(1ms)), folly::OperationCancelled);

queue.enqueue(42);
auto val = co_await queue.co_try_dequeue_for(1ms);
EXPECT_EQ(val, 42);

co_await folly::coro::collectAll(
[&]() -> folly::coro::Task<void> {
co_await folly::coro::co_reschedule_on_current_executor;
queue.enqueue(43);
}(),
[&]() -> folly::coro::Task<void> {
EXPECT_TRUE(queue.empty());
val = co_await queue.co_try_dequeue_for(1h);
EXPECT_EQ(val, 43);
}());

folly::CancellationSource cancelSource;
co_await folly::coro::collectAll(
[&]() -> folly::coro::Task<void> {
co_await folly::coro::co_reschedule_on_current_executor;
cancelSource.requestCancellation();
}(),
[&]() -> folly::coro::Task<void> {
EXPECT_THROW(
(co_await folly::coro::co_withCancellation(
cancelSource.getToken(), queue.co_try_dequeue_for(1h))),
folly::OperationCancelled);
}());
}());
}

#endif

0 comments on commit 7fd73ff

Please sign in to comment.