Skip to content

Commit

Permalink
refactor(operator): code improvements from PR review
Browse files Browse the repository at this point in the history
  • Loading branch information
jsenko committed Jan 27, 2025
1 parent 81c2b3c commit ca3931f
Show file tree
Hide file tree
Showing 5 changed files with 50 additions and 51 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,12 @@ public class EnvironmentVariables {

public static final String APICURIO_REST_MUTABILITY_ARTIFACT_VERSION_CONTENT_ENABLED = "APICURIO_REST_MUTABILITY_ARTIFACT-VERSION-CONTENT_ENABLED";

private static final String KAFKA_PREFIX = "APICURIO_KAFKA_COMMON_";
public static final String KAFKASQL_SECURITY_PROTOCOL = KAFKA_PREFIX + "SECURITY_PROTOCOL";
public static final String KAFKASQL_SSL_KEYSTORE_TYPE = KAFKA_PREFIX + "SSL_KEYSTORE_TYPE";
public static final String KAFKASQL_SSL_KEYSTORE_LOCATION = KAFKA_PREFIX + "SSL_KEYSTORE_LOCATION";
public static final String KAFKASQL_SSL_KEYSTORE_PASSWORD = KAFKA_PREFIX + "SSL_KEYSTORE_PASSWORD";
public static final String KAFKASQL_SSL_TRUSTSTORE_TYPE = KAFKA_PREFIX + "SSL_TRUSTSTORE_TYPE";
public static final String KAFKASQL_SSL_TRUSTSTORE_LOCATION = KAFKA_PREFIX + "SSL_TRUSTSTORE_LOCATION";
public static final String KAFKASQL_SSL_TRUSTSTORE_PASSWORD = KAFKA_PREFIX + "SSL_TRUSTSTORE_PASSWORD";
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,87 +11,70 @@
import io.fabric8.kubernetes.api.model.apps.Deployment;

import java.util.Map;
import java.util.Optional;

import static io.apicurio.registry.operator.EnvironmentVariables.*;
import static io.apicurio.registry.operator.resource.app.AppDeploymentResource.addEnvVar;
import static java.util.Optional.ofNullable;

public class KafkaSqlTLS {

public static final String ENV_KAFKASQL_SECURITY_PROTOCOL = "APICURIO_KAFKA_COMMON_SECURITY_PROTOCOL";

public static final String ENV_KAFKASQL_SSL_KEYSTORE_TYPE = "APICURIO_KAFKA_COMMON_SSL_KEYSTORE_TYPE";
public static final String ENV_KAFKASQL_SSL_KEYSTORE_LOCATION = "APICURIO_KAFKA_COMMON_SSL_KEYSTORE_LOCATION";
public static final String ENV_KAFKASQL_SSL_KEYSTORE_PASSWORD = "APICURIO_KAFKA_COMMON_SSL_KEYSTORE_PASSWORD";

public static final String ENV_KAFKASQL_SSL_TRUSTSTORE_TYPE = "APICURIO_KAFKA_COMMON_SSL_TRUSTSTORE_TYPE";
public static final String ENV_KAFKASQL_SSL_TRUSTSTORE_LOCATION = "APICURIO_KAFKA_COMMON_SSL_TRUSTSTORE_LOCATION";
public static final String ENV_KAFKASQL_SSL_TRUSTSTORE_PASSWORD = "APICURIO_KAFKA_COMMON_SSL_TRUSTSTORE_PASSWORD";

/**
* Plain KafkaSQL must be already configured.
*/
public static boolean configureKafkaSQLTLS(ApicurioRegistry3 primary, Deployment deployment,
String containerName, Map<String, EnvVar> env) {

// spotless:off
var keystore = new SecretKeyRefTool(ofNullable(primary)
.map(ApicurioRegistry3::getSpec)
.map(ApicurioRegistry3Spec::getApp)
.map(AppSpec::getStorage)
.map(StorageSpec::getKafkasql)
.map(KafkaSqlSpec::getTls)
var keystore = new SecretKeyRefTool(getKafkaSqlTLSSpec(primary)
.map(KafkaSqlTLSSpec::getKeystoreSecretRef)
.orElse(null), "user.p12");

var keystorePassword = new SecretKeyRefTool(ofNullable(primary)
.map(ApicurioRegistry3::getSpec)
.map(ApicurioRegistry3Spec::getApp)
.map(AppSpec::getStorage)
.map(StorageSpec::getKafkasql)
.map(KafkaSqlSpec::getTls)
var keystorePassword = new SecretKeyRefTool(getKafkaSqlTLSSpec(primary)
.map(KafkaSqlTLSSpec::getKeystorePasswordSecretRef)
.orElse(null), "user.password");

var truststore = new SecretKeyRefTool(ofNullable(primary)
.map(ApicurioRegistry3::getSpec)
.map(ApicurioRegistry3Spec::getApp)
.map(AppSpec::getStorage)
.map(StorageSpec::getKafkasql)
.map(KafkaSqlSpec::getTls)
var truststore = new SecretKeyRefTool(getKafkaSqlTLSSpec(primary)
.map(KafkaSqlTLSSpec::getTruststoreSecretRef)
.orElse(null), "ca.p12");

var truststorePassword = new SecretKeyRefTool(ofNullable(primary)
.map(ApicurioRegistry3::getSpec)
.map(ApicurioRegistry3Spec::getApp)
.map(AppSpec::getStorage)
.map(StorageSpec::getKafkasql)
.map(KafkaSqlSpec::getTls)
var truststorePassword = new SecretKeyRefTool(getKafkaSqlTLSSpec(primary)
.map(KafkaSqlTLSSpec::getTruststorePasswordSecretRef)
.orElse(null), "ca.password");
// spotless:on

if (truststore.isValid() && truststorePassword.isValid() && keystore.isValid()
&& keystorePassword.isValid()) {

addEnvVar(env, ENV_KAFKASQL_SECURITY_PROTOCOL, "SSL");
addEnvVar(env, KAFKASQL_SECURITY_PROTOCOL, "SSL");

// ===== Keystore

addEnvVar(env, ENV_KAFKASQL_SSL_KEYSTORE_TYPE, "PKCS12");
addEnvVar(env, KAFKASQL_SSL_KEYSTORE_TYPE, "PKCS12");
keystore.applySecretVolume(deployment, containerName);
addEnvVar(env, ENV_KAFKASQL_SSL_KEYSTORE_LOCATION, keystore.getSecretVolumeKeyPath());
keystorePassword.applySecretEnvVar(env, ENV_KAFKASQL_SSL_KEYSTORE_PASSWORD);
addEnvVar(env, KAFKASQL_SSL_KEYSTORE_LOCATION, keystore.getSecretVolumeKeyPath());
keystorePassword.applySecretEnvVar(env, KAFKASQL_SSL_KEYSTORE_PASSWORD);

// ===== Truststore

addEnvVar(env, ENV_KAFKASQL_SSL_TRUSTSTORE_TYPE, "PKCS12");
addEnvVar(env, KAFKASQL_SSL_TRUSTSTORE_TYPE, "PKCS12");
truststore.applySecretVolume(deployment, containerName);
addEnvVar(env, ENV_KAFKASQL_SSL_TRUSTSTORE_LOCATION, truststore.getSecretVolumeKeyPath());
truststorePassword.applySecretEnvVar(env, ENV_KAFKASQL_SSL_TRUSTSTORE_PASSWORD);
addEnvVar(env, KAFKASQL_SSL_TRUSTSTORE_LOCATION, truststore.getSecretVolumeKeyPath());
truststorePassword.applySecretEnvVar(env, KAFKASQL_SSL_TRUSTSTORE_PASSWORD);

return true;
}
return false;
}

private static Optional<KafkaSqlTLSSpec> getKafkaSqlTLSSpec(ApicurioRegistry3 primary) {
// spotless:off
return ofNullable(primary)
.map(ApicurioRegistry3::getSpec)
.map(ApicurioRegistry3Spec::getApp)
.map(AppSpec::getStorage)
.map(StorageSpec::getKafkasql)
.map(KafkaSqlSpec::getTls);
// spotless:on
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ public AppDeploymentResource() {
@Override
protected Deployment desired(ApicurioRegistry3 primary, Context<ApicurioRegistry3> context) {

var d = APP_DEPLOYMENT_KEY.getFactory().apply(primary);
var deployment = APP_DEPLOYMENT_KEY.getFactory().apply(primary);

var envVars = new LinkedHashMap<String, EnvVar>();
ofNullable(primary.getSpec()).map(ApicurioRegistry3Spec::getApp).map(AppSpec::getEnv)
Expand Down Expand Up @@ -93,16 +93,16 @@ protected Deployment desired(ApicurioRegistry3 primary, Context<ApicurioRegistry
.map(StorageSpec::getType).ifPresent(storageType -> {
switch (storageType) {
case POSTGRESQL -> PostgresSql.configureDatasource(primary, envVars);
case KAFKASQL -> KafkaSql.configureKafkaSQL(primary, d, envVars);
case KAFKASQL -> KafkaSql.configureKafkaSQL(primary, deployment, envVars);
}
});

// Set the ENV VARs on the deployment's container spec.
var container = getContainerFromDeployment(d, REGISTRY_APP_CONTAINER_NAME);
var container = getContainerFromDeployment(deployment, REGISTRY_APP_CONTAINER_NAME);
container.setEnv(envVars.values().stream().toList());

log.debug("Desired {} is {}", APP_DEPLOYMENT_KEY.getId(), toYAML(d));
return d;
log.debug("Desired {} is {}", APP_DEPLOYMENT_KEY.getId(), toYAML(deployment));
return deployment;
}

public static void addEnvVar(Map<String, EnvVar> map, EnvVar envVar) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,11 @@
import static io.apicurio.registry.operator.resource.app.AppDeploymentResource.getContainerFromDeployment;
import static io.apicurio.registry.operator.utils.Utils.isBlank;

/**
* This is a utility wrapper around {@link SecretKeyRef}, that helps with using the Secret reference in the
* target Deployment. Usually, a secret value is either provided in an env. variable or accessed as a file.
* This class helps with both use cases.
*/
public class SecretKeyRefTool {

private SecretKeyRef secretKeyRef;
Expand Down Expand Up @@ -54,11 +59,17 @@ private String getSecretVolumeMountPath() {
return "/etc/" + getSecretVolumeName();
}

/**
* @return a path to a file mounted within the container that contains the value from the
* {@link SecretKeyRef}
*/
public String getSecretVolumeKeyPath() {
return getSecretVolumeMountPath() + "/" + secretKeyRef.getKey();
}

/**
* Mount the Secret as a volume and configure its mount inside the container.
* <p>
* Use this method in case the {@link SecretKeyRef} references a file to be mounted into the pod. You can
* then use {@link #getSecretVolumeKeyPath()} to get the path to the file within the pod.
*/
Expand All @@ -69,6 +80,8 @@ public void applySecretVolume(Deployment deployment, String containerName) {
}

/**
* Add an env. variable that references a value from the Secret.
* <p>
* Use this method in case the {@link SecretKeyRef} references data to be provided as an env. variable.
*/
public void applySecretEnvVar(Map<String, EnvVar> env, String envVarName) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,6 @@ void testKafkaSQLPlain() {
client.load(KafkaSqlITTest.class
.getResourceAsStream("/k8s/examples/kafkasql/plain/example-cluster.kafka.yaml")).create();
final var clusterName = "example-cluster";
// client.load(getClass().getResourceAsStream("/k8s/examples/kafkasql/plain/example-cluster.kafka.yaml"))
// .createOrReplace();
// final var clusterName = "example-cluster";

await().ignoreExceptions().untilAsserted(() ->
// Strimzi uses StrimziPodSet instead of ReplicaSet, so we have to check pods
Expand All @@ -52,8 +49,6 @@ void testKafkaSQLPlain() {
var registry = deserialize(
"k8s/examples/kafkasql/plain/example-kafkasql-plain.apicurioregistry3.yaml",
ApicurioRegistry3.class);
// var registry = deserialize("k8s/examples/kafkasql/plain/kafkasql-plain.apicurioregistry3.yaml",
// ApicurioRegistry3.class);
registry.getMetadata().setNamespace(namespace);
registry.getSpec().getApp().getStorage().getKafkasql().setBootstrapServers(bootstrapServers);

Expand Down

0 comments on commit ca3931f

Please sign in to comment.