From e5a485787e72f48a7466ec154e0998b268f7929c Mon Sep 17 00:00:00 2001 From: christosarvanitis Date: Thu, 2 Jan 2025 14:15:15 +0200 Subject: [PATCH] feat(orca-front50): Adding scheduling agent to Disable unused Pipelines on Front50 --- .../persistence/DualExecutionRepository.kt | 10 + .../persistence/ExecutionRepository.java | 3 + .../InMemoryExecutionRepository.kt | 10 + orca-front50/orca-front50.gradle | 3 + ...pelineDisablePollingNotificationAgent.java | 169 ++++++++++++++ ...neDisablePollingNotificationAgentTest.java | 216 ++++++++++++++++++ .../jedis/RedisExecutionRepository.java | 9 + .../persistence/SqlExecutionRepository.kt | 32 +++ 8 files changed, 452 insertions(+) create mode 100644 orca-front50/src/main/java/com/netflix/spinnaker/orca/front50/scheduling/UnusedPipelineDisablePollingNotificationAgent.java create mode 100644 orca-front50/src/test/java/com/netflix/spinnaker/orca/front50/scheduling/UnusedPipelineDisablePollingNotificationAgentTest.java diff --git a/orca-core/src/main/java/com/netflix/spinnaker/orca/pipeline/persistence/DualExecutionRepository.kt b/orca-core/src/main/java/com/netflix/spinnaker/orca/pipeline/persistence/DualExecutionRepository.kt index cc32aa958b..d7c5232918 100644 --- a/orca-core/src/main/java/com/netflix/spinnaker/orca/pipeline/persistence/DualExecutionRepository.kt +++ b/orca-core/src/main/java/com/netflix/spinnaker/orca/pipeline/persistence/DualExecutionRepository.kt @@ -230,6 +230,16 @@ class DualExecutionRepository( ).distinctBy { it.id } } + override fun retrievePipelineConfigIdsForApplicationWithCriteria( + @Nonnull application: String, + @Nonnull criteria: ExecutionCriteria + ): List { + return ( + primary.retrievePipelineConfigIdsForApplicationWithCriteria(application, criteria) + + previous.retrievePipelineConfigIdsForApplicationWithCriteria(application, criteria) + ) + } + override fun retrievePipelinesForPipelineConfigIdsBetweenBuildTimeBoundary( pipelineConfigIds: MutableList, buildTimeStartBoundary: Long, diff --git a/orca-core/src/main/java/com/netflix/spinnaker/orca/pipeline/persistence/ExecutionRepository.java b/orca-core/src/main/java/com/netflix/spinnaker/orca/pipeline/persistence/ExecutionRepository.java index 4cfe657aef..393872e9f2 100644 --- a/orca-core/src/main/java/com/netflix/spinnaker/orca/pipeline/persistence/ExecutionRepository.java +++ b/orca-core/src/main/java/com/netflix/spinnaker/orca/pipeline/persistence/ExecutionRepository.java @@ -107,6 +107,9 @@ Collection retrievePipelineExecutionDetailsForApplication( @Nonnull List pipelineConfigIds, int queryTimeoutSeconds); + List retrievePipelineConfigIdsForApplicationWithCriteria( + @Nonnull String application, @Nonnull ExecutionCriteria criteria); + /** * Returns executions in the time boundary. Redis impl does not respect pageSize or offset params, * and returns all executions. Sql impl respects these params. diff --git a/orca-core/src/main/java/com/netflix/spinnaker/orca/pipeline/persistence/InMemoryExecutionRepository.kt b/orca-core/src/main/java/com/netflix/spinnaker/orca/pipeline/persistence/InMemoryExecutionRepository.kt index 96ee40242d..d8a35103e7 100644 --- a/orca-core/src/main/java/com/netflix/spinnaker/orca/pipeline/persistence/InMemoryExecutionRepository.kt +++ b/orca-core/src/main/java/com/netflix/spinnaker/orca/pipeline/persistence/InMemoryExecutionRepository.kt @@ -305,6 +305,16 @@ class InMemoryExecutionRepository : ExecutionRepository { .distinctBy { it.id } } + override fun retrievePipelineConfigIdsForApplicationWithCriteria( + @Nonnull application: String, + @Nonnull criteria: ExecutionCriteria + ): List { + return pipelines.values + .filter { it.application == application } + .applyCriteria(criteria) + .map { it.id } + } + override fun retrieveOrchestrationForCorrelationId(correlationId: String): PipelineExecution { return retrieveByCorrelationId(ORCHESTRATION, correlationId) } diff --git a/orca-front50/orca-front50.gradle b/orca-front50/orca-front50.gradle index b208f0e922..9d429c9443 100644 --- a/orca-front50/orca-front50.gradle +++ b/orca-front50/orca-front50.gradle @@ -40,6 +40,9 @@ dependencies { testImplementation("com.github.ben-manes.caffeine:guava") testImplementation("org.apache.groovy:groovy-json") testRuntimeOnly("net.bytebuddy:byte-buddy") + testImplementation("org.junit.jupiter:junit-jupiter-api") + testImplementation("org.assertj:assertj-core") + testImplementation("org.mockito:mockito-junit-jupiter") } sourceSets { diff --git a/orca-front50/src/main/java/com/netflix/spinnaker/orca/front50/scheduling/UnusedPipelineDisablePollingNotificationAgent.java b/orca-front50/src/main/java/com/netflix/spinnaker/orca/front50/scheduling/UnusedPipelineDisablePollingNotificationAgent.java new file mode 100644 index 0000000000..6bc0b4643b --- /dev/null +++ b/orca-front50/src/main/java/com/netflix/spinnaker/orca/front50/scheduling/UnusedPipelineDisablePollingNotificationAgent.java @@ -0,0 +1,169 @@ +/* + * Copyright 2024 Harness, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.netflix.spinnaker.orca.front50.scheduling; + +import static com.netflix.spinnaker.orca.api.pipeline.models.ExecutionType.PIPELINE; + +import com.netflix.spectator.api.Id; +import com.netflix.spectator.api.LongTaskTimer; +import com.netflix.spectator.api.Registry; +import com.netflix.spinnaker.kork.retrofit.exceptions.SpinnakerHttpException; +import com.netflix.spinnaker.orca.api.pipeline.models.ExecutionStatus; +import com.netflix.spinnaker.orca.front50.Front50Service; +import com.netflix.spinnaker.orca.notifications.AbstractPollingNotificationAgent; +import com.netflix.spinnaker.orca.notifications.NotificationClusterLock; +import com.netflix.spinnaker.orca.pipeline.persistence.ExecutionRepository; +import java.time.Clock; +import java.time.ZoneOffset; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression; +import org.springframework.stereotype.Component; + +@Component +@ConditionalOnExpression( + "${pollers.unused-pipelines-disable.enabled:false} && ${execution-repository.sql.enabled:false}") +public class UnusedPipelineDisablePollingNotificationAgent + extends AbstractPollingNotificationAgent { + + Front50Service front50service; + + private static final List COMPLETED_STATUSES = + ExecutionStatus.COMPLETED.stream().map(Enum::toString).collect(Collectors.toList()); + + private final Logger log = + LoggerFactory.getLogger(UnusedPipelineDisablePollingNotificationAgent.class); + + private final Clock clock; + private final ExecutionRepository executionRepository; + private final Registry registry; + + private final long pollingIntervalSec; + private final int thresholdDays; + private final boolean dryRun; + + private final Id timerId; + + @Autowired + public UnusedPipelineDisablePollingNotificationAgent( + NotificationClusterLock clusterLock, + ExecutionRepository executionRepository, + Front50Service front50Service, + Clock clock, + Registry registry, + @Value("${pollers.unused-pipelines-disable.interval-sec:3600}") long pollingIntervalSec, + @Value("${pollers.unused-pipelines-disable.threshold-days:365}") int thresholdDays, + @Value("${pollers.unused-pipelines-disable.dry-run:true}") boolean dryRun) { + super(clusterLock); + this.executionRepository = executionRepository; + this.clock = clock; + this.registry = registry; + this.pollingIntervalSec = pollingIntervalSec; + this.thresholdDays = thresholdDays; + this.dryRun = dryRun; + this.front50service = front50Service; + + timerId = registry.createId("pollers.unusedPipelineDisable.timing"); + } + + @Override + protected long getPollingInterval() { + return pollingIntervalSec * 1000; + } + + @Override + protected String getNotificationType() { + return UnusedPipelineDisablePollingNotificationAgent.class.getSimpleName(); + } + + @Override + protected void tick() { + LongTaskTimer timer = registry.longTaskTimer(timerId); + long timerId = timer.start(); + try { + executionRepository + .retrieveAllApplicationNames(PIPELINE) + .forEach( + app -> { + log.info("Evaluating " + app + " for unused pipelines"); + List pipelineConfigIds = + front50service.getPipelines(app, false, "enabled").stream() + .map(p -> (String) p.get("id")) + .collect(Collectors.toList()); + + ExecutionRepository.ExecutionCriteria criteria = + new ExecutionRepository.ExecutionCriteria(); + criteria.setStatuses(COMPLETED_STATUSES); + criteria.setStartTimeCutoff( + clock.instant().atZone(ZoneOffset.UTC).minusDays(thresholdDays).toInstant()); + + List orcaExecutionsCount = + executionRepository.retrievePipelineConfigIdsForApplicationWithCriteria( + app, criteria); + + disableAppPipelines(app, orcaExecutionsCount, pipelineConfigIds); + }); + } catch (Exception e) { + log.error("Disabling pipelines failed", e); + } finally { + timer.stop(timerId); + } + } + + public void disableAppPipelines( + String app, List orcaExecutionsCount, List front50PipelineConfigIds) { + + List front50PipelineConfigIdsNotExecuted = + front50PipelineConfigIds.stream() + .filter(p -> !orcaExecutionsCount.contains(p)) + .collect(Collectors.toList()); + + log.info( + "Found " + + front50PipelineConfigIdsNotExecuted.size() + + " pipelines to disable for Application " + + app); + front50PipelineConfigIdsNotExecuted.forEach( + p -> { + log.info("Disabling pipeline execution " + p); + if (!dryRun) { + disableFront50PipelineConfigId(p); + } + }); + } + + public void disableFront50PipelineConfigId(String pipelineConfigId) { + Map pipeline = front50service.getPipeline(pipelineConfigId); + if (pipeline.get("disabled") == null || !(boolean) pipeline.get("disabled")) { + pipeline.put("disabled", true); + try { + front50service.updatePipeline(pipelineConfigId, pipeline); + } catch (SpinnakerHttpException e) { + if (Arrays.asList(404, 403).contains(e.getResponseCode())) { + log.warn("Failed to disable pipeline " + pipelineConfigId + " due to " + e.getMessage()); + } else { + throw e; + } + } + } + } +} diff --git a/orca-front50/src/test/java/com/netflix/spinnaker/orca/front50/scheduling/UnusedPipelineDisablePollingNotificationAgentTest.java b/orca-front50/src/test/java/com/netflix/spinnaker/orca/front50/scheduling/UnusedPipelineDisablePollingNotificationAgentTest.java new file mode 100644 index 0000000000..e568578324 --- /dev/null +++ b/orca-front50/src/test/java/com/netflix/spinnaker/orca/front50/scheduling/UnusedPipelineDisablePollingNotificationAgentTest.java @@ -0,0 +1,216 @@ +/* + * Copyright 2024 Harness, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.netflix.spinnaker.orca.front50.scheduling; + +import static com.netflix.spinnaker.orca.api.pipeline.models.ExecutionType.PIPELINE; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.mockito.Mockito.*; + +import com.netflix.spectator.api.Id; +import com.netflix.spectator.api.LongTaskTimer; +import com.netflix.spectator.api.Registry; +import com.netflix.spinnaker.kork.retrofit.exceptions.SpinnakerHttpException; +import com.netflix.spinnaker.orca.front50.Front50Service; +import com.netflix.spinnaker.orca.notifications.NotificationClusterLock; +import com.netflix.spinnaker.orca.pipeline.persistence.ExecutionRepository; +import java.time.Clock; +import java.time.Instant; +import java.time.ZoneOffset; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.mockito.MockitoAnnotations; +import retrofit.RetrofitError; +import retrofit.client.Response; + +class UnusedPipelineDisablePollingNotificationAgentTest { + + NotificationClusterLock clusterLock = mock(NotificationClusterLock.class); + ExecutionRepository executionRepository = mock(ExecutionRepository.class); + Front50Service front50Service = mock(Front50Service.class); + Registry registry = mock(Registry.class); + LongTaskTimer timer = mock(LongTaskTimer.class); + + Id timerId = mock(Id.class); + Id disabledId = mock(Id.class); + + @BeforeEach + void setUp() { + MockitoAnnotations.openMocks(this); + when(registry.createId("pollers.unusedPipelineDisable.timing")).thenReturn(timerId); + when(registry.createId("pollers.unusedPipelineDisable.disabled")).thenReturn(disabledId); + when(registry.longTaskTimer(timerId)).thenReturn(timer); + when(timer.start()).thenReturn(1L); + } + + @Test + void disableAppPipelines_shouldDisableUnusedPipelines() { + Clock clock = Clock.fixed(Instant.now(), ZoneOffset.UTC); + UnusedPipelineDisablePollingNotificationAgent agent = + new UnusedPipelineDisablePollingNotificationAgent( + clusterLock, executionRepository, front50Service, clock, registry, 3600000, 30, false); + List orcaExecutionsCount = List.of("pipeline1"); + List front50PipelineConfigIds = List.of("pipeline2"); + + Map pipeline = new HashMap<>(); + pipeline.put("name", "pipeline2"); + when(front50Service.getPipeline("pipeline2")).thenReturn(pipeline); + + agent.disableAppPipelines("app1", orcaExecutionsCount, front50PipelineConfigIds); + + verify(front50Service, times(1)).getPipeline("pipeline2"); + verify(front50Service, times(1)) + .updatePipeline("pipeline2", Map.of("name", "pipeline2", "disabled", true)); + } + + @Test + void disableFront50PipelineConfigId_shouldDisablePipeline() { + Map pipeline = new HashMap<>(); + pipeline.put("disabled", false); + Clock clock = Clock.fixed(Instant.now(), ZoneOffset.UTC); + UnusedPipelineDisablePollingNotificationAgent agent = + new UnusedPipelineDisablePollingNotificationAgent( + clusterLock, executionRepository, front50Service, clock, registry, 3600000, 30, false); + when(front50Service.getPipeline("pipeline1")).thenReturn(pipeline); + + agent.disableFront50PipelineConfigId("pipeline1"); + + verify(front50Service, times(1)).getPipeline("pipeline1"); + verify(front50Service, times(1)).updatePipeline(eq("pipeline1"), any()); + } + + @Test + void disableAppPipelines_shouldDisableUnusedPipelines_dryRunMode() { + Clock clock = Clock.fixed(Instant.now(), ZoneOffset.UTC); + UnusedPipelineDisablePollingNotificationAgent agent = + new UnusedPipelineDisablePollingNotificationAgent( + clusterLock, executionRepository, front50Service, clock, registry, 3600000, 30, true); + List orcaExecutionsCount = List.of("pipeline1"); + List front50PipelineConfigIds = List.of("pipeline2"); + + Map pipeline = new HashMap<>(); + pipeline.put("name", "pipeline2"); + when(front50Service.getPipeline("pipeline2")).thenReturn(pipeline); + + agent.disableAppPipelines("app1", orcaExecutionsCount, front50PipelineConfigIds); + + verify(front50Service, never()).getPipeline("pipeline2"); + verify(front50Service, never()) + .updatePipeline("pipeline2", Map.of("name", "pipeline2", "disabled", true)); + } + + @Test + void disableFront50PipelineConfigId_shouldNotDisableAlreadyDisabledPipeline() { + Map pipeline = new HashMap<>(); + pipeline.put("disabled", true); + when(front50Service.getPipeline("pipeline1")).thenReturn(pipeline); + Clock clock = Clock.fixed(Instant.now(), ZoneOffset.UTC); + UnusedPipelineDisablePollingNotificationAgent agent = + new UnusedPipelineDisablePollingNotificationAgent( + clusterLock, executionRepository, front50Service, clock, registry, 3600000, 30, false); + agent.disableFront50PipelineConfigId("pipeline1"); + + verify(front50Service, times(1)).getPipeline("pipeline1"); + verify(front50Service, never()).updatePipeline(eq("pipeline1"), any()); + } + + @Test + void tick_shouldEvaluateAllApplications() { + when(executionRepository.retrieveAllApplicationNames(PIPELINE)) + .thenReturn(List.of("app1", "app2")); + when(front50Service.getPipelines(anyString(), eq(false), eq("enabled"))) + .thenReturn(List.of(Map.of("id", "pipeline1"))); + when(disabledId.withTag(any())).thenReturn(disabledId); + + Clock clock = Clock.fixed(Instant.now(), ZoneOffset.UTC); + UnusedPipelineDisablePollingNotificationAgent agent = + new UnusedPipelineDisablePollingNotificationAgent( + clusterLock, executionRepository, front50Service, clock, registry, 3600000, 30, false); + agent.tick(); + + verify(executionRepository, times(1)).retrieveAllApplicationNames(PIPELINE); + verify(front50Service, times(2)).getPipelines(anyString(), eq(false), eq("enabled")); + } + + @Test + void tick_shouldHandleNoApplications() { + when(executionRepository.retrieveAllApplicationNames(PIPELINE)) + .thenReturn(Collections.emptyList()); + Clock clock = Clock.fixed(Instant.now(), ZoneOffset.UTC); + UnusedPipelineDisablePollingNotificationAgent agent = + new UnusedPipelineDisablePollingNotificationAgent( + clusterLock, executionRepository, front50Service, clock, registry, 3600000, 30, false); + agent.tick(); + + verify(executionRepository, times(1)).retrieveAllApplicationNames(PIPELINE); + verify(front50Service, never()).getPipelines(anyString(), eq(false), eq("enabled")); + } + + @Test + void disableFront50PipelineConfigId_shouldLogWarningFor404() { + Map pipeline = new HashMap<>(); + pipeline.put("disabled", false); + Clock clock = Clock.fixed(Instant.now(), ZoneOffset.UTC); + UnusedPipelineDisablePollingNotificationAgent agent = + new UnusedPipelineDisablePollingNotificationAgent( + clusterLock, executionRepository, front50Service, clock, registry, 3600000, 30, false); + when(front50Service.getPipeline("pipeline1")).thenReturn(pipeline); + doThrow( + new SpinnakerHttpException( + RetrofitError.httpError( + "http://front50", + new Response("http://front50", 404, "", List.of(), null), + null, + null))) + .when(front50Service) + .updatePipeline(eq("pipeline1"), any()); + + agent.disableFront50PipelineConfigId("pipeline1"); + + verify(front50Service, times(1)).getPipeline("pipeline1"); + verify(front50Service, times(1)).updatePipeline(eq("pipeline1"), any()); + } + + @Test + void disableFront50PipelineConfigId_shouldThrowExceptionForOtherErrors() { + Map pipeline = new HashMap<>(); + pipeline.put("disabled", false); + Clock clock = Clock.fixed(Instant.now(), ZoneOffset.UTC); + UnusedPipelineDisablePollingNotificationAgent agent = + new UnusedPipelineDisablePollingNotificationAgent( + clusterLock, executionRepository, front50Service, clock, registry, 3600000, 30, false); + when(front50Service.getPipeline("pipeline1")).thenReturn(pipeline); + doThrow( + new SpinnakerHttpException( + RetrofitError.httpError( + "http://front50", + new Response("http://front50", 500, "", List.of(), null), + null, + null))) + .when(front50Service) + .updatePipeline(eq("pipeline1"), any()); + + assertThrows( + SpinnakerHttpException.class, () -> agent.disableFront50PipelineConfigId("pipeline1")); + + verify(front50Service, times(1)).getPipeline("pipeline1"); + verify(front50Service, times(1)).updatePipeline(eq("pipeline1"), any()); + } +} diff --git a/orca-redis/src/main/java/com/netflix/spinnaker/orca/pipeline/persistence/jedis/RedisExecutionRepository.java b/orca-redis/src/main/java/com/netflix/spinnaker/orca/pipeline/persistence/jedis/RedisExecutionRepository.java index bfc6070edb..e8d1721eb8 100644 --- a/orca-redis/src/main/java/com/netflix/spinnaker/orca/pipeline/persistence/jedis/RedisExecutionRepository.java +++ b/orca-redis/src/main/java/com/netflix/spinnaker/orca/pipeline/persistence/jedis/RedisExecutionRepository.java @@ -510,6 +510,15 @@ public void delete(@Nonnull ExecutionType type, @Nonnull List idsToDelet return List.of(); } + @Override + public @Nonnull List retrievePipelineConfigIdsForApplicationWithCriteria( + @Nonnull String application, @Nonnull ExecutionCriteria criteria) { + // TODO: not implemented yet - this method, at present, is primarily meant for the + // SqlExecutionRepository + // implementation. + return List.of(); + } + /* * There is no guarantee that the returned results will be sorted. * @param limit and the param @offset are only implemented in SqlExecutionRepository diff --git a/orca-sql/src/main/kotlin/com/netflix/spinnaker/orca/sql/pipeline/persistence/SqlExecutionRepository.kt b/orca-sql/src/main/kotlin/com/netflix/spinnaker/orca/sql/pipeline/persistence/SqlExecutionRepository.kt index abd49437d5..4edfd3513c 100644 --- a/orca-sql/src/main/kotlin/com/netflix/spinnaker/orca/sql/pipeline/persistence/SqlExecutionRepository.kt +++ b/orca-sql/src/main/kotlin/com/netflix/spinnaker/orca/sql/pipeline/persistence/SqlExecutionRepository.kt @@ -459,6 +459,38 @@ class SqlExecutionRepository( .fetch(0, String::class.java) } + override fun retrievePipelineConfigIdsForApplicationWithCriteria( + application: String, + criteria: ExecutionCriteria + ): List { + var baseQueryPredicate = field("application").eq(application) + var table = if (jooq.dialect() == SQLDialect.MYSQL) PIPELINE.tableName.forceIndex("pipeline_application_idx") + else PIPELINE.tableName + + if (criteria.statuses.isNotEmpty() && criteria.statuses.size != ExecutionStatus.values().size) { + val statusStrings = criteria.statuses.map { it.toString() } + baseQueryPredicate = baseQueryPredicate + .and(field("status").`in`(*statusStrings.toTypedArray())) + + table = if (jooq.dialect() == SQLDialect.MYSQL) PIPELINE.tableName.forceIndex("pipeline_application_status_starttime_idx") + else PIPELINE.tableName + } + if (criteria.startTimeCutoff != null) { + baseQueryPredicate = baseQueryPredicate + .and( + field("start_time").greaterThan(criteria.startTimeCutoff!!.toEpochMilli()) + ) + } + + withPool(poolName) { + return jooq.selectDistinct(field("config_id")) + .from(table) + .where(baseQueryPredicate) + .groupBy(field("config_id")) + .fetch(0, String::class.java) + } + } + /** * this function supports the following ExecutionCriteria currently: * 'limit', a.k.a page size and