From e2800b326944f1d1e081ca1e2c0bef8d9383d763 Mon Sep 17 00:00:00 2001 From: Andrew Ho Date: Fri, 7 Feb 2025 17:38:58 -0800 Subject: [PATCH] Update the Supervisor endpoint to not restart the Supervisor if the spec was unmodified --- .../supervisor/SupervisorManager.java | 12 ++++++++ .../supervisor/SupervisorResource.java | 9 +++++- .../supervisor/SupervisorResourceTest.java | 10 +++---- .../metadata/MetadataSupervisorManager.java | 7 +++++ .../SQLMetadataSupervisorManager.java | 30 +++++++++++++++++++ 5 files changed, 62 insertions(+), 6 deletions(-) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java index 731ddcaa1362..28da47ab7866 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java @@ -166,6 +166,18 @@ public boolean createOrUpdateAndStartSupervisor(SupervisorSpec spec) } } + public boolean wasSupervisorSpecModified(SupervisorSpec spec) + { + Preconditions.checkState(started, "SupervisorManager not started"); + Preconditions.checkNotNull(spec, "spec"); + Preconditions.checkNotNull(spec.getId(), "spec.getId()"); + Preconditions.checkNotNull(spec.getDataSources(), "spec.getDatasources()"); + synchronized (lock) { + Preconditions.checkState(started, "SupervisorManager not started"); + return metadataSupervisorManager.wasSupervisorSpecModified(spec); + } + } + public boolean stopAndRemoveSupervisor(String id) { Preconditions.checkState(started, "SupervisorManager not started"); diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResource.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResource.java index 3190835c3e67..ce4515031e41 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResource.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResource.java @@ -56,6 +56,7 @@ import javax.annotation.Nullable; import javax.servlet.http.HttpServletRequest; import javax.ws.rs.Consumes; +import javax.ws.rs.DefaultValue; import javax.ws.rs.GET; import javax.ws.rs.POST; import javax.ws.rs.Path; @@ -119,7 +120,10 @@ public SupervisorResource( @POST @Consumes(MediaType.APPLICATION_JSON) @Produces(MediaType.APPLICATION_JSON) - public Response specPost(final SupervisorSpec spec, @Context final HttpServletRequest req) + public Response specPost( + final SupervisorSpec spec, + @Context final HttpServletRequest req, + @QueryParam("restartIfUnmodified") @DefaultValue("true") boolean restartIfUnmodified) { return asLeaderWithSupervisorManager( manager -> { @@ -151,6 +155,9 @@ public Response specPost(final SupervisorSpec spec, @Context final HttpServletRe if (!authResult.allowAccessWithNoRestriction()) { throw new ForbiddenException(authResult.getErrorMessage()); } + if (!restartIfUnmodified && !manager.wasSupervisorSpecModified(spec)) { + return Response.ok(ImmutableMap.of("id", spec.getId())).build(); + } manager.createOrUpdateAndStartSupervisor(spec); diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResourceTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResourceTest.java index f07c6c13ab88..21983ce2275d 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResourceTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResourceTest.java @@ -161,7 +161,7 @@ public List getDataSources() replayAll(); - Response response = supervisorResource.specPost(spec, request); + Response response = supervisorResource.specPost(spec, request, true); verifyAll(); Assert.assertEquals(200, response.getStatus()); @@ -171,7 +171,7 @@ public List getDataSources() EasyMock.expect(taskMaster.getSupervisorManager()).andReturn(Optional.absent()); replayAll(); - response = supervisorResource.specPost(spec, request); + response = supervisorResource.specPost(spec, request, true); verifyAll(); Assert.assertEquals(503, response.getStatus()); @@ -201,7 +201,7 @@ public List getDataSources() EasyMock.expect(authConfig.isEnableInputSourceSecurity()).andReturn(true); replayAll(); - Response response = supervisorResource.specPost(spec, request); + Response response = supervisorResource.specPost(spec, request, true); verifyAll(); Assert.assertEquals(200, response.getStatus()); @@ -211,7 +211,7 @@ public List getDataSources() EasyMock.expect(taskMaster.getSupervisorManager()).andReturn(Optional.absent()); replayAll(); - response = supervisorResource.specPost(spec, request); + response = supervisorResource.specPost(spec, request, true); verifyAll(); Assert.assertEquals(503, response.getStatus()); @@ -241,7 +241,7 @@ public List getDataSources() EasyMock.expect(authConfig.isEnableInputSourceSecurity()).andReturn(true); replayAll(); - Assert.assertThrows(ForbiddenException.class, () -> supervisorResource.specPost(spec, request)); + Assert.assertThrows(ForbiddenException.class, () -> supervisorResource.specPost(spec, request, true)); verifyAll(); } diff --git a/server/src/main/java/org/apache/druid/metadata/MetadataSupervisorManager.java b/server/src/main/java/org/apache/druid/metadata/MetadataSupervisorManager.java index 70b25d95275f..99319aa7f6d1 100644 --- a/server/src/main/java/org/apache/druid/metadata/MetadataSupervisorManager.java +++ b/server/src/main/java/org/apache/druid/metadata/MetadataSupervisorManager.java @@ -63,4 +63,11 @@ public interface MetadataSupervisorManager * @return number of supervisor removed */ int removeTerminatedSupervisorsOlderThan(long timestamp); + /** + * Checks whether the submitted spec is different from the spec in the metastore + * + * @param SupervisorSpec spec being submitted + * @return whether the spec was modified + */ + boolean wasSupervisorSpecModified(SupervisorSpec spec); } diff --git a/server/src/main/java/org/apache/druid/metadata/SQLMetadataSupervisorManager.java b/server/src/main/java/org/apache/druid/metadata/SQLMetadataSupervisorManager.java index 5564d715792e..353b3405278c 100644 --- a/server/src/main/java/org/apache/druid/metadata/SQLMetadataSupervisorManager.java +++ b/server/src/main/java/org/apache/druid/metadata/SQLMetadataSupervisorManager.java @@ -20,6 +20,7 @@ package org.apache.druid.metadata; import com.fasterxml.jackson.core.JsonParseException; +import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.JsonMappingException; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Supplier; @@ -49,6 +50,7 @@ import java.sql.ResultSet; import java.sql.SQLException; import java.util.ArrayList; +import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -285,4 +287,32 @@ private String getSupervisorsTable() { return dbTables.get().getSupervisorTable(); } + + @Override + public boolean wasSupervisorSpecModified(SupervisorSpec spec) + { + byte[] latestSpecAsBytes = dbi.withHandle( + handle -> handle.createQuery( + StringUtils.format( + "SELECT id, spec_id, created_date, payload FROM %1$s WHERE spec_id = :spec_id ORDER BY id DESC LIMIT 1", + getSupervisorsTable() + )) + .bind("spec_id", spec.getId()) + .map((index, r, ctx) -> r.getBytes("payload")) + .first() + ); + if (latestSpecAsBytes != null) { + try { + byte[] specAsBytes = jsonMapper.writeValueAsBytes(spec); + if (Arrays.equals(specAsBytes, latestSpecAsBytes)) { + return false; + } + } + catch (JsonProcessingException ex) { + log.warn("Failed to write spec as bytes for spec_id[%s]", spec.getId()); + return true; + } + } + return true; + } }