Introduction
I have the reactive programming fever right now and am having fun using RxJava (Reactive Extensions for Java). When I found out about reactive support for Jersey, I figured a BLOG might be warranted.
Problem Domain
It's kinda nice to have problem domain. In our case company Acme had a monolithic retail web app application that sold products. There were many modules composing the monolith such as CheckOut, Account details, etc all contributing to a single deployable. One module among them was Product which had had core product information, pricing information and inventory that were obtained from a database.
The solution worked really well at first. However, over time, the company felt the following pain points -
All the web service calls were written using JAX-RS/Jersey. The company had totally adopted the micro services bandwagon. A product would be created for the Web Site by calling the different services and aggregating the result as shown below -
The following is a sample code that uses just one jersey client that demonstrates the above :
The solution worked really well at first. However, over time, the company felt the following pain points -
- One Deploy thus One Rollback - All or nothing
- Long build time and testing
- Information such as price, reviews etc were needed elsewhere as there was some logic associated with these thus having it centralized for consumption would be nice.
- Coordination nightmare among teams working on different areas to make sure that all came together.
All the web service calls were written using JAX-RS/Jersey. The company had totally adopted the micro services bandwagon. A product would be created for the Web Site by calling the different services and aggregating the result as shown below -
The following is a sample code that uses just one jersey client that demonstrates the above :
@GET public void getProduct(@PathParam("id") Long productId, @Suspended final AsyncResponse response) { try { // Get the Product BaseProduct baseProduct = client.target(serviceUrls.getBaseProductUrl()).path("/products") .path(String.valueOf(productId)).request().get(BaseProduct.class); // Get reviews of the Product Reviews reviews = client.target(serviceUrls.getReviewsUrl()).path("/productReviews") .path(String.valueOf(productId)).request().get(Reviews.class); // Create a Result Product object - Base Product object does not have price and inventory Product resultProduct = resultProductFrom(baseProduct, reviews); // Obtain the Price for Result Product resultProduct.setPrice(client.target(serviceUrls.getPriceUrl()).path("/productPrice") .path(productId.toString()).request().get(ProductPrice.class).getPrice()); // Get Price and Inventory of Product Options and set the same for (Long optionId : optionId(resultProduct.getOptions())) { Double price = client.target(serviceUrls.getPriceUrl()).path("/productPrice") .path(optionId.toString()).request().get(ProductPrice.class).getPrice(); ProductInventory inventory = client.target(serviceUrls.getInventoryUrl()) .path("/productInventory").path(optionId.toString()).request() .get(ProductInventory.class); ProductOption option = resultProduct.getOption(optionId); option.setInventory(inventory.getCount()); option.setPrice(price); } response.resume(resultProduct); } catch (Exception e) { response.resume(e); } }
- Call the Product Service to obtain core product data serially - You need base product information and its options
- Call the Review Service asynchronously
- Call the Pricing Service asynchronously to obtain the price of the Product and Options
- Call the Inventory Service asynchronously to obtain the inventory of the different Product Options
@GET public void observableProduct(@PathParam("id") final Long productId, @Suspended final AsyncResponse response) { // An Observable of a Result Product from Observable<Product> product = Observable.zip(baseProduct(productId), reviews(productId), new Func2<BaseProduct, Reviews, Product>() { @Override public Product call(BaseProduct product, Reviews review) { return resultProductFrom(product, review); } }); // All Product Observable<Long> productIds = productAndOptionIds(product); // Observable of Options only Observable<Long> optionIds = productIds.filter(new Func1<Long, Boolean>() { @Override public Boolean call(Long prodId) { return !prodId.equals(productId); } }); // Set Inventory Data product .zipWith(inventories(productId, optionIds).toList(), new Func2<Product, List<ProductInventory>, Product>() { @Override public Product call(Product resultProduct, List<ProductInventory> productInventories) { for (ProductInventory inventory : productInventories) { if (!inventory.getProductId().equals(resultProduct.getProductId())) { resultProduct.getOption(inventory.getProductId()) .setInventory(inventory.getCount()); } } return resultProduct; } }) // Set Price Data .zipWith(prices(productIds).toList(), new Func2<Product, List<ProductPrice>, Product>() { @Override public Product call(Product resultProduct, List<ProductPrice> prices) { for (ProductPrice price : prices) { if (price.getProductId().equals(resultProduct.getProductId())) { resultProduct.setPrice(price.getPrice()); } else { resultProduct.getOption(price.getProductId()).setPrice(price.getPrice()); } } return resultProduct; } }).observeOn(Schedulers.io()).subscribe(new Action1<Product>() { @Override public void call(Product productToSet) { response.resume(productToSet); } }, new Action1<Throwable>() { @Override public void call(Throwable t1) { response.resume(t1); } }); } /** * @return an Observable of the BaseProduct */ private Observable<BaseProduct> baseProduct(Long productId) { return RxObservable .from( client.target(serviceUrls.getBaseProductUrl()).path("/products").path(String.valueOf(productId))) .request().rx().get(BaseProduct.class); } /** * @return An Observable of the Reviews */ private Observable<Reviews> reviews(Long productId) { return RxObservable .from(client.target(serviceUrls.getReviewsUrl()).path("/productReviews").path(String.valueOf(productId))) .request().rx().get(Reviews.class); } /** * @return An Observable having Product and Option Ids */ private Observable<Long> productAndOptionIds(Observable<Product> product) { return product.flatMap(new Func1<Product, Observable<Long>>() { @Override public Observable<Long> call(Product resultProduct) { return Observable.from(Iterables.concat( Lists.<Long> newArrayList(resultProduct.getProductId()), Iterables.transform(resultProduct.getOptions(), new Function<ProductOption, Long>() { @Override public Long apply(ProductOption option) { return option.getProductId(); } }))); } }); } /** * Inventories returns back inventories of the Primary product and options. * However, for the primary product, no web service call is invoked as inventory of the main product is the sum of * inventories of all options. However, a dummy ProductInventory is created to maintain order during final concatenation. * * @param productId Id of the Product * @param optionIds Observable of OptionIds * @return An Observable of Product Inventory */ private Observable<ProductInventory> inventories(Long productId, Observable<Long> optionIds) { return Observable.just(new ProductInventory(productId, 0)) .concatWith(optionIds.flatMap(new Func1<Long, Observable<ProductInventory>>() { @Override public Observable<ProductInventory> call(Long optionId) { return RxObservable .from(client.target(serviceUrls.getInventoryUrl()).path("/productInventory").path("/{productId}")) .resolveTemplate("productId", optionId).request().rx().get(ProductInventory.class); } })); } /** * @return An Observable of ProductPrice for product Ids */ private Observable<ProductPrice> prices(Observable<Long> productIds) { return productIds .flatMap(new Func1<Long, Observable<ProductPrice>>() { @Override public Observable<ProductPrice> call(Long productId) { return RxObservable .from(client.target(serviceUrls.getPriceUrl()).path("/productPrice").path("/{productId}")) .resolveTemplate("productId", productId.toString()).request().rx() .get(ProductPrice.class); } }); }
Phew! A lot of code for something simple huh? The above code is demonstrated using jdk 7. With jdk 8 Lambda's, it should be far more succinct. However, I will admit that there is more code but not code that is not clear (hopefully). The important thing is that we are not dealing with the 'callback hell' associated with Java Futures or Invocation callbacks. That said, the performance difference between the serial execution and the Reactive Jersey version appears to strongly favor the Reactive Jersey version by a significant magnitude. By taking the serial call and turning in into parallel execution, I am certain you will achieve closer numbers to the Reactive Jersey version but at the cost of having to maintain latches etc.
Running the Example
An example application can be obtained from https://github.com/sanjayvacharya/sleeplessinslc/tree/master/reactive-jersey. The example does not have the Acme web site but has a Product-Gateway that serves as an orchestration layer. There are two resources in the Product-Gateway, one that returns the product using serial orchestration and a second that uses Reactive Jersey clients. An Integration Test project exists where a client is used to invoke both the Serial and Reactive resources and logs an average response time. Checkout the project, and simply execute a 'mvn install' at the root level of the project.
Summary
- Reactive programming appears more verbose. Possible that I have not optimized it well enough, tips would be appreciated. jdk-8 Lamda's would regardless reduce the footprint.
- Requires a mind shift regarding how you write code.
- Aysnc could be introduced as required using Observable.subscribeOn() and Observable.observeOn() features as an when required. Read more about it on a Stack Overflow post.
- It's a style of programming not a particular technology. RxJava seems to be emerging as the library of choice for Java.
- Question whether something benefits from being Reactive before introducing Reactive code.
- Jersey Reactive is a Glassfish implementation and not part of a JSR. So....
- Monlith to Microservices might only be trading one problem for another so observe caution and be judicious in your selection of what you choose to make into a microservice
- Batch request that fetches multiple items is not necessarily faster than multiple requests for a each item.