-
Notifications
You must be signed in to change notification settings - Fork 111
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Transactional Kafka Producer per thread #750
Comments
Thanks for your contribution @pszymczyk. I've been looking into the issue you described and checking your PR. I think there's a simpler solution to the problem. First of all, Now, you could use @Prototype
@KafkaClient(id = "foo-${random.uuid}", transactionalId = "bar-${random.uuid}")
public interface DemoProducer {
...
} Combining these two existing functionalities would let you use the same producer (different instances of it, really) from different threads without any errors. @sdelamo Given the above, I came to the conclusion that we shouldn't merge #751 unless it provides an additional value I failed to see. Since you tagged it as |
@guillermocalvo perhaps documentation is needed? |
There's a section in the guide that says:
@graemerocher Do you think we should maybe mention |
makes sense to me |
Hi
The ideal solution is to use some static identifiers or at least some predictible values, like: |
Agreed; that's exactly why I'd rather not merge #751 -- because it would make it thread local for everyone. I'd like to let users choose the config that makes more sense for their application.
If that was the case, using numeric sequences would probably make the problem worse. Suppose an application restarts continuosly and starts from That's why I suggested using
The good thing about solving this via config is that everyone can choose the ID scheme that works best for them. |
It works different way - previously registered producer with same transactional id will be fenced and it is correct behaviour. |
@pszymczyk Yes, you're right. The previously registered producer would be the one unable to commit the transaction. In any case, if an application is in a crash loop for days, unexpired transactional ids would be the least of the problems, don't you think? |
Most of the time Kafka is shared service so it is not only matter of the application in crash loop. I don't know what resources are allocated on Kafka broker for new transaction id, lets say Kafka Broker creates new thread for transactional producer, then random transactional id could result in broker crash caused by too many open files. I know Kafka very well and I can check exactly pottential side effects of thousands registered transaction ids next week if you like. |
So I have digged into the Kafka code to be 100% sure how it works and for that case the most important part is when broker loads into memory subset of producerProperties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, ""+UUID.randomUUID());
kafkaProducer = new KafkaProducer<>(producerProperties);
kafkaProducer.initTransactions(); sends one kafka record to |
Feature description
Hi
I would like to send messages to Kafka in transactions from multiple threads. Great example of given is inject
KafkaProducer
into HTTP controller and send messages from Netty Nio Event Loop. Right now it is not possible causeKafkaProducer
is singleton, my proposition is to:ThreadLocal<Map<ClientKey, Producer>>
transactional.id
suffix -my-app-0
,my-app-1
and so onI’ve prepared implementation for that feature .
Exception:
The text was updated successfully, but these errors were encountered: