Search This Blog

Wednesday, February 7, 2018

Inventory Microservice example with Kafka Streams and CQRS

Introduction


I recently had a chance to play with Kafka Streams and CQRS and wanted to share my learnings via an example. For the example, I have selected a domain that represents Sellable Inventory, i.e, a computation of inventory that denotes what you can sell based of what you have on-hand and what has been reserved.

Kafka Streams


Many people have used Kafka in their workplace simply by virtue of it being the choice of technology for large throughput messaging. Kafka Streams builds upon Kafka to provide:
  • Fault Tolerant processing of Streams with support for joins, windowing and aggregations
  • Support for exactly once semantics
  • Supports a rich notion of time of event for processing with support for: Event Time, Processing Time and Ingestion Time.
For more on Kafka Streams this video is a good introduction:


Architecture and Domain for the Example

Neha Narkhede has a nice post on the confluent site: Event Sourcing, CQRS, stream processing and Kafka: What's the connection? In that post, she discusses the concepts of Event Sourcing, CQRS and Stream processing and eludes to an example of a Retail Inventory solution that is implemented using the concepts but does not provide the source code. The domain for this BLOG is loosely based of the example she mentions there and loosely based of the GCP recipe on Building Real-Time Inventory Systems for Retail.

The application has the following architecture:


inventory-mutator
This module is responsible for inventory mutation actions. Consider this the command part of the CQRS pattern. There are two operations that it handles:
  • Adjustments - Warehouse updates of inventory quantities. These are increments to availability of inventory
  • Reservations - Decrements to inventory as it gets sold on a Retail website
Both the end points are part of the same application but emit mutations to separate Kafka topics as shown in the figure, inventory_adjustments and inventory_reservations. One might choose to separate both these operations, adjustments and reservations, into different Microservices in the real world in the interest of separation of concerns and scale but this example keeps it simple.

sellable-inventory-calculator
This is the stream processor that combines the adjustments and reservation streams to provide a stream of sellable inventory.


In the example, the sellable_inventory_calculator application is also a Microservice that serves up the sellable inventory at a REST endpoint. The Stream processor stores the partitioned sellable inventory data in a local State store. Every instance of the sellable-inventory-calculator application that embeds the Kafka Streams library, hosts a subset of the application state thus partitioning the data across the different instances. Fault tolerance for  the Local State is provided by Kafka Streams by transparently logging all the updates to the Local State store to a separate Kafka Topic.

The code that is representative of the above is shown below:
   StreamsBuilder kafkaStreamBuilder = new StreamsBuilder();

    // KTable of adjustments
    KTable<LocationSku, Integer> adjustments = kafkaStreamBuilder
        .stream(Topics.INVENTORY_ADJUSTMENT.name(),
            Consumed.with(Topics.INVENTORY_ADJUSTMENT.keySerde(), Topics.INVENTORY_ADJUSTMENT.valueSerde()))
        .groupByKey(Serialized.<LocationSku, Integer>with(Topics.INVENTORY_ADJUSTMENT.keySerde(),
            Topics.INVENTORY_ADJUSTMENT.valueSerde()))
        .reduce((value1, value2) -> {
          return (value1 + value2);
        }); 

    // KTable of reservations
    KTable<LocationSku, Integer> inventoryReservations = kafkaStreamBuilder
        .stream(Topics.INVENTORY_RESERVATION.name(),
            Consumed.with(Topics.INVENTORY_RESERVATION.keySerde(), Topics.INVENTORY_RESERVATION.valueSerde()))
        .groupByKey(Serialized.<LocationSku, Integer>with(Topics.INVENTORY_RESERVATION.keySerde(),
            Topics.INVENTORY_RESERVATION.valueSerde()))
        .reduce((value1, value2) -> {
          return (value1 + value2);
        }); 

    // KTable of sellable inventory
    KTable<LocationSku, Integer> sellableInventory = adjustments.leftJoin(inventoryReservations,
        (adjustment, reservation) -> {
          LOGGER.info("Adjustment:{} Reservation {}", adjustment, reservation);
          return  (adjustment - (reservation == null ? 0 : reservation));
        }, Materialized.<LocationSku, Integer>as(Stores.persistentKeyValueStore(SELLABLE_INVENTORY_STORE))
            .withKeySerde(Topics.SELLABLE_INVENTORY.keySerde()).withValueSerde(Topics.SELLABLE_INVENTORY.valueSerde()));


    // Send the sellable inventory to the sellable_inventory topic
    sellableInventory.toStream().map((key, value) -> {
      LOGGER.info("Streaming Key: {} Value: {}", key, value);
      return new KeyValue<LocationSku, Integer>(key, value);
    }).to(Topics.SELLABLE_INVENTORY.name(), Produced.<LocationSku, Integer>with(Topics.SELLABLE_INVENTORY.keySerde(),
        Topics.SELLABLE_INVENTORY.valueSerde()));

    sellableInventoryStream = new KafkaStreams(kafkaStreamBuilder.build(), streamsConfig);
    sellableInventoryStream.start();
When an instance of the sellable-inventory-service instance starts, it sets the the Stream configuration of APPLICATION_SERVER to its host:port as shown below:
  @Bean
  public StreamsConfig streamConfig() {
   
    Map<String, Object> props = new HashMap<>();

    props.put(StreamsConfig.APPLICATION_ID_CONFIG, "sellableInventoryCalculator");
    props.put(StreamsConfig.APPLICATION_SERVER_CONFIG, getAppHostPort().getHost() + ":" + serverPort); // Server host/port
    ... 
    return new StreamsConfig(props);
  }
When a request arrives to a particular instance of the sellable-inventory-service, if it has the [location, SKU] combination, it will serve the same up, else it will forward the request to an instance of the service that has the data.

This host/port computation is done by using the stream as shown below:
public HostPort hostPortForKey(LocationSku locationSku) throws NotFoundException {
 StreamsMetadata metadataForKey = sellableInventoryStream.metadataForKey(SELLABLE_INVENTORY_STORE, locationSku,
        Topics.SELLABLE_INVENTORY.keySerde().serializer());

 return new HostPort(metadataForKey.host(), metadataForKey.port()); // HOST:Port that has the sellable inventory for location-sku
}

This application serves as the query part of the CQRS architecture if one were to choose the above architectural style.  The part about an instance having to do the forwarding to the right server if it does not have the data seems like an extra hop but don't distributed database systems actually do that behind the scenes?

The sellable-inventory-calculator also fires the Sellable Inventory to a Kafka Topic for downstream consumers to pick up.

sellable-inventory-service

The sellable-inventory-service is the query side of the CQRS architecture of the Retail inventory example.  The Microservice (or a separate process) takes the sellable inventory events from the sellable inventory topic and persists them into its Cassandra store. The Microservice has a REST endpoint that serves up the sellable inventory for a [location, SKU] tuple. Wait up, didn't the sellable-inventory-calculator already have a query side, so why this additional service? The only purpose is to demonstrate the different approaches to the query side, one with Kafka Streams and a Local storage and the second with a dedicated Cassandra instance servicing the Query.

schema-registry

The schema-registry is included in the example for completeness. It stores a versioned history of Avro schemas. The example could easily have been done with JSON rather than Avro. For more on why Schema registries are useful read the Confluent article, Yes Virginia, You Really Do Need a Schema Registry. Note that the example shares a schema jar that contains Avro generated classes across producers and consumers of topics. This is only for demonstration. In a real world, one would generate the schema classes from the definition in the registry using the Schema Registry maven plugin for example.


Running the Example


You can obtain the full example from here: https://github.com/sanjayvacharya/sleeplessinslc/tree/master/inventory-example

The different components of this architecture are Dockerized and using docker-compose we can locally run the containers linking them as needed. Before proceeding ensure that you have Docker and docker-compose installed.  From the root level of the project, issue the following to build all the modules:

>mvn clean install

The build should result in docker images being created for the different artifacts. The docker-compose file has been tried on any other platform other than  Mac OS.  It is quite likely that you might need some tweaking on other platforms to get this to work.  On a Mac, issue the following:

>export DOCKER_KAFKA_HOST=$(ipconfig getifaddr en0)

If you would like to run the individual Microservices in your IDE, then start run the following command to start docker containers for Kafka, Schema Registry and Cassandra.
>docker-compose -f docker-compose-dev.yml up

After this you should be able to start the individual Microservices by invoking their individual Main classes as you would do any Spring Boot application. Ensure that you pass the following VM argument for each service '-Dspring.kafka.bootstrap.servers=${DOCKER_KAFKA_HOST}:9092'' where you replace the DOCKER_KAFKA_HOST with the computed value earlier.
Alternatively, if you would prefer to have all the services started as docker containers, start docker-compose using (make sure you have done a mvn install prior):
>docker-compose up

The above will result in the starting of a  three node Kafka Cluster, Cassandra, Schema Registry, inventory-mutator, sellable-inventory-service and two instances of sellable-inventory-calculator. Why two instances of the latter? This helps demonstrate the forwarding of a request from an instance of sellable-inventory-calculator whose local stream store does NOT have the requested {LOCATION, SKU} to an instance that does have it. Note that when you run docker-compose, you might see services failing to start and then retrying. This is normal as docker-compose does not have a way to ensure deterministic startup dependencies (at least in my findings).
To send an inventory adjustment issue the following for which you would get a response like 'Updated Inventory [inventory_adjustment-2@1]' indicating a success:
>curl -H "Content-Type: application/json" -X POST -d '{"locationId":"STL","sku":"123123", "count": "100"}' http://localhost:9091/inventory/adjustment

To issue a reservation, you can send the following:
curl -H "Content-Type: application/json" -X POST -d '{"locationId":"STL","sku":"123123", "count": "10"}' http://localhost:9091/inventory/reservation

You can obtain the sellable inventory value by either querying the sellable-inventory-service or one of the two instances of sellable-inventory-calculator. For the call to the sellable inventory service use PORT 9093 and for the two instances of sellable-inventory-calculator, use PORTs 9094 and 9095. A successful response should show something like: `{"locationId":"STL","sku":"123123","count":190}' and should be the same answer from all three components.
>curl -s http://localhost:{PORT}/inventory/locations/STL/skus/123123

There is an integration-test module as well but it is not set up to build as a child of the parent pom. That test needs all services to be up using 'docker-compose up' mentioned earlier.  The test class in that module will execute adjustments, reservations and queries for sellable inventory.

Conclusion



I have barely scratched the surface of the potential of Kafka Streams with this article. The process of doing low latency transformations on a stream of data is powerful. Combine this with the ability of having local state stores that are fault tolerant by virtue of them being backed by Kafka, well, you have an architecture that can rock. If you are looking for Read what you Wrote consistency, then while this solution approaches it, you will not have the comfort that you would have of an ACID data store like Oracle etc. If you embrace eventual consistency, KStreams opens a splendorous world....