diff --git a/pom.xml b/pom.xml
index 54e50ad6..d9d80c75 100644
--- a/pom.xml
+++ b/pom.xml
@@ -123,6 +123,7 @@
4.2.0
test
+
com.google.guava
guava-testlib
diff --git a/src/main/java/com/mercateo/sqs/utils/message/handling/LongRunningMessageHandler.java b/src/main/java/com/mercateo/sqs/utils/message/handling/LongRunningMessageHandler.java
index 49a18b5b..834e77f8 100644
--- a/src/main/java/com/mercateo/sqs/utils/message/handling/LongRunningMessageHandler.java
+++ b/src/main/java/com/mercateo/sqs/utils/message/handling/LongRunningMessageHandler.java
@@ -16,7 +16,6 @@
package com.mercateo.sqs.utils.message.handling;
import com.mercateo.sqs.utils.queue.Queue;
-import com.mercateo.sqs.utils.visibility.VisibilityTimeoutExtender;
import com.mercateo.sqs.utils.visibility.VisibilityTimeoutExtenderFactory;
import java.time.Duration;
@@ -150,7 +149,8 @@ public Future> submit(Runnable task) {
* the message to be processed
*/
public void handleMessage(@NonNull Message message) {
- String messageId = message.getHeaders().get("MessageId", String.class);
+ MessageWrapper messageWrapper = new MessageWrapper<>(message);
+ String messageId = messageWrapper.getMessageId();
if (messagesInProcessing.contains(messageId)) {
return;
}
@@ -158,7 +158,7 @@ public void handleMessage(@NonNull Message message) {
ScheduledFuture> timeoutExtender;
try {
- timeoutExtender = scheduleNewVisibilityTimeoutExtender(message);
+ timeoutExtender = scheduleNewVisibilityTimeoutExtender(messageWrapper);
} catch (RuntimeException rex) {
messagesInProcessing.remove(messageId);
log.error("error while trying to schedule timeout extender", rex);
@@ -166,7 +166,7 @@ public void handleMessage(@NonNull Message message) {
}
try {
- scheduleNewMessageTask(message, timeoutExtender);
+ scheduleNewMessageTask(messageWrapper, timeoutExtender);
} catch (RuntimeException rex) {
messagesInProcessing.remove(messageId);
timeoutExtender.cancel(true);
@@ -192,19 +192,20 @@ public int getFreeWorkerCapacity() {
return messagesInProcessing.free();
}
- private void scheduleNewMessageTask(@NonNull Message message,
+ private void scheduleNewMessageTask(@NonNull MessageWrapper messageWrapper,
ScheduledFuture> visibilityTimeoutExtender) {
MessageHandlingRunnable messageTask = messageHandlingRunnableFactory.get(worker,
- message, finishedMessageCallback, messagesInProcessing, visibilityTimeoutExtender, errorHandlingStrategy);
+ messageWrapper, finishedMessageCallback, messagesInProcessing, visibilityTimeoutExtender, errorHandlingStrategy);
messageProcessingExecutor.submit(messageTask);
}
- private ScheduledFuture> scheduleNewVisibilityTimeoutExtender(@NonNull Message message) {
- VisibilityTimeoutExtender timeoutExtender = timeoutExtenderFactory.get(message, queue, errorHandlingStrategy);
- return timeoutExtensionExecutor.scheduleAtFixedRate(timeoutExtender,
- timeUntilVisibilityTimeoutExtension.toMillis(), timeUntilVisibilityTimeoutExtension
- .toMillis(), TimeUnit.MILLISECONDS);
+ private ScheduledFuture> scheduleNewVisibilityTimeoutExtender(@NonNull MessageWrapper messageWrapper) {
+ return timeoutExtensionExecutor.scheduleAtFixedRate(
+ timeoutExtenderFactory.get(messageWrapper, queue, errorHandlingStrategy),
+ timeUntilVisibilityTimeoutExtension.toMillis(),
+ timeUntilVisibilityTimeoutExtension.toMillis(),
+ TimeUnit.MILLISECONDS);
}
/**
diff --git a/src/main/java/com/mercateo/sqs/utils/message/handling/MessageHandlingRunnable.java b/src/main/java/com/mercateo/sqs/utils/message/handling/MessageHandlingRunnable.java
index 4de32067..4f91b165 100644
--- a/src/main/java/com/mercateo/sqs/utils/message/handling/MessageHandlingRunnable.java
+++ b/src/main/java/com/mercateo/sqs/utils/message/handling/MessageHandlingRunnable.java
@@ -21,17 +21,18 @@
import java.util.concurrent.ScheduledFuture;
-import lombok.NonNull;
+import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
-import org.springframework.messaging.Message;
+import org.springframework.messaging.MessageHeaders;
@Slf4j
+@RequiredArgsConstructor
public class MessageHandlingRunnable implements Runnable {
private final MessageWorkerWithHeaders worker;
- private final Message message;
+ private final MessageWrapper messageWrapper;
private final FinishedMessageCallback finishedMessageCallback;
@@ -41,58 +42,44 @@ public class MessageHandlingRunnable implements Runnable {
private final ErrorHandlingStrategy errorHandlingStrategy;
- MessageHandlingRunnable(@NonNull MessageWorkerWithHeaders worker,
- @NonNull Message message,
- @NonNull FinishedMessageCallback finishedMessageCallback,
- @NonNull SetWithUpperBound messages,
- @NonNull ScheduledFuture> visibilityTimeoutExtender,
- @NonNull ErrorHandlingStrategy errorHandlingStrategy) {
-
- this.worker = worker;
- this.message = message;
- this.finishedMessageCallback = finishedMessageCallback;
- this.messages = messages;
- this.visibilityTimeoutExtender = visibilityTimeoutExtender;
- this.errorHandlingStrategy = errorHandlingStrategy;
-
- }
-
@Override
public void run() {
- String messageId = message.getHeaders().get("MessageId", String.class);
- Acknowledgment acknowledgment = message.getHeaders().get("Acknowledgment",
- Acknowledgment.class);
+ String messageId = messageWrapper.getMessageId();
try {
log.info("starting processing of message " + messageId);
- O outcome = worker.work(message.getPayload(), message.getHeaders());
+ I payload = messageWrapper.getMessage().getPayload();
+ MessageHeaders headers = messageWrapper.getMessage().getHeaders();
+ O outcome = worker.work(payload, headers);
- finishedMessageCallback.call(message.getPayload(), outcome);
- acknowledge(messageId, acknowledgment);
- log.info("message task successfully processed and message acknowledged: " + messageId);
+ finishedMessageCallback.call(payload, outcome);
+ acknowledge(messageWrapper);
+ log.info("message task successfully processed and message acknowledged: " + messageId); //
} catch (InterruptedException e) {
log.info("got interrupted, did not finish: " + messageId, e);
} catch (Exception e) {
- errorHandlingStrategy.handleWorkerException(e, message);
- acknowledge(messageId, acknowledgment);
+ errorHandlingStrategy.handleWorkerException(e, messageWrapper.getMessage());
+ acknowledge(messageWrapper);
} catch (Throwable t) {
- errorHandlingStrategy.handleWorkerThrowable(t, message);
- acknowledge(messageId, acknowledgment);
+ errorHandlingStrategy.handleWorkerThrowable(t, messageWrapper.getMessage());
+ acknowledge(messageWrapper);
} finally {
visibilityTimeoutExtender.cancel(false);
messages.remove(messageId);
}
}
- private void acknowledge(String messageId, Acknowledgment acknowledgment) {
+ private void acknowledge(MessageWrapper messageWrapper) {
+ Acknowledgment acknowledgment = messageWrapper.getAcknowledgment();
try {
try {
acknowledgment.acknowledge().get();
+ messageWrapper.acknowledge();
} catch (AmazonServiceException e) {
- errorHandlingStrategy.handleAcknowledgeMessageException(e, message);
+ errorHandlingStrategy.handleAcknowledgeMessageException(e, messageWrapper.getMessage());
}
} catch (Exception e) {
- log.error("failure during acknowledge " + messageId, e);
+ log.error("failure during acknowledge " + messageWrapper.getMessageId(), e);
}
}
}
\ No newline at end of file
diff --git a/src/main/java/com/mercateo/sqs/utils/message/handling/MessageHandlingRunnableFactory.java b/src/main/java/com/mercateo/sqs/utils/message/handling/MessageHandlingRunnableFactory.java
index 8cd524af..c7cd77b2 100644
--- a/src/main/java/com/mercateo/sqs/utils/message/handling/MessageHandlingRunnableFactory.java
+++ b/src/main/java/com/mercateo/sqs/utils/message/handling/MessageHandlingRunnableFactory.java
@@ -21,19 +21,17 @@
import lombok.NonNull;
-import org.springframework.messaging.Message;
-
@Named
public class MessageHandlingRunnableFactory {
MessageHandlingRunnable get(@NonNull MessageWorkerWithHeaders worker,
- @NonNull Message message,
+ @NonNull MessageWrapper messageWrapper,
@NonNull FinishedMessageCallback finishedMessageCallback,
@NonNull SetWithUpperBound messageSet,
@NonNull ScheduledFuture> visibilityTimeoutExtender,
@NonNull ErrorHandlingStrategy errorHandlingStrategy) {
- return new MessageHandlingRunnable<>(worker, message, finishedMessageCallback, messageSet,
+ return new MessageHandlingRunnable<>(worker, messageWrapper, finishedMessageCallback, messageSet,
visibilityTimeoutExtender, errorHandlingStrategy);
}
}
\ No newline at end of file
diff --git a/src/main/java/com/mercateo/sqs/utils/message/handling/MessageWrapper.java b/src/main/java/com/mercateo/sqs/utils/message/handling/MessageWrapper.java
new file mode 100644
index 00000000..517c81fb
--- /dev/null
+++ b/src/main/java/com/mercateo/sqs/utils/message/handling/MessageWrapper.java
@@ -0,0 +1,42 @@
+package com.mercateo.sqs.utils.message.handling;
+
+import io.awspring.cloud.messaging.listener.Acknowledgment;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import lombok.Getter;
+import lombok.NonNull;
+import lombok.RequiredArgsConstructor;
+
+import org.springframework.messaging.Message;
+
+@RequiredArgsConstructor
+public class MessageWrapper {
+
+ @NonNull
+ @Getter
+ private final Message message;
+
+ private final AtomicBoolean acknowledged = new AtomicBoolean(false);
+
+ public void acknowledge() {
+ acknowledged.set(true);
+ }
+
+ public boolean isAcknowledged() {
+ return acknowledged.get();
+ }
+
+ public String getMessageId() {
+ return message.getHeaders().get("MessageId", String.class);
+ }
+
+ public String getReceiptHandle() {
+ return message.getHeaders().get("ReceiptHandle", String.class);
+ }
+
+ public Acknowledgment getAcknowledgment() {
+ return message.getHeaders().get("Acknowledgment", Acknowledgment.class);
+ }
+
+}
diff --git a/src/main/java/com/mercateo/sqs/utils/visibility/VisibilityTimeoutExtender.java b/src/main/java/com/mercateo/sqs/utils/visibility/VisibilityTimeoutExtender.java
index 066fbdcf..e33f8320 100644
--- a/src/main/java/com/mercateo/sqs/utils/visibility/VisibilityTimeoutExtender.java
+++ b/src/main/java/com/mercateo/sqs/utils/visibility/VisibilityTimeoutExtender.java
@@ -22,6 +22,7 @@
import com.github.rholder.retry.Retryer;
import com.github.rholder.retry.RetryerBuilder;
import com.mercateo.sqs.utils.message.handling.ErrorHandlingStrategy;
+import com.mercateo.sqs.utils.message.handling.MessageWrapper;
import java.net.UnknownHostException;
import java.time.Duration;
@@ -29,8 +30,6 @@
import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;
-import org.springframework.messaging.Message;
-
@Slf4j
public class VisibilityTimeoutExtender implements Runnable {
@@ -38,18 +37,18 @@ public class VisibilityTimeoutExtender implements Runnable {
private final ChangeMessageVisibilityRequest request;
- private final Message> message;
+ private final MessageWrapper> messageWrapper;
private final ErrorHandlingStrategy> errorHandlingStrategy;
private final Retryer retryer;
VisibilityTimeoutExtender(@NonNull AmazonSQS sqsClient, @NonNull Duration newVisibilityTimeout,
- @NonNull Message> message, @NonNull String queueUrl,
- @NonNull ErrorHandlingStrategy> errorHandlingStrategy,
- @NonNull RetryStrategy retryStrategy) {
+ @NonNull MessageWrapper> messageWrapper, @NonNull String queueUrl,
+ @NonNull ErrorHandlingStrategy> errorHandlingStrategy,
+ @NonNull RetryStrategy retryStrategy) {
this.sqsClient = sqsClient;
- this.message = message;
+ this.messageWrapper = messageWrapper;
this.errorHandlingStrategy = errorHandlingStrategy;
this.retryer = RetryerBuilder
. newBuilder()
@@ -59,7 +58,7 @@ public class VisibilityTimeoutExtender implements Runnable {
.build();
request = new ChangeMessageVisibilityRequest().withQueueUrl(queueUrl).withReceiptHandle(
- message.getHeaders().get("ReceiptHandle", String.class)).withVisibilityTimeout(
+ messageWrapper.getReceiptHandle()).withVisibilityTimeout(
timeoutInSeconds(newVisibilityTimeout));
}
@@ -70,14 +69,17 @@ private Integer timeoutInSeconds(Duration timeout) {
@Override
public void run() {
try {
- log.trace("changing message visibility: " + request);
retryer.call(() -> sqsClient.changeMessageVisibility(request));
- } catch (AmazonServiceException e) {
- errorHandlingStrategy.handleExtendVisibilityTimeoutException(e, message);
} catch (Exception e) {
- log.error("error while extending message visibility for " + message.getHeaders().get("MessageId",
- String.class), e);
- throw new RuntimeException(e);
+ if (messageWrapper.isAcknowledged()) {
+ return;
+ }
+ if (e.getCause() instanceof AmazonServiceException) {
+ errorHandlingStrategy.handleExtendVisibilityTimeoutException((AmazonServiceException) e.getCause(), messageWrapper.getMessage());
+ } else {
+ log.error("error while extending message visibility for " + messageWrapper.getMessageId(), e);
+ throw new RuntimeException(e);
+ }
}
}
}
\ No newline at end of file
diff --git a/src/main/java/com/mercateo/sqs/utils/visibility/VisibilityTimeoutExtenderFactory.java b/src/main/java/com/mercateo/sqs/utils/visibility/VisibilityTimeoutExtenderFactory.java
index 797b8f13..2c9d447c 100644
--- a/src/main/java/com/mercateo/sqs/utils/visibility/VisibilityTimeoutExtenderFactory.java
+++ b/src/main/java/com/mercateo/sqs/utils/visibility/VisibilityTimeoutExtenderFactory.java
@@ -19,6 +19,7 @@
import com.github.rholder.retry.StopStrategies;
import com.github.rholder.retry.WaitStrategies;
import com.mercateo.sqs.utils.message.handling.ErrorHandlingStrategy;
+import com.mercateo.sqs.utils.message.handling.MessageWrapper;
import com.mercateo.sqs.utils.queue.Queue;
import java.time.Duration;
@@ -29,8 +30,6 @@
import lombok.NonNull;
-import org.springframework.messaging.Message;
-
@Named
public class VisibilityTimeoutExtenderFactory {
@@ -41,12 +40,11 @@ public VisibilityTimeoutExtenderFactory(@NonNull AmazonSQS amazonSQS) {
this.sqsClient = amazonSQS;
}
- public VisibilityTimeoutExtender get(@NonNull Message> message, @NonNull Queue queue,
- @NonNull ErrorHandlingStrategy> errorHandlingStrategy) {
+ public VisibilityTimeoutExtender get(@NonNull MessageWrapper messageWrapper, @NonNull Queue queue, @NonNull ErrorHandlingStrategy> errorHandlingStrategy) {
Duration defaultVisibilityTimeout = queue.getDefaultVisibilityTimeout();
- return new VisibilityTimeoutExtender(sqsClient, defaultVisibilityTimeout, message, queue
+ return new VisibilityTimeoutExtender(sqsClient, defaultVisibilityTimeout, messageWrapper, queue
.getUrl(), errorHandlingStrategy,
new RetryStrategy(WaitStrategies.fixedWait(1000, TimeUnit.MILLISECONDS),
StopStrategies.stopAfterAttempt(5)));
diff --git a/src/test/java/com/mercateo/sqs/utils/message/handling/MessageHandlingRunnableFactoryTest.java b/src/test/java/com/mercateo/sqs/utils/message/handling/MessageHandlingRunnableFactoryTest.java
deleted file mode 100644
index f0a20f03..00000000
--- a/src/test/java/com/mercateo/sqs/utils/message/handling/MessageHandlingRunnableFactoryTest.java
+++ /dev/null
@@ -1,31 +0,0 @@
-package com.mercateo.sqs.utils.message.handling;
-
-import com.google.common.testing.NullPointerTester;
-import com.mercateo.sqs.utils.visibility.VisibilityTimeoutExtender;
-
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Test;
-import org.mockito.Mockito;
-
-public class MessageHandlingRunnableFactoryTest {
-
- private MessageHandlingRunnableFactory uut;
-
- @BeforeEach
- public void setUp() throws Exception {
- uut = new MessageHandlingRunnableFactory();
- }
-
- @Test
- public void testNullContracts() throws Exception {
- // given
- NullPointerTester nullPointerTester = new NullPointerTester();
- nullPointerTester.setDefault(SetWithUpperBound.class, new SetWithUpperBound(5));
- nullPointerTester.setDefault(VisibilityTimeoutExtender.class, Mockito.mock(
- VisibilityTimeoutExtender.class));
-
- // when
- nullPointerTester.testInstanceMethods(uut, NullPointerTester.Visibility.PACKAGE);
- nullPointerTester.testAllPublicConstructors(uut.getClass());
- }
-}
\ No newline at end of file
diff --git a/src/test/java/com/mercateo/sqs/utils/message/handling/MessageHandlingRunnableTest.java b/src/test/java/com/mercateo/sqs/utils/message/handling/MessageHandlingRunnableTest.java
index 32a807ed..a2d0a9b2 100644
--- a/src/test/java/com/mercateo/sqs/utils/message/handling/MessageHandlingRunnableTest.java
+++ b/src/test/java/com/mercateo/sqs/utils/message/handling/MessageHandlingRunnableTest.java
@@ -8,8 +8,6 @@
import static org.mockito.Mockito.verifyNoInteractions;
import static org.mockito.Mockito.when;
-import com.google.common.testing.NullPointerTester;
-
import io.awspring.cloud.messaging.listener.Acknowledgment;
import java.util.HashMap;
@@ -56,20 +54,10 @@ public void setUp() throws Exception {
headerMap.put("MessageId", "mid");
headerMap.put("Acknowledgment", acknowledgment);
message = new GenericMessage<>(3, new MessageHeaders(headerMap));
- uut = new MessageHandlingRunnable<>(worker, message, finishedMessageCallback, messages,
+ uut = new MessageHandlingRunnable<>(worker, new MessageWrapper<>(message), finishedMessageCallback, messages,
visibilityTimeoutExtender, errorHandlingStrategy);
}
- @Test
- void testNullContracts() throws Exception {
- // given
- NullPointerTester nullPointerTester = new NullPointerTester();
-
- // when
- nullPointerTester.testInstanceMethods(uut, NullPointerTester.Visibility.PACKAGE);
- nullPointerTester.testAllPublicConstructors(uut.getClass());
- }
-
@SuppressWarnings("unchecked")
@Test
void testRun() throws Throwable {
diff --git a/src/test/java/com/mercateo/sqs/utils/visibility/VisibilityTimeoutExtenderFactoryTest.java b/src/test/java/com/mercateo/sqs/utils/visibility/VisibilityTimeoutExtenderFactoryTest.java
deleted file mode 100644
index a2cb9e58..00000000
--- a/src/test/java/com/mercateo/sqs/utils/visibility/VisibilityTimeoutExtenderFactoryTest.java
+++ /dev/null
@@ -1,36 +0,0 @@
-package com.mercateo.sqs.utils.visibility;
-
-import com.amazonaws.services.sqs.AmazonSQS;
-import com.google.common.testing.NullPointerTester;
-import com.mercateo.sqs.utils.queue.Queue;
-
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Test;
-import org.mockito.Mock;
-import org.mockito.Mockito;
-import org.mockito.MockitoAnnotations;
-
-public class VisibilityTimeoutExtenderFactoryTest {
-
- @Mock
- private AmazonSQS amazonSQS;
-
- private VisibilityTimeoutExtenderFactory uut;
-
- @BeforeEach
- public void setUp() throws Exception {
- MockitoAnnotations.openMocks(this);
- uut = new VisibilityTimeoutExtenderFactory(amazonSQS);
- }
-
- @Test
- public void testNullContracts() throws Exception {
- // given
- NullPointerTester nullPointerTester = new NullPointerTester();
- nullPointerTester.setDefault(Queue.class, Mockito.mock(Queue.class));
-
- // when
- nullPointerTester.testInstanceMethods(uut, NullPointerTester.Visibility.PACKAGE);
- nullPointerTester.testAllPublicConstructors(uut.getClass());
- }
-}
\ No newline at end of file
diff --git a/src/test/java/com/mercateo/sqs/utils/visibility/VisibilityTimeoutExtenderTest.java b/src/test/java/com/mercateo/sqs/utils/visibility/VisibilityTimeoutExtenderTest.java
index 6afd6b0a..e56eb973 100644
--- a/src/test/java/com/mercateo/sqs/utils/visibility/VisibilityTimeoutExtenderTest.java
+++ b/src/test/java/com/mercateo/sqs/utils/visibility/VisibilityTimeoutExtenderTest.java
@@ -15,6 +15,7 @@
import com.github.rholder.retry.WaitStrategies;
import com.google.common.testing.NullPointerTester;
import com.mercateo.sqs.utils.message.handling.ErrorHandlingStrategy;
+import com.mercateo.sqs.utils.message.handling.MessageWrapper;
import java.net.UnknownHostException;
import java.time.Duration;
@@ -44,11 +45,11 @@ public void setUp() throws Exception {
MockitoAnnotations.openMocks(this);
HashMap headerMap = new HashMap<>();
headerMap.put("ReceiptHandle", "rhd");
- GenericMessage