diff --git a/operator/controller/src/main/java/io/apicurio/registry/operator/EnvironmentVariables.java b/operator/controller/src/main/java/io/apicurio/registry/operator/EnvironmentVariables.java index 3d72573cf4..aeaeee269f 100644 --- a/operator/controller/src/main/java/io/apicurio/registry/operator/EnvironmentVariables.java +++ b/operator/controller/src/main/java/io/apicurio/registry/operator/EnvironmentVariables.java @@ -12,4 +12,12 @@ public class EnvironmentVariables { public static final String APICURIO_REST_MUTABILITY_ARTIFACT_VERSION_CONTENT_ENABLED = "APICURIO_REST_MUTABILITY_ARTIFACT-VERSION-CONTENT_ENABLED"; + private static final String KAFKA_PREFIX = "APICURIO_KAFKA_COMMON_"; + public static final String KAFKASQL_SECURITY_PROTOCOL = KAFKA_PREFIX + "SECURITY_PROTOCOL"; + public static final String KAFKASQL_SSL_KEYSTORE_TYPE = KAFKA_PREFIX + "SSL_KEYSTORE_TYPE"; + public static final String KAFKASQL_SSL_KEYSTORE_LOCATION = KAFKA_PREFIX + "SSL_KEYSTORE_LOCATION"; + public static final String KAFKASQL_SSL_KEYSTORE_PASSWORD = KAFKA_PREFIX + "SSL_KEYSTORE_PASSWORD"; + public static final String KAFKASQL_SSL_TRUSTSTORE_TYPE = KAFKA_PREFIX + "SSL_TRUSTSTORE_TYPE"; + public static final String KAFKASQL_SSL_TRUSTSTORE_LOCATION = KAFKA_PREFIX + "SSL_TRUSTSTORE_LOCATION"; + public static final String KAFKASQL_SSL_TRUSTSTORE_PASSWORD = KAFKA_PREFIX + "SSL_TRUSTSTORE_PASSWORD"; } diff --git a/operator/controller/src/main/java/io/apicurio/registry/operator/feat/KafkaSqlTLS.java b/operator/controller/src/main/java/io/apicurio/registry/operator/feat/KafkaSqlTLS.java index 76fa36959a..4e34f886e9 100644 --- a/operator/controller/src/main/java/io/apicurio/registry/operator/feat/KafkaSqlTLS.java +++ b/operator/controller/src/main/java/io/apicurio/registry/operator/feat/KafkaSqlTLS.java @@ -11,22 +11,14 @@ import io.fabric8.kubernetes.api.model.apps.Deployment; import java.util.Map; +import java.util.Optional; +import static io.apicurio.registry.operator.EnvironmentVariables.*; import static io.apicurio.registry.operator.resource.app.AppDeploymentResource.addEnvVar; import static java.util.Optional.ofNullable; public class KafkaSqlTLS { - public static final String ENV_KAFKASQL_SECURITY_PROTOCOL = "APICURIO_KAFKA_COMMON_SECURITY_PROTOCOL"; - - public static final String ENV_KAFKASQL_SSL_KEYSTORE_TYPE = "APICURIO_KAFKA_COMMON_SSL_KEYSTORE_TYPE"; - public static final String ENV_KAFKASQL_SSL_KEYSTORE_LOCATION = "APICURIO_KAFKA_COMMON_SSL_KEYSTORE_LOCATION"; - public static final String ENV_KAFKASQL_SSL_KEYSTORE_PASSWORD = "APICURIO_KAFKA_COMMON_SSL_KEYSTORE_PASSWORD"; - - public static final String ENV_KAFKASQL_SSL_TRUSTSTORE_TYPE = "APICURIO_KAFKA_COMMON_SSL_TRUSTSTORE_TYPE"; - public static final String ENV_KAFKASQL_SSL_TRUSTSTORE_LOCATION = "APICURIO_KAFKA_COMMON_SSL_TRUSTSTORE_LOCATION"; - public static final String ENV_KAFKASQL_SSL_TRUSTSTORE_PASSWORD = "APICURIO_KAFKA_COMMON_SSL_TRUSTSTORE_PASSWORD"; - /** * Plain KafkaSQL must be already configured. */ @@ -34,39 +26,19 @@ public static boolean configureKafkaSQLTLS(ApicurioRegistry3 primary, Deployment String containerName, Map env) { // spotless:off - var keystore = new SecretKeyRefTool(ofNullable(primary) - .map(ApicurioRegistry3::getSpec) - .map(ApicurioRegistry3Spec::getApp) - .map(AppSpec::getStorage) - .map(StorageSpec::getKafkasql) - .map(KafkaSqlSpec::getTls) + var keystore = new SecretKeyRefTool(getKafkaSqlTLSSpec(primary) .map(KafkaSqlTLSSpec::getKeystoreSecretRef) .orElse(null), "user.p12"); - var keystorePassword = new SecretKeyRefTool(ofNullable(primary) - .map(ApicurioRegistry3::getSpec) - .map(ApicurioRegistry3Spec::getApp) - .map(AppSpec::getStorage) - .map(StorageSpec::getKafkasql) - .map(KafkaSqlSpec::getTls) + var keystorePassword = new SecretKeyRefTool(getKafkaSqlTLSSpec(primary) .map(KafkaSqlTLSSpec::getKeystorePasswordSecretRef) .orElse(null), "user.password"); - var truststore = new SecretKeyRefTool(ofNullable(primary) - .map(ApicurioRegistry3::getSpec) - .map(ApicurioRegistry3Spec::getApp) - .map(AppSpec::getStorage) - .map(StorageSpec::getKafkasql) - .map(KafkaSqlSpec::getTls) + var truststore = new SecretKeyRefTool(getKafkaSqlTLSSpec(primary) .map(KafkaSqlTLSSpec::getTruststoreSecretRef) .orElse(null), "ca.p12"); - var truststorePassword = new SecretKeyRefTool(ofNullable(primary) - .map(ApicurioRegistry3::getSpec) - .map(ApicurioRegistry3Spec::getApp) - .map(AppSpec::getStorage) - .map(StorageSpec::getKafkasql) - .map(KafkaSqlSpec::getTls) + var truststorePassword = new SecretKeyRefTool(getKafkaSqlTLSSpec(primary) .map(KafkaSqlTLSSpec::getTruststorePasswordSecretRef) .orElse(null), "ca.password"); // spotless:on @@ -74,24 +46,35 @@ public static boolean configureKafkaSQLTLS(ApicurioRegistry3 primary, Deployment if (truststore.isValid() && truststorePassword.isValid() && keystore.isValid() && keystorePassword.isValid()) { - addEnvVar(env, ENV_KAFKASQL_SECURITY_PROTOCOL, "SSL"); + addEnvVar(env, KAFKASQL_SECURITY_PROTOCOL, "SSL"); // ===== Keystore - addEnvVar(env, ENV_KAFKASQL_SSL_KEYSTORE_TYPE, "PKCS12"); + addEnvVar(env, KAFKASQL_SSL_KEYSTORE_TYPE, "PKCS12"); keystore.applySecretVolume(deployment, containerName); - addEnvVar(env, ENV_KAFKASQL_SSL_KEYSTORE_LOCATION, keystore.getSecretVolumeKeyPath()); - keystorePassword.applySecretEnvVar(env, ENV_KAFKASQL_SSL_KEYSTORE_PASSWORD); + addEnvVar(env, KAFKASQL_SSL_KEYSTORE_LOCATION, keystore.getSecretVolumeKeyPath()); + keystorePassword.applySecretEnvVar(env, KAFKASQL_SSL_KEYSTORE_PASSWORD); // ===== Truststore - addEnvVar(env, ENV_KAFKASQL_SSL_TRUSTSTORE_TYPE, "PKCS12"); + addEnvVar(env, KAFKASQL_SSL_TRUSTSTORE_TYPE, "PKCS12"); truststore.applySecretVolume(deployment, containerName); - addEnvVar(env, ENV_KAFKASQL_SSL_TRUSTSTORE_LOCATION, truststore.getSecretVolumeKeyPath()); - truststorePassword.applySecretEnvVar(env, ENV_KAFKASQL_SSL_TRUSTSTORE_PASSWORD); + addEnvVar(env, KAFKASQL_SSL_TRUSTSTORE_LOCATION, truststore.getSecretVolumeKeyPath()); + truststorePassword.applySecretEnvVar(env, KAFKASQL_SSL_TRUSTSTORE_PASSWORD); return true; } return false; } + + private static Optional getKafkaSqlTLSSpec(ApicurioRegistry3 primary) { + // spotless:off + return ofNullable(primary) + .map(ApicurioRegistry3::getSpec) + .map(ApicurioRegistry3Spec::getApp) + .map(AppSpec::getStorage) + .map(StorageSpec::getKafkasql) + .map(KafkaSqlSpec::getTls); + // spotless:on + } } diff --git a/operator/controller/src/main/java/io/apicurio/registry/operator/resource/app/AppDeploymentResource.java b/operator/controller/src/main/java/io/apicurio/registry/operator/resource/app/AppDeploymentResource.java index 69cb626b3e..611147fcf9 100644 --- a/operator/controller/src/main/java/io/apicurio/registry/operator/resource/app/AppDeploymentResource.java +++ b/operator/controller/src/main/java/io/apicurio/registry/operator/resource/app/AppDeploymentResource.java @@ -52,7 +52,7 @@ public AppDeploymentResource() { @Override protected Deployment desired(ApicurioRegistry3 primary, Context context) { - var d = APP_DEPLOYMENT_KEY.getFactory().apply(primary); + var deployment = APP_DEPLOYMENT_KEY.getFactory().apply(primary); var envVars = new LinkedHashMap(); ofNullable(primary.getSpec()).map(ApicurioRegistry3Spec::getApp).map(AppSpec::getEnv) @@ -93,16 +93,16 @@ protected Deployment desired(ApicurioRegistry3 primary, Context { switch (storageType) { case POSTGRESQL -> PostgresSql.configureDatasource(primary, envVars); - case KAFKASQL -> KafkaSql.configureKafkaSQL(primary, d, envVars); + case KAFKASQL -> KafkaSql.configureKafkaSQL(primary, deployment, envVars); } }); // Set the ENV VARs on the deployment's container spec. - var container = getContainerFromDeployment(d, REGISTRY_APP_CONTAINER_NAME); + var container = getContainerFromDeployment(deployment, REGISTRY_APP_CONTAINER_NAME); container.setEnv(envVars.values().stream().toList()); - log.debug("Desired {} is {}", APP_DEPLOYMENT_KEY.getId(), toYAML(d)); - return d; + log.debug("Desired {} is {}", APP_DEPLOYMENT_KEY.getId(), toYAML(deployment)); + return deployment; } public static void addEnvVar(Map map, EnvVar envVar) { diff --git a/operator/controller/src/main/java/io/apicurio/registry/operator/utils/SecretKeyRefTool.java b/operator/controller/src/main/java/io/apicurio/registry/operator/utils/SecretKeyRefTool.java index d0098cd740..5789e12c62 100644 --- a/operator/controller/src/main/java/io/apicurio/registry/operator/utils/SecretKeyRefTool.java +++ b/operator/controller/src/main/java/io/apicurio/registry/operator/utils/SecretKeyRefTool.java @@ -13,6 +13,11 @@ import static io.apicurio.registry.operator.resource.app.AppDeploymentResource.getContainerFromDeployment; import static io.apicurio.registry.operator.utils.Utils.isBlank; +/** + * This is a utility wrapper around {@link SecretKeyRef}, that helps with using the Secret reference in the + * target Deployment. Usually, a secret value is either provided in an env. variable or accessed as a file. + * This class helps with both use cases. + */ public class SecretKeyRefTool { private SecretKeyRef secretKeyRef; @@ -54,11 +59,17 @@ private String getSecretVolumeMountPath() { return "/etc/" + getSecretVolumeName(); } + /** + * @return a path to a file mounted within the container that contains the value from the + * {@link SecretKeyRef} + */ public String getSecretVolumeKeyPath() { return getSecretVolumeMountPath() + "/" + secretKeyRef.getKey(); } /** + * Mount the Secret as a volume and configure its mount inside the container. + *

* Use this method in case the {@link SecretKeyRef} references a file to be mounted into the pod. You can * then use {@link #getSecretVolumeKeyPath()} to get the path to the file within the pod. */ @@ -69,6 +80,8 @@ public void applySecretVolume(Deployment deployment, String containerName) { } /** + * Add an env. variable that references a value from the Secret. + *

* Use this method in case the {@link SecretKeyRef} references data to be provided as an env. variable. */ public void applySecretEnvVar(Map env, String envVarName) { diff --git a/operator/controller/src/test/java/io/apicurio/registry/operator/it/KafkaSqlITTest.java b/operator/controller/src/test/java/io/apicurio/registry/operator/it/KafkaSqlITTest.java index 1a8ba65428..a9eb359c78 100644 --- a/operator/controller/src/test/java/io/apicurio/registry/operator/it/KafkaSqlITTest.java +++ b/operator/controller/src/test/java/io/apicurio/registry/operator/it/KafkaSqlITTest.java @@ -36,9 +36,6 @@ void testKafkaSQLPlain() { client.load(KafkaSqlITTest.class .getResourceAsStream("/k8s/examples/kafkasql/plain/example-cluster.kafka.yaml")).create(); final var clusterName = "example-cluster"; - // client.load(getClass().getResourceAsStream("/k8s/examples/kafkasql/plain/example-cluster.kafka.yaml")) - // .createOrReplace(); - // final var clusterName = "example-cluster"; await().ignoreExceptions().untilAsserted(() -> // Strimzi uses StrimziPodSet instead of ReplicaSet, so we have to check pods @@ -52,8 +49,6 @@ void testKafkaSQLPlain() { var registry = deserialize( "k8s/examples/kafkasql/plain/example-kafkasql-plain.apicurioregistry3.yaml", ApicurioRegistry3.class); - // var registry = deserialize("k8s/examples/kafkasql/plain/kafkasql-plain.apicurioregistry3.yaml", - // ApicurioRegistry3.class); registry.getMetadata().setNamespace(namespace); registry.getSpec().getApp().getStorage().getKafkasql().setBootstrapServers(bootstrapServers);