Skip to content

Commit

Permalink
Fix handling of interrupts (#1727)
Browse files Browse the repository at this point in the history
* Fix handling of interrupts

Co-authored-by: Piotr Rżysko <[email protected]>

* lint

Co-authored-by: Piotr Rżysko <[email protected]>

---------

Co-authored-by: Piotr Rżysko <[email protected]>
  • Loading branch information
moscicky and Piotr Rżysko authored Sep 22, 2023
1 parent f4c58bb commit 9c8ad5b
Show file tree
Hide file tree
Showing 4 changed files with 18 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,8 @@ private Retryer<MessageSendingResult> createRetryer(final MessageBatch batch,
.retryIfRuntimeException()
.retryIfResult(result -> consuming && !result.succeeded() && shouldRetryOnClientError(retryClientErrors, result))
.withWaitStrategy(fixedWait(messageBackoff, MILLISECONDS))
.withStopStrategy(attempt -> attempt.getDelaySinceFirstAttempt() > messageTtlMillis)
.withStopStrategy(attempt -> attempt.getDelaySinceFirstAttempt() > messageTtlMillis
|| Thread.currentThread().isInterrupted())
.withRetryListener(getRetryListener(result -> {
batch.incrementRetryCounter();
markSendingResult(batch, result);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,11 @@

public interface Consumer {

/**
* Consume **must** make sure that interrupted status is restored as it is needed for stopping unhealthy consumers.
* Swallowing the interrupt by consume or any of its dependencies will result in consumer being marked
* as unhealthy and will prevent commits despite messages being sent to subscribers.
*/
void consume(Runnable signalsInterrupt);

void initialize();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,9 @@ public void consume(Runnable signalsInterrupt) {
} else {
inflightSemaphore.release();
}
} catch (InterruptedException e) {
logger.info("Restoring interrupted status {}", subscription.getQualifiedName(), e);
Thread.currentThread().interrupt();
} catch (Exception e) {
logger.error("Consumer loop failed for {}", subscription.getQualifiedName(), e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
import org.apache.http.protocol.HTTP;
import org.eclipse.jetty.client.api.ContentResponse;
import org.eclipse.jetty.client.api.Request;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import pl.allegro.tech.hermes.api.EndpointAddress;
import pl.allegro.tech.hermes.api.EndpointAddressResolverMetadata;
import pl.allegro.tech.hermes.consumers.consumer.batch.MessageBatch;
Expand All @@ -29,6 +31,8 @@

public class JettyMessageBatchSender implements MessageBatchSender {

private static final Logger logger = LoggerFactory.getLogger(JettyMessageBatchSender.class);

private final BatchHttpRequestFactory requestFactory;
private final EndpointAddressResolver resolver;
private final SendingResultHandlers resultHandlers;
Expand Down Expand Up @@ -64,6 +68,10 @@ private MessageSendingResult send(MessageBatch batch, URI address, int requestTi
ContentResponse response = request.send();
return resultHandlers.handleSendingResultForBatch(response);
} catch (TimeoutException | ExecutionException | InterruptedException e) {
if (e instanceof InterruptedException) {
Thread.currentThread().interrupt();
logger.info("Restoring interrupted status", e);
}
throw new HttpBatchSenderException("Failed to send message batch", e);
}
}
Expand Down

0 comments on commit 9c8ad5b

Please sign in to comment.