-
Notifications
You must be signed in to change notification settings - Fork 217
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
SKYEDEN-3234 detect unused topics (#1922)
* SKYEDEN-3234 | Add marking and unmarking commands * SKYEDEN-3234 | Change to batch upserting * SKYEDEN-3234 | Implement zk unusted topics repo * SKYEDEN-3234 | Implement unused topics service * SKYEDEN-3234 | Add repo for last published message timestamp * SKYEDEN-3234 | Rename from read to get * SKYEDEN-3234 | Change last notified to list with timestamps * SKYEDEN-3234 | Implement unused topics detection job * SKYEDEN-3234 | Add tests for unused topics detection * SKYEDEN-3234 | Add detection job test * SKYEDEN-3234 | Refactor detection job test * SKYEDEN-3234 | Make unused topics notifier bean optional * SKYEDEN-3234 | Add scheduling * SKYEDEN-3234 | Add properties with default values * SKYEDEN-3234 | Add more logging and refactor * SKYEDEN-3234 | Rename from unused to inactive * SKYEDEN-3234 | Handle lack of last published message metrics * SKYEDEN-3234 | Do not call notifier when no inactive topics * SKYEDEN-3234 | Fix style * SKYEDEN-3234 | Add log when detection starts * SKYEDEN-3234 | Fix style * SKYEDEN-3234 | Add more logging * SKYEDEN-3234 | Move leader and config to separate classes * SKYEDEN-3234 | Change log message * SKYEDEN-3234 | Remove unnecessary annotation * SKYEDEN-3234 | Do not update notification timestamps when notification skipped * SKYEDEN-3234 | Add docs for inactive topics detection * SKYEDEN-3234 | Add notification result as return type of notifier * SKYEDEN-3234 | Add owner info to be used by notifier * SKYEDEN-3234 | Limit number of notification timestamps in history * SKYEDEN-3234 | Update docs * SKYEDEN-3234 | Specify shorter names to be saved in json * SKYEDEN-3271 |hermes management leader (#1934) * SKYEDEN-3271 |hermes management leader * SKYEDEN-3271 | cr changes * SKYEDEN-3234 | Remove unused property * SKYEDEN-3271 | fix leader path * Revert "Merge branch 'master' into SKYEDEN-3234-detect-unused-topics" This reverts commit 9545569, reversing changes made to 8194321. * Revert "Revert "Merge branch 'master' into SKYEDEN-3234-detect-unused-topics"" This reverts commit cf17940. * SKYEDEN-3234 | Add metrics for number of inactive topics * SKYEDEN-3234 | Remove * import --------- Co-authored-by: Marcin Bobiński <[email protected]> Co-authored-by: Marcin Bobinski <[email protected]>
- Loading branch information
1 parent
7adcd41
commit ffc4e39
Showing
29 changed files
with
1,367 additions
and
3 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,24 @@ | ||
# Inactive Topics Detection | ||
|
||
Hermes Management provides an optional feature to detect inactive topics and | ||
notify about them. This feature is **disabled by default**. You can enable it | ||
and configure other options in the Hermes Management configuration. | ||
|
||
Option | Description | Default value | ||
-------------------------------------------------------------|----------------------------------------------------------------------------|--------------- | ||
detection.inactive-topics.enabled | enable inactive topics detection | false | ||
detection.inactive-topics.inactivity-threshold | duration after which a topic is considered inactive and first notified | 60d | ||
detection.inactive-topics.next-notification-threshold | duration after previous notification after which a topic is notified again | 14d | ||
detection.inactive-topics.whitelisted-qualified-topic-names | list of qualified topic names that will not be notified event if inactive | [] | ||
detection.inactive-topics.cron | cron expression for the detection job | 0 0 8 * * * | ||
detection.inactive-topics.notifications-history-limit | how many notification timestamps will be kept in history | 5 | ||
|
||
The detection job runs on a single instance of Hermes Management that is a | ||
leader based on the leader election Zookeeper instance. | ||
|
||
Option | Description | Default Value | ||
------------------------------------|-----------------------------------------------------------------------------|--------------- | ||
management.leadership.zookeeper-dc | Specifies the datacenter of the Zookeeper instance used for leader election | dc | ||
|
||
To make notifying work, you need to provide an implementation of | ||
`pl.allegro.tech.hermes.management.domain.detection.InactiveTopicsNotifier` |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
25 changes: 25 additions & 0 deletions
25
...ava/pl/allegro/tech/hermes/management/config/detection/InactiveTopicsDetectionConfig.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,25 @@ | ||
package pl.allegro.tech.hermes.management.config.detection; | ||
|
||
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; | ||
import org.springframework.boot.context.properties.EnableConfigurationProperties; | ||
import org.springframework.context.annotation.Bean; | ||
import org.springframework.context.annotation.Configuration; | ||
import org.springframework.scheduling.annotation.EnableScheduling; | ||
import pl.allegro.tech.hermes.management.domain.detection.InactiveTopicsDetectionJob; | ||
import pl.allegro.tech.hermes.management.infrastructure.detection.InactiveTopicsDetectionScheduler; | ||
import pl.allegro.tech.hermes.management.infrastructure.leader.ManagementLeadership; | ||
|
||
@Configuration | ||
@EnableConfigurationProperties(InactiveTopicsDetectionProperties.class) | ||
@EnableScheduling | ||
public class InactiveTopicsDetectionConfig { | ||
@ConditionalOnProperty( | ||
prefix = "detection.inactive-topics", | ||
value = "enabled", | ||
havingValue = "true") | ||
@Bean | ||
InactiveTopicsDetectionScheduler inactiveTopicsDetectionScheduler( | ||
InactiveTopicsDetectionJob job, ManagementLeadership leader) { | ||
return new InactiveTopicsDetectionScheduler(job, leader); | ||
} | ||
} |
12 changes: 12 additions & 0 deletions
12
...pl/allegro/tech/hermes/management/config/detection/InactiveTopicsDetectionProperties.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,12 @@ | ||
package pl.allegro.tech.hermes.management.config.detection; | ||
|
||
import java.time.Duration; | ||
import java.util.Set; | ||
import org.springframework.boot.context.properties.ConfigurationProperties; | ||
|
||
@ConfigurationProperties(prefix = "detection.inactive-topics") | ||
public record InactiveTopicsDetectionProperties( | ||
Duration inactivityThreshold, | ||
Duration nextNotificationThreshold, | ||
Set<String> whitelistedQualifiedTopicNames, | ||
int notificationsHistoryLimit) {} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
36 changes: 36 additions & 0 deletions
36
...ement/src/main/java/pl/allegro/tech/hermes/management/domain/detection/InactiveTopic.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,36 @@ | ||
package pl.allegro.tech.hermes.management.domain.detection; | ||
|
||
import com.fasterxml.jackson.annotation.JsonProperty; | ||
import java.time.Instant; | ||
import java.util.ArrayList; | ||
import java.util.List; | ||
|
||
public record InactiveTopic( | ||
@JsonProperty("topic") String qualifiedTopicName, | ||
@JsonProperty("lastPublishedTsMs") long lastPublishedMessageTimestampMs, | ||
@JsonProperty("notificationTsMs") List<Long> notificationTimestampsMs, | ||
@JsonProperty("whitelisted") boolean whitelisted) { | ||
|
||
InactiveTopic notificationSent(Instant timestamp) { | ||
List<Long> newNotificationTimestampsMs = new ArrayList<>(notificationTimestampsMs); | ||
newNotificationTimestampsMs.add(timestamp.toEpochMilli()); | ||
return new InactiveTopic( | ||
this.qualifiedTopicName, | ||
this.lastPublishedMessageTimestampMs, | ||
newNotificationTimestampsMs, | ||
this.whitelisted); | ||
} | ||
|
||
InactiveTopic limitNotificationsHistory(int limit) { | ||
List<Long> newNotificationTimestampsMs = | ||
notificationTimestampsMs.stream() | ||
.sorted((a, b) -> Long.compare(b, a)) | ||
.limit(limit) | ||
.toList(); | ||
return new InactiveTopic( | ||
this.qualifiedTopicName, | ||
this.lastPublishedMessageTimestampMs, | ||
newNotificationTimestampsMs, | ||
this.whitelisted); | ||
} | ||
} |
5 changes: 5 additions & 0 deletions
5
.../main/java/pl/allegro/tech/hermes/management/domain/detection/InactiveTopicWithOwner.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,5 @@ | ||
package pl.allegro.tech.hermes.management.domain.detection; | ||
|
||
import pl.allegro.tech.hermes.api.OwnerId; | ||
|
||
public record InactiveTopicWithOwner(InactiveTopic topic, OwnerId ownerId) {} |
153 changes: 153 additions & 0 deletions
153
...n/java/pl/allegro/tech/hermes/management/domain/detection/InactiveTopicsDetectionJob.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,153 @@ | ||
package pl.allegro.tech.hermes.management.domain.detection; | ||
|
||
import static java.util.stream.Collectors.groupingBy; | ||
|
||
import io.micrometer.core.instrument.MeterRegistry; | ||
import io.micrometer.core.instrument.Tags; | ||
import java.time.Clock; | ||
import java.time.Instant; | ||
import java.util.HashMap; | ||
import java.util.List; | ||
import java.util.Map; | ||
import java.util.Objects; | ||
import java.util.Optional; | ||
import java.util.stream.Collectors; | ||
import java.util.stream.Stream; | ||
import org.slf4j.Logger; | ||
import org.slf4j.LoggerFactory; | ||
import org.springframework.stereotype.Component; | ||
import pl.allegro.tech.hermes.api.OwnerId; | ||
import pl.allegro.tech.hermes.api.Topic; | ||
import pl.allegro.tech.hermes.api.TopicName; | ||
import pl.allegro.tech.hermes.management.config.detection.InactiveTopicsDetectionProperties; | ||
import pl.allegro.tech.hermes.management.domain.topic.TopicService; | ||
|
||
@Component | ||
public class InactiveTopicsDetectionJob { | ||
private final TopicService topicService; | ||
private final InactiveTopicsStorageService inactiveTopicsStorageService; | ||
private final InactiveTopicsDetectionService inactiveTopicsDetectionService; | ||
private final Optional<InactiveTopicsNotifier> notifier; | ||
private final InactiveTopicsDetectionProperties properties; | ||
private final Clock clock; | ||
private final MeterRegistry meterRegistry; | ||
|
||
private static final Logger logger = LoggerFactory.getLogger(InactiveTopicsDetectionJob.class); | ||
|
||
public InactiveTopicsDetectionJob( | ||
TopicService topicService, | ||
InactiveTopicsStorageService inactiveTopicsStorageService, | ||
InactiveTopicsDetectionService inactiveTopicsDetectionService, | ||
Optional<InactiveTopicsNotifier> notifier, | ||
InactiveTopicsDetectionProperties properties, | ||
Clock clock, | ||
MeterRegistry meterRegistry) { | ||
this.topicService = topicService; | ||
this.inactiveTopicsStorageService = inactiveTopicsStorageService; | ||
this.inactiveTopicsDetectionService = inactiveTopicsDetectionService; | ||
this.properties = properties; | ||
this.clock = clock; | ||
this.meterRegistry = meterRegistry; | ||
if (notifier.isEmpty()) { | ||
logger.info("Inactive topics notifier bean is absent"); | ||
} | ||
this.notifier = notifier; | ||
} | ||
|
||
public void detectAndNotify() { | ||
List<Topic> topics = topicService.getAllTopics(); | ||
List<String> qualifiedTopicNames = topics.stream().map(Topic::getQualifiedName).toList(); | ||
List<InactiveTopic> historicalInactiveTopics = inactiveTopicsStorageService.getInactiveTopics(); | ||
List<InactiveTopic> foundInactiveTopics = | ||
detectInactiveTopics(qualifiedTopicNames, historicalInactiveTopics); | ||
|
||
Map<Boolean, List<InactiveTopic>> groupedByNeedOfNotification = | ||
foundInactiveTopics.stream() | ||
.collect(groupingBy(inactiveTopicsDetectionService::shouldBeNotified)); | ||
|
||
List<InactiveTopic> topicsToNotify = groupedByNeedOfNotification.getOrDefault(true, List.of()); | ||
List<InactiveTopic> topicsToSkipNotification = | ||
groupedByNeedOfNotification.getOrDefault(false, List.of()); | ||
List<InactiveTopic> notifiedTopics = notify(enrichWithOwner(topicsToNotify, topics)); | ||
|
||
List<InactiveTopic> processedTopics = | ||
limitHistory( | ||
Stream.concat(notifiedTopics.stream(), topicsToSkipNotification.stream()).toList()); | ||
measureInactiveTopics(processedTopics); | ||
inactiveTopicsStorageService.markAsInactive(processedTopics); | ||
} | ||
|
||
private List<InactiveTopic> detectInactiveTopics( | ||
List<String> qualifiedTopicNames, List<InactiveTopic> historicalInactiveTopics) { | ||
Map<String, InactiveTopic> historicalInactiveTopicsByName = | ||
groupByName(historicalInactiveTopics); | ||
return qualifiedTopicNames.stream() | ||
.map( | ||
qualifiedTopicName -> | ||
inactiveTopicsDetectionService.detectInactiveTopic( | ||
TopicName.fromQualifiedName(qualifiedTopicName), | ||
Optional.ofNullable(historicalInactiveTopicsByName.get(qualifiedTopicName)))) | ||
.map(opt -> opt.orElse(null)) | ||
.filter(Objects::nonNull) | ||
.toList(); | ||
} | ||
|
||
private Map<String, InactiveTopic> groupByName(List<InactiveTopic> inactiveTopics) { | ||
return inactiveTopics.stream() | ||
.collect(Collectors.toMap(InactiveTopic::qualifiedTopicName, v -> v, (v1, v2) -> v1)); | ||
} | ||
|
||
private List<InactiveTopicWithOwner> enrichWithOwner( | ||
List<InactiveTopic> inactiveTopics, List<Topic> topics) { | ||
Map<String, OwnerId> ownerByTopicName = new HashMap<>(); | ||
topics.forEach(topic -> ownerByTopicName.put(topic.getQualifiedName(), topic.getOwner())); | ||
|
||
return inactiveTopics.stream() | ||
.map( | ||
inactiveTopic -> | ||
new InactiveTopicWithOwner( | ||
inactiveTopic, ownerByTopicName.get(inactiveTopic.qualifiedTopicName()))) | ||
.toList(); | ||
} | ||
|
||
private List<InactiveTopic> notify(List<InactiveTopicWithOwner> inactiveTopics) { | ||
if (inactiveTopics.isEmpty()) { | ||
logger.info("No inactive topics to notify"); | ||
return List.of(); | ||
} else if (notifier.isPresent()) { | ||
logger.info("Notifying {} inactive topics", inactiveTopics.size()); | ||
NotificationResult result = notifier.get().notify(inactiveTopics); | ||
Instant now = clock.instant(); | ||
|
||
return inactiveTopics.stream() | ||
.map(InactiveTopicWithOwner::topic) | ||
.map( | ||
topic -> | ||
result.isSuccess(topic.qualifiedTopicName()) | ||
? topic.notificationSent(now) | ||
: topic) | ||
.toList(); | ||
} else { | ||
logger.info("Skipping notification of {} inactive topics", inactiveTopics.size()); | ||
return inactiveTopics.stream().map(InactiveTopicWithOwner::topic).toList(); | ||
} | ||
} | ||
|
||
private List<InactiveTopic> limitHistory(List<InactiveTopic> inactiveTopics) { | ||
return inactiveTopics.stream() | ||
.map(topic -> topic.limitNotificationsHistory(properties.notificationsHistoryLimit())) | ||
.toList(); | ||
} | ||
|
||
private void measureInactiveTopics(List<InactiveTopic> processedTopics) { | ||
processedTopics.stream() | ||
.collect( | ||
Collectors.groupingBy( | ||
topic -> topic.notificationTimestampsMs().size(), Collectors.counting())) | ||
.forEach( | ||
(notificationsCount, topicsCount) -> { | ||
Tags tags = Tags.of("notifications", notificationsCount.toString()); | ||
meterRegistry.gauge("inactive-topics", tags, topicsCount); | ||
}); | ||
} | ||
} |
Oops, something went wrong.