Skip to content
This repository has been archived by the owner on Sep 6, 2024. It is now read-only.

Commit

Permalink
use MessageWrapper to sync acknowledge and timeout extender (#23)
Browse files Browse the repository at this point in the history
use MessageWrapper to sync acknowledge and timeout extender
  • Loading branch information
StephanPraetsch authored Jun 13, 2024
1 parent c746611 commit 36ee1eb
Show file tree
Hide file tree
Showing 10 changed files with 125 additions and 168 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
package com.mercateo.sqs.utils.message.handling;

import com.mercateo.sqs.utils.queue.Queue;
import com.mercateo.sqs.utils.visibility.VisibilityTimeoutExtender;
import com.mercateo.sqs.utils.visibility.VisibilityTimeoutExtenderFactory;

import java.time.Duration;
Expand Down Expand Up @@ -150,23 +149,24 @@ public Future<?> submit(Runnable task) {
* the message to be processed
*/
public void handleMessage(@NonNull Message<I> message) {
String messageId = message.getHeaders().get("MessageId", String.class);
MessageWrapper<I> messageWrapper = new MessageWrapper<>(message);
String messageId = messageWrapper.getMessageId();
if (messagesInProcessing.contains(messageId)) {
return;
}
messagesInProcessing.add(messageId);

ScheduledFuture<?> timeoutExtender;
try {
timeoutExtender = scheduleNewVisibilityTimeoutExtender(message);
timeoutExtender = scheduleNewVisibilityTimeoutExtender(messageWrapper);
} catch (RuntimeException rex) {
messagesInProcessing.remove(messageId);
log.error("error while trying to schedule timeout extender", rex);
throw new RuntimeException(rex);
}

try {
scheduleNewMessageTask(message, timeoutExtender);
scheduleNewMessageTask(messageWrapper, timeoutExtender);
} catch (RuntimeException rex) {
messagesInProcessing.remove(messageId);
timeoutExtender.cancel(true);
Expand All @@ -192,19 +192,20 @@ public int getFreeWorkerCapacity() {
return messagesInProcessing.free();
}

private void scheduleNewMessageTask(@NonNull Message<I> message,
private void scheduleNewMessageTask(@NonNull MessageWrapper<I> messageWrapper,
ScheduledFuture<?> visibilityTimeoutExtender) {
MessageHandlingRunnable<I, O> messageTask = messageHandlingRunnableFactory.get(worker,
message, finishedMessageCallback, messagesInProcessing, visibilityTimeoutExtender, errorHandlingStrategy);
messageWrapper, finishedMessageCallback, messagesInProcessing, visibilityTimeoutExtender, errorHandlingStrategy);

messageProcessingExecutor.submit(messageTask);
}

private ScheduledFuture<?> scheduleNewVisibilityTimeoutExtender(@NonNull Message<I> message) {
VisibilityTimeoutExtender timeoutExtender = timeoutExtenderFactory.get(message, queue, errorHandlingStrategy);
return timeoutExtensionExecutor.scheduleAtFixedRate(timeoutExtender,
timeUntilVisibilityTimeoutExtension.toMillis(), timeUntilVisibilityTimeoutExtension
.toMillis(), TimeUnit.MILLISECONDS);
private ScheduledFuture<?> scheduleNewVisibilityTimeoutExtender(@NonNull MessageWrapper<I> messageWrapper) {
return timeoutExtensionExecutor.scheduleAtFixedRate(
timeoutExtenderFactory.get(messageWrapper, queue, errorHandlingStrategy),
timeUntilVisibilityTimeoutExtension.toMillis(),
timeUntilVisibilityTimeoutExtension.toMillis(),
TimeUnit.MILLISECONDS);
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
/**
*
* Copyright © 2017 Mercateo AG (http://www.mercateo.com)
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
Expand All @@ -17,82 +16,70 @@

import com.amazonaws.AmazonServiceException;

import io.awspring.cloud.messaging.listener.Acknowledgment;

import java.util.concurrent.ScheduledFuture;

import lombok.NonNull;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;

import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;

@Slf4j
@RequiredArgsConstructor
public class MessageHandlingRunnable<I, O> implements Runnable {

@NonNull
private final MessageWorkerWithHeaders<I, O> worker;

private final Message<I> message;
@NonNull
private final MessageWrapper<I> messageWrapper;

@NonNull
private final FinishedMessageCallback<I, O> finishedMessageCallback;

@NonNull
private final SetWithUpperBound<String> messages;

@NonNull
private final ScheduledFuture<?> visibilityTimeoutExtender;

@NonNull
private final ErrorHandlingStrategy<I> errorHandlingStrategy;

MessageHandlingRunnable(@NonNull MessageWorkerWithHeaders<I, O> worker,
@NonNull Message<I> message,
@NonNull FinishedMessageCallback<I, O> finishedMessageCallback,
@NonNull SetWithUpperBound<String> messages,
@NonNull ScheduledFuture<?> visibilityTimeoutExtender,
@NonNull ErrorHandlingStrategy<I> errorHandlingStrategy) {

this.worker = worker;
this.message = message;
this.finishedMessageCallback = finishedMessageCallback;
this.messages = messages;
this.visibilityTimeoutExtender = visibilityTimeoutExtender;
this.errorHandlingStrategy = errorHandlingStrategy;

}

@Override
public void run() {
String messageId = message.getHeaders().get("MessageId", String.class);
Acknowledgment acknowledgment = message.getHeaders().get("Acknowledgment",
Acknowledgment.class);
String messageId = messageWrapper.getMessageId();
try {
log.info("starting processing of message " + messageId);

O outcome = worker.work(message.getPayload(), message.getHeaders());
I payload = messageWrapper.getMessage().getPayload();
MessageHeaders headers = messageWrapper.getMessage().getHeaders();
O outcome = worker.work(payload, headers);

finishedMessageCallback.call(message.getPayload(), outcome);
acknowledge(messageId, acknowledgment);
log.info("message task successfully processed and message acknowledged: " + messageId);
finishedMessageCallback.call(payload, outcome);
acknowledge();
log.info("message task successfully processed and message acknowledged: " + messageId); //
} catch (InterruptedException e) {
log.info("got interrupted, did not finish: " + messageId, e);
} catch (Exception e) {
errorHandlingStrategy.handleWorkerException(e, message);
acknowledge(messageId, acknowledgment);
errorHandlingStrategy.handleWorkerException(e, messageWrapper.getMessage());
acknowledge();
} catch (Throwable t) {
errorHandlingStrategy.handleWorkerThrowable(t, message);
acknowledge(messageId, acknowledgment);
errorHandlingStrategy.handleWorkerThrowable(t, messageWrapper.getMessage());
acknowledge();
} finally {
visibilityTimeoutExtender.cancel(false);
messages.remove(messageId);
}
}

private void acknowledge(String messageId, Acknowledgment acknowledgment) {
private void acknowledge() {
try {
try {
acknowledgment.acknowledge().get();
} catch (AmazonServiceException e) {
errorHandlingStrategy.handleAcknowledgeMessageException(e, message);
}
messageWrapper.acknowledge();
} catch (AmazonServiceException e) {
errorHandlingStrategy.handleAcknowledgeMessageException(e, messageWrapper.getMessage());
} catch (Exception e) {
log.error("failure during acknowledge " + messageId, e);
log.error("failure during acknowledge " + messageWrapper.getMessageId(), e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,19 +21,17 @@

import lombok.NonNull;

import org.springframework.messaging.Message;

@Named
public class MessageHandlingRunnableFactory {

<I, O> MessageHandlingRunnable<I, O> get(@NonNull MessageWorkerWithHeaders<I, O> worker,
@NonNull Message<I> message,
@NonNull MessageWrapper<I> messageWrapper,
@NonNull FinishedMessageCallback<I, O> finishedMessageCallback,
@NonNull SetWithUpperBound<String> messageSet,
@NonNull ScheduledFuture<?> visibilityTimeoutExtender,
@NonNull ErrorHandlingStrategy<I> errorHandlingStrategy) {

return new MessageHandlingRunnable<>(worker, message, finishedMessageCallback, messageSet,
return new MessageHandlingRunnable<>(worker, messageWrapper, finishedMessageCallback, messageSet,
visibilityTimeoutExtender, errorHandlingStrategy);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package com.mercateo.sqs.utils.message.handling;

import com.amazonaws.services.sqs.AmazonSQS;
import com.amazonaws.services.sqs.model.ChangeMessageVisibilityRequest;

import io.awspring.cloud.messaging.listener.Acknowledgment;

import java.util.concurrent.TimeUnit;

import lombok.Getter;
import lombok.NonNull;
import lombok.RequiredArgsConstructor;
import lombok.SneakyThrows;

import org.springframework.messaging.Message;

@RequiredArgsConstructor
public class MessageWrapper<I> {

@NonNull
@Getter
private final Message<I> message;

private boolean acknowledged = false;

public String getMessageId() {
return message.getHeaders().get("MessageId", String.class);
}

public String getReceiptHandle() {
return message.getHeaders().get("ReceiptHandle", String.class);
}

@SneakyThrows
public synchronized void acknowledge() {
Acknowledgment acknowledgment = message.getHeaders().get("Acknowledgment", Acknowledgment.class);
if (acknowledgment == null) {
throw new NullPointerException("there is no \"Acknowledgment\" in the message headers");
}
acknowledgment.acknowledge().get(2, TimeUnit.MINUTES);
acknowledged = true;
}

public synchronized void changeMessageVisibility(AmazonSQS sqsClient, ChangeMessageVisibilityRequest request) {
if (acknowledged) {
return;
}
sqsClient.changeMessageVisibility(request);
}
}
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
/**
* Copyright © 2017 Mercateo AG (http://www.mercateo.com)
*
* <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
Expand All @@ -18,49 +18,47 @@
import com.amazonaws.AmazonServiceException;
import com.amazonaws.services.sqs.AmazonSQS;
import com.amazonaws.services.sqs.model.ChangeMessageVisibilityRequest;
import com.amazonaws.services.sqs.model.ChangeMessageVisibilityResult;
import com.github.rholder.retry.Retryer;
import com.github.rholder.retry.RetryerBuilder;
import com.mercateo.sqs.utils.message.handling.ErrorHandlingStrategy;
import com.mercateo.sqs.utils.message.handling.MessageWrapper;

import java.net.UnknownHostException;
import java.time.Duration;

import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;

import org.springframework.messaging.Message;

@Slf4j
public class VisibilityTimeoutExtender implements Runnable {

private final AmazonSQS sqsClient;

private final ChangeMessageVisibilityRequest request;

private final Message<?> message;
private final MessageWrapper<?> messageWrapper;

private final ErrorHandlingStrategy<?> errorHandlingStrategy;

private final Retryer<ChangeMessageVisibilityResult> retryer;
private final Retryer<Void> retryer;

VisibilityTimeoutExtender(@NonNull AmazonSQS sqsClient, @NonNull Duration newVisibilityTimeout,
@NonNull Message<?> message, @NonNull String queueUrl,
@NonNull ErrorHandlingStrategy<?> errorHandlingStrategy,
@NonNull RetryStrategy retryStrategy) {
@NonNull MessageWrapper<?> messageWrapper, @NonNull String queueUrl,
@NonNull ErrorHandlingStrategy<?> errorHandlingStrategy,
@NonNull RetryStrategy retryStrategy) {
this.sqsClient = sqsClient;
this.message = message;
this.messageWrapper = messageWrapper;
this.errorHandlingStrategy = errorHandlingStrategy;
this.retryer = RetryerBuilder
.<ChangeMessageVisibilityResult> newBuilder()
.<Void>newBuilder()
.retryIfException(t -> (t.getCause() instanceof UnknownHostException))
.withWaitStrategy(retryStrategy.getRetryWaitStrategy())
.withStopStrategy(retryStrategy.getRetryStopStrategy())
.build();

request = new ChangeMessageVisibilityRequest().withQueueUrl(queueUrl).withReceiptHandle(
message.getHeaders().get("ReceiptHandle", String.class)).withVisibilityTimeout(
timeoutInSeconds(newVisibilityTimeout));
messageWrapper.getReceiptHandle()).withVisibilityTimeout(
timeoutInSeconds(newVisibilityTimeout));
}

private Integer timeoutInSeconds(Duration timeout) {
Expand All @@ -70,14 +68,17 @@ private Integer timeoutInSeconds(Duration timeout) {
@Override
public void run() {
try {
log.trace("changing message visibility: " + request);
retryer.call(() -> sqsClient.changeMessageVisibility(request));
} catch (AmazonServiceException e) {
errorHandlingStrategy.handleExtendVisibilityTimeoutException(e, message);
retryer.call(() -> {
messageWrapper.changeMessageVisibility(sqsClient, request);
return null;
});
} catch (Exception e) {
log.error("error while extending message visibility for " + message.getHeaders().get("MessageId",
String.class), e);
throw new RuntimeException(e);
if (e.getCause() instanceof AmazonServiceException) {
errorHandlingStrategy.handleExtendVisibilityTimeoutException((AmazonServiceException) e.getCause(), messageWrapper.getMessage());
} else {
log.error("error while extending message visibility for " + messageWrapper.getMessageId(), e);
throw new RuntimeException(e);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import com.github.rholder.retry.StopStrategies;
import com.github.rholder.retry.WaitStrategies;
import com.mercateo.sqs.utils.message.handling.ErrorHandlingStrategy;
import com.mercateo.sqs.utils.message.handling.MessageWrapper;
import com.mercateo.sqs.utils.queue.Queue;

import java.time.Duration;
Expand All @@ -29,8 +30,6 @@

import lombok.NonNull;

import org.springframework.messaging.Message;

@Named
public class VisibilityTimeoutExtenderFactory {

Expand All @@ -41,12 +40,11 @@ public VisibilityTimeoutExtenderFactory(@NonNull AmazonSQS amazonSQS) {
this.sqsClient = amazonSQS;
}

public VisibilityTimeoutExtender get(@NonNull Message<?> message, @NonNull Queue queue,
@NonNull ErrorHandlingStrategy<?> errorHandlingStrategy) {
public VisibilityTimeoutExtender get(@NonNull MessageWrapper messageWrapper, @NonNull Queue queue, @NonNull ErrorHandlingStrategy<?> errorHandlingStrategy) {

Duration defaultVisibilityTimeout = queue.getDefaultVisibilityTimeout();

return new VisibilityTimeoutExtender(sqsClient, defaultVisibilityTimeout, message, queue
return new VisibilityTimeoutExtender(sqsClient, defaultVisibilityTimeout, messageWrapper, queue
.getUrl(), errorHandlingStrategy,
new RetryStrategy(WaitStrategies.fixedWait(1000, TimeUnit.MILLISECONDS),
StopStrategies.stopAfterAttempt(5)));
Expand Down
Loading

0 comments on commit 36ee1eb

Please sign in to comment.