Skip to content

Commit

Permalink
API Tweaks
Browse files Browse the repository at this point in the history
  • Loading branch information
tetriscode committed Aug 17, 2019
1 parent 209ba76 commit 0088ffc
Show file tree
Hide file tree
Showing 9 changed files with 573 additions and 348 deletions.
Original file line number Diff line number Diff line change
@@ -1,70 +1,70 @@
/*
* Copyright 2017-2019 original authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.micronaut.configuration.kafka.streams;

import io.micronaut.configuration.kafka.config.AbstractKafkaConfiguration;
import io.micronaut.configuration.kafka.config.KafkaDefaultConfiguration;
import io.micronaut.context.env.Environment;
import io.micronaut.core.util.StringUtils;
import io.micronaut.runtime.ApplicationConfiguration;
import org.apache.kafka.streams.StreamsConfig;

import java.io.File;
import java.util.Properties;

/**
* Abstract streams configuration.
*
* @author graemerocher
* @since 1.0
* @param <K> The key deserializer type
* @param <V> The value deserializer type
*/
public abstract class AbstractKafkaStreamsConfiguration<K, V> extends AbstractKafkaConfiguration<K, V> implements KafkaStreamsConfiguration {

/**
* Construct a new {@link AbstractKafkaStreamsConfiguration} for the given defaults.
*
* @param defaultConfiguration The default configuration
*/
protected AbstractKafkaStreamsConfiguration(KafkaDefaultConfiguration defaultConfiguration) {
super(new Properties());
Properties config = getConfig();
config.putAll(defaultConfiguration.getConfig());
}

/**
* Shared initialization.
*
* @param applicationConfiguration The application config
* @param environment The env
* @param config The config to be initialized
*/
protected void init(ApplicationConfiguration applicationConfiguration, Environment environment, Properties config) {
// set the default application id
String applicationName = applicationConfiguration.getName().orElse(Environment.DEFAULT_NAME);
config.putIfAbsent(StreamsConfig.APPLICATION_ID_CONFIG, applicationName);

if (environment.getActiveNames().contains(Environment.TEST)) {
String tmpDir = System.getProperty("java.io.tmpdir");
if (StringUtils.isNotEmpty(tmpDir)) {
if (new File(tmpDir, applicationName).mkdirs()) {
config.putIfAbsent(StreamsConfig.STATE_DIR_CONFIG, tmpDir);
}
}
}
}
}
/*
* Copyright 2017-2019 original authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.micronaut.configuration.kafka.streams;

import io.micronaut.configuration.kafka.config.AbstractKafkaConfiguration;
import io.micronaut.configuration.kafka.config.KafkaDefaultConfiguration;
import io.micronaut.context.env.Environment;
import io.micronaut.core.util.StringUtils;
import io.micronaut.runtime.ApplicationConfiguration;
import org.apache.kafka.streams.StreamsConfig;

import java.io.File;
import java.util.Properties;

/**
* Abstract streams configuration.
*
* @param <K> The key deserializer type
* @param <V> The value deserializer type
* @author graemerocher
* @since 1.0
*/
public abstract class AbstractKafkaStreamsConfiguration<K, V> extends AbstractKafkaConfiguration<K, V> implements KafkaStreamsConfiguration {

/**
* Construct a new {@link AbstractKafkaStreamsConfiguration} for the given defaults.
*
* @param defaultConfiguration The default configuration
*/
protected AbstractKafkaStreamsConfiguration(KafkaDefaultConfiguration defaultConfiguration) {
super(new Properties());
Properties config = getConfig();
config.putAll(defaultConfiguration.getConfig());
}

/**
* Shared initialization.
*
* @param applicationConfiguration The application config
* @param environment The env
* @param config The config to be initialized
*/
protected void init(ApplicationConfiguration applicationConfiguration, Environment environment, Properties config) {
// set the default application id
String applicationName = applicationConfiguration.getName().orElse(Environment.DEFAULT_NAME);
config.putIfAbsent(StreamsConfig.APPLICATION_ID_CONFIG, applicationName);

if (environment.getActiveNames().contains(Environment.TEST)) {
String tmpDir = System.getProperty("java.io.tmpdir");
if (StringUtils.isNotEmpty(tmpDir)) {
if (new File(tmpDir, applicationName).mkdirs()) {
config.putIfAbsent(StreamsConfig.STATE_DIR_CONFIG, tmpDir);
}
}
}
}
}
Original file line number Diff line number Diff line change
@@ -1,50 +1,51 @@
/*
* Copyright 2017-2019 original authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.micronaut.configuration.kafka.streams;

import org.apache.kafka.streams.StreamsBuilder;

import javax.annotation.Nonnull;
import java.util.Properties;

/**
* Extended version of {@link StreamsBuilder} that can be configured.
*
* @author graemerocher
* @since 1.0
*/
public class ConfiguredStreamBuilder extends StreamsBuilder {

private final Properties configuration = new Properties();

/**
* Default constructor.
*
* @param configuration The configuration
*/
public ConfiguredStreamBuilder(KafkaStreamsConfiguration configuration) {
this.configuration.putAll(configuration.getConfig());
}

/**
* The configuration. Can be mutated.
*
* @return The configuration
*/
public @Nonnull Properties getConfiguration() {
return configuration;
}
}
/*
* Copyright 2017-2019 original authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.micronaut.configuration.kafka.streams;

import org.apache.kafka.streams.StreamsBuilder;

import javax.annotation.Nonnull;
import java.util.Properties;

/**
* Extended version of {@link StreamsBuilder} that can be configured.
*
* @author graemerocher
* @since 1.0
*/
public class ConfiguredStreamBuilder extends StreamsBuilder {

private final Properties configuration = new Properties();

/**
* Default constructor.
*
* @param configuration The configuration
*/
public ConfiguredStreamBuilder(KafkaStreamsConfiguration configuration) {
this.configuration.putAll(configuration.getConfig());
}

/**
* The configuration. Can be mutated.
*
* @return The configuration
*/
public @Nonnull
Properties getConfiguration() {
return configuration;
}
}
Original file line number Diff line number Diff line change
@@ -1,54 +1,54 @@
/*
* Copyright 2017-2019 original authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.micronaut.configuration.kafka.streams;

import io.micronaut.configuration.kafka.config.KafkaDefaultConfiguration;
import io.micronaut.context.annotation.Primary;
import io.micronaut.context.annotation.Requires;
import io.micronaut.context.env.Environment;
import io.micronaut.runtime.ApplicationConfiguration;

import javax.inject.Named;
import javax.inject.Singleton;

/**
* The default streams configuration is non other is present.
*
* @author graemerocher
* @since 1.0
* @param <K>
* @param <V>
*/
@Requires(missingProperty = NamedKafkaStreamsConfiguration.PREFIX + ".default")
@Singleton
@Requires(beans = KafkaDefaultConfiguration.class)
@Named("default")
@Primary
public class DefaultKafkaStreamsConfiguration<K, V> extends AbstractKafkaStreamsConfiguration<K, V> {
/**
* Construct a new {@link DefaultKafkaStreamsConfiguration} for the given defaults.
*
* @param defaultConfiguration The default configuration
* @param applicationConfiguration The application configuration
* @param environment The environment
*/
public DefaultKafkaStreamsConfiguration(KafkaDefaultConfiguration defaultConfiguration,
ApplicationConfiguration applicationConfiguration,
Environment environment) {
super(defaultConfiguration);
init(applicationConfiguration, environment, getConfig());
}
}
/*
* Copyright 2017-2019 original authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.micronaut.configuration.kafka.streams;

import io.micronaut.configuration.kafka.config.KafkaDefaultConfiguration;
import io.micronaut.context.annotation.Primary;
import io.micronaut.context.annotation.Requires;
import io.micronaut.context.env.Environment;
import io.micronaut.runtime.ApplicationConfiguration;

import javax.inject.Named;
import javax.inject.Singleton;

/**
* The default streams configuration is non other is present.
*
* @param <K>
* @param <V>
* @author graemerocher
* @since 1.0
*/
@Requires(missingProperty = NamedKafkaStreamsConfiguration.PREFIX + ".default")
@Singleton
@Requires(beans = KafkaDefaultConfiguration.class)
@Named("default")
@Primary
public class DefaultKafkaStreamsConfiguration<K, V> extends AbstractKafkaStreamsConfiguration<K, V> {
/**
* Construct a new {@link DefaultKafkaStreamsConfiguration} for the given defaults.
*
* @param defaultConfiguration The default configuration
* @param applicationConfiguration The application configuration
* @param environment The environment
*/
public DefaultKafkaStreamsConfiguration(KafkaDefaultConfiguration defaultConfiguration,
ApplicationConfiguration applicationConfiguration,
Environment environment) {
super(defaultConfiguration);
init(applicationConfiguration, environment, getConfig());
}
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package io.micronaut.configuration.kafka.streams;

import java.util.Properties;

public interface KafkaStreamsConfiguration {
Properties getConfig();
}
package io.micronaut.configuration.kafka.streams;

import java.util.Properties;

public interface KafkaStreamsConfiguration {
Properties getConfig();
}
Loading

0 comments on commit 0088ffc

Please sign in to comment.