Introduction
I recently had a chance to play with Kafka Streams and CQRS and wanted to share my learnings via an example. For the example, I have selected a domain that represents Sellable Inventory, i.e, a computation of inventory that denotes what you can sell based of what you have on-hand and what has been reserved.
Kafka Streams
Many people have used Kafka in their workplace simply by virtue of it being the choice of technology for large throughput messaging. Kafka Streams builds upon Kafka to provide:
- Fault Tolerant processing of Streams with support for joins, windowing and aggregations
- Support for exactly once semantics
- Supports a rich notion of time of event for processing with support for: Event Time, Processing Time and Ingestion Time.
For more on Kafka Streams this video is a good introduction:
Architecture and Domain for the Example
Neha Narkhede has a nice post on the confluent site: Event Sourcing, CQRS, stream processing and Kafka: What's the connection? In that post, she discusses the concepts of Event Sourcing, CQRS and Stream processing and eludes to an example of a Retail Inventory solution that is implemented using the concepts but does not provide the source code. The domain for this BLOG is loosely based of the example she mentions there and loosely based of the GCP recipe on Building Real-Time Inventory Systems for Retail.
inventory-mutator
This module is responsible for inventory mutation actions. Consider this the command part of the CQRS pattern. There are two operations that it handles:
- Adjustments - Warehouse updates of inventory quantities. These are increments to availability of inventory
- Reservations - Decrements to inventory as it gets sold on a Retail website
Both the end points are part of the same application but emit mutations to separate Kafka topics as shown in the figure, inventory_adjustments and inventory_reservations. One might choose to separate both these operations, adjustments and reservations, into different Microservices in the real world in the interest of separation of concerns and scale but this example keeps it simple.
sellable-inventory-calculator
This is the stream processor that combines the adjustments and reservation streams to provide a stream of sellable inventory.
In the example, the sellable_inventory_calculator application is also a Microservice that serves up the sellable inventory at a REST endpoint. The Stream processor stores the partitioned sellable inventory data in a local State store. Every instance of the sellable-inventory-calculator application that embeds the Kafka Streams library, hosts a subset of the application state thus partitioning the data across the different instances. Fault tolerance for the Local State is provided by Kafka Streams by transparently logging all the updates to the Local State store to a separate Kafka Topic.
The code that is representative of the above is shown below:
When an instance of the sellable-inventory-service instance starts, it sets the the Stream configuration of APPLICATION_SERVER to its host:port as shown below:
This host/port computation is done by using the stream as shown below:
In the example, the sellable_inventory_calculator application is also a Microservice that serves up the sellable inventory at a REST endpoint. The Stream processor stores the partitioned sellable inventory data in a local State store. Every instance of the sellable-inventory-calculator application that embeds the Kafka Streams library, hosts a subset of the application state thus partitioning the data across the different instances. Fault tolerance for the Local State is provided by Kafka Streams by transparently logging all the updates to the Local State store to a separate Kafka Topic.
The code that is representative of the above is shown below:
StreamsBuilder kafkaStreamBuilder = new StreamsBuilder(); // KTable of adjustments KTable<LocationSku, Integer> adjustments = kafkaStreamBuilder .stream(Topics.INVENTORY_ADJUSTMENT.name(), Consumed.with(Topics.INVENTORY_ADJUSTMENT.keySerde(), Topics.INVENTORY_ADJUSTMENT.valueSerde())) .groupByKey(Serialized.<LocationSku, Integer>with(Topics.INVENTORY_ADJUSTMENT.keySerde(), Topics.INVENTORY_ADJUSTMENT.valueSerde())) .reduce((value1, value2) -> { return (value1 + value2); }); // KTable of reservations KTable<LocationSku, Integer> inventoryReservations = kafkaStreamBuilder .stream(Topics.INVENTORY_RESERVATION.name(), Consumed.with(Topics.INVENTORY_RESERVATION.keySerde(), Topics.INVENTORY_RESERVATION.valueSerde())) .groupByKey(Serialized.<LocationSku, Integer>with(Topics.INVENTORY_RESERVATION.keySerde(), Topics.INVENTORY_RESERVATION.valueSerde())) .reduce((value1, value2) -> { return (value1 + value2); }); // KTable of sellable inventory KTable<LocationSku, Integer> sellableInventory = adjustments.leftJoin(inventoryReservations, (adjustment, reservation) -> { LOGGER.info("Adjustment:{} Reservation {}", adjustment, reservation); return (adjustment - (reservation == null ? 0 : reservation)); }, Materialized.<LocationSku, Integer>as(Stores.persistentKeyValueStore(SELLABLE_INVENTORY_STORE)) .withKeySerde(Topics.SELLABLE_INVENTORY.keySerde()).withValueSerde(Topics.SELLABLE_INVENTORY.valueSerde())); // Send the sellable inventory to the sellable_inventory topic sellableInventory.toStream().map((key, value) -> { LOGGER.info("Streaming Key: {} Value: {}", key, value); return new KeyValue<LocationSku, Integer>(key, value); }).to(Topics.SELLABLE_INVENTORY.name(), Produced.<LocationSku, Integer>with(Topics.SELLABLE_INVENTORY.keySerde(), Topics.SELLABLE_INVENTORY.valueSerde())); sellableInventoryStream = new KafkaStreams(kafkaStreamBuilder.build(), streamsConfig); sellableInventoryStream.start();
@Bean public StreamsConfig streamConfig() { Map<String, Object> props = new HashMap<>(); props.put(StreamsConfig.APPLICATION_ID_CONFIG, "sellableInventoryCalculator"); props.put(StreamsConfig.APPLICATION_SERVER_CONFIG, getAppHostPort().getHost() + ":" + serverPort); // Server host/port ... return new StreamsConfig(props); }When a request arrives to a particular instance of the sellable-inventory-service, if it has the [location, SKU] combination, it will serve the same up, else it will forward the request to an instance of the service that has the data.
public HostPort hostPortForKey(LocationSku locationSku) throws NotFoundException { StreamsMetadata metadataForKey = sellableInventoryStream.metadataForKey(SELLABLE_INVENTORY_STORE, locationSku, Topics.SELLABLE_INVENTORY.keySerde().serializer()); return new HostPort(metadataForKey.host(), metadataForKey.port()); // HOST:Port that has the sellable inventory for location-sku }
This application serves as the query part of the CQRS architecture if one were to choose the above architectural style. The part about an instance having to do the forwarding to the right server if it does not have the data seems like an extra hop but don't distributed database systems actually do that behind the scenes?
The sellable-inventory-calculator also fires the Sellable Inventory to a Kafka Topic for downstream consumers to pick up.
The sellable-inventory-calculator also fires the Sellable Inventory to a Kafka Topic for downstream consumers to pick up.
sellable-inventory-service
The sellable-inventory-service is the query side of the CQRS architecture of the Retail inventory example. The Microservice (or a separate process) takes the sellable inventory events from the sellable inventory topic and persists them into its Cassandra store. The Microservice has a REST endpoint that serves up the sellable inventory for a [location, SKU] tuple. Wait up, didn't the sellable-inventory-calculator already have a query side, so why this additional service? The only purpose is to demonstrate the different approaches to the query side, one with Kafka Streams and a Local storage and the second with a dedicated Cassandra instance servicing the Query.
schema-registry
The schema-registry is included in the example for completeness. It stores a versioned history of Avro schemas. The example could easily have been done with JSON rather than Avro. For more on why Schema registries are useful read the Confluent article, Yes Virginia, You Really Do Need a Schema Registry. Note that the example shares a schema jar that contains Avro generated classes across producers and consumers of topics. This is only for demonstration. In a real world, one would generate the schema classes from the definition in the registry using the Schema Registry maven plugin for example.
You can obtain the full example from here: https://github.com/sanjayvacharya/sleeplessinslc/tree/master/inventory-example
The different components of this architecture are Dockerized and using docker-compose we can locally run the containers linking them as needed. Before proceeding ensure that you have Docker and docker-compose installed. From the root level of the project, issue the following to build all the modules:
The build should result in docker images being created for the different artifacts. The docker-compose file has been tried on any other platform other than Mac OS. It is quite likely that you might need some tweaking on other platforms to get this to work. On a Mac, issue the following:
If you would like to run the individual Microservices in your IDE, then start run the following command to start docker containers for Kafka, Schema Registry and Cassandra.
After this you should be able to start the individual Microservices by invoking their individual Main classes as you would do any Spring Boot application. Ensure that you pass the following VM argument for each service '-Dspring.kafka.bootstrap.servers=${DOCKER_KAFKA_HOST}:9092'' where you replace the DOCKER_KAFKA_HOST with the computed value earlier.
Alternatively, if you would prefer to have all the services started as docker containers, start docker-compose using (make sure you have done a mvn install prior):
The above will result in the starting of a three node Kafka Cluster, Cassandra, Schema Registry, inventory-mutator, sellable-inventory-service and two instances of sellable-inventory-calculator. Why two instances of the latter? This helps demonstrate the forwarding of a request from an instance of sellable-inventory-calculator whose local stream store does NOT have the requested {LOCATION, SKU} to an instance that does have it. Note that when you run docker-compose, you might see services failing to start and then retrying. This is normal as docker-compose does not have a way to ensure deterministic startup dependencies (at least in my findings).
To send an inventory adjustment issue the following for which you would get a response like 'Updated Inventory [inventory_adjustment-2@1]' indicating a success:
To issue a reservation, you can send the following:
You can obtain the sellable inventory value by either querying the sellable-inventory-service or one of the two instances of sellable-inventory-calculator. For the call to the sellable inventory service use PORT 9093 and for the two instances of sellable-inventory-calculator, use PORTs 9094 and 9095. A successful response should show something like: `{"locationId":"STL","sku":"123123","count":190}' and should be the same answer from all three components.
schema-registry
The schema-registry is included in the example for completeness. It stores a versioned history of Avro schemas. The example could easily have been done with JSON rather than Avro. For more on why Schema registries are useful read the Confluent article, Yes Virginia, You Really Do Need a Schema Registry. Note that the example shares a schema jar that contains Avro generated classes across producers and consumers of topics. This is only for demonstration. In a real world, one would generate the schema classes from the definition in the registry using the Schema Registry maven plugin for example.
Running the Example
You can obtain the full example from here: https://github.com/sanjayvacharya/sleeplessinslc/tree/master/inventory-example
The different components of this architecture are Dockerized and using docker-compose we can locally run the containers linking them as needed. Before proceeding ensure that you have Docker and docker-compose installed. From the root level of the project, issue the following to build all the modules:
>mvn clean install
The build should result in docker images being created for the different artifacts. The docker-compose file has been tried on any other platform other than Mac OS. It is quite likely that you might need some tweaking on other platforms to get this to work. On a Mac, issue the following:
>export DOCKER_KAFKA_HOST=$(ipconfig getifaddr en0)
If you would like to run the individual Microservices in your IDE, then start run the following command to start docker containers for Kafka, Schema Registry and Cassandra.
>docker-compose -f docker-compose-dev.yml up
After this you should be able to start the individual Microservices by invoking their individual Main classes as you would do any Spring Boot application. Ensure that you pass the following VM argument for each service '-Dspring.kafka.bootstrap.servers=${DOCKER_KAFKA_HOST}:9092'' where you replace the DOCKER_KAFKA_HOST with the computed value earlier.
Alternatively, if you would prefer to have all the services started as docker containers, start docker-compose using (make sure you have done a mvn install prior):
>docker-compose up
The above will result in the starting of a three node Kafka Cluster, Cassandra, Schema Registry, inventory-mutator, sellable-inventory-service and two instances of sellable-inventory-calculator. Why two instances of the latter? This helps demonstrate the forwarding of a request from an instance of sellable-inventory-calculator whose local stream store does NOT have the requested {LOCATION, SKU} to an instance that does have it. Note that when you run docker-compose, you might see services failing to start and then retrying. This is normal as docker-compose does not have a way to ensure deterministic startup dependencies (at least in my findings).
To send an inventory adjustment issue the following for which you would get a response like 'Updated Inventory [inventory_adjustment-2@1]' indicating a success:
>curl -H "Content-Type: application/json" -X POST -d '{"locationId":"STL","sku":"123123", "count": "100"}' http://localhost:9091/inventory/adjustment
To issue a reservation, you can send the following:
curl -H "Content-Type: application/json" -X POST -d '{"locationId":"STL","sku":"123123", "count": "10"}' http://localhost:9091/inventory/reservation
You can obtain the sellable inventory value by either querying the sellable-inventory-service or one of the two instances of sellable-inventory-calculator. For the call to the sellable inventory service use PORT 9093 and for the two instances of sellable-inventory-calculator, use PORTs 9094 and 9095. A successful response should show something like: `{"locationId":"STL","sku":"123123","count":190}' and should be the same answer from all three components.
>curl -s http://localhost:{PORT}/inventory/locations/STL/skus/123123
There is an integration-test module as well but it is not set up to build as a child of the parent pom. That test needs all services to be up using 'docker-compose up' mentioned earlier. The test class in that module will execute adjustments, reservations and queries for sellable inventory.
I have barely scratched the surface of the potential of Kafka Streams with this article. The process of doing low latency transformations on a stream of data is powerful. Combine this with the ability of having local state stores that are fault tolerant by virtue of them being backed by Kafka, well, you have an architecture that can rock. If you are looking for Read what you Wrote consistency, then while this solution approaches it, you will not have the comfort that you would have of an ACID data store like Oracle etc. If you embrace eventual consistency, KStreams opens a splendorous world....
Conclusion
I have barely scratched the surface of the potential of Kafka Streams with this article. The process of doing low latency transformations on a stream of data is powerful. Combine this with the ability of having local state stores that are fault tolerant by virtue of them being backed by Kafka, well, you have an architecture that can rock. If you are looking for Read what you Wrote consistency, then while this solution approaches it, you will not have the comfort that you would have of an ACID data store like Oracle etc. If you embrace eventual consistency, KStreams opens a splendorous world....