Skip to content

Commit

Permalink
DEVX-1881: Update microservices code to accept a config file
Browse files Browse the repository at this point in the history
Additionally the examples are converted to utilize commons-cli for command line argument
parsing instead of relying on positional arguments
  • Loading branch information
rspurgeon committed Jun 30, 2020
1 parent 642787d commit d5f561e
Show file tree
Hide file tree
Showing 28 changed files with 912 additions and 203 deletions.
5 changes: 5 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,11 @@
<artifactId>avro-maven-plugin</artifactId>
<version>${avro.version}</version>
</dependency>
<dependency>
<groupId>commons-cli</groupId>
<artifactId>commons-cli</artifactId>
<version>1.4</version>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,33 +4,39 @@
import io.confluent.examples.streams.microservices.domain.Schemas;
import io.confluent.examples.streams.microservices.domain.Schemas.Topics;
import io.confluent.examples.streams.utils.MonitoringInterceptorUtils;
import org.apache.commons.cli.*;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KeyValue;

import java.io.IOException;
import java.util.List;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.ExecutionException;

import static io.confluent.examples.streams.avro.microservices.Product.JUMPERS;
import static io.confluent.examples.streams.avro.microservices.Product.UNDERPANTS;
import static io.confluent.examples.streams.microservices.util.MicroserviceUtils.ProductTypeSerde;
import static io.confluent.examples.streams.microservices.util.MicroserviceUtils.*;
import static java.util.Arrays.asList;

public class AddInventory {

private static void sendInventory(final List<KeyValue<Product, Integer>> inventory,
final Schemas.Topic<Product, Integer> topic,
final String bootstrapServers) {
final String bootstrapServers,
final Properties defaultConfig) {

final Properties producerConfig = new Properties();
producerConfig.putAll(defaultConfig);
producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
producerConfig.put(ProducerConfig.ACKS_CONFIG, "all");
producerConfig.put(ProducerConfig.RETRIES_CONFIG, 0);
producerConfig.put(ProducerConfig.CLIENT_ID_CONFIG, "inventory-generator");
MonitoringInterceptorUtils.maybeConfigureInterceptorsProducer(producerConfig);

final ProductTypeSerde productSerde = new ProductTypeSerde();

try (final KafkaProducer<Product, Integer> stockProducer = new KafkaProducer<>(
Expand All @@ -46,19 +52,50 @@ private static void sendInventory(final List<KeyValue<Product, Integer>> invento
}
}

public static void main(final String[] args) {
public static void main(final String[] args) throws Exception {

final Options opts = new Options();
opts.addOption(Option.builder("b")
.longOpt("bootstrap-server").hasArg().desc("Kafka cluster bootstrap server string").build())
.addOption(Option.builder("c")
.longOpt("config-file").hasArg().desc("Java properties file with configurations for Kafka Clients").build())
.addOption(Option.builder("h")
.longOpt("help").hasArg(false).desc("Show usage information").build())
.addOption(Option.builder("u")
.longOpt("underpants").hasArg().desc("Quantity of underpants to add to inventory").build())
.addOption(Option.builder("j")
.longOpt("jumpers").hasArg().desc("Quantity of jumpers to add to inventory").build());

final CommandLine cl = new DefaultParser().parse(opts, args);

if (cl.hasOption("h")) {
final HelpFormatter formatter = new HelpFormatter();
formatter.printHelp("Add Inventory", opts);
return;
}

final int quantityUnderpants = Integer.parseInt(cl.getOptionValue("u", "20"));
final int quantityJumpers = Integer.parseInt(cl.getOptionValue("j", "10"));

final String bootstrapServers = cl.getOptionValue("b", DEFAULT_BOOTSTRAP_SERVERS);

final int quantityUnderpants = args.length > 0 ? Integer.parseInt(args[0]) : 20;
final int quantityJumpers = args.length > 1 ? Integer.parseInt(args[1]) : 10;
final String bootstrapServers = args.length > 2 ? args[2] : "localhost:9092";
final Properties defaultConfig = Optional.ofNullable(cl.getOptionValue("config-file", null))
.map(path -> {
try {
return buildPropertiesFromConfigFile(path);
} catch (final IOException e) {
throw new RuntimeException(e);
}
})
.orElse(new Properties());

// Send Inventory
final List<KeyValue<Product, Integer>> inventory = asList(
new KeyValue<>(UNDERPANTS, quantityUnderpants),
new KeyValue<>(JUMPERS, quantityJumpers)
);
System.out.printf("Send inventory to %s%n", Topics.WAREHOUSE_INVENTORY);
sendInventory(inventory, Topics.WAREHOUSE_INVENTORY, bootstrapServers);
sendInventory(inventory, Topics.WAREHOUSE_INVENTORY, bootstrapServers, defaultConfig);

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,23 @@
import static io.confluent.examples.streams.microservices.domain.Schemas.Topics.ORDERS;
import static io.confluent.examples.streams.microservices.domain.Schemas.Topics.ORDERS_ENRICHED;
import static io.confluent.examples.streams.microservices.domain.Schemas.Topics.PAYMENTS;
import static io.confluent.examples.streams.microservices.util.MicroserviceUtils.addShutdownHookAndBlock;
import static io.confluent.examples.streams.microservices.util.MicroserviceUtils.baseStreamsConfig;
import static io.confluent.examples.streams.microservices.util.MicroserviceUtils.parseArgsAndConfigure;
import static io.confluent.examples.streams.microservices.util.MicroserviceUtils.*;

import io.confluent.examples.streams.avro.microservices.Customer;
import io.confluent.examples.streams.avro.microservices.Order;
import io.confluent.examples.streams.avro.microservices.OrderEnriched;
import io.confluent.examples.streams.avro.microservices.Payment;

import io.confluent.examples.streams.microservices.domain.Schemas;
import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig;
import org.apache.commons.cli.*;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.Consumed;

import java.io.IOException;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.streams.KafkaStreams.State;
Expand Down Expand Up @@ -48,8 +53,10 @@ public EmailService(final Emailer emailer) {
}

@Override
public void start(final String bootstrapServers, final String stateDir) {
streams = processStreams(bootstrapServers, stateDir);
public void start(final String bootstrapServers,
final String stateDir,
final Properties defaultConfig) {
streams = processStreams(bootstrapServers, stateDir, defaultConfig);
streams.cleanUp(); //don't do this in prod as it clears your state stores
final CountDownLatch startLatch = new CountDownLatch(1);
streams.setStateListener((newState, oldState) -> {
Expand All @@ -70,7 +77,9 @@ public void start(final String bootstrapServers, final String stateDir) {

}

private KafkaStreams processStreams(final String bootstrapServers, final String stateDir) {
private KafkaStreams processStreams(final String bootstrapServers,
final String stateDir,
final Properties defaultConfig) {

final StreamsBuilder builder = new StreamsBuilder();

Expand Down Expand Up @@ -106,12 +115,62 @@ private KafkaStreams processStreams(final String bootstrapServers, final String
//TopicNameExtractor to get the topic name (i.e., customerLevel) from the enriched order record being sent
.to((orderId, orderEnriched, record) -> orderEnriched.getCustomerLevel(), Produced.with(ORDERS_ENRICHED.keySerde(), ORDERS_ENRICHED.valueSerde()));

return new KafkaStreams(builder.build(), baseStreamsConfig(bootstrapServers, stateDir, SERVICE_APP_ID));
return new KafkaStreams(builder.build(),
baseStreamsConfig(bootstrapServers, stateDir, SERVICE_APP_ID, defaultConfig));
}

public static void main(final String[] args) throws Exception {
final Options opts = new Options();
opts.addOption(Option.builder("b")
.longOpt("bootstrap-server")
.hasArg()
.desc("Kafka cluster bootstrap server string (ex: broker:9092)")
.build());
opts.addOption(Option.builder("s")
.longOpt("schema-registry")
.hasArg()
.desc("Schema Registry URL")
.build());
opts.addOption(Option.builder("c")
.longOpt("config-file")
.hasArg()
.desc("Java properties file with configurations for Kafka Clients")
.build());
opts.addOption(Option.builder("t")
.longOpt("state-dir")
.hasArg()
.desc("The directory for state storage")
.build());
opts.addOption(Option.builder("h").longOpt("help").hasArg(false).desc("Show usage information").build());

final CommandLine cl = new DefaultParser().parse(opts, args);

if (cl.hasOption("h")) {
final HelpFormatter formatter = new HelpFormatter();
formatter.printHelp("Email Service", opts);
return;
}
final EmailService service = new EmailService(new LoggingEmailer());
service.start(parseArgsAndConfigure(args), "/tmp/kafka-streams");

final Properties defaultConfig = Optional.ofNullable(cl.getOptionValue("config-file", null))
.map(path -> {
try {
return buildPropertiesFromConfigFile(path);
} catch (final IOException e) {
throw new RuntimeException(e);
}
})
.orElse(new Properties());


final String schemaRegistryUrl = cl.getOptionValue("schema-registry", DEFAULT_SCHEMA_REGISTRY_URL);
defaultConfig.put(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl);
Schemas.configureSerdes(defaultConfig);

service.start(
cl.getOptionValue("bootstrap-server", DEFAULT_BOOTSTRAP_SERVERS),
cl.getOptionValue("state-dir", "/tmp/kafka-streams-examples"),
defaultConfig);
addShutdownHookAndBlock(service);
}

Expand All @@ -137,8 +196,8 @@ interface Emailer {

public static class EmailTuple {

public Order order;
public Payment payment;
final public Order order;
final public Payment payment;
public Customer customer;

public EmailTuple(final Order order, final Payment payment) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,14 @@
import io.confluent.examples.streams.avro.microservices.OrderValidation;
import io.confluent.examples.streams.avro.microservices.OrderValue;
import io.confluent.examples.streams.microservices.domain.Schemas;

import java.io.IOException;
import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig;
import org.apache.commons.cli.*;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
Expand All @@ -32,9 +38,7 @@
import static io.confluent.examples.streams.avro.microservices.OrderValidationType.FRAUD_CHECK;
import static io.confluent.examples.streams.microservices.domain.Schemas.Topics.ORDERS;
import static io.confluent.examples.streams.microservices.domain.Schemas.Topics.ORDER_VALIDATIONS;
import static io.confluent.examples.streams.microservices.util.MicroserviceUtils.addShutdownHookAndBlock;
import static io.confluent.examples.streams.microservices.util.MicroserviceUtils.baseStreamsConfig;
import static io.confluent.examples.streams.microservices.util.MicroserviceUtils.parseArgsAndConfigure;
import static io.confluent.examples.streams.microservices.util.MicroserviceUtils.*;


/**
Expand All @@ -52,8 +56,10 @@ public class FraudService implements Service {
private KafkaStreams streams;

@Override
public void start(final String bootstrapServers, final String stateDir) {
streams = processStreams(bootstrapServers, stateDir);
public void start(final String bootstrapServers,
final String stateDir,
final Properties defaultConfig) {
streams = processStreams(bootstrapServers, stateDir, defaultConfig);
streams.cleanUp(); //don't do this in prod as it clears your state stores
final CountDownLatch startLatch = new CountDownLatch(1);
streams.setStateListener((newState, oldState) -> {
Expand All @@ -75,7 +81,9 @@ public void start(final String bootstrapServers, final String stateDir) {
log.info("Started Service " + getClass().getSimpleName());
}

private KafkaStreams processStreams(final String bootstrapServers, final String stateDir) {
private KafkaStreams processStreams(final String bootstrapServers,
final String stateDir,
final Properties defaultConfig) {

//Latch onto instances of the orders and inventory topics
final StreamsBuilder builder = new StreamsBuilder();
Expand Down Expand Up @@ -121,7 +129,7 @@ private KafkaStreams processStreams(final String bootstrapServers, final String
//as caching in Kafka Streams will conflate subsequent updates for the same key. Disabling caching ensures
//we get a complete "changelog" from the aggregate(...) step above (i.e. every input event will have a
//corresponding output event.
final Properties props = baseStreamsConfig(bootstrapServers, stateDir, SERVICE_APP_ID);
final Properties props = baseStreamsConfig(bootstrapServers, stateDir, SERVICE_APP_ID, defaultConfig);
props.setProperty(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, "0");

return new KafkaStreams(builder.build(), props);
Expand All @@ -132,8 +140,56 @@ private OrderValue simpleMerge(final OrderValue a, final OrderValue b) {
}

public static void main(final String[] args) throws Exception {

final Options opts = new Options();
opts.addOption(Option.builder("b")
.longOpt("bootstrap-server")
.hasArg()
.desc("Kafka cluster bootstrap server string (ex: broker:9092)")
.build());
opts.addOption(Option.builder("s")
.longOpt("schema-registry")
.hasArg()
.desc("Schema Registry URL")
.build());
opts.addOption(Option.builder("c")
.longOpt("config-file")
.hasArg()
.desc("Java properties file with configurations for Kafka Clients")
.build());
opts.addOption(Option.builder("t")
.longOpt("state-dir")
.hasArg()
.desc("The directory for state storage")
.build());
opts.addOption(Option.builder("h").longOpt("help").hasArg(false).desc("Show usage information").build());

final CommandLine cl = new DefaultParser().parse(opts, args);

if (cl.hasOption("h")) {
final HelpFormatter formatter = new HelpFormatter();
formatter.printHelp("Fraud Service", opts);
return;
}
final FraudService service = new FraudService();
service.start(parseArgsAndConfigure(args), "/tmp/kafka-streams");
final Properties defaultConfig = Optional.ofNullable(cl.getOptionValue("config-file", null))
.map(path -> {
try {
return buildPropertiesFromConfigFile(path);
} catch (final IOException e) {
throw new RuntimeException(e);
}
})
.orElse(new Properties());

final String schemaRegistryUrl = cl.getOptionValue("schema-registry", DEFAULT_SCHEMA_REGISTRY_URL);
defaultConfig.put(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl);
Schemas.configureSerdes(defaultConfig);

service.start(
cl.getOptionValue("bootstrap-server", DEFAULT_BOOTSTRAP_SERVERS),
cl.getOptionValue("state-dir", "/tmp/kafka-streams-examples"),
defaultConfig);
addShutdownHookAndBlock(service);
}

Expand Down
Loading

0 comments on commit d5f561e

Please sign in to comment.