Skip to content
This repository has been archived by the owner on Sep 6, 2024. It is now read-only.

Commit

Permalink
use MessageWrapper to sync acknowledge and timeout extender
Browse files Browse the repository at this point in the history
  • Loading branch information
Stephan.Praetsch committed Jun 10, 2024
1 parent c746611 commit 61fdc3a
Show file tree
Hide file tree
Showing 11 changed files with 101 additions and 150 deletions.
1 change: 1 addition & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@
<version>4.2.0</version>
<scope>test</scope>
</dependency>
<!-- remove ? -->
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava-testlib</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
package com.mercateo.sqs.utils.message.handling;

import com.mercateo.sqs.utils.queue.Queue;
import com.mercateo.sqs.utils.visibility.VisibilityTimeoutExtender;
import com.mercateo.sqs.utils.visibility.VisibilityTimeoutExtenderFactory;

import java.time.Duration;
Expand Down Expand Up @@ -150,23 +149,24 @@ public Future<?> submit(Runnable task) {
* the message to be processed
*/
public void handleMessage(@NonNull Message<I> message) {
String messageId = message.getHeaders().get("MessageId", String.class);
MessageWrapper<I> messageWrapper = new MessageWrapper<>(message);
String messageId = messageWrapper.getMessageId();
if (messagesInProcessing.contains(messageId)) {
return;
}
messagesInProcessing.add(messageId);

ScheduledFuture<?> timeoutExtender;
try {
timeoutExtender = scheduleNewVisibilityTimeoutExtender(message);
timeoutExtender = scheduleNewVisibilityTimeoutExtender(messageWrapper);
} catch (RuntimeException rex) {
messagesInProcessing.remove(messageId);
log.error("error while trying to schedule timeout extender", rex);
throw new RuntimeException(rex);
}

try {
scheduleNewMessageTask(message, timeoutExtender);
scheduleNewMessageTask(messageWrapper, timeoutExtender);
} catch (RuntimeException rex) {
messagesInProcessing.remove(messageId);
timeoutExtender.cancel(true);
Expand All @@ -192,19 +192,20 @@ public int getFreeWorkerCapacity() {
return messagesInProcessing.free();
}

private void scheduleNewMessageTask(@NonNull Message<I> message,
private void scheduleNewMessageTask(@NonNull MessageWrapper<I> messageWrapper,
ScheduledFuture<?> visibilityTimeoutExtender) {
MessageHandlingRunnable<I, O> messageTask = messageHandlingRunnableFactory.get(worker,
message, finishedMessageCallback, messagesInProcessing, visibilityTimeoutExtender, errorHandlingStrategy);
messageWrapper, finishedMessageCallback, messagesInProcessing, visibilityTimeoutExtender, errorHandlingStrategy);

messageProcessingExecutor.submit(messageTask);
}

private ScheduledFuture<?> scheduleNewVisibilityTimeoutExtender(@NonNull Message<I> message) {
VisibilityTimeoutExtender timeoutExtender = timeoutExtenderFactory.get(message, queue, errorHandlingStrategy);
return timeoutExtensionExecutor.scheduleAtFixedRate(timeoutExtender,
timeUntilVisibilityTimeoutExtension.toMillis(), timeUntilVisibilityTimeoutExtension
.toMillis(), TimeUnit.MILLISECONDS);
private ScheduledFuture<?> scheduleNewVisibilityTimeoutExtender(@NonNull MessageWrapper<I> messageWrapper) {
return timeoutExtensionExecutor.scheduleAtFixedRate(
timeoutExtenderFactory.get(messageWrapper, queue, errorHandlingStrategy),
timeUntilVisibilityTimeoutExtension.toMillis(),
timeUntilVisibilityTimeoutExtension.toMillis(),
TimeUnit.MILLISECONDS);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,18 @@

import java.util.concurrent.ScheduledFuture;

import lombok.NonNull;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;

import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;

@Slf4j
@RequiredArgsConstructor
public class MessageHandlingRunnable<I, O> implements Runnable {

private final MessageWorkerWithHeaders<I, O> worker;

private final Message<I> message;
private final MessageWrapper<I> messageWrapper;

private final FinishedMessageCallback<I, O> finishedMessageCallback;

Expand All @@ -41,58 +42,44 @@ public class MessageHandlingRunnable<I, O> implements Runnable {

private final ErrorHandlingStrategy<I> errorHandlingStrategy;

MessageHandlingRunnable(@NonNull MessageWorkerWithHeaders<I, O> worker,
@NonNull Message<I> message,
@NonNull FinishedMessageCallback<I, O> finishedMessageCallback,
@NonNull SetWithUpperBound<String> messages,
@NonNull ScheduledFuture<?> visibilityTimeoutExtender,
@NonNull ErrorHandlingStrategy<I> errorHandlingStrategy) {

this.worker = worker;
this.message = message;
this.finishedMessageCallback = finishedMessageCallback;
this.messages = messages;
this.visibilityTimeoutExtender = visibilityTimeoutExtender;
this.errorHandlingStrategy = errorHandlingStrategy;

}

@Override
public void run() {
String messageId = message.getHeaders().get("MessageId", String.class);
Acknowledgment acknowledgment = message.getHeaders().get("Acknowledgment",
Acknowledgment.class);
String messageId = messageWrapper.getMessageId();
try {
log.info("starting processing of message " + messageId);

O outcome = worker.work(message.getPayload(), message.getHeaders());
I payload = messageWrapper.getMessage().getPayload();
MessageHeaders headers = messageWrapper.getMessage().getHeaders();
O outcome = worker.work(payload, headers);

finishedMessageCallback.call(message.getPayload(), outcome);
acknowledge(messageId, acknowledgment);
log.info("message task successfully processed and message acknowledged: " + messageId);
finishedMessageCallback.call(payload, outcome);
acknowledge(messageWrapper);
log.info("message task successfully processed and message acknowledged: " + messageId); //
} catch (InterruptedException e) {
log.info("got interrupted, did not finish: " + messageId, e);
} catch (Exception e) {
errorHandlingStrategy.handleWorkerException(e, message);
acknowledge(messageId, acknowledgment);
errorHandlingStrategy.handleWorkerException(e, messageWrapper.getMessage());
acknowledge(messageWrapper);
} catch (Throwable t) {
errorHandlingStrategy.handleWorkerThrowable(t, message);
acknowledge(messageId, acknowledgment);
errorHandlingStrategy.handleWorkerThrowable(t, messageWrapper.getMessage());
acknowledge(messageWrapper);
} finally {
visibilityTimeoutExtender.cancel(false);
messages.remove(messageId);
}
}

private void acknowledge(String messageId, Acknowledgment acknowledgment) {
private void acknowledge(MessageWrapper<I> messageWrapper) {
Acknowledgment acknowledgment = messageWrapper.getAcknowledgment();
try {
try {
acknowledgment.acknowledge().get();
messageWrapper.acknowledge();
} catch (AmazonServiceException e) {
errorHandlingStrategy.handleAcknowledgeMessageException(e, message);
errorHandlingStrategy.handleAcknowledgeMessageException(e, messageWrapper.getMessage());
}
} catch (Exception e) {
log.error("failure during acknowledge " + messageId, e);
log.error("failure during acknowledge " + messageWrapper.getMessageId(), e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,19 +21,17 @@

import lombok.NonNull;

import org.springframework.messaging.Message;

@Named
public class MessageHandlingRunnableFactory {

<I, O> MessageHandlingRunnable<I, O> get(@NonNull MessageWorkerWithHeaders<I, O> worker,
@NonNull Message<I> message,
@NonNull MessageWrapper<I> messageWrapper,
@NonNull FinishedMessageCallback<I, O> finishedMessageCallback,
@NonNull SetWithUpperBound<String> messageSet,
@NonNull ScheduledFuture<?> visibilityTimeoutExtender,
@NonNull ErrorHandlingStrategy<I> errorHandlingStrategy) {

return new MessageHandlingRunnable<>(worker, message, finishedMessageCallback, messageSet,
return new MessageHandlingRunnable<>(worker, messageWrapper, finishedMessageCallback, messageSet,
visibilityTimeoutExtender, errorHandlingStrategy);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package com.mercateo.sqs.utils.message.handling;

import io.awspring.cloud.messaging.listener.Acknowledgment;

import java.util.concurrent.atomic.AtomicBoolean;

import lombok.Getter;
import lombok.NonNull;
import lombok.RequiredArgsConstructor;

import org.springframework.messaging.Message;

@RequiredArgsConstructor
public class MessageWrapper<I> {

@NonNull
@Getter
private final Message<I> message;

private final AtomicBoolean acknowledged = new AtomicBoolean(false);

public void acknowledge() {
acknowledged.set(true);
}

public boolean isAcknowledged() {
return acknowledged.get();
}

public String getMessageId() {
return message.getHeaders().get("MessageId", String.class);
}

public String getReceiptHandle() {
return message.getHeaders().get("ReceiptHandle", String.class);
}

public Acknowledgment getAcknowledgment() {
return message.getHeaders().get("Acknowledgment", Acknowledgment.class);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -22,34 +22,33 @@
import com.github.rholder.retry.Retryer;
import com.github.rholder.retry.RetryerBuilder;
import com.mercateo.sqs.utils.message.handling.ErrorHandlingStrategy;
import com.mercateo.sqs.utils.message.handling.MessageWrapper;

import java.net.UnknownHostException;
import java.time.Duration;

import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;

import org.springframework.messaging.Message;

@Slf4j
public class VisibilityTimeoutExtender implements Runnable {

private final AmazonSQS sqsClient;

private final ChangeMessageVisibilityRequest request;

private final Message<?> message;
private final MessageWrapper<?> messageWrapper;

private final ErrorHandlingStrategy<?> errorHandlingStrategy;

private final Retryer<ChangeMessageVisibilityResult> retryer;

VisibilityTimeoutExtender(@NonNull AmazonSQS sqsClient, @NonNull Duration newVisibilityTimeout,
@NonNull Message<?> message, @NonNull String queueUrl,
@NonNull ErrorHandlingStrategy<?> errorHandlingStrategy,
@NonNull RetryStrategy retryStrategy) {
@NonNull MessageWrapper<?> messageWrapper, @NonNull String queueUrl,
@NonNull ErrorHandlingStrategy<?> errorHandlingStrategy,
@NonNull RetryStrategy retryStrategy) {
this.sqsClient = sqsClient;
this.message = message;
this.messageWrapper = messageWrapper;
this.errorHandlingStrategy = errorHandlingStrategy;
this.retryer = RetryerBuilder
.<ChangeMessageVisibilityResult> newBuilder()
Expand All @@ -59,7 +58,7 @@ public class VisibilityTimeoutExtender implements Runnable {
.build();

request = new ChangeMessageVisibilityRequest().withQueueUrl(queueUrl).withReceiptHandle(
message.getHeaders().get("ReceiptHandle", String.class)).withVisibilityTimeout(
messageWrapper.getReceiptHandle()).withVisibilityTimeout(
timeoutInSeconds(newVisibilityTimeout));
}

Expand All @@ -70,14 +69,17 @@ private Integer timeoutInSeconds(Duration timeout) {
@Override
public void run() {
try {
log.trace("changing message visibility: " + request);
retryer.call(() -> sqsClient.changeMessageVisibility(request));
} catch (AmazonServiceException e) {
errorHandlingStrategy.handleExtendVisibilityTimeoutException(e, message);
} catch (Exception e) {
log.error("error while extending message visibility for " + message.getHeaders().get("MessageId",
String.class), e);
throw new RuntimeException(e);
if (messageWrapper.isAcknowledged()) {
return;
}
if (e.getCause() instanceof AmazonServiceException) {
errorHandlingStrategy.handleExtendVisibilityTimeoutException((AmazonServiceException) e.getCause(), messageWrapper.getMessage());
} else {
log.error("error while extending message visibility for " + messageWrapper.getMessageId(), e);
throw new RuntimeException(e);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import com.github.rholder.retry.StopStrategies;
import com.github.rholder.retry.WaitStrategies;
import com.mercateo.sqs.utils.message.handling.ErrorHandlingStrategy;
import com.mercateo.sqs.utils.message.handling.MessageWrapper;
import com.mercateo.sqs.utils.queue.Queue;

import java.time.Duration;
Expand All @@ -29,8 +30,6 @@

import lombok.NonNull;

import org.springframework.messaging.Message;

@Named
public class VisibilityTimeoutExtenderFactory {

Expand All @@ -41,12 +40,11 @@ public VisibilityTimeoutExtenderFactory(@NonNull AmazonSQS amazonSQS) {
this.sqsClient = amazonSQS;
}

public VisibilityTimeoutExtender get(@NonNull Message<?> message, @NonNull Queue queue,
@NonNull ErrorHandlingStrategy<?> errorHandlingStrategy) {
public VisibilityTimeoutExtender get(@NonNull MessageWrapper messageWrapper, @NonNull Queue queue, @NonNull ErrorHandlingStrategy<?> errorHandlingStrategy) {

Duration defaultVisibilityTimeout = queue.getDefaultVisibilityTimeout();

return new VisibilityTimeoutExtender(sqsClient, defaultVisibilityTimeout, message, queue
return new VisibilityTimeoutExtender(sqsClient, defaultVisibilityTimeout, messageWrapper, queue
.getUrl(), errorHandlingStrategy,
new RetryStrategy(WaitStrategies.fixedWait(1000, TimeUnit.MILLISECONDS),
StopStrategies.stopAfterAttempt(5)));
Expand Down

This file was deleted.

Loading

0 comments on commit 61fdc3a

Please sign in to comment.