-
Notifications
You must be signed in to change notification settings - Fork 0
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Feat #124 채팅 카프카 이전 #124
base: dev
Are you sure you want to change the base?
The head ref may contain hidden characters: "feat/#72/\uCC44\uD305-\uCE74\uD504\uCE74"
Feat #124 채팅 카프카 이전 #124
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
레디스에서 카프카로 이전하여 메세지 유실에 대한 걱정이 크게 줄어들었네요!
고생하셨습니다!
private String topic; | ||
|
||
public void publish(ChatMessage chatMessage) { | ||
log.info("📤 Kafka 채팅 메시지 발행: {}", chatMessage); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
귀여운 이모지와 있는 해당 로그 기록은 운영 상에서도 사용하는 건가용??
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
여기서 Kafka 채팅 메시지 발행시 로그를 남겨줘 서버단에서 확인할 수 있도록 설계하신 걸로 이해했는데,
저는 Kafka 브로커와의 연결 문제가 발생하거나 전송 과정에서 오류가 생길 경우(실패시에도) 이를 명확히 로그로 남겨주는 것도 도움이 될 것 같다는 개인적인 생각이 들기도 했습니다
혹시 이런 부분을 고려해 발행 실패 시 예외 처리를 추가할 계획이 있으신지, 아니면 현재 구조에서는 발행 상황에서만 로그가 필요하다고 보시는지 강혁님의 의견이 궁금합니당
@KafkaListener(topics = "${gachtaxi.kafka.topics.chat-room}", groupId = "${spring.kafka.consumer.chat-group-id}", containerFactory = "chatMessageListenerFactory") | ||
public void consumeChatMessage(@Payload ChatMessage chatMessage, Acknowledgment acknowledgment) { | ||
try { | ||
log.info("📤 Kafka 채팅 메시지 읽기 성공: {}", chatMessage); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
여기도 일부러 사용하시는 건가요??
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
고생하셨습니다 !!
import static com.gachtaxi.global.auth.jwt.util.kafka.KafkaBeanSuffix.KAFKA_TEMPLATE_SUFFIX; | ||
import static com.gachtaxi.global.auth.jwt.util.kafka.KafkaBeanSuffix.NEW_TOPIC_SUFFIX; | ||
import static com.gachtaxi.global.auth.jwt.util.kafka.KafkaBeanSuffix.PRODUCER_FACTORY_SUFFIX; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
해당 부분들을 나눠서 KAFKA_TEMPLATE_SUFFIX, NEW_TOPIC_SUFFIX 등을 각각 import 해주다가,
아래에서 import static com.gachtaxi.global.auth.jwt.util.kafka.KafkaBeanSuffix.*; 으로 통합해서 하나의 경로로 관리해주는 방식으로 변경하신 이유가 있으셨는지 궁금합니다 !
private String topic; | ||
|
||
public void publish(ChatMessage chatMessage) { | ||
log.info("📤 Kafka 채팅 메시지 발행: {}", chatMessage); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
여기서 Kafka 채팅 메시지 발행시 로그를 남겨줘 서버단에서 확인할 수 있도록 설계하신 걸로 이해했는데,
저는 Kafka 브로커와의 연결 문제가 발생하거나 전송 과정에서 오류가 생길 경우(실패시에도) 이를 명확히 로그로 남겨주는 것도 도움이 될 것 같다는 개인적인 생각이 들기도 했습니다
혹시 이런 부분을 고려해 발행 실패 시 예외 처리를 추가할 계획이 있으신지, 아니면 현재 구조에서는 발행 상황에서만 로그가 필요하다고 보시는지 강혁님의 의견이 궁금합니당
📌 관련 이슈
관련 이슈 번호 #74
Close #74
🚀 작업 내용
이전 redis로 메시지 발행을 하던 것을 카프카로 이전했습니다.
이전한 이유는 무중단 배포가 돌아갈 때 발행된 메시지의 유실을 막기 위해 이전하였습니다.
카프카에 chat-group을 추가하였고, 채팅을 위한 topic을 추가했습니다.
ChatMessage만 다루는 Producer, Consumer 설정을 추가했습니다. ACK는 All로 설정해 메시지 유실을 방지했고, 재시도 횟수는 3회로 설정했습니다.
KafkaChatSubscriber에서 메시지를 받을 시 ack를 읽음 처리해 메시지 중복 처리를 막았습니다.
jmeter를 이용한 부하테스트시 무중단 테스트까지는 진행하지 못했지만, 서버가 꺼진 상태에서 카프카에 메시지가 전송된 경우 서버가 다시 켜졌을 때 처리되지 않은 메시지부터 Consumer에게 정상적으로 발행되는 것을 테스트 완료했습니다
📸 스크린샷
📢 리뷰 요구사항