Skip to content

Commit

Permalink
Merge branch 'main' into feat/operator-pod-disruption
Browse files Browse the repository at this point in the history
  • Loading branch information
EricWittmann authored Jan 27, 2025
2 parents 94435de + fb91990 commit e7a8776
Show file tree
Hide file tree
Showing 35 changed files with 922 additions and 120 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,14 @@ public class EnvironmentVariables {
public static final String APICURIO_REST_DELETION_ARTIFACT_ENABLED = "APICURIO_REST_DELETION_ARTIFACT_ENABLED";
public static final String APICURIO_REST_DELETION_GROUP_ENABLED = "APICURIO_REST_DELETION_GROUP_ENABLED";

public static final String APICURIO_REST_MUTABILITY_ARTIFACT_VERSION_CONTENT_ENABLED = "APICURIO_REST_MUTABILITY_ARTIFACT-VERSION-CONTENT_ENABLED";

private static final String KAFKA_PREFIX = "APICURIO_KAFKA_COMMON_";
public static final String KAFKASQL_SECURITY_PROTOCOL = KAFKA_PREFIX + "SECURITY_PROTOCOL";
public static final String KAFKASQL_SSL_KEYSTORE_TYPE = KAFKA_PREFIX + "SSL_KEYSTORE_TYPE";
public static final String KAFKASQL_SSL_KEYSTORE_LOCATION = KAFKA_PREFIX + "SSL_KEYSTORE_LOCATION";
public static final String KAFKASQL_SSL_KEYSTORE_PASSWORD = KAFKA_PREFIX + "SSL_KEYSTORE_PASSWORD";
public static final String KAFKASQL_SSL_TRUSTSTORE_TYPE = KAFKA_PREFIX + "SSL_TRUSTSTORE_TYPE";
public static final String KAFKASQL_SSL_TRUSTSTORE_LOCATION = KAFKA_PREFIX + "SSL_TRUSTSTORE_LOCATION";
public static final String KAFKASQL_SSL_TRUSTSTORE_PASSWORD = KAFKA_PREFIX + "SSL_TRUSTSTORE_PASSWORD";
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
package io.apicurio.registry.operator.feat;

import io.apicurio.registry.operator.EnvironmentVariables;
import io.apicurio.registry.operator.api.v1.ApicurioRegistry3;
import io.apicurio.registry.operator.api.v1.ApicurioRegistry3Spec;
import io.apicurio.registry.operator.api.v1.spec.ComponentSpec;
import io.apicurio.registry.operator.resource.ResourceFactory;
import io.apicurio.registry.operator.utils.IngressUtils;
import io.fabric8.kubernetes.api.model.EnvVar;
import io.fabric8.kubernetes.api.model.EnvVarBuilder;

import java.util.Arrays;
import java.util.LinkedHashMap;
import java.util.Optional;
import java.util.Set;
import java.util.TreeSet;

/**
* Helper class used to handle CORS related configuration.
*/
public class Cors {
/**
* Configure the QUARKUS_HTTP_CORS_ORIGINS environment variable with the following:
* <ul>
* <li>Add the ingress host</li>
* <li>Override if QUARKUS_HTTP_CORS_ORIGINS is configured in the "env" section</li>
* </ul>
*
* @param primary
* @param envVars
*/
public static void configureAllowedOrigins(ApicurioRegistry3 primary,
LinkedHashMap<String, EnvVar> envVars) {
TreeSet<String> allowedOrigins = new TreeSet<>();

// If the QUARKUS_HTTP_CORS_ORIGINS env var is configured in the "env" section of the CR,
// then make sure to add those configured values to the set of allowed origins we want to
// configure.
Optional.ofNullable(primary.getSpec()).map(ApicurioRegistry3Spec::getApp).map(ComponentSpec::getEnv)
.ifPresent(env -> {
env.stream().filter(
envVar -> envVar.getName().equals(EnvironmentVariables.QUARKUS_HTTP_CORS_ORIGINS))
.forEach(envVar -> {
Optional.ofNullable(envVar.getValue()).ifPresent(envVarValue -> {
Arrays.stream(envVarValue.split(",")).forEach(allowedOrigins::add);
});
});
});

// If not, let's try to figure it out from other sources.
if (allowedOrigins.isEmpty()) {
// If there is a configured Ingress host for the UI or the Studio UI, add them to the allowed
// origins.
Set.of(ResourceFactory.COMPONENT_UI, ResourceFactory.COMPONENT_STUDIO_UI).forEach(component -> {
String host = IngressUtils.getConfiguredHost(component, primary);
if (host != null) {
allowedOrigins.add("http://" + host);
allowedOrigins.add("https://" + host);
}
});
}

// If we still do not have anything, then default to "*"
if (allowedOrigins.isEmpty()) {
allowedOrigins.add("*");
}

// Join the values in allowedOrigins into a String and set it as the new value of the env var.
String envVarValue = String.join(",", allowedOrigins);
envVars.put(EnvironmentVariables.QUARKUS_HTTP_CORS_ORIGINS, new EnvVarBuilder()
.withName(EnvironmentVariables.QUARKUS_HTTP_CORS_ORIGINS).withValue(envVarValue).build());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,26 +6,38 @@
import io.apicurio.registry.operator.api.v1.spec.StorageSpec;
import io.fabric8.kubernetes.api.model.EnvVar;
import io.fabric8.kubernetes.api.model.EnvVarBuilder;
import io.fabric8.kubernetes.api.model.apps.Deployment;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Map;

import static io.apicurio.registry.operator.api.v1.ContainerNames.REGISTRY_APP_CONTAINER_NAME;
import static io.apicurio.registry.operator.resource.app.AppDeploymentResource.addEnvVar;
import static io.apicurio.registry.operator.utils.Utils.isBlank;
import static java.util.Optional.ofNullable;

public class KafkaSql {

private static final Logger log = LoggerFactory.getLogger(KafkaSql.class);

public static String ENV_STORAGE_KIND = "APICURIO_STORAGE_KIND";
public static String ENV_KAFKASQL_BOOTSTRAP_SERVERS = "APICURIO_KAFKASQL_BOOTSTRAP_SERVERS";

public static void configureKafkaSQL(ApicurioRegistry3 primary, Map<String, EnvVar> env) {
public static void configureKafkaSQL(ApicurioRegistry3 primary, Deployment deployment,
Map<String, EnvVar> env) {
ofNullable(primary.getSpec()).map(ApicurioRegistry3Spec::getApp).map(AppSpec::getStorage)
.map(StorageSpec::getKafkasql).ifPresent(kafkasql -> {
if (!isBlank(kafkasql.getBootstrapServers())) {
addEnvVar(env,
new EnvVarBuilder().withName(ENV_STORAGE_KIND).withValue("kafkasql").build());
addEnvVar(env, new EnvVarBuilder().withName(ENV_KAFKASQL_BOOTSTRAP_SERVERS)
.withValue(kafkasql.getBootstrapServers()).build());

if (KafkaSqlTLS.configureKafkaSQLTLS(primary, deployment, REGISTRY_APP_CONTAINER_NAME,
env)) {
log.info("KafkaSQL storage with TLS security configured.");
}
}
});
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
package io.apicurio.registry.operator.feat;

import io.apicurio.registry.operator.api.v1.ApicurioRegistry3;
import io.apicurio.registry.operator.api.v1.ApicurioRegistry3Spec;
import io.apicurio.registry.operator.api.v1.spec.AppSpec;
import io.apicurio.registry.operator.api.v1.spec.KafkaSqlSpec;
import io.apicurio.registry.operator.api.v1.spec.KafkaSqlTLSSpec;
import io.apicurio.registry.operator.api.v1.spec.StorageSpec;
import io.apicurio.registry.operator.utils.SecretKeyRefTool;
import io.fabric8.kubernetes.api.model.EnvVar;
import io.fabric8.kubernetes.api.model.apps.Deployment;

import java.util.Map;
import java.util.Optional;

import static io.apicurio.registry.operator.EnvironmentVariables.*;
import static io.apicurio.registry.operator.resource.app.AppDeploymentResource.addEnvVar;
import static java.util.Optional.ofNullable;

public class KafkaSqlTLS {

/**
* Plain KafkaSQL must be already configured.
*/
public static boolean configureKafkaSQLTLS(ApicurioRegistry3 primary, Deployment deployment,
String containerName, Map<String, EnvVar> env) {

// spotless:off
var keystore = new SecretKeyRefTool(getKafkaSqlTLSSpec(primary)
.map(KafkaSqlTLSSpec::getKeystoreSecretRef)
.orElse(null), "user.p12");

var keystorePassword = new SecretKeyRefTool(getKafkaSqlTLSSpec(primary)
.map(KafkaSqlTLSSpec::getKeystorePasswordSecretRef)
.orElse(null), "user.password");

var truststore = new SecretKeyRefTool(getKafkaSqlTLSSpec(primary)
.map(KafkaSqlTLSSpec::getTruststoreSecretRef)
.orElse(null), "ca.p12");

var truststorePassword = new SecretKeyRefTool(getKafkaSqlTLSSpec(primary)
.map(KafkaSqlTLSSpec::getTruststorePasswordSecretRef)
.orElse(null), "ca.password");
// spotless:on

if (truststore.isValid() && truststorePassword.isValid() && keystore.isValid()
&& keystorePassword.isValid()) {

addEnvVar(env, KAFKASQL_SECURITY_PROTOCOL, "SSL");

// ===== Keystore

addEnvVar(env, KAFKASQL_SSL_KEYSTORE_TYPE, "PKCS12");
keystore.applySecretVolume(deployment, containerName);
addEnvVar(env, KAFKASQL_SSL_KEYSTORE_LOCATION, keystore.getSecretVolumeKeyPath());
keystorePassword.applySecretEnvVar(env, KAFKASQL_SSL_KEYSTORE_PASSWORD);

// ===== Truststore

addEnvVar(env, KAFKASQL_SSL_TRUSTSTORE_TYPE, "PKCS12");
truststore.applySecretVolume(deployment, containerName);
addEnvVar(env, KAFKASQL_SSL_TRUSTSTORE_LOCATION, truststore.getSecretVolumeKeyPath());
truststorePassword.applySecretEnvVar(env, KAFKASQL_SSL_TRUSTSTORE_PASSWORD);

return true;
}
return false;
}

private static Optional<KafkaSqlTLSSpec> getKafkaSqlTLSSpec(ApicurioRegistry3 primary) {
// spotless:off
return ofNullable(primary)
.map(ApicurioRegistry3::getSpec)
.map(ApicurioRegistry3Spec::getApp)
.map(AppSpec::getStorage)
.map(StorageSpec::getKafkasql)
.map(KafkaSqlSpec::getTls);
// spotless:on
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -297,8 +297,20 @@ public static <T> T deserialize(String path, Class<T> klass) {
}
}

public static <T> T deserialize(String path, Class<T> klass, ClassLoader classLoader) {
try {
return YAML_MAPPER.readValue(load(path, classLoader), klass);
} catch (JsonProcessingException ex) {
throw new OperatorException("Could not deserialize resource: " + path, ex);
}
}

public static String load(String path) {
try (var stream = Thread.currentThread().getContextClassLoader().getResourceAsStream(path)) {
return load(path, Thread.currentThread().getContextClassLoader());
}

public static String load(String path, ClassLoader classLoader) {
try (var stream = classLoader.getResourceAsStream(path)) {
return new String(stream.readAllBytes(), Charset.defaultCharset());
} catch (Exception ex) {
throw new OperatorException("Could not read resource: " + path, ex);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import io.apicurio.registry.operator.api.v1.spec.AppFeaturesSpec;
import io.apicurio.registry.operator.api.v1.spec.AppSpec;
import io.apicurio.registry.operator.api.v1.spec.StorageSpec;
import io.apicurio.registry.operator.feat.Cors;
import io.apicurio.registry.operator.feat.KafkaSql;
import io.apicurio.registry.operator.feat.PostgresSql;
import io.fabric8.kubernetes.api.model.Container;
Expand Down Expand Up @@ -51,7 +52,7 @@ public AppDeploymentResource() {
@Override
protected Deployment desired(ApicurioRegistry3 primary, Context<ApicurioRegistry3> context) {

var d = APP_DEPLOYMENT_KEY.getFactory().apply(primary);
var deployment = APP_DEPLOYMENT_KEY.getFactory().apply(primary);

var envVars = new LinkedHashMap<String, EnvVar>();
ofNullable(primary.getSpec()).map(ApicurioRegistry3Spec::getApp).map(AppSpec::getEnv)
Expand All @@ -60,7 +61,6 @@ protected Deployment desired(ApicurioRegistry3 primary, Context<ApicurioRegistry
// spotless:off
addEnvVar(envVars, new EnvVarBuilder().withName(EnvironmentVariables.QUARKUS_PROFILE).withValue("prod").build());
addEnvVar(envVars, new EnvVarBuilder().withName(EnvironmentVariables.QUARKUS_HTTP_ACCESS_LOG_ENABLED).withValue("true").build());
addEnvVar(envVars, new EnvVarBuilder().withName(EnvironmentVariables.QUARKUS_HTTP_CORS_ORIGINS).withValue("*").build());

// Enable deletes if configured in the CR
boolean allowDeletes = Optional.ofNullable(primary.getSpec().getApp())
Expand All @@ -72,31 +72,37 @@ protected Deployment desired(ApicurioRegistry3 primary, Context<ApicurioRegistry
addEnvVar(envVars, new EnvVarBuilder().withName(EnvironmentVariables.APICURIO_REST_DELETION_ARTIFACT_ENABLED).withValue("true").build());
addEnvVar(envVars, new EnvVarBuilder().withName(EnvironmentVariables.APICURIO_REST_DELETION_GROUP_ENABLED).withValue("true").build());
}
// spotless:on

// This is enabled only if Studio is deployed. It is based on Service in case a custom Ingress is
// used.
// Configure the CORS_ALLOWED_ORIGINS env var based on the ingress host
Cors.configureAllowedOrigins(primary, envVars);

// Enable the "mutability" feature in Registry, but only if Studio is deployed. It is based on Service
// in case a custom Ingress is used.
var sOpt = context.getSecondaryResource(STUDIO_UI_SERVICE_KEY.getKlass(),
STUDIO_UI_SERVICE_KEY.getDiscriminator());
sOpt.ifPresent(s -> {
addEnvVar(envVars,
new EnvVarBuilder().withName("APICURIO_REST_MUTABILITY_ARTIFACT-VERSION-CONTENT_ENABLED")
new EnvVarBuilder().withName(EnvironmentVariables.APICURIO_REST_MUTABILITY_ARTIFACT_VERSION_CONTENT_ENABLED)
.withValue("true").build());
});

// spotless:on

// Configure the storage (Postgresql or KafkaSql).
ofNullable(primary.getSpec()).map(ApicurioRegistry3Spec::getApp).map(AppSpec::getStorage)
.map(StorageSpec::getType).ifPresent(storageType -> {
switch (storageType) {
case POSTGRESQL -> PostgresSql.configureDatasource(primary, envVars);
case KAFKASQL -> KafkaSql.configureKafkaSQL(primary, envVars);
case KAFKASQL -> KafkaSql.configureKafkaSQL(primary, deployment, envVars);
}
});

var container = getContainerFromDeployment(d, REGISTRY_APP_CONTAINER_NAME);
// Set the ENV VARs on the deployment's container spec.
var container = getContainerFromDeployment(deployment, REGISTRY_APP_CONTAINER_NAME);
container.setEnv(envVars.values().stream().toList());

log.debug("Desired {} is {}", APP_DEPLOYMENT_KEY.getId(), toYAML(d));
return d;
log.debug("Desired {} is {}", APP_DEPLOYMENT_KEY.getId(), toYAML(deployment));
return deployment;
}

public static void addEnvVar(Map<String, EnvVar> map, EnvVar envVar) {
Expand All @@ -105,6 +111,10 @@ public static void addEnvVar(Map<String, EnvVar> map, EnvVar envVar) {
}
}

public static void addEnvVar(Map<String, EnvVar> map, String name, String value) {
addEnvVar(map, new EnvVarBuilder().withName(name).withValue(value).build());
}

/**
* Get container with a given name from the given Deployment.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ public static boolean update(ApicurioRegistry3 primary) {
var storageType = ofNullable(primary.getSpec()).map(ApicurioRegistry3Spec::getApp)
.map(AppSpec::getStorage).map(StorageSpec::getType);
var bootstrapServers = ofNullable(primary.getSpec()).map(ApicurioRegistry3Spec::getApp)
.map(AppSpec::getStorage).map(StorageSpec::getKafkasql).map(KafkasqlSpec::getBootstrapServers)
.map(AppSpec::getStorage).map(StorageSpec::getKafkasql).map(KafkaSqlSpec::getBootstrapServers)
.filter(x -> !isBlank(x));
if (prevBootstrapServers.isPresent()) {
log.warn("CR field `app.kafkasql.boostrapServers` is DEPRECATED and should not be used.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,21 +28,39 @@ public final class IngressUtils {
private IngressUtils() {
}

public static String getHost(String component, ApicurioRegistry3 p) {
/**
* Get the host configured in the ingress. If no host is configured in the ingress then a null is
* returned.
*
* @param component
* @param primary
*/
public static String getConfiguredHost(String component, ApicurioRegistry3 primary) {
String host = switch (component) {
case COMPONENT_APP -> ofNullable(p.getSpec()).map(ApicurioRegistry3Spec::getApp)
case COMPONENT_APP -> ofNullable(primary.getSpec()).map(ApicurioRegistry3Spec::getApp)
.map(AppSpec::getIngress).map(IngressSpec::getHost).filter(h -> !isBlank(h)).orElse(null);
case COMPONENT_UI -> ofNullable(p.getSpec()).map(ApicurioRegistry3Spec::getUi)
case COMPONENT_UI -> ofNullable(primary.getSpec()).map(ApicurioRegistry3Spec::getUi)
.map(UiSpec::getIngress).map(IngressSpec::getHost).filter(h -> !isBlank(h)).orElse(null);
case COMPONENT_STUDIO_UI ->
ofNullable(p.getSpec()).map(ApicurioRegistry3Spec::getStudioUi).map(StudioUiSpec::getIngress)
.map(IngressSpec::getHost).filter(h -> !isBlank(h)).orElse(null);
case COMPONENT_STUDIO_UI -> ofNullable(primary.getSpec()).map(ApicurioRegistry3Spec::getStudioUi)
.map(StudioUiSpec::getIngress).map(IngressSpec::getHost).filter(h -> !isBlank(h))
.orElse(null);
default -> throw new OperatorException("Unexpected value: " + component);
};
return host;
}

/**
* Get the host for an ingress. If not configured, a default value is returned.
*
* @param component
* @param primary
*/
public static String getHost(String component, ApicurioRegistry3 primary) {
String host = getConfiguredHost(component, primary);
if (host == null) {
// TODO: This is not used because of the current activation conditions.
host = "%s-%s.%s%s".formatted(p.getMetadata().getName(), component,
p.getMetadata().getNamespace(), Configuration.getDefaultBaseHost());
host = "%s-%s.%s%s".formatted(primary.getMetadata().getName(), component,
primary.getMetadata().getNamespace(), Configuration.getDefaultBaseHost());
}
log.debug("Host for component {} is {}", component, host);
return host;
Expand Down
Loading

0 comments on commit e7a8776

Please sign in to comment.