Skip to content

Commit

Permalink
#1569 | Introduced RawSchemaAdminClient interface, which provides met…
Browse files Browse the repository at this point in the history
…hods to read and modify the schema. Modified RawSchemaClient to provide only read methods.
  • Loading branch information
adriansobolewski committed Oct 18, 2023
1 parent f8fa305 commit 23b4ba8
Show file tree
Hide file tree
Showing 19 changed files with 355 additions and 307 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
import pl.allegro.tech.hermes.metrics.PathsCompiler;
import pl.allegro.tech.hermes.schema.DirectCompiledSchemaRepository;
import pl.allegro.tech.hermes.schema.DirectSchemaVersionsRepository;
import pl.allegro.tech.hermes.schema.RawSchemaClient;
import pl.allegro.tech.hermes.schema.RawSchemaAdminClient;
import pl.allegro.tech.hermes.schema.SchemaCompilersFactory;
import pl.allegro.tech.hermes.schema.SchemaRepository;
import pl.allegro.tech.hermes.tracker.frontend.Trackers;
Expand All @@ -55,11 +55,16 @@ static HermesServer provideHermesServer() throws IOException {
MetricsFacade metricsFacade = new MetricsFacade(new SimpleMeterRegistry(), hermesMetrics);
TopicsCache topicsCache = new InMemoryTopicsCache(metricsFacade, topic);
BrokerMessageProducer brokerMessageProducer = new InMemoryBrokerMessageProducer();
RawSchemaClient rawSchemaClient = new InMemorySchemaClient(topic.getName(), loadMessageResource("schema"), 1, 1);
RawSchemaAdminClient rawSchemaAdminClient = new InMemorySchemaAdminClient(topic.getName(), loadMessageResource("schema"), 1, 1);
Trackers trackers = new Trackers(Collections.emptyList());
AvroMessageContentWrapper avroMessageContentWrapper = new AvroMessageContentWrapper(Clock.systemDefaultZone());
HttpHandler httpHandler = provideHttpHandler(throughputLimiter, topicsCache,
brokerMessageProducer, rawSchemaClient, trackers, avroMessageContentWrapper);
HttpHandler httpHandler = provideHttpHandler(
throughputLimiter,
topicsCache,
brokerMessageProducer,
rawSchemaAdminClient,
trackers,
avroMessageContentWrapper);
SslProperties sslProperties = new SslProperties();
HermesServerProperties hermesServerProperties = new HermesServerProperties();
hermesServerProperties.setGracefulShutdownEnabled(false);
Expand All @@ -79,8 +84,8 @@ static HermesServer provideHermesServer() throws IOException {
}

private static HttpHandler provideHttpHandler(ThroughputLimiter throughputLimiter,
TopicsCache topicsCache, BrokerMessageProducer brokerMessageProducer,
RawSchemaClient rawSchemaClient, Trackers trackers, AvroMessageContentWrapper avroMessageContentWrapper) {
TopicsCache topicsCache, BrokerMessageProducer brokerMessageProducer,
RawSchemaAdminClient rawSchemaAdminClient, Trackers trackers, AvroMessageContentWrapper avroMessageContentWrapper) {
HeaderPropagationProperties headerPropagationProperties = new HeaderPropagationProperties();
HandlersChainProperties handlersChainProperties = new HandlersChainProperties();
TrackingHeadersExtractor trackingHeadersExtractor = new DefaultTrackingHeaderExtractor();
Expand All @@ -94,8 +99,8 @@ private static HttpHandler provideHttpHandler(ThroughputLimiter throughputLimite
new MessageValidators(Collections.emptyList()),
new MessageContentTypeEnforcer(),
new SchemaRepository(
new DirectSchemaVersionsRepository(rawSchemaClient),
new DirectCompiledSchemaRepository<>(rawSchemaClient, SchemaCompilersFactory.avroSchemaCompiler())
new DirectSchemaVersionsRepository(rawSchemaAdminClient),
new DirectCompiledSchemaRepository<>(rawSchemaAdminClient, SchemaCompilersFactory.avroSchemaCompiler())
),
new DefaultHeadersPropagator(headerPropagationProperties.isEnabled(), headerPropagationProperties.getAllowFilter()),
new BenchmarkMessageContentWrapper(avroMessageContentWrapper),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,20 +4,20 @@
import pl.allegro.tech.hermes.api.RawSchema;
import pl.allegro.tech.hermes.api.RawSchemaWithMetadata;
import pl.allegro.tech.hermes.api.TopicName;
import pl.allegro.tech.hermes.schema.RawSchemaClient;
import pl.allegro.tech.hermes.schema.RawSchemaAdminClient;
import pl.allegro.tech.hermes.schema.SchemaId;
import pl.allegro.tech.hermes.schema.SchemaVersion;

import java.util.List;
import java.util.Objects;
import java.util.Optional;

public class InMemorySchemaClient implements RawSchemaClient {
public class InMemorySchemaAdminClient implements RawSchemaAdminClient {

private final TopicName schemaTopicName;
private final RawSchemaWithMetadata rawSchemaWithMetadata;

public InMemorySchemaClient(TopicName schemaTopicName, String schemaSource, int id, int version) {
public InMemorySchemaAdminClient(TopicName schemaTopicName, String schemaSource, int id, int version) {
this.schemaTopicName = schemaTopicName;
rawSchemaWithMetadata = RawSchemaWithMetadata.of(schemaSource, id, version);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,10 @@ public AvroCompiledSchemaRepositoryFactory(RawSchemaClient rawSchemaClient,
}

public CompiledSchemaRepository<Schema> provide() {
CompiledSchemaRepository<Schema> repository = new DirectCompiledSchemaRepository<>(rawSchemaClient,
SchemaCompilersFactory.avroSchemaCompiler());
CompiledSchemaRepository<Schema> repository = new DirectCompiledSchemaRepository<>(
rawSchemaClient,
SchemaCompilersFactory.avroSchemaCompiler()
);

if (cacheEnabled) {
return new CachedCompiledSchemaRepository<>(repository,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
package pl.allegro.tech.hermes.common.schema;

import com.fasterxml.jackson.databind.ObjectMapper;
import pl.allegro.tech.hermes.common.metric.HermesMetrics;
import pl.allegro.tech.hermes.common.metric.MetricsFacade;
import pl.allegro.tech.hermes.schema.RawSchemaClient;
import pl.allegro.tech.hermes.schema.SubjectNamingStrategy;
Expand All @@ -13,27 +11,24 @@ public class RawSchemaClientFactory {
private final String kafkaNamespace;
private final String kafkaNamespaceSeparator;
private final MetricsFacade metricsFacade;
private final ObjectMapper objectMapper;
private final SchemaRepositoryInstanceResolver resolver;
private final boolean subjectSuffixEnabled;
private final boolean subjectNamespaceEnabled;

public RawSchemaClientFactory(String kafkaNamespace,
String kafkaNamespaceSeparator,
MetricsFacade metricsFacade,
ObjectMapper objectMapper,
SchemaRepositoryInstanceResolver resolver,
boolean subjectSuffixEnabled,
boolean subjectNamespaceEnabled) {
this.kafkaNamespace = kafkaNamespace;
this.kafkaNamespaceSeparator = kafkaNamespaceSeparator;
this.metricsFacade = metricsFacade;
this.objectMapper = objectMapper;
this.resolver = resolver;
this.subjectSuffixEnabled = subjectSuffixEnabled;
this.subjectNamespaceEnabled = subjectNamespaceEnabled;
}

public RawSchemaClient provide() {
SubjectNamingStrategy subjectNamingStrategy = SubjectNamingStrategy.qualifiedName
.withValueSuffixIf(subjectSuffixEnabled)
Expand All @@ -45,11 +40,11 @@ public RawSchemaClient provide() {
)
);
return createMetricsTrackingClient(
new SchemaRegistryRawSchemaClient(resolver, objectMapper, subjectNamingStrategy)
new SchemaRegistryRawSchemaClient(resolver, subjectNamingStrategy)
);
}

private RawSchemaClient createMetricsTrackingClient(RawSchemaClient rawSchemaClient) {
return new ReadMetricsTrackingRawSchemaClient(rawSchemaClient, metricsFacade);
return new ReadMetricsTrackingRawSchemaClient(metricsFacade, rawSchemaClient);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package pl.allegro.tech.hermes.common.schema;

import pl.allegro.tech.hermes.api.RawSchema;
import pl.allegro.tech.hermes.api.TopicName;
import pl.allegro.tech.hermes.common.metric.MetricsFacade;
import pl.allegro.tech.hermes.schema.RawSchemaAdminClient;

public class ReadMetricsTrackingRawSchemaAdminClient extends ReadMetricsTrackingRawSchemaClient implements RawSchemaAdminClient {
private final RawSchemaAdminClient rawSchemaAdminClient;

public ReadMetricsTrackingRawSchemaAdminClient(
RawSchemaAdminClient rawSchemaAdminClient,
MetricsFacade metricsFacade) {
super(metricsFacade, rawSchemaAdminClient);
this.rawSchemaAdminClient = rawSchemaAdminClient;
}

@Override
public void registerSchema(TopicName topic, RawSchema rawSchema) {
rawSchemaAdminClient.registerSchema(topic, rawSchema);
}

@Override
public void deleteAllSchemaVersions(TopicName topic) {
rawSchemaAdminClient.deleteAllSchemaVersions(topic);
}

@Override
public void validateSchema(TopicName topic, RawSchema rawSchema) {
rawSchemaAdminClient.validateSchema(topic, rawSchema);
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package pl.allegro.tech.hermes.common.schema;

import pl.allegro.tech.hermes.api.RawSchema;
import java.util.List;
import java.util.Optional;
import java.util.function.Supplier;
import pl.allegro.tech.hermes.api.RawSchemaWithMetadata;
import pl.allegro.tech.hermes.api.TopicName;
import pl.allegro.tech.hermes.common.metric.MetricsFacade;
Expand All @@ -10,19 +12,15 @@
import pl.allegro.tech.hermes.schema.SchemaId;
import pl.allegro.tech.hermes.schema.SchemaVersion;

import java.util.List;
import java.util.Optional;
import java.util.function.Supplier;

public class ReadMetricsTrackingRawSchemaClient implements RawSchemaClient {
private final RawSchemaClient rawSchemaClient;
private final MetricsFacade metricsFacade;
protected final RawSchemaClient rawSchemaClient;

public ReadMetricsTrackingRawSchemaClient(
RawSchemaClient rawSchemaClient,
MetricsFacade metricsFacade) {
this.rawSchemaClient = rawSchemaClient;
MetricsFacade metricsFacade,
RawSchemaClient rawSchemaClient) {
this.metricsFacade = metricsFacade;
this.rawSchemaClient = rawSchemaClient;
}

@Override
Expand All @@ -45,30 +43,15 @@ public List<SchemaVersion> getVersions(TopicName topic) {
return timedVersions(() -> rawSchemaClient.getVersions(topic));
}

@Override
public void registerSchema(TopicName topic, RawSchema rawSchema) {
rawSchemaClient.registerSchema(topic, rawSchema);
}

@Override
public void deleteAllSchemaVersions(TopicName topic) {
rawSchemaClient.deleteAllSchemaVersions(topic);
}

@Override
public void validateSchema(TopicName topic, RawSchema rawSchema) {
rawSchemaClient.validateSchema(topic, rawSchema);
}

private <T> T timedSchema(Supplier<? extends T> callable) {
protected <T> T timedSchema(Supplier<? extends T> callable) {
return timed(callable, metricsFacade.schemaClient().schemaTimer());
}

private <T> T timedVersions(Supplier<? extends T> callable) {
protected <T> T timedVersions(Supplier<? extends T> callable) {
return timed(callable, metricsFacade.schemaClient().versionsTimer());
}

private <T> T timed(Supplier<? extends T> callable, HermesTimer timer) {
protected <T> T timed(Supplier<? extends T> callable, HermesTimer timer) {
try (HermesTimerContext time = timer.time()) {
return callable.get();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,10 @@ public SchemaVersionsRepositoryFactory(RawSchemaClient rawSchemaClient,
public SchemaVersionsRepository provide() {
if (schemaVersionsRepositoryParameters.isCacheEnabled()) {
CachedSchemaVersionsRepository cachedSchemaVersionsRepository = new CachedSchemaVersionsRepository(
rawSchemaClient,
getVersionsReloader(),
schemaVersionsRepositoryParameters.getRefreshAfterWrite(),
schemaVersionsRepositoryParameters.getExpireAfterWrite());
rawSchemaClient,
getVersionsReloader(),
schemaVersionsRepositoryParameters.getRefreshAfterWrite(),
schemaVersionsRepositoryParameters.getExpireAfterWrite());

notificationsBus.registerTopicCallback(
new SchemaCacheRefresherCallback<>(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,14 @@ import pl.allegro.tech.hermes.common.metric.HermesMetrics
import pl.allegro.tech.hermes.common.metric.MetricsFacade
import pl.allegro.tech.hermes.common.metric.Timers
import pl.allegro.tech.hermes.metrics.PathsCompiler
import pl.allegro.tech.hermes.schema.RawSchemaClient
import pl.allegro.tech.hermes.schema.RawSchemaAdminClient
import pl.allegro.tech.hermes.schema.SchemaVersion
import pl.allegro.tech.hermes.test.helper.metrics.MicrometerUtils
import spock.lang.Shared
import spock.lang.Specification
import spock.lang.Subject

class ReadMetricsTrackingRawSchemaClientTest extends Specification {
class ReadMetricsTrackingRawSchemaAdminClientTest extends Specification {
@Shared
TopicName topicName = TopicName.fromQualifiedName("someGroup.someTopic")

Expand All @@ -33,10 +33,10 @@ class ReadMetricsTrackingRawSchemaClientTest extends Specification {

MetricsFacade metricsFacade = new MetricsFacade(meterRegistry, hermesMetrics)

RawSchemaClient rawSchemaClient = Mock()
RawSchemaAdminClient rawSchemaClient = Mock()

@Subject
RawSchemaClient readMetricsTrackingClient = new ReadMetricsTrackingRawSchemaClient(rawSchemaClient, metricsFacade)
RawSchemaAdminClient readMetricsTrackingClient = new ReadMetricsTrackingRawSchemaAdminClient(rawSchemaClient, metricsFacade)

def "should track latency metrics for schema retrieval"() {
expect:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,14 +49,17 @@ public CompiledSchemaRepository<Schema> avroCompiledSchemaRepository(RawSchemaCl

@Bean
public RawSchemaClient rawSchemaClient(MetricsFacade metricsFacade,
ObjectMapper objectMapper,
SchemaRepositoryInstanceResolver resolver,
SchemaProperties schemaProperties,
KafkaClustersProperties kafkaProperties) {
return new RawSchemaClientFactory(kafkaProperties.getNamespace(), kafkaProperties.getNamespaceSeparator(), metricsFacade,
objectMapper, resolver,
schemaProperties.getRepository().isSubjectSuffixEnabled(),
schemaProperties.getRepository().isSubjectNamespaceEnabled()).provide();
return new RawSchemaClientFactory(
kafkaProperties.getNamespace(),
kafkaProperties.getNamespaceSeparator(),
metricsFacade,
resolver,
schemaProperties.getRepository().isSubjectSuffixEnabled(),
schemaProperties.getRepository().isSubjectNamespaceEnabled()
).provide();
}

@Bean
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,14 +50,12 @@ public CompiledSchemaRepository<Schema> avroCompiledSchemaRepository(RawSchemaCl
@Bean
public RawSchemaClient rawSchemaClient(KafkaClustersProperties kafkaClustersProperties,
MetricsFacade metricsFacade,
ObjectMapper objectMapper,
SchemaRepositoryInstanceResolver resolver,
SchemaProperties schemaProperties) {
return new RawSchemaClientFactory(
kafkaClustersProperties.getNamespace(),
kafkaClustersProperties.getNamespaceSeparator(),
metricsFacade,
objectMapper,
resolver,
schemaProperties.getRepository().isSubjectSuffixEnabled(),
schemaProperties.getRepository().isSubjectNamespaceEnabled()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import com.fasterxml.jackson.jakarta.rs.json.JacksonJsonProvider;
import jakarta.ws.rs.client.Client;
import jakarta.ws.rs.client.ClientBuilder;
import java.net.URI;
import org.apache.avro.Schema;
import org.glassfish.jersey.client.ClientConfig;
import org.glassfish.jersey.client.ClientProperties;
Expand All @@ -19,17 +20,16 @@
import pl.allegro.tech.hermes.schema.CompiledSchemaRepository;
import pl.allegro.tech.hermes.schema.DirectCompiledSchemaRepository;
import pl.allegro.tech.hermes.schema.DirectSchemaVersionsRepository;
import pl.allegro.tech.hermes.schema.RawSchemaAdminClient;
import pl.allegro.tech.hermes.schema.RawSchemaClient;
import pl.allegro.tech.hermes.schema.SchemaCompilersFactory;
import pl.allegro.tech.hermes.schema.SchemaRepository;
import pl.allegro.tech.hermes.schema.SchemaVersionsRepository;
import pl.allegro.tech.hermes.schema.SubjectNamingStrategy;
import pl.allegro.tech.hermes.schema.confluent.SchemaRegistryRawSchemaClient;
import pl.allegro.tech.hermes.schema.confluent.SchemaRegistryRawSchemaAdminClient;
import pl.allegro.tech.hermes.schema.resolver.DefaultSchemaRepositoryInstanceResolver;
import pl.allegro.tech.hermes.schema.resolver.SchemaRepositoryInstanceResolver;

import java.net.URI;

import static pl.allegro.tech.hermes.schema.SubjectNamingStrategy.qualifiedName;

@Configuration
Expand Down Expand Up @@ -64,15 +64,15 @@ public SubjectNamingStrategy subjectNamingStrategy(KafkaClustersProperties kafka
}

@Bean
@ConditionalOnMissingBean(RawSchemaClient.class)
public RawSchemaClient schemaRegistryRawSchemaClient(
@ConditionalOnMissingBean(RawSchemaAdminClient.class)
public RawSchemaAdminClient schemaRegistryRawSchemaAdminClient(
SchemaRepositoryInstanceResolver schemaRepositoryInstanceResolver,
ObjectMapper objectMapper,
SubjectNamingStrategy subjectNamingStrategy
) {
return new SchemaRegistryRawSchemaClient(schemaRepositoryInstanceResolver, objectMapper,
schemaRepositoryProperties.isValidationEnabled(), schemaRepositoryProperties.getDeleteSchemaPathSuffix(),
subjectNamingStrategy);
return new SchemaRegistryRawSchemaAdminClient(schemaRepositoryInstanceResolver, objectMapper,
schemaRepositoryProperties.isValidationEnabled(), schemaRepositoryProperties.getDeleteSchemaPathSuffix(),
subjectNamingStrategy);
}

@Bean
Expand All @@ -85,7 +85,7 @@ public SchemaRepositoryInstanceResolver defaultSchemaRepositoryInstanceResolver(
public SchemaRepository aggregateSchemaRepository(RawSchemaClient rawSchemaClient) {
SchemaVersionsRepository versionsRepository = new DirectSchemaVersionsRepository(rawSchemaClient);
CompiledSchemaRepository<Schema> avroSchemaRepository = new DirectCompiledSchemaRepository<>(
rawSchemaClient, SchemaCompilersFactory.avroSchemaCompiler());
rawSchemaClient, SchemaCompilersFactory.avroSchemaCompiler());

return new SchemaRepository(versionsRepository, avroSchemaRepository);
}
Expand Down
Loading

0 comments on commit 23b4ba8

Please sign in to comment.