Search This Blog

Tuesday, November 27, 2018

API First Services with Open API 3.0 and Spring Boot

API First Approach


The term API First design seems to be quite prevalent with Microservices and especially for those on the cloud. Swagger.io defines API first as: "An API-first approach means that for any given development project, your APIs are treated as “first-class citizens.” That everything about a project revolves around the idea that the end product will be consumed by mobile devices, and that APIs will be consumed by client applications.". API first approach involves giving more thought ahead of time toward the design of the API. It involves planning across stake holders and feedback on the API before the API is translated to code.

They seem to state some characteristics of API that are developed using an API First approach:
- That they are consistent. This makes sense. In an ecosystem of API's, one would like the API's to have a level of consistency
- That can be developed using an API definition language
- That they are re-usable

An API first approach seems to want to focus first on the API rather than building versions of the API for mobile and web etc.

Kevin Hoffmann in his BLOG, An API-first approach for cloud-native app development  describes a cloud Microservice ecosystem where there are multiple services in play and that without discipline, represents a nightmarish scenario of integration failures. He goes on to state: "To avoid these integration failures, and to formally recognize your API as a first-class artifact of the development process, API first gives teams the ability to work against each other’s public contracts without interfering with internal development processes." Kevin also stresses that designing your API first enables one to have a discussion with stakeholders with some tangible material (the API), leading to the creation of user stories, mocked API and documentation that socialize the scope of the service.

Swagger defines the following as benefits of an API First Approach to creating services:

  • Parallel development among teams as they work with the interface or contract as they lend themselves to mocking and stubbing.
  • Reduces cost of developing applications as API can code can be re-used across multiple applications. Focusing on API design and establishment allows for the problem to thought through well enough before the code is even invested in.
  • Speed to Market due to the automation support around generating of code from the API specification in place
  • Ensures a good developer experience as the API is consistent, well documented and well designed.
  • Reduces the risk of failure for the business as API first approach ensures that the APIs are reliable, consistent and easy for developers to use
So is this just old wine in a new bottle here? Taking an example of SOAP based services. They satisfied the parallel development point from above because one could create the IDL and then generate service and client stubs. XML schemas used in the IDL's were re-usable. The service developed would be re-usable across multiple applications. The generation of code from the API meant speed to market. Ensured good developer experience as the API was well documented with first class RPC type operations. API's are consistent and developers simply used the generated artifacts and focused on building the logic backing them.

I think the beauty of the API first development is not as much about the technology but is about the methodology and principles advocated by the approach where the API is worked on ahead of developing the code and is bought into by the stakeholders before investment is done on the effort. The API in such as ecosystem is considered as a first class citizen of the business.  The simplicity of the IDL along with the tooling made available makes the job of writing the IDL pretty easy. This coupled with the fact that the IDL itself is very readable helps make it easy for people to understand what the capabilities of the API will be. Code generation tools also allows the generation of the service/client stubs in a multitude of languages which makes polyglot programming easy.

Open API and Swagger


The Open API Specification is a community driven one that defines a programming language agnostic IDL for REST API. The IDL is defined in a way that is intuitive for humans to read and author without requiring additional code. Open API documents are generated using YAML or JSON. The Open API specification evolved from Swagger 2.0 specification with input from the community.

The specification allows authoring of API including end points, authentication mechanisms, operation parameters, request/response types and other licensing information.

Swagger is a bunch of a tools that are used for the implementation of the specification like Editor, Code Generator etc.

Open API Example


The full source code for this example can be found at: open-api-example
For the Open API example, I chose to use my favorite simplistic notes service. I then placed the following requirements on it:

  • Needed to generate the API definition first and have a simple way to make it available for service and service consumers to use
  • Have the Open API definition support basic authentication
  • Use the API definition to generate the service stubs
  • Use the API definitions to generate client stubs
  • Use the client stubs to invoke the service in a simple integration test

For the first requirement, the notes api.yaml file that describes the API is being bundled in a notes-api maven jar. This jar only has the API file, it does not bundle in any classes. The YAML file is set up to use Basic Authentication of all its resources. Part of the API specification is shown below:

openapi: "3.0.0"
info:
   version: '1.0'
   title: Notes Service
   description: Demo project of open api 3 using API first development
   termsOfService: http://localhost:8080/terms-of-service
   contact:
    name: Sanjay Acharya
    email: foo@bar.com


security:
  - BasicAuth: []

paths:
   /notes:
      get:
         summary: Get all notes
         operationId: getNotes
         tags:
            - notes
         parameters:
            - name: page
              in: query
              description: Page Number
              required: false
              schema:
                 type: integer
                 format: int32
            - name: page-size
              in: query
              description: Number of items to return at one time
              required: false
              schema:
                 type: integer
                 format: int32
         responses:
            '200':
              description: A paged array of notes
              content:
                application/json:
                  schema:
                    $ref: "#/components/schemas/NotesPage"
            '404':
              $ref: "#/components/responses/NotFound"

            default:
              $ref: "#/components/responses/UnexpectedError"
   ....
   ....


To generate the server stubs, I used the swagger-codegen-maven-plugin configured as shown:

  <plugin>
        <groupId>io.swagger.codegen.v3</groupId>
        <artifactId>swagger-codegen-maven-plugin</artifactId>
        <version>3.0.2</version>
        <dependencies>
          <dependency>
            <groupId>com.welflex.example</groupId> // Dependency to notes-api in classpath during plugin execution
            <artifactId>notes-api</artifactId>
            <version>1.0-SNAPSHOT</version>
          </dependency>
        </dependencies>
        <executions>
          <execution>
            <id>server</id>
            <goals>
              <goal>generate</goal>
            </goals>
            <configuration>
              <inputSpec>/api.yaml</inputSpec>
              <language>spring</language>
              <library>spring-boot</library>  // Generate SpringBoot based stubs
              <output>${project.build.directory}/generated-sources</output>
              <invokerPackage>com.welflex.notes</invokerPackage>
              <apiPackage>com.welflex.notes.api.generated</apiPackage>
              ....

            </configuration>
          </execution>
        </executions>
      </plugin>

The plugin uses the notes-api as a JAR dependency to build the stub. Think of the notes-api JAR as being an artifact that the service creators have authored and published as a JAR for clients to use in some maven repository. Typically these artifacts would be authored and published in some API registry but for the scope of this example, the simpler jar method works. Of importance on the generated code is the NotesApi that defines the different operations and documents it with content from the swagger file and the NotesApiDelegate that defines the API stubs like shown below:

@Api(value = "notes", description = "the notes API")
public interface NotesApi {

    NotesApiDelegate getDelegate();

    @ApiOperation(value = "Creates a new note", nickname = "createNote", notes = "", response = Note.class, authorizations = {
        @Authorization(value = "BasicAuth")
    }, tags={ "notes", })
    @ApiResponses(value = {
        @ApiResponse(code = 201, message = "A simple string response", response = Note.class),
        @ApiResponse(code = 200, message = "Unexpected Error", response = Error.class) })
    @RequestMapping(value = "/notes",
        produces = { "application/json" },
        consumes = { "application/json" },
        method = RequestMethod.POST)
    default ResponseEntity<Note> createNote(@ApiParam(value = "The note contents" ,required=true )  @Valid @RequestBody Note body) {
        return getDelegate().createNote(body);
    }
 

    @ApiOperation(value = "Gets a Note", nickname = "getNote", notes = "", response = Note.class, authorizations = {
        @Authorization(value = "BasicAuth")
    }, tags={ "notes", })
    @ApiResponses(value = {
        @ApiResponse(code = 200, message = "The note requested", response = Note.class),
        @ApiResponse(code = 404, message = "The specified resource was not found", response = Error.class),
        @ApiResponse(code = 200, message = "Unexpected Error", response = Error.class) })
    @RequestMapping(value = "/notes/{id}",
        produces = { "application/json" },
        method = RequestMethod.GET)
    default ResponseEntity<Note> getNote(@ApiParam(value = "The id of the note to retrieve",required=true) @PathVariable("id") Integer id) {
        return getDelegate().getNote(id);
    }
   ....
}

 public interface NotesApiDelegate {
....
    /**
     * @see NotesApi#createNote
     */
    default ResponseEntity<Note> createNote( Note  body) {
        if(getObjectMapper().isPresent() && getAcceptHeader().isPresent()) {
        } else {
            log.warn("ObjectMapper or HttpServletRequest not configured in default NotesApi interface so no example is generated");
        }
        return new ResponseEntity<>(HttpStatus.NOT_IMPLEMENTED);
    }

 
    /**
     * @see NotesApi#getNote
     */
    default ResponseEntity<Note> getNote( Integer  id) {
        if(getObjectMapper().isPresent() && getAcceptHeader().isPresent()) {
        } else {
            log.warn("ObjectMapper or HttpServletRequest not configured in default NotesApi interface so no example is generated");
        }
        return new ResponseEntity<>(HttpStatus.NOT_IMPLEMENTED);
    }

 ..
}

The generated stub has a bunch of default signatures that need to be implemented. The code generated does not have any 'security' level annotations or the stub code for security. It leaves that to the implementor. Shown below is the NotesApiDelegateImpl that a developer would author which  implements the NotesApiDelegate and uses to a Spring Data JPA NotesRepository for the CRUD operations:
public class NotesApiDelegateImpl implements NotesApiDelegate {

  private final NotesRepository notesRepository; // Yes, Yes, we could delegate to a service and then to a repository, it is an example :-)
 
  @Autowired
  public NotesApiDelegateImpl(NotesRepository notesRepository) {
    this.notesRepository = notesRepository;
  }

  /**
   * @see NotesApi#getNote
   */
  public ResponseEntity<Note> getNote( Integer  id) {

    Optional<NoteModel> noteModel = notesRepository.findById(id);

    if (noteModel.isPresent()) {
      return ResponseEntity.ok(toNote(noteModel.get()));
    }

    throw new NoteNotFoundException(id);
  }

  public ResponseEntity<NotesPage> getNotes( Integer  page,
       Integer  pageSize) {

    int pageRequest = page == null? 0 : page;
    int limitRequested = pageSize == null ? 100 : pageSize;

    Page<NoteModel> dataPage = notesRepository.findAll(PageRequest.of(pageRequest, limitRequested));

    PageMetadata pageMetadata = new PageMetadata().pageNumber(pageRequest).pageSize(limitRequested).resultCount(dataPage.getNumberOfElements())
        .totalResults(dataPage.getTotalElements());

    Notes notes = new Notes();

    notes.addAll(dataPage.stream().map(t -> toNote(t)).collect(Collectors.toList()));

    NotesPage notesPage = new NotesPage().items(notes).metadata(pageMetadata);

    return ResponseEntity.<NotesPage>ok(notesPage);
  }
...
}

For the Basic Authentication, the example uses Spring Security which is configured as shown below with support for only one demo user having user name 'user' and password 'password':
@Configuration
@EnableWebSecurity
public class SecurityConfiguration extends WebSecurityConfigurerAdapter {

  @Override
  protected void configure(HttpSecurity http) throws Exception {
    http.csrf().disable().authorizeRequests().antMatchers("/index.html", "/api.yaml", "/api-docs")
        .permitAll().anyRequest().authenticated().and().httpBasic();

    super.configure(http);
  }

  @Autowired
  public void configureGlobal(AuthenticationManagerBuilder auth) throws Exception {
    PasswordEncoder encoder = PasswordEncoderFactories.createDelegatingPasswordEncoder();

    auth.inMemoryAuthentication().withUser("user").password(encoder.encode("password"))
        .roles("USER");
  }
}

There service is configured to serve the api.yaml file via a rest endpoint as well.
A maven module exists called notes-client that uses the same plugin to generate the client side artifacts including the model artifacts. The service maintainers would typically not distribute such a client (See my blog around:Shared Client Libraries) but would expect the client to be generated in similar manner using the notes-api maven dependency in the consuming application directly.
      <plugin>
        <groupId>io.swagger.codegen.v3</groupId>
        <artifactId>swagger-codegen-maven-plugin</artifactId>
        <version>3.0.2</version>
        <dependencies>    
          <dependency> // Dependency to notes-api in classpath during plugin execution
            <groupId>com.welflex.example</groupId>
            <artifactId>notes-api</artifactId>
            <version>${project.version}</version>
          </dependency>
        </dependencies>
        <executions>
          <execution>
            <id>client</id>
            <goals>
              <goal>generate</goal>
            </goals>
            <configuration>
              <inputSpec>/api.yaml</inputSpec> // API from classpath
              <language>java</language>
              <library>resttemplate</library>  // Generate for RestTemplate
              <output>${project.build.directory}/generated-sources</output>
              <ignoreFileOverride>${project.basedir}/src/main/resources/.openapi-codegen-ingore</ignoreFileOverride>
              <apiPackage>com.welflex.notes.api.generated</apiPackage>             
            </configuration>
          </execution>
        </executions>
      </plugin>

The generated client stubs utilize RestTemplate and provide a neat client abstraction to communicate with the service.  The client stubs pleasantly  include basic authentication support built in that challenge as needed to invoke the API. The sample integration test demonstrates the use of the generated Notes client stubs.

When the server module is built, a docker image is created. For this reason, you need to have docker installed on your computer prior to playing with this.

The integration test module launches the service container using the TestContainers framework directly from a JUnit test and subsequently uses the notes-client to invoke the API methods.

The full source code for this project can be found at open-api-example.

Parting Thoughts


I really liked the Open API specification as I felt that defining the API is pretty intuitive. If you use the Swagger Editor UI, then authoring the API definition is pretty straightforward where the tool assists in making corrections. Using something like Swagger Hub to collaboratively author the API where different stake holders are participating in the creation/evolution of the API is valuable. There are also some really good eclipse plugins that help with authoring of the api as well if you don't want to use Swagger UI.

The swagger-code-generator was pretty good to generate the Spring Boot service and the client that uses RestTemplate. It however did nothing for spring-security and the Basic Authentication pieces. Maybe a future version would allow a tag for security-framework to generate security classes. One hard point I had was that the generated service code did not support alternate execution paths well. For example, there was no easy way to respond with a different entity type in the generated code for an error condition like a 404. This could lead to drift between what is documented and what is actually returned. I used a hack where I defined a Spring Exception handler to interrogate the annotation on the NotesApi and pull the response message from it as shown below:
@ControllerAdvice
public class ExceptionMapper extends ResponseEntityExceptionHandler {

  @ExceptionHandler(value = {NoteNotFoundException.class})
  public ResponseEntity<?> handleException(Exception e, HandlerMethod handlerMethod) {
    if (e instanceof NoteNotFoundException) {
      return handleNoteNotFoundException(NoteNotFoundException.class.cast(e), handlerMethod);
    }
    else {
      Error error = new Error().message("Unexpected Error:"  + e.getMessage());
      return new ResponseEntity<Error>(error, HttpStatus.INTERNAL_SERVER_ERROR);
    }
  }

  private ResponseEntity<?> handleNoteNotFoundException(NoteNotFoundException e,
    HandlerMethod handlerMethod) {
    ApiResponse apiResponse = getApiResponseForErrorCode(404, handlerMethod);
    Error error = new Error().message(apiResponse != null ? apiResponse.message() : "Note not found:" + e.getNoteId()); // Use the message as defined in the annotation

    return new ResponseEntity<Error>(error, HttpStatus.NOT_FOUND);
  }

  private ApiResponse getApiResponseForErrorCode(int errorCode, HandlerMethod method) {
   ...
    return method.getMethodAnnotation(ApiResponse.class);
  }
}

Another example that I found hard to do on the client is to respond with a Location header to a POST request with the location of a created note. The generated client did not provide any way to expose that to the consumer of the generated code.
Overall I really liked the Open API specification and the collaborative definition of an API. I also liked the idea that teams spend time thinking of what their API should be before they get down to coding. Service consumers should use the API to generate their own consumer clients.

Wednesday, February 7, 2018

Inventory Microservice example with Kafka Streams and CQRS

Introduction


I recently had a chance to play with Kafka Streams and CQRS and wanted to share my learnings via an example. For the example, I have selected a domain that represents Sellable Inventory, i.e, a computation of inventory that denotes what you can sell based of what you have on-hand and what has been reserved.

Kafka Streams


Many people have used Kafka in their workplace simply by virtue of it being the choice of technology for large throughput messaging. Kafka Streams builds upon Kafka to provide:
  • Fault Tolerant processing of Streams with support for joins, windowing and aggregations
  • Support for exactly once semantics
  • Supports a rich notion of time of event for processing with support for: Event Time, Processing Time and Ingestion Time.
For more on Kafka Streams this video is a good introduction:


Architecture and Domain for the Example

Neha Narkhede has a nice post on the confluent site: Event Sourcing, CQRS, stream processing and Kafka: What's the connection? In that post, she discusses the concepts of Event Sourcing, CQRS and Stream processing and eludes to an example of a Retail Inventory solution that is implemented using the concepts but does not provide the source code. The domain for this BLOG is loosely based of the example she mentions there and loosely based of the GCP recipe on Building Real-Time Inventory Systems for Retail.

The application has the following architecture:


inventory-mutator
This module is responsible for inventory mutation actions. Consider this the command part of the CQRS pattern. There are two operations that it handles:
  • Adjustments - Warehouse updates of inventory quantities. These are increments to availability of inventory
  • Reservations - Decrements to inventory as it gets sold on a Retail website
Both the end points are part of the same application but emit mutations to separate Kafka topics as shown in the figure, inventory_adjustments and inventory_reservations. One might choose to separate both these operations, adjustments and reservations, into different Microservices in the real world in the interest of separation of concerns and scale but this example keeps it simple.

sellable-inventory-calculator
This is the stream processor that combines the adjustments and reservation streams to provide a stream of sellable inventory.


In the example, the sellable_inventory_calculator application is also a Microservice that serves up the sellable inventory at a REST endpoint. The Stream processor stores the partitioned sellable inventory data in a local State store. Every instance of the sellable-inventory-calculator application that embeds the Kafka Streams library, hosts a subset of the application state thus partitioning the data across the different instances. Fault tolerance for  the Local State is provided by Kafka Streams by transparently logging all the updates to the Local State store to a separate Kafka Topic.

The code that is representative of the above is shown below:
   StreamsBuilder kafkaStreamBuilder = new StreamsBuilder();

    // KTable of adjustments
    KTable<LocationSku, Integer> adjustments = kafkaStreamBuilder
        .stream(Topics.INVENTORY_ADJUSTMENT.name(),
            Consumed.with(Topics.INVENTORY_ADJUSTMENT.keySerde(), Topics.INVENTORY_ADJUSTMENT.valueSerde()))
        .groupByKey(Serialized.<LocationSku, Integer>with(Topics.INVENTORY_ADJUSTMENT.keySerde(),
            Topics.INVENTORY_ADJUSTMENT.valueSerde()))
        .reduce((value1, value2) -> {
          return (value1 + value2);
        }); 

    // KTable of reservations
    KTable<LocationSku, Integer> inventoryReservations = kafkaStreamBuilder
        .stream(Topics.INVENTORY_RESERVATION.name(),
            Consumed.with(Topics.INVENTORY_RESERVATION.keySerde(), Topics.INVENTORY_RESERVATION.valueSerde()))
        .groupByKey(Serialized.<LocationSku, Integer>with(Topics.INVENTORY_RESERVATION.keySerde(),
            Topics.INVENTORY_RESERVATION.valueSerde()))
        .reduce((value1, value2) -> {
          return (value1 + value2);
        }); 

    // KTable of sellable inventory
    KTable<LocationSku, Integer> sellableInventory = adjustments.leftJoin(inventoryReservations,
        (adjustment, reservation) -> {
          LOGGER.info("Adjustment:{} Reservation {}", adjustment, reservation);
          return  (adjustment - (reservation == null ? 0 : reservation));
        }, Materialized.<LocationSku, Integer>as(Stores.persistentKeyValueStore(SELLABLE_INVENTORY_STORE))
            .withKeySerde(Topics.SELLABLE_INVENTORY.keySerde()).withValueSerde(Topics.SELLABLE_INVENTORY.valueSerde()));


    // Send the sellable inventory to the sellable_inventory topic
    sellableInventory.toStream().map((key, value) -> {
      LOGGER.info("Streaming Key: {} Value: {}", key, value);
      return new KeyValue<LocationSku, Integer>(key, value);
    }).to(Topics.SELLABLE_INVENTORY.name(), Produced.<LocationSku, Integer>with(Topics.SELLABLE_INVENTORY.keySerde(),
        Topics.SELLABLE_INVENTORY.valueSerde()));

    sellableInventoryStream = new KafkaStreams(kafkaStreamBuilder.build(), streamsConfig);
    sellableInventoryStream.start();
When an instance of the sellable-inventory-service instance starts, it sets the the Stream configuration of APPLICATION_SERVER to its host:port as shown below:
  @Bean
  public StreamsConfig streamConfig() {
   
    Map<String, Object> props = new HashMap<>();

    props.put(StreamsConfig.APPLICATION_ID_CONFIG, "sellableInventoryCalculator");
    props.put(StreamsConfig.APPLICATION_SERVER_CONFIG, getAppHostPort().getHost() + ":" + serverPort); // Server host/port
    ... 
    return new StreamsConfig(props);
  }
When a request arrives to a particular instance of the sellable-inventory-service, if it has the [location, SKU] combination, it will serve the same up, else it will forward the request to an instance of the service that has the data.

This host/port computation is done by using the stream as shown below:
public HostPort hostPortForKey(LocationSku locationSku) throws NotFoundException {
 StreamsMetadata metadataForKey = sellableInventoryStream.metadataForKey(SELLABLE_INVENTORY_STORE, locationSku,
        Topics.SELLABLE_INVENTORY.keySerde().serializer());

 return new HostPort(metadataForKey.host(), metadataForKey.port()); // HOST:Port that has the sellable inventory for location-sku
}

This application serves as the query part of the CQRS architecture if one were to choose the above architectural style.  The part about an instance having to do the forwarding to the right server if it does not have the data seems like an extra hop but don't distributed database systems actually do that behind the scenes?

The sellable-inventory-calculator also fires the Sellable Inventory to a Kafka Topic for downstream consumers to pick up.

sellable-inventory-service

The sellable-inventory-service is the query side of the CQRS architecture of the Retail inventory example.  The Microservice (or a separate process) takes the sellable inventory events from the sellable inventory topic and persists them into its Cassandra store. The Microservice has a REST endpoint that serves up the sellable inventory for a [location, SKU] tuple. Wait up, didn't the sellable-inventory-calculator already have a query side, so why this additional service? The only purpose is to demonstrate the different approaches to the query side, one with Kafka Streams and a Local storage and the second with a dedicated Cassandra instance servicing the Query.

schema-registry

The schema-registry is included in the example for completeness. It stores a versioned history of Avro schemas. The example could easily have been done with JSON rather than Avro. For more on why Schema registries are useful read the Confluent article, Yes Virginia, You Really Do Need a Schema Registry. Note that the example shares a schema jar that contains Avro generated classes across producers and consumers of topics. This is only for demonstration. In a real world, one would generate the schema classes from the definition in the registry using the Schema Registry maven plugin for example.


Running the Example


You can obtain the full example from here: https://github.com/sanjayvacharya/sleeplessinslc/tree/master/inventory-example

The different components of this architecture are Dockerized and using docker-compose we can locally run the containers linking them as needed. Before proceeding ensure that you have Docker and docker-compose installed.  From the root level of the project, issue the following to build all the modules:

>mvn clean install

The build should result in docker images being created for the different artifacts. The docker-compose file has been tried on any other platform other than  Mac OS.  It is quite likely that you might need some tweaking on other platforms to get this to work.  On a Mac, issue the following:

>export DOCKER_KAFKA_HOST=$(ipconfig getifaddr en0)

If you would like to run the individual Microservices in your IDE, then start run the following command to start docker containers for Kafka, Schema Registry and Cassandra.
>docker-compose -f docker-compose-dev.yml up

After this you should be able to start the individual Microservices by invoking their individual Main classes as you would do any Spring Boot application. Ensure that you pass the following VM argument for each service '-Dspring.kafka.bootstrap.servers=${DOCKER_KAFKA_HOST}:9092'' where you replace the DOCKER_KAFKA_HOST with the computed value earlier.
Alternatively, if you would prefer to have all the services started as docker containers, start docker-compose using (make sure you have done a mvn install prior):
>docker-compose up

The above will result in the starting of a  three node Kafka Cluster, Cassandra, Schema Registry, inventory-mutator, sellable-inventory-service and two instances of sellable-inventory-calculator. Why two instances of the latter? This helps demonstrate the forwarding of a request from an instance of sellable-inventory-calculator whose local stream store does NOT have the requested {LOCATION, SKU} to an instance that does have it. Note that when you run docker-compose, you might see services failing to start and then retrying. This is normal as docker-compose does not have a way to ensure deterministic startup dependencies (at least in my findings).
To send an inventory adjustment issue the following for which you would get a response like 'Updated Inventory [inventory_adjustment-2@1]' indicating a success:
>curl -H "Content-Type: application/json" -X POST -d '{"locationId":"STL","sku":"123123", "count": "100"}' http://localhost:9091/inventory/adjustment

To issue a reservation, you can send the following:
curl -H "Content-Type: application/json" -X POST -d '{"locationId":"STL","sku":"123123", "count": "10"}' http://localhost:9091/inventory/reservation

You can obtain the sellable inventory value by either querying the sellable-inventory-service or one of the two instances of sellable-inventory-calculator. For the call to the sellable inventory service use PORT 9093 and for the two instances of sellable-inventory-calculator, use PORTs 9094 and 9095. A successful response should show something like: `{"locationId":"STL","sku":"123123","count":190}' and should be the same answer from all three components.
>curl -s http://localhost:{PORT}/inventory/locations/STL/skus/123123

There is an integration-test module as well but it is not set up to build as a child of the parent pom. That test needs all services to be up using 'docker-compose up' mentioned earlier.  The test class in that module will execute adjustments, reservations and queries for sellable inventory.

Conclusion



I have barely scratched the surface of the potential of Kafka Streams with this article. The process of doing low latency transformations on a stream of data is powerful. Combine this with the ability of having local state stores that are fault tolerant by virtue of them being backed by Kafka, well, you have an architecture that can rock. If you are looking for Read what you Wrote consistency, then while this solution approaches it, you will not have the comfort that you would have of an ACID data store like Oracle etc. If you embrace eventual consistency, KStreams opens a splendorous world....