Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Transactional Kafka Producer per thread #750

Closed
pszymczyk opened this issue Jun 27, 2023 · 10 comments · Fixed by #882
Closed

Transactional Kafka Producer per thread #750

pszymczyk opened this issue Jun 27, 2023 · 10 comments · Fixed by #882
Assignees
Labels
status: under consideration The issue is being considered, but has not been accepted yet type: enhancement New feature or request

Comments

@pszymczyk
Copy link

Feature description

Hi

I would like to send messages to Kafka in transactions from multiple threads. Great example of given is inject KafkaProducer into HTTP controller and send messages from Netty Nio Event Loop. Right now it is not possible cause KafkaProducer is singleton, my proposition is to:

  • maintain producer per thread - ThreadLocal<Map<ClientKey, Producer>>
  • add incremental integer as transactional.id suffix - my-app-0, my-app-1 and so on

I’ve prepared implementation for that feature .

@Controller("/demo")
public class DemoController {

    private DemoProducer demoProducer;

    public DemoController(DemoProducer demoProducer) {
        this.demoProducer = demoProducer;
    }

    @Post(uri = "/")
    public HttpResponse<String> publishSomeMessages() {
        demoProducer.send("asd", "asd");
        demoProducer.send("abc", "abc");
        return accepted();
    }
}

Exception:

10:55:19.587 [default-nioEventLoopGroup-1-16] ERROR i.m.http.server.RouteExecutor - Unexpected error occurred: Exception sending producer record for method [void send(String key,String value)]: TransactionalId trololo: Invalid transition attempted from state ABORTING_TRANSACTION to state IN_TRANSACTION
io.micronaut.messaging.exceptions.MessagingClientException: Exception sending producer record for method [void send(String key,String value)]: TransactionalId trololo: Invalid transition attempted from state ABORTING_TRANSACTION to state IN_TRANSACTION
	at io.micronaut.configuration.kafka.intercept.KafkaClientIntroductionAdvice.wrapException(KafkaClientIntroductionAdvice.java:472)
	at io.micronaut.configuration.kafka.intercept.KafkaClientIntroductionAdvice.returnSynchronous(KafkaClientIntroductionAdvice.java:246)
	at io.micronaut.configuration.kafka.intercept.KafkaClientIntroductionAdvice.intercept(KafkaClientIntroductionAdvice.java:144)
	at io.micronaut.aop.chain.MethodInterceptorChain.proceed(MethodInterceptorChain.java:137)
	at io.micronaut.retry.intercept.RecoveryInterceptor.intercept(RecoveryInterceptor.java:98)
	at io.micronaut.aop.chain.MethodInterceptorChain.proceed(MethodInterceptorChain.java:137)
	at com.pszymczyk.DemoProducer$Intercepted.send(Unknown Source)
	at com.pszymczyk.DemoController.publishSomeMessages(DemoController.java:26)
	at com.pszymczyk.$DemoController$Definition$Exec.dispatch(Unknown Source)
	at io.micronaut.context.AbstractExecutableMethodsDefinition$DispatchedExecutableMethod.invoke(AbstractExecutableMethodsDefinition.java:371)
	at io.micronaut.context.DefaultBeanContext$4.invoke(DefaultBeanContext.java:594)
	at io.micronaut.web.router.AbstractRouteMatch.execute(AbstractRouteMatch.java:246)
	at io.micronaut.web.router.RouteMatch.execute(RouteMatch.java:111)
	at io.micronaut.http.context.ServerRequestContext.with(ServerRequestContext.java:103)
	at io.micronaut.http.server.RouteExecutor.lambda$executeRoute$14(RouteExecutor.java:659)
	at reactor.core.publisher.FluxDeferContextual.subscribe(FluxDeferContextual.java:49)
	at reactor.core.publisher.Flux.subscribe(Flux.java:8660)
	at reactor.core.publisher.FluxFlatMap$FlatMapMain.onNext(FluxFlatMap.java:426)
	at io.micronaut.reactive.reactor.instrument.ReactorSubscriber.onNext(ReactorSubscriber.java:57)
	at reactor.core.publisher.Operators$ScalarSubscription.request(Operators.java:2508)
	at reactor.core.publisher.FluxFlatMap$FlatMapMain.onSubscribe(FluxFlatMap.java:371)
	at io.micronaut.reactive.reactor.instrument.ReactorSubscriber.onSubscribe(ReactorSubscriber.java:50)
	at reactor.core.publisher.FluxJust.subscribe(FluxJust.java:68)
	at reactor.core.publisher.Flux.subscribe(Flux.java:8660)
	at io.micronaut.http.server.netty.RoutingInBoundHandler.handleRouteMatch(RoutingInBoundHandler.java:601)
	at io.micronaut.http.server.netty.RoutingInBoundHandler.channelRead0(RoutingInBoundHandler.java:457)
	at io.micronaut.http.server.netty.RoutingInBoundHandler.channelRead0(RoutingInBoundHandler.java:147)
	at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
	at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:102)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
	at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
	at io.micronaut.http.netty.stream.HttpStreamsHandler.channelRead(HttpStreamsHandler.java:225)
	at io.micronaut.http.netty.stream.HttpStreamsServerHandler.channelRead(HttpStreamsServerHandler.java:134)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
	at io.netty.channel.ChannelInboundHandlerAdapter.channelRead(ChannelInboundHandlerAdapter.java:93)
	at io.netty.handler.codec.http.websocketx.extensions.WebSocketServerExtensionHandler.onHttpRequestChannelRead(WebSocketServerExtensionHandler.java:160)
	at io.netty.handler.codec.http.websocketx.extensions.WebSocketServerExtensionHandler.channelRead(WebSocketServerExtensionHandler.java:83)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
	at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
	at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
	at io.netty.handler.codec.MessageToMessageCodec.channelRead(MessageToMessageCodec.java:111)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
	at io.netty.channel.ChannelInboundHandlerAdapter.channelRead(ChannelInboundHandlerAdapter.java:93)
	at io.netty.handler.codec.http.HttpServerKeepAliveHandler.channelRead(HttpServerKeepAliveHandler.java:64)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
	at io.netty.handler.flow.FlowControlHandler.dequeue(FlowControlHandler.java:202)
	at io.netty.handler.flow.FlowControlHandler.channelRead(FlowControlHandler.java:164)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
	at io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:436)
	at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:346)
	at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:318)
	at io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:251)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
	at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:286)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
	at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
	at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
	at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
	at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:788)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:724)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:650)
	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:562)
	at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
	at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	at java.base/java.lang.Thread.run(Thread.java:833)
Caused by: org.apache.kafka.common.KafkaException: TransactionalId trololo: Invalid transition attempted from state ABORTING_TRANSACTION to state IN_TRANSACTION
	at org.apache.kafka.clients.producer.internals.TransactionManager.transitionTo(TransactionManager.java:1078)
	at org.apache.kafka.clients.producer.internals.TransactionManager.transitionTo(TransactionManager.java:1071)
	at org.apache.kafka.clients.producer.internals.TransactionManager.beginTransaction(TransactionManager.java:357)
	at org.apache.kafka.clients.producer.KafkaProducer.beginTransaction(KafkaProducer.java:619)
	at io.micronaut.configuration.kafka.intercept.KafkaClientIntroductionAdvice.returnSynchronous(KafkaClientIntroductionAdvice.java:180)
	... 91 common frames omitted
@sdelamo sdelamo added type: enhancement New feature or request status: under consideration The issue is being considered, but has not been accepted yet labels Aug 4, 2023
@guillermocalvo guillermocalvo self-assigned this Sep 25, 2023
@guillermocalvo
Copy link
Contributor

Thanks for your contribution @pszymczyk.

I've been looking into the issue you described and checking your PR. I think there's a simpler solution to the problem.

First of all, KafkaProducer beans are not necessarily tied to @Singleton scope. For example, you can annotate your Kafka clients with @ThreadLocal to store beans in thread local storage, or you can even use @Prototype to create a new bean for every injection point.

Now, you could use random values to your advantage so that each instance of your producer gets a different transactional ID.

@Prototype
@KafkaClient(id = "foo-${random.uuid}", transactionalId = "bar-${random.uuid}")
public interface DemoProducer {
  ...
}

Combining these two existing functionalities would let you use the same producer (different instances of it, really) from different threads without any errors.

@sdelamo Given the above, I came to the conclusion that we shouldn't merge #751 unless it provides an additional value I failed to see. Since you tagged it as under consideration, what do you think we should do?

@graemerocher
Copy link
Contributor

@guillermocalvo perhaps documentation is needed?

@guillermocalvo
Copy link
Contributor

perhaps documentation is needed?

There's a section in the guide that says:

@KafkaListener beans are by default singleton. When using multiple threads you must either synchronize access to local state or declare the bean as @Prototype.

@graemerocher Do you think we should maybe mention random values too?

@graemerocher
Copy link
Contributor

makes sense to me

@guillermocalvo guillermocalvo linked a pull request Sep 28, 2023 that will close this issue
@pszymczyk
Copy link
Author

Hi

@ThreadLocal sounds really good to me, but I am on sure is it good for everyone, and it is legit to make it part of the official documentation. The problem with random values is that every time we create new KafkaProducer instance we will register new transaction.id in Kafka Transaction Manager, so according to the documentation Kafka broker is waiting 7 days untill remove previous transactional ids. In most cases it wouldn't be the case but sometimes, when for example application restarts continously for few days, that configuration will produce a lot of rubbish data on Kafka Broker.

The ideal solution is to use some static identifiers or at least some predictible values, like:
{applicationName}-{instance identifier}-{integer identifier}, for example: mySuperApplication-blue-3

@guillermocalvo
Copy link
Contributor

@ThreadLocal sounds really good to me, but I am on sure is it good for everyone

Agreed; that's exactly why I'd rather not merge #751 -- because it would make it thread local for everyone. I'd like to let users choose the config that makes more sense for their application.

The problem with random values is that every time we create new KafkaProducer instance we will register new transaction.id in Kafka Transaction Manager, so according to the documentation Kafka broker is waiting 7 days untill remove previous transactional ids. In most cases it wouldn't be the case but sometimes, when for example application restarts continously for few days, that configuration will produce a lot of rubbish data on Kafka Broker.

If that was the case, using numeric sequences would probably make the problem worse. Suppose an application restarts continuosly and starts from 1 every time. Eventually it will pick up a transactional.id that was previously abandoned; then the producer won't be able to even begin the transaction.

That's why I suggested using random.uuid: if you need to make up transaction IDs along the way, you are going to want to avoid collisions as much as possible. There are "shorter" alternatives like random.shortuuid or random.int though.

The ideal solution is to use some static identifiers or at least some predictible values, like: {applicationName}-{instance identifier}-{integer identifier}, for example: mySuperApplication-blue-3

The good thing about solving this via config is that everyone can choose the ID scheme that works best for them.

@pszymczyk
Copy link
Author

If that was the case, using numeric sequences would probably make the problem worse. Suppose an application restarts continuosly and starts from 1 every time. Eventually it will pick up a transactional.id that was previously abandoned; then the producer won't be able to even begin the transaction.

It works different way - previously registered producer with same transactional id will be fenced and it is correct behaviour.

@guillermocalvo
Copy link
Contributor

It works different way - previously registered producer with same transactional id will be fenced and it is correct behaviour.

@pszymczyk Yes, you're right. The previously registered producer would be the one unable to commit the transaction. In any case, if an application is in a crash loop for days, unexpired transactional ids would be the least of the problems, don't you think?

@pszymczyk
Copy link
Author

Most of the time Kafka is shared service so it is not only matter of the application in crash loop. I don't know what resources are allocated on Kafka broker for new transaction id, lets say Kafka Broker creates new thread for transactional producer, then random transactional id could result in broker crash caused by too many open files. I know Kafka very well and I can check exactly pottential side effects of thousands registered transaction ids next week if you like.

@pszymczyk
Copy link
Author

So I have digged into the Kafka code to be 100% sure how it works and for that case the most important part is when broker loads into memory subset of __transaction_state topic partitions, so each invocation of:

producerProperties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, ""+UUID.randomUUID());
kafkaProducer = new KafkaProducer<>(producerProperties);
kafkaProducer.initTransactions();

sends one kafka record to __transaction_state topic and record is cached in broker memory. I have performed simple test where I have registered 10443 transactional producers with random transactional.id on single kafka broker, broker memory usage before test was 320MB and after 670MB.

@sdelamo sdelamo removed this from 4.1.5 Release Oct 18, 2023
@sdelamo sdelamo added this to 4.1.6 Oct 18, 2023
@sdelamo sdelamo moved this to Done in 4.1.6 Nov 1, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
status: under consideration The issue is being considered, but has not been accepted yet type: enhancement New feature or request
Projects
No open projects
Status: Done
Development

Successfully merging a pull request may close this issue.

4 participants