Skip to content

Commit

Permalink
feat: 메시지 발행을 카프카로 이전
Browse files Browse the repository at this point in the history
  • Loading branch information
hyxklee committed Feb 13, 2025
1 parent eb0cb5d commit b3871cf
Show file tree
Hide file tree
Showing 3 changed files with 9 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import com.gachtaxi.domain.chat.entity.enums.MessageType;
import com.gachtaxi.domain.chat.exception.ChattingParticipantNotFoundException;
import com.gachtaxi.domain.chat.exception.DuplicateSubscribeException;
import com.gachtaxi.domain.chat.kafka.KafkaChatPublisher;
import com.gachtaxi.domain.chat.redis.RedisChatPublisher;
import com.gachtaxi.domain.chat.repository.ChattingMessageMongoRepository;
import com.gachtaxi.domain.chat.repository.ChattingParticipantRepository;
Expand All @@ -27,6 +28,7 @@ public class ChattingParticipantService {
private final ChattingMessageMongoRepository chattingMessageMongoRepository;
private final ChattingRedisService chattingRedisService;
private final RedisChatPublisher redisChatPublisher;
private final KafkaChatPublisher kafkaChatPublisher;

@Value("${chat.topic}")
public String chatTopic;
Expand Down Expand Up @@ -83,6 +85,6 @@ private void reEnterEvent(long roomId, long senderId, String senderName, ReadMes
ChannelTopic topic = new ChannelTopic(chatTopic + roomId);
ChatMessage chatMessage = ChatMessage.of(roomId, senderId, senderName, range, MessageType.READ);

redisChatPublisher.publish(topic, chatMessage);
kafkaChatPublisher.publish(chatMessage);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import com.gachtaxi.domain.chat.entity.enums.ChatStatus;
import com.gachtaxi.domain.chat.entity.enums.MessageType;
import com.gachtaxi.domain.chat.exception.ChattingRoomNotFoundException;
import com.gachtaxi.domain.chat.kafka.KafkaChatPublisher;
import com.gachtaxi.domain.chat.redis.RedisChatPublisher;
import com.gachtaxi.domain.chat.repository.ChattingMessageRepository;
import com.gachtaxi.domain.chat.repository.ChattingRoomRepository;
Expand Down Expand Up @@ -37,6 +38,7 @@ public class ChattingRoomService {
private final ChattingParticipantService chattingParticipantService;
private final MemberService memberService;
private final RedisChatPublisher redisChatPublisher;
private final KafkaChatPublisher kafkaChatPublisher;
private final ChattingRedisService chattingRedisService;

@Value("${chat.topic}")
Expand Down Expand Up @@ -112,6 +114,6 @@ private void publishMessage(long roomId, long senderId, String senderName, Strin
ChannelTopic topic = new ChannelTopic(chatTopic + roomId);
ChatMessage chatMessage = ChatMessage.from(chattingMessage);

redisChatPublisher.publish(topic, chatMessage);
kafkaChatPublisher.publish(chatMessage);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import com.gachtaxi.domain.chat.entity.ChattingParticipant;
import com.gachtaxi.domain.chat.entity.ChattingRoom;
import com.gachtaxi.domain.chat.exception.WebSocketSessionException;
import com.gachtaxi.domain.chat.kafka.KafkaChatPublisher;
import com.gachtaxi.domain.chat.redis.RedisChatPublisher;
import com.gachtaxi.domain.chat.repository.ChattingMessageRepository;
import com.gachtaxi.domain.members.entity.Members;
Expand Down Expand Up @@ -38,6 +39,7 @@ public class ChattingService {

private final ChattingMessageRepository chattingMessageRepository;
private final RedisChatPublisher redisChatPublisher;
private final KafkaChatPublisher kafkaChatPublisher;
private final ChattingRoomService chattingRoomService;
private final ChattingParticipantService chattingParticipantService;
private final MemberService memberService;
Expand All @@ -62,7 +64,7 @@ public void chat(ChatMessageRequest request, SimpMessageHeaderAccessor accessor)
ChannelTopic topic = new ChannelTopic(chatTopic + roomId);
ChatMessage chatMessage = ChatMessage.from(chattingMessage);

redisChatPublisher.publish(topic, chatMessage);
kafkaChatPublisher.publish(chatMessage);
/*
todo 채팅에 알림이 도입되면 redis에 참여하지 않은 사람 리스트를 가져와서 푸시알림 보내기. 참여하고 있다면 X
*/
Expand Down

0 comments on commit b3871cf

Please sign in to comment.