Skip to content

Commit

Permalink
Propagating http headers in Kafka headers (#1612)
Browse files Browse the repository at this point in the history
Propagating http headers in Kafka headers
  • Loading branch information
druminski authored Nov 3, 2023
1 parent c8bff9a commit cbf655c
Show file tree
Hide file tree
Showing 43 changed files with 448 additions and 283 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import java.util.UUID;
import java.util.concurrent.TimeUnit;

import static java.util.Collections.emptyMap;
import static pl.allegro.tech.hermes.test.helper.builder.TopicBuilder.topic;

@Fork(1)
Expand Down Expand Up @@ -63,7 +64,7 @@ public void setup() throws IOException {
private Message generateMessage() {
byte[] messageContent = UUID.randomUUID().toString().getBytes();
String id = MessageIdGenerator.generate();
return new JsonMessage(id, messageContent, System.currentTimeMillis(), "partition-key");
return new JsonMessage(id, messageContent, System.currentTimeMillis(), "partition-key", emptyMap());
}

private File prepareFile() throws IOException {
Expand Down Expand Up @@ -109,7 +110,8 @@ public void save(Message message, Topic topic) {
topic.getQualifiedName(),
message.getPartitionKey(),
null,
null);
null,
emptyMap());
map.put(message.getId(), entryValue);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@
import pl.allegro.tech.hermes.common.metric.HermesMetrics;
import pl.allegro.tech.hermes.common.metric.MetricsFacade;
import pl.allegro.tech.hermes.frontend.cache.topic.TopicsCache;
import pl.allegro.tech.hermes.frontend.config.HTTPHeadersProperties;
import pl.allegro.tech.hermes.frontend.config.HandlersChainProperties;
import pl.allegro.tech.hermes.frontend.config.HeaderPropagationProperties;
import pl.allegro.tech.hermes.frontend.config.HermesServerProperties;
import pl.allegro.tech.hermes.frontend.config.SchemaProperties;
import pl.allegro.tech.hermes.frontend.config.SslProperties;
Expand Down Expand Up @@ -81,7 +81,7 @@ static HermesServer provideHermesServer() throws IOException {
private static HttpHandler provideHttpHandler(ThroughputLimiter throughputLimiter,
TopicsCache topicsCache, BrokerMessageProducer brokerMessageProducer,
RawSchemaClient rawSchemaClient, Trackers trackers, AvroMessageContentWrapper avroMessageContentWrapper) {
HeaderPropagationProperties headerPropagationProperties = new HeaderPropagationProperties();
HTTPHeadersProperties httpHeadersProperties = new HTTPHeadersProperties();
HandlersChainProperties handlersChainProperties = new HandlersChainProperties();
TrackingHeadersExtractor trackingHeadersExtractor = new DefaultTrackingHeaderExtractor();
SchemaProperties schemaProperties = new SchemaProperties();
Expand All @@ -97,7 +97,7 @@ private static HttpHandler provideHttpHandler(ThroughputLimiter throughputLimite
new DirectSchemaVersionsRepository(rawSchemaClient),
new DirectCompiledSchemaRepository<>(rawSchemaClient, SchemaCompilersFactory.avroSchemaCompiler())
),
new DefaultHeadersPropagator(headerPropagationProperties.isEnabled(), headerPropagationProperties.getAllowFilter()),
new DefaultHeadersPropagator(httpHeadersProperties),
new BenchmarkMessageContentWrapper(avroMessageContentWrapper),
Clock.systemDefaultZone(),
schemaProperties.isIdHeaderEnabled()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package pl.allegro.tech.hermes.common.kafka;

public interface HTTPHeadersPropagationAsKafkaHeadersProperties {
boolean isEnabled();

String getPrefix();
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,4 @@ public interface KafkaHeaderNameParameters {

String getMessageId();

String getTimestamp();

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package pl.allegro.tech.hermes.consumers.config;

import org.springframework.boot.context.properties.ConfigurationProperties;
import pl.allegro.tech.hermes.common.kafka.HTTPHeadersPropagationAsKafkaHeadersProperties;

@ConfigurationProperties(prefix = "consumer.http.headers.propagation-as-kafka-headers")
public class ConsumerHTTPHeadersPropagationAsKafkaHeadersProperties implements HTTPHeadersPropagationAsKafkaHeadersProperties {

public boolean enabled = true;
public String prefix = "h-";

@Override
public boolean isEnabled() {
return enabled;
}

@Override
public String getPrefix() {
return prefix;
}

public void setEnabled(boolean enabled) {
this.enabled = enabled;
}

public void setPrefix(String prefix) {
this.prefix = prefix;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import pl.allegro.tech.hermes.common.kafka.HTTPHeadersPropagationAsKafkaHeadersProperties;
import pl.allegro.tech.hermes.common.kafka.KafkaNamesMapper;
import pl.allegro.tech.hermes.common.message.wrapper.CompositeMessageContentWrapper;
import pl.allegro.tech.hermes.common.metric.MetricsFacade;
Expand All @@ -27,7 +28,8 @@
KafkaConsumerProperties.class,
KafkaClustersProperties.class,
CommonConsumerProperties.class,
KafkaHeaderNameProperties.class
KafkaHeaderNameProperties.class,
ConsumerHTTPHeadersPropagationAsKafkaHeadersProperties.class
})
public class ConsumerReceiverConfiguration {

Expand Down Expand Up @@ -78,7 +80,9 @@ public MessageContentReaderFactory messageContentReaderFactory(CompositeMessageC
}

@Bean
public KafkaHeaderExtractor kafkaHeaderExtractor(KafkaHeaderNameProperties kafkaHeaderNameProperties) {
return new KafkaHeaderExtractor(kafkaHeaderNameProperties);
public KafkaHeaderExtractor kafkaHeaderExtractor(
KafkaHeaderNameProperties kafkaHeaderNameProperties,
HTTPHeadersPropagationAsKafkaHeadersProperties httpHeadersPropagationAsKafkaHeadersProperties) {
return new KafkaHeaderExtractor(kafkaHeaderNameProperties, httpHeadersPropagationAsKafkaHeadersProperties);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@ public class KafkaHeaderNameProperties implements KafkaHeaderNameParameters {

private String messageId = "id";

private String timestamp = "ts";

@Override
public String getSchemaVersion() {
return schemaVersion;
Expand Down Expand Up @@ -41,12 +39,4 @@ public void setMessageId(String messageId) {
this.messageId = messageId;
}

@Override
public String getTimestamp() {
return timestamp;
}

public void setTimestamp(String timestamp) {
this.timestamp = timestamp;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,13 @@ public KafkaConsumerRecordToMessageConverter(Topic topic,
public Message convertToMessage(ConsumerRecord<byte[], byte[]> record, long partitionAssignmentTerm) {
KafkaTopic kafkaTopic = topics.get(record.topic());
UnwrappedMessageContent unwrappedContent = messageContentReader.read(record, kafkaTopic.contentType());

Map<String, String> externalMetadata = kafkaHeaderExtractor.extractHTTPHeadersIfEnabled(record.headers());
// compatibility condition, can be removed when all messages have HTTP headers propagated via Kafka headers.
if (externalMetadata.isEmpty()) {
externalMetadata = unwrappedContent.getMessageMetadata().getExternalMetadata();
}

return new Message(
kafkaHeaderExtractor.extractMessageId(record.headers()),
topic.getQualifiedName(),
Expand All @@ -47,7 +54,7 @@ public Message convertToMessage(ConsumerRecord<byte[], byte[]> record, long part
clock.millis(),
new PartitionOffset(kafkaTopic.name(), record.offset(), record.partition()),
partitionAssignmentTerm,
unwrappedContent.getMessageMetadata().getExternalMetadata(),
externalMetadata,
subscription.getHeaders(),
subscription.getName(),
subscription.isSubscriptionIdentityHeadersEnabled()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,28 @@
import com.google.common.primitives.Ints;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.Headers;
import pl.allegro.tech.hermes.common.kafka.HTTPHeadersPropagationAsKafkaHeadersProperties;
import pl.allegro.tech.hermes.consumers.config.KafkaHeaderNameProperties;

import java.util.Map;

import static java.nio.charset.StandardCharsets.UTF_8;
import static java.util.Collections.emptyMap;
import static java.util.stream.Collectors.toMap;
import static java.util.stream.StreamSupport.stream;

public class KafkaHeaderExtractor {

private final KafkaHeaderNameProperties kafkaHeaderNameProperties;
private final boolean isHTTPheadersPropagationAsKafkaHeadersEnabled;
private final String httpHeadersPrefix;

public KafkaHeaderExtractor(KafkaHeaderNameProperties kafkaHeaderNameProperties) {
public KafkaHeaderExtractor(KafkaHeaderNameProperties kafkaHeaderNameProperties,
HTTPHeadersPropagationAsKafkaHeadersProperties httpHeadersPropagationAsKafkaHeadersProperties) {

this.kafkaHeaderNameProperties = kafkaHeaderNameProperties;
this.isHTTPheadersPropagationAsKafkaHeadersEnabled = httpHeadersPropagationAsKafkaHeadersProperties.isEnabled();
this.httpHeadersPrefix = httpHeadersPropagationAsKafkaHeadersProperties.getPrefix();
}

public Integer extractSchemaVersion(Headers headers) {
Expand Down Expand Up @@ -42,4 +53,15 @@ public String extractMessageId(Headers headers) {
return new String(header.value(), UTF_8);
}

public Map<String, String> extractHTTPHeadersIfEnabled(Headers headers) {
return isHTTPheadersPropagationAsKafkaHeadersEnabled
?
stream(headers.spliterator(), false)
.filter(h -> h.key().startsWith(httpHeadersPrefix))
.collect(toMap(
h -> h.key().substring(httpHeadersPrefix.length()),
h -> new String(h.value(), UTF_8)))
:
emptyMap();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import java.io.Serializable;
import java.util.Arrays;
import java.util.Map;
import java.util.Objects;

public class BackupMessage implements Serializable {
Expand All @@ -13,16 +14,18 @@ public class BackupMessage implements Serializable {
private final String partitionKey;
private final Integer schemaVersion;
private final Integer schemaId;
private final Map<String, String> propagatedHTTPHeaders;

public BackupMessage(String messageId, byte[] data, long timestamp, String qualifiedTopicName, String partitionKey,
Integer schemaVersion, Integer schemaId) {
Integer schemaVersion, Integer schemaId, Map<String, String> propagatedHTTPHeaders) {
this.messageId = messageId;
this.data = data;
this.timestamp = timestamp;
this.qualifiedTopicName = qualifiedTopicName;
this.partitionKey = partitionKey;
this.schemaVersion = schemaVersion;
this.schemaId = schemaId;
this.propagatedHTTPHeaders = propagatedHTTPHeaders;
}

public String getMessageId() {
Expand Down Expand Up @@ -53,6 +56,10 @@ public Integer getSchemaId() {
return schemaId;
}

public Map<String, String> getPropagatedHTTPHeaders() {
return propagatedHTTPHeaders;
}

@Override
public boolean equals(Object o) {
if (this == o) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,12 +189,12 @@ private Message createAvroMessageFromVersion(BackupMessage backupMessage, Option

private Message createAvroMessage(BackupMessage backupMessage, CompiledSchema<Schema> schema) {
return new AvroMessage(backupMessage.getMessageId(), backupMessage.getData(), backupMessage.getTimestamp(), schema,
backupMessage.getPartitionKey());
backupMessage.getPartitionKey(), backupMessage.getPropagatedHTTPHeaders());
}

private Message createJsonMessage(BackupMessage backupMessage) {
return new JsonMessage(backupMessage.getMessageId(), backupMessage.getData(), backupMessage.getTimestamp(),
backupMessage.getPartitionKey());
backupMessage.getPartitionKey(), backupMessage.getPropagatedHTTPHeaders());
}

private boolean sendMessageIfNeeded(Message message, String topicQualifiedName, Optional<CachedTopic> cachedTopic, String contextName) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import java.io.Serializable;
import java.util.Arrays;
import java.util.Map;
import java.util.Objects;

public class ChronicleMapEntryValue implements Serializable {
Expand All @@ -13,15 +14,17 @@ public class ChronicleMapEntryValue implements Serializable {
private final String partitionKey;
private final Integer schemaVersion;
private final Integer schemaId;
private final Map<String, String> propagatedHttpHeaders;

public ChronicleMapEntryValue(byte[] data, long timestamp, String qualifiedTopicName, String partitionKey,
Integer schemaVersion, Integer schemaId) {
Integer schemaVersion, Integer schemaId, Map<String, String> propagatedHttpHeaders) {
this.data = data;
this.timestamp = timestamp;
this.qualifiedTopicName = qualifiedTopicName;
this.partitionKey = partitionKey;
this.schemaVersion = schemaVersion;
this.schemaId = schemaId;
this.propagatedHttpHeaders = propagatedHttpHeaders;
}

public byte[] getData() {
Expand All @@ -48,6 +51,10 @@ public Integer getSchemaId() {
return schemaId;
}

public Map<String, String> getPropagatedHttpHeaders() {
return propagatedHttpHeaders;
}

@Override
public boolean equals(Object o) {
if (this == o) {
Expand All @@ -60,11 +67,14 @@ public boolean equals(Object o) {
return Objects.equals(timestamp, that.timestamp)
&& Arrays.equals(data, that.data)
&& Objects.equals(qualifiedTopicName, that.qualifiedTopicName)
&& Objects.equals(partitionKey, that.partitionKey);
&& Objects.equals(partitionKey, that.partitionKey)
&& Objects.equals(propagatedHttpHeaders, that.propagatedHttpHeaders);
}

@Override
public int hashCode() {
return Objects.hash(data, timestamp, qualifiedTopicName, partitionKey, schemaVersion, schemaId);
return Objects.hash(data, timestamp, qualifiedTopicName, partitionKey, schemaVersion, schemaId,
propagatedHttpHeaders);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ public void save(Message message, Topic topic) {
new ChronicleMapEntryValue(
message.getData(), message.getTimestamp(), topic.getQualifiedName(),
message.getPartitionKey(), message.getCompiledSchema().map(v -> v.getVersion().value()).orElse(null),
message.getCompiledSchema().map(v -> v.getId().value()).orElse(null)));
message.getCompiledSchema().map(v -> v.getId().value()).orElse(null), message.getHTTPHeaders()));
} finally {
lock.unlock();
}
Expand All @@ -92,7 +92,7 @@ public void close() {
private BackupMessage toBackupMessage(String id, ChronicleMapEntryValue entryValue) {
return new BackupMessage(id, entryValue.getData(), entryValue.getTimestamp(),
entryValue.getQualifiedTopicName(), entryValue.getPartitionKey(), entryValue.getSchemaVersion(),
entryValue.getSchemaId());
entryValue.getSchemaId(), entryValue.getPropagatedHttpHeaders());
}

private class LoggingMapSizePreShutdownHook implements Runnable {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@
SchemaProperties.class,
KafkaHeaderNameProperties.class,
KafkaProducerProperties.class,
KafkaClustersProperties.class
KafkaClustersProperties.class,
HTTPHeadersProperties.class
})
public class FrontendProducerConfiguration {

Expand All @@ -33,8 +34,9 @@ public BrokerMessageProducer kafkaBrokerMessageProducer(Producers producers,
}

@Bean
public KafkaHeaderFactory kafkaHeaderFactory(KafkaHeaderNameProperties kafkaHeaderNameProperties) {
return new KafkaHeaderFactory(kafkaHeaderNameProperties);
public KafkaHeaderFactory kafkaHeaderFactory(KafkaHeaderNameProperties kafkaHeaderNameProperties,
HTTPHeadersProperties httpHeadersProperties) {
return new KafkaHeaderFactory(kafkaHeaderNameProperties, httpHeadersProperties.getPropagationAsKafkaHeaders());
}

@Bean(destroyMethod = "close")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
@EnableConfigurationProperties({
ThroughputProperties.class,
MessagePreviewProperties.class,
HeaderPropagationProperties.class,
HandlersChainProperties.class,
SchemaProperties.class
})
Expand Down Expand Up @@ -95,8 +94,8 @@ public MessageFactory messageFactory(MessageValidators validators,
}

@Bean
public HeadersPropagator defaultHeadersPropagator(HeaderPropagationProperties headerPropagationProperties) {
return new DefaultHeadersPropagator(headerPropagationProperties.isEnabled(), headerPropagationProperties.getAllowFilter());
public HeadersPropagator defaultHeadersPropagator(HTTPHeadersProperties httpHeadersProperties) {
return new DefaultHeadersPropagator(httpHeadersProperties);
}

@Bean
Expand Down
Loading

0 comments on commit cbf655c

Please sign in to comment.