Search This Blog

Friday, April 17, 2015

Reactive Programming with Jersey and RxJava

Introduction


I have the reactive programming fever right now and am having fun using RxJava (Reactive Extensions for Java).  When I found out about reactive support for Jersey, I figured a BLOG might be warranted.

Problem Domain

It's kinda nice to have problem domain. In our case company Acme had a monolithic retail web app application that sold products. There were many modules composing the monolith such as CheckOut, Account details, etc all contributing to a single deployable. One module among them was Product which had had core product information, pricing information and inventory that were obtained from a database.



The solution worked really well at first. However, over time, the company felt the following pain points -
  • One Deploy thus One Rollback - All or nothing
  • Long build time and testing
  • Information such as price, reviews etc were needed elsewhere as there was some logic associated with these thus having it centralized for consumption would be nice.
  • Coordination nightmare among teams working on different areas to make sure that all came together.
The Architects  decided to extract the concept of Product into separate services.  Over time, the company created separate web services for the base product, inventory, reviews and price. These were aligned with the different teams working on the respective areas and allowed for each area to evolve and scale independently.



All the web service calls were written using JAX-RS/Jersey. The company had totally adopted the micro services bandwagon.  A product would be created for the Web Site by calling the different services and aggregating the result as shown below  -

The following is a sample code that uses just one jersey client that demonstrates the above :
   @GET
   public void getProduct(@PathParam("id") Long productId,
    @Suspended final AsyncResponse response) {
    try {
      // Get the Product
      BaseProduct baseProduct = client.target(serviceUrls.getBaseProductUrl()).path("/products")
          .path(String.valueOf(productId)).request().get(BaseProduct.class);

      // Get reviews of the Product
      Reviews reviews = client.target(serviceUrls.getReviewsUrl()).path("/productReviews")
          .path(String.valueOf(productId)).request().get(Reviews.class);

      // Create a Result Product object - Base Product object does not have price and inventory
      Product resultProduct = resultProductFrom(baseProduct, reviews);

      // Obtain the Price for Result Product
      resultProduct.setPrice(client.target(serviceUrls.getPriceUrl()).path("/productPrice")
          .path(productId.toString()).request().get(ProductPrice.class).getPrice());

      // Get Price and Inventory of Product Options and set the same
      for (Long optionId : optionId(resultProduct.getOptions())) {        
        Double price = client.target(serviceUrls.getPriceUrl()).path("/productPrice")
            .path(optionId.toString()).request().get(ProductPrice.class).getPrice();

        ProductInventory inventory = client.target(serviceUrls.getInventoryUrl())
            .path("/productInventory").path(optionId.toString()).request()
            .get(ProductInventory.class);

        ProductOption option = resultProduct.getOption(optionId);

        option.setInventory(inventory.getCount());
        option.setPrice(price);  
      }

      response.resume(resultProduct);
    }
    catch (Exception e) {
      response.resume(e);
    }
  }

ACME soon found that calling these services had a performance degradation compared to when they were a monolithic product service that obtained all product information using a database operation.The performance degradation was primarily attributed to the serial nature in which the requests were being invoked and results subsequently composed not counting the fact that a database join among disparate tables was far more performant. As an immediate fix, the direction from the Architects was to ensure these services were called in parallel when appropriate -
  • Call the Product Service to obtain core product data serially - You need base product information and its options
  • Call the Review Service asynchronously
  • Call the Pricing Service asynchronously to obtain the price of the Product and Options
  • Call the Inventory Service asynchronously to obtain the inventory of the different Product Options
The above worked and things were more performant, however, the code looked a mess due to the different CountDownLatches, composition logic and Futures they had in play. The Architects met again, hours were spent locked in a room while pizza's where delivered under their door until they heard of Reactive Programming in general and Reactive Jersey with RxJava in particular! Bling, Bling, bulbs coming to light illuminating their halo's and away they go reacting to their new idea. They found that Reactive Jersey promotes a clean API to handle parallel execution and composition while not having to worry about Countdown latches and the likes. The resulting code developed looked like -
  
 @GET
  public void observableProduct(@PathParam("id") final Long productId,
    @Suspended final AsyncResponse response) {

    // An Observable of a Result Product from 
    Observable<Product> product = Observable.zip(baseProduct(productId), reviews(productId),
      new Func2<BaseProduct, Reviews, Product>() {

        @Override
        public Product call(BaseProduct product, Reviews review) {
          return resultProductFrom(product, review);
        }
      });

    // All Product
    Observable<Long> productIds = productAndOptionIds(product);
    
    // Observable of Options only
    Observable<Long> optionIds = productIds.filter(new Func1<Long, Boolean>() {

      @Override
      public Boolean call(Long prodId) {
        return !prodId.equals(productId);
      }

    });
    
    // Set Inventory Data
    product
        .zipWith(inventories(productId, optionIds).toList(), new Func2<Product, List<ProductInventory>, Product>() {

          @Override
          public Product call(Product resultProduct, List<ProductInventory> productInventories) {
            for (ProductInventory inventory : productInventories) {
              if (!inventory.getProductId().equals(resultProduct.getProductId())) {
                resultProduct.getOption(inventory.getProductId())
                    .setInventory(inventory.getCount());
              }
            }
            return resultProduct;
          }
        })
        // Set Price Data
        .zipWith(prices(productIds).toList(), new Func2<Product, List<ProductPrice>, Product>() {

          @Override
          public Product call(Product resultProduct, List<ProductPrice> prices) {
            for (ProductPrice price : prices) {
              if (price.getProductId().equals(resultProduct.getProductId())) {
                resultProduct.setPrice(price.getPrice());
              }
              else {
                resultProduct.getOption(price.getProductId()).setPrice(price.getPrice());
              }
            }
            return resultProduct;
          }
        }).observeOn(Schedulers.io()).subscribe(new Action1<Product>() {

          @Override
          public void call(Product productToSet) {
            response.resume(productToSet);
          }

        }, new Action1<Throwable>() {

          @Override
          public void call(Throwable t1) {
            response.resume(t1);
          }
        });
  }
  
  /**
   * @return an Observable of the BaseProduct
   */
  private Observable<BaseProduct> baseProduct(Long productId) {
    return RxObservable
    .from(
      client.target(serviceUrls.getBaseProductUrl()).path("/products").path(String.valueOf(productId)))
    .request().rx().get(BaseProduct.class);
  }
  
  /**
   * @return An Observable of the Reviews
   */
  private Observable<Reviews> reviews(Long productId) {
    return RxObservable
        .from(client.target(serviceUrls.getReviewsUrl()).path("/productReviews").path(String.valueOf(productId)))
        .request().rx().get(Reviews.class);
  }
  
  /**
   * @return An Observable having Product and Option Ids
   */
  private Observable<Long> productAndOptionIds(Observable<Product> product) {
    return product.flatMap(new Func1<Product, Observable<Long>>() {

      @Override
      public Observable<Long> call(Product resultProduct) {
        return Observable.from(Iterables.concat(
          Lists.<Long> newArrayList(resultProduct.getProductId()),
          Iterables.transform(resultProduct.getOptions(), new Function<ProductOption, Long>() {

            @Override
            public Long apply(ProductOption option) {
              return option.getProductId();
            }
          })));
      }
    });
  }
  
  /**
   * Inventories returns back inventories of the Primary product and options. 
   * However, for the primary product, no web service call is invoked as inventory of the main product is the sum of 
   * inventories of all options. However, a dummy ProductInventory is created to maintain order during final concatenation.
   *
   * @param productId Id of the Product
   * @param optionIds Observable of OptionIds
   * @return An Observable of Product Inventory
   */
  private Observable<ProductInventory> inventories(Long productId, Observable<Long> optionIds) {
    return Observable.just(new ProductInventory(productId, 0))
        .concatWith(optionIds.flatMap(new Func1<Long, Observable<ProductInventory>>() {

      @Override
      public Observable<ProductInventory> call(Long optionId) {
        return RxObservable
            .from(client.target(serviceUrls.getInventoryUrl()).path("/productInventory").path("/{productId}"))
            .resolveTemplate("productId", optionId).request().rx().get(ProductInventory.class);
      }
    }));
  }
  
  /**
   * @return An Observable of ProductPrice for product Ids
   */
  private Observable<ProductPrice> prices(Observable<Long> productIds) {
    return productIds
        .flatMap(new Func1<Long, Observable<ProductPrice>>() {

          @Override
          public Observable<ProductPrice> call(Long productId) {
            return RxObservable
                .from(client.target(serviceUrls.getPriceUrl()).path("/productPrice").path("/{productId}"))
                .resolveTemplate("productId", productId.toString()).request().rx()
                .get(ProductPrice.class);
          }
        });
  }

Phew! A lot of code for something simple huh? The above code is demonstrated using jdk 7. With jdk 8 Lambda's, it should be far more succinct. However, I will admit that there is more code but not code that is not clear (hopefully). The important thing is that we are not dealing with the 'callback hell' associated with Java Futures or Invocation callbacks. That said, the performance difference between the serial execution and the Reactive Jersey version appears to strongly favor the Reactive Jersey version by a significant magnitude. By taking the serial call and turning in into parallel execution, I am certain you will achieve closer numbers to the Reactive Jersey version but at the cost of having to maintain latches etc.

Running the Example


An example application can be obtained from https://github.com/sanjayvacharya/sleeplessinslc/tree/master/reactive-jersey. The example does not have the Acme web site but has a Product-Gateway that serves as an orchestration layer. There are two resources in the Product-Gateway, one that returns the product using serial orchestration and a second that uses Reactive Jersey clients.  An Integration Test project exists where a client is used to invoke both the Serial and Reactive resources and logs an average response time.  Checkout the project, and simply execute a 'mvn install' at the root level of the project.

Summary

  • Reactive programming appears more verbose. Possible that I have not optimized it well enough, tips would be appreciated. jdk-8 Lamda's would regardless reduce the footprint.
  • Requires a mind shift regarding how you write code.
  • Aysnc could be introduced as required using Observable.subscribeOn() and Observable.observeOn() features as an when required. Read more about it on a Stack Overflow post.
  • It's a style of programming not a particular technology. RxJava seems to be emerging as the library of choice for Java.
  • Question whether something benefits from being Reactive before introducing Reactive code.
  • Jersey Reactive is a Glassfish implementation and not part of a JSR. So....
  • Monlith to Microservices might only be trading one problem for another so observe caution and be judicious in your selection of what you choose to make into a microservice
  • Batch request that fetches multiple items is not necessarily faster than multiple requests for a each item.
Finally, if there is a way to make my Reactive jersey code less verbose, let me know...all I ask is don't say collapse the services back to the monolith :-)