Search This Blog

Loading...

Sunday, December 6, 2015

Maven Integration Testing of Spring Boot Cloud Netflix Eureka Services with Docker

Introduction


My previous blog was about using Spring Cloud Netflix and my experiences with it.  In this BLOG, I will share how one could perform a maven integration test that involves multiple Spring Boot services that use Netflix Eureka as a service registry. I have utilized a similar example as the one I used in my BLOG about Reactive Programming with Jersey and RxJava where a product is assembled from different JAX-RS microservices. For this BLOG though, each of the microservices are written in Spring Boot using Spring Cloud Netflix Eureka for service discovery. A service which I am calling the Product Gateway, assembles data from the independent product based microservices to hydrate a Product.

Product Gateway


A mention of the Product Gateway is warranted for completeness. The following represents the ProductGateway classes where the ObservableProductResource uses a ProductService which in turn utilizes the REST clients of the different services and RxJava to hydrate a Product.



The ObservableProductResource Spring Controller class is shown below which uses a DeferredResult for Asynchronous processing:
@RestController
public class ObservableProductResource {  
  @Inject
  private ProductService productService;
  
  @RequestMapping("/products/{productId}")
  public DeferredResult<Product> get(@PathVariable Long productId) {
    DeferredResult<Product> deferredResult = new DeferredResult<Product>();
    Observable<Product> productObservable = productService.getProduct(productId);

    productObservable.observeOn(Schedulers.io())
    .subscribe(productToSet -> deferredResult.setResult(productToSet), t1 -> deferredResult.setErrorResult(t1));
    
    return deferredResult;
  }
}
Each Micro Service client is declaratively created using Netflix Feign. As an example of one such client, the BaseProductClient, is shown below:
@FeignClient("baseproduct")
public interface BaseProductClient {
  @RequestMapping(method = RequestMethod.GET, value = "/baseProduct/{id}", consumes = "application/json")
  BaseProduct getBaseProduct(@PathVariable("id") Long id);
}

What does the Integration Test do?

The primary purpose is to test the actual end to end integration of the Product Gateway Service. As a maven integration test, the expectancy is that it would entail:
  • Starting an instance of Eureka
  • Starting Product related services registering them with Eureka
  • Starting Product Gateway Service and registering it with Eureka
  • Issuing a call to Product Gateway Service to obtain said Product
  • Product Gateway Service discovering instances of Product microservices like Inventory, Reviews and Price from Eureka
  • Product Gateway issuing calls to each of the services using RxJava and hydrating a Product
  • Asserting the retrieval of the Product and shutting down the different services
The test itself would be a maven JUnit integration test.
As the services are bundled as JARs with embedded containers, they present a challenge to start up and tear down during an integration test. 
One option is to create equivalent WAR based artifacts for testing purposes only and use the maven-cargo plugin to deploy each of them under a separate context of the container and test the gateway. That however does mean creating a WAR that might never really be used apart from testing purposes.
Another option to start the different services is using the exec maven plugin and/or some flavor(hack) to launch external JVMs.
Yet another option is write custom class loader logic [to prevent stomping of properties and classes of individual microservices] and launch the different services in the same integration test JVM.
All these are options but what appealed to me was to use Docker containers to start each of these microservice JVMs and run the integration test.  So why Docker? Docker seems a natural fit to compose an application and distribute it across a development environment as a consistent artifact. The benefits during micro service based integration testing where one can simply pull in different docker images such as services, data stores etc of specific versions without dealing with environment based conflicts is what I find appealing.

Creating Docker Images 


As part of building each of the web services, it would be ideal to create a Docker image. There are many maven plugins out there to create Docker images [actually too many]. In the example, we have used the one from Spotify.  The building of the Docker image using the Spotify plugin for Spring Boot applications is nicely explained in the BLOG from spring.io, Spring Boot with Docker.
What I would see happening is that as part of the build process, the Docker image would be published to a docker repository which is internal to an organization and then made available for other consumers.

Integration Test


As part of the pre-integration test phase of maven, we would like to start up the Docker containers representing the different services. In order for the gateway container to work with the other service containers, we need to be able to link the Docker containers. I was not able to find a way to do that using the Spotify plugin. What I instead found myself doing is utilizing another maven plugin for Docker by Roland HuB which has much better documentation and more features. Shown below is the plugin configuration for the integration test.

<plugin>
  <groupId>org.jolokia</groupId>
  <artifactId>docker-maven-plugin</artifactId>
  <version>0.13.6</version>
  <configuration>
    <logDate>default</logDate>
    <autoPull>true</autoPull>
    <images>
      <image>
        <!-- Eureka Server -->
        <alias>eureka</alias>
        <name>docker/eureka</name>
        <run>
          <wait>
            <http>
              <url>http://localhost:8761</url>
              <method>GET</method>
              <status>200</status>
            </http>
            <time>30000</time>
          </wait>
          <log>
            <prefix>EEEE</prefix>
            <color>green</color> <!-- Color the output green -->
          </log>
          <ports>
            <port>8761:8761</port> <!-- Local to container port mapping -->
          </ports>
          <env>
            <eureka.instance.hostname>eureka</eureka.instance.hostname> <!-- Override host name property -->
          </env>
        </run>
      </image>
      <image>
        <alias>baseproduct</alias>
        <name>docker/baseproduct</name>
        <run>
          <wait>
            <http>
              <url>http://localhost:9090</url>
              <method>GET</method>
              <status>200</status>
            </http>
            <time>30000</time>
          </wait>
          <log>
            <prefix>EEEE</prefix>
            <color>blue</color>
          </log>
          <ports>
            <port>9090:9090</port>
          </ports>
          <links>
            <link>eureka</link> <!-- Link to Eureka Docker image -->
          </links>
          <env>
            <!-- Notice the system property overriding of the eureka service Url -->
            <eureka.client.serviceUrl.defaultZone>http://eureka:8761/eureka/</eureka.client.serviceUrl.defaultZone>
          </env>
        </run>
      </image>
      <!--....Other service containers like price, review, inventory-->
      <image>
        <alias>product-gateway</alias>
        <name>docker/product-gateway</name>
        <run>
          <wait>
            <http>
              <url>http://localhost:9094</url>
              <method>GET</method>
              <status>200</status>
            </http>
            <time>30000</time>
          </wait>
          <log>
            <prefix>EEEE</prefix>
            <color>blue</color>
          </log>
          <ports>
            <port>9094:9094</port>
          </ports>
          <links>
            <!-- Links to all other containers -->
            <link>eureka</link>
            <link>baseproduct</link>
            <link>price</link>
            <link>inventory</link>
            <link>review</link>
          </links>
          <env>
            <eureka.client.serviceUrl.defaultZone>http://eureka:8761/eureka/</eureka.client.serviceUrl.defaultZone>
            <!-- Setting this property to prefer ip address, else Integration will fail as it does not know host name of product-gateway container-->
            <eureka.instance.prefer-ip-address>true</eureka.instance.prefer-ip-address>
          </env>
        </run>
      </image>
    </images>
  </configuration>
  <executions>
    <execution>
      <id>start</id>
      <phase>pre-integration-test</phase>
      <goals>
        <goal>start</goal>
      </goals>
    </execution>
    <execution>
      <id>stop</id>
      <phase>post-integration-test</phase>
      <goals>
        <goal>stop</goal>
      </goals>
    </execution>
  </executions>
</plugin>
One of the nice features is the syntax color prefix of each containers messages, this gives one a sense of visual separation among the multitude of containers that are started. The Integration Test itself is shown below:
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(classes = { ProductGatewayIntegrationTest.IntegrationTestConfig.class })
public class ProductGatewayIntegrationTest {
  private static final Logger LOGGER = Logger.getLogger(ProductGatewayIntegrationTest.class);
  
  /**
   * A Feign Client to obtain a Product
   */
  @FeignClient("product-gateway")
  public static interface ProductClient {
    @RequestMapping(method = RequestMethod.GET, value = "/products/{productId}", consumes = "application/json")
    Product getProduct(@PathVariable("productId") Long productId);
  }

  @EnableFeignClients
  @EnableDiscoveryClient
  @EnableAutoConfiguration
  @ComponentScan
  @Configuration
  public static class IntegrationTestConfig {}

  // Ribbon Load Balancer Client used for testing to ensure an instance is available before invoking call
  @Autowired
  LoadBalancerClient loadBalancerClient;

  @Inject
  private ProductClient productClient;

  static final Long PRODUCT_ID = 9310301L;

  @Test(timeout = 30000)
  public void getProduct() throws InterruptedException {
    waitForGatewayDiscovery();
    Product product = productClient.getProduct(PRODUCT_ID);
    assertNotNull(product);
  }

  /**
   * Waits for the product gateway service to register with Eureka
   * and be available on the client.
   */
  private void waitForGatewayDiscovery() {
    while (!Thread.currentThread().isInterrupted()) {
      LOGGER.debug("Checking to see if an instance of product-gateway is available..");
      ServiceInstance choose = loadBalancerClient.choose("product-gateway");
      if (choose != null) {
        LOGGER.debug("An instance of product-gateway was found. Test can proceed.");
        break;
      }
      try {
        LOGGER.debug("Sleeping for a second waiting for service discovery to catch up");
        Thread.sleep(1000);
      }
      catch (InterruptedException e) {
        Thread.currentThread().interrupt();
      }
    }
  }
}
The test uses the LoadBalancerClient client from Ribbon to ensure an instance of 'product-gateway' can be discovered from Eureka prior to using the Product client to invoke the gateway service to obtain back a product.

Running the Example


The first thing you need to do is make sure you have Docker installed on your machine. Once you have Docker installed, clone the example from github (https://github.com/sanjayvacharya/sleeplessinslc/tree/master/product-gateway-docker) and then execute a mvn install from the root level of the project. This will result in the creation of Docker images and the running of the Docker based integration tests of the product gateway. Cheers!

Friday, November 6, 2015

Spring Cloud Netflix Eureka Example...It's so Bootiful

Introduction


Been looking at Netflix Eureka for Service Registry/Discovery and thought I'd share some thoughts via an example. My interest was more around Eureka 2.0 and I was eagerly awaiting their release in early 2015 per swags(?) but that did not materialize with the team stating its better to adopt Eureka 1.0. Not very thrilled I must say :-(.

Looking at Eureka 1.0, there appears two routes that one could go:
  1. Use Stock Netflix Eureka
  2. Use  Spring Cloud Netflix which is a Netflix Eureka and garnished with other Netflix OSS components, heated to a Spring temperature and presented as a Spring Boot Dish.
This Blog will utilize Spring Cloud Netflix and my favorite Notes Web Service to demonstrate a workable project that one can check out play with to understand Service Discovery with Eureka. The example will register an instance of  the Notes Service with Eureka Server (the registry) and a Notes Client will discover the service instance and issue a request to it simulating a full integration. Additional Technologies used:
  • RestAssured - Rest Assured is used a fluent testing of the service API   
  • Spring Fox - Used to expose Swagger UI for service documentation and testing
  • Netlflix Ocelli - Client Side Load Balancing
For the Client side of the application, I choose to use Netflix Ocelli for Client side load balancing along with Spring Frameworks RestTemplate. Why Ocelli over Ribbon for Client Side Load Balancing? Well,  Netflix seems to be looking at Ocelli as the new Ribbon, at least that is what this email chain seems to indicate.

Eureka Components


The following are some components one deals with when working with Eureka

EurekaClientConfig

EurekaClientConfig provides the configuration required by eureka clients to register an instance with Eureka. Netflix Eureka provides an implementation DefaultEurekaClientConfig that is configured via a property file eureka.client.props. Spring Cloud Netflix extends EurekaClientConfig and provides a EurekaClientConfigBean which implements and EurekaClientConfig and works with Spring properties. Properties prefixed by eureka.client will map into the EurekaClientConfigBean via Project Lombok magic.

EurekaServerConfig

Follows a similar approach like the ClientConfig but this is used for Eureka Server properties

EurekaInstanceConfig

Provides the configuration required by an instance to register with Eureka server. If building a service, you might want to tweak some of  knobs here. You can control properties like:
  • Name of Service
  • Secure/Non-secure ports of operation
  • Lease renewal and expiration
  • Home page URL and Status page URL
If working with vanilla Netflix Eureka, you could use MyDataCenterInstanceConfig (for non-AWS data center or the CloudInstanceConfig (AWS). Spring Cloud Netflix users get a EurekaInstanceConfigBean spruced with Lombok magic to control their properties. The properties are prefixed by eureka.instance.

Discovery Client

Discovery Client uses a 'jersey 1' client to communicate with Eureka Server to:
  • Register a service instance
  • Renew the lease of a Service instance
  • Cancel of a lease of service instance
  • Query Eureka Server for registered instances
DiscoveryClient is configured by the DiscoveryManager with has information of the EurekaClientConfig and the EurekaInstanceConfig

The Notes Example

The Notes example defines a Server side component that will register with Netflix Eureka, a Client side library, the Notes Client which discovers a  Notes Service Instance from Eureka and an integration test that demonstrates this lifecycle.

Notes Service Code


The Heart of the Service code is the declaration of the Spring Boot Application as shown below:

@SpringBootApplication // Spring Boot 
@EnableEurekaClient    // Enable Eureka Client
@RestController 
@EnableSwagger2        // Swagger  
public class Application {
  @RequestMapping("/")
  public String home() {
    return "This is the Notes App";
  }

  @Bean
  public Docket swaggerSpringMvcPlugin() { // Minimalistic setting for Swagger Support
    return new Docket(DocumentationType.SWAGGER_2)
            .select()
            .paths(PathSelectors.any())
            .build().pathMapping("/");
  }

  public static void main(String args[]) {
    SpringApplication.run(Application.class, args);
  }
}

@RestController
@Api(basePath = "/notes", value = "Notes", description = "Note Creation and Retrieval", produces = "application/xml")
public class NotesController {
  @Inject
  private NotesService notesService;

  @RequestMapping(value = "/notes", method = RequestMethod.POST)
  public Long create(@RequestBody Note note) {
    return notesService.createNote(note);
  }

  @RequestMapping(value = "/notes/{id}", method = RequestMethod.GET, produces = MediaType.APPLICATION_XML_VALUE)
  public Note get(@PathVariable Long id) {
    return notesService.getNote(id);
  }
}
In the above Boot application, we enabled Eureka Client via the @EnableEurekaClient annotation and also set up swagger support by creating a Docket and adding the @EnableSwagger2 annotation. The JSON Swagger resource is available at http://localhost:9090/v2/api-docs and the Swagger HTML is available at http://localhost:9090/swagger-ui.html.  The configuration for the application is driven by a simple YAML file as shown below where the port, location of Eureka server instance and a custom instance Id are defined:
spring:
  application:
    name: Notes

server.port: 9090

eureka:
  client:
    serviceUrl:
          defaultZone: http://localhost:8761/eureka/v2/
  instance:
    metadataMap:
      instanceId: ${spring.application.name}:${spring.application.instance_id:${random.value}}
If one wished to integration test (exercise DB and other calls) in the Service application without having to worry about the Eureka registration and discovery, you could simply turn of the eureka client and use, say RestAssured to validate your service as shown below.
@RunWith(SpringJUnit4ClassRunner.class)
@SpringApplicationConfiguration(classes = Application.class)
@WebAppConfiguration
@IntegrationTest({"server.port=0", // Pick an ephemeral port
                  "eureka.client.enabled=false" // Prevent registering/discovering with Eureka
})
public class NotesControllerTest {

  @Value("${local.server.port}")
 private  int port;

  @Before
  public void setUp() {
    RestAssured.port = port; 
  }

  @Test
  public void createAndGetNote() {
    given().contentType(MediaType.APPLICATION_XML)
           .body(new Note("Test"))
            .when()
            .post("http://localhost:" + port + "/notes")
            .then()
            .statusCode(200).body(equalTo("1"));

    when().get("/notes/{id}", 1).then()
            .assertThat()
            .statusCode(200)
             .body("note.content", equalTo("Test"));
  }
}

Notes Client Code


For the NoteClient, as mentioned, Ocelli is used as a Load Balancer with RestTemplate used to make the REST call.

public class NotesClientImpl implements NotesClient, Closeable {
  private final RestTemplate restTemplate;
  private final Subscription subscription;
  private final AtomicReference<List<InstanceInfo>> result;
  private final RoundRobinLoadBalancer<InstanceInfo> instanceInfoLoadBalancer;

  public NotesClientImpl(RestTemplate restTemplate, DiscoveryClient discoveryClient,
                         long refreshInterval, TimeUnit refreshIntervalTimeUnit) {
    this.restTemplate = restTemplate;
    this.result = new AtomicReference<List<InstanceInfo>>(new ArrayList<>());
    this.subscription = new EurekaInterestManager(discoveryClient).newInterest()
            .forApplication("Notes") // Notes Service
            .withRefreshInterval(refreshInterval, refreshIntervalTimeUnit) // Ocelli Refresh Interval
            .asObservable()
            .compose(InstanceCollector.<InstanceInfo>create())
            .subscribe(RxUtil.set(result));
    instanceInfoLoadBalancer = RoundRobinLoadBalancer.<InstanceInfo>create(); // Round Robin
  }

  private String getServiceInstanceUrl() {
    InstanceInfo instanceInfo = instanceInfoLoadBalancer.choose(result.get()); // Choose an instance
    if (instanceInfo != null) {
      return "http://" + instanceInfo.getIPAddr() + ":" + instanceInfo.getPort();
    }
    throw new RuntimeException("Service Not available");
  }


  @Override
  public Note getNote(Long noteId) {
    return restTemplate.getForEntity(getServiceInstanceUrl() + "/notes/{id}", Note.class, noteId).getBody(); 
  }
  ...
}
Ocelli uses an 'interest' manager to register a subscription for changes to Notes Service. A round robin load balancer is used in the example. The NotesClient is set up in the NotesClientConfig.

Notes Integration Test


NotesIntegrationTest  is where things get interesting. The integration test will -
  • Start a Eureka Server in the pre-integration-test phase of maven using Cargo. Note that we are using the stock Netflix eureka WAR for this. One could also use the Spring Cloud version via an exec maven plugin.
  • Have Notes Service start on a random port and register with the Eureka Server
  • Have the Notes Client create and obtain a 'Note' by discovering the Notes Service from Eureka
Eureka operations are usually set in range of 20 to 30 seconds for things like registration, heartbeart, renewal and registry fetching. That works in a production environment but in an integration test waiting for so long is not an option. Thankfully, the Spring Boot test framework allows for easy overriding of these properties.
@RunWith(SpringJUnit4ClassRunner.class)
@SpringApplicationConfiguration(classes = {Application.class, NotesClientConfig.class}) // Note the Import of Server and Client
@WebIntegrationTest(value = {"eureka.instance.leaseRenewalIntervalInSeconds=10", // Lease Interval
        "eureka.client.instanceInfoReplicationIntervalSeconds=1",
        "eureka.client.initialInstanceInfoReplicationIntervalSeconds=1",
        "eureka.client.registryFetchIntervalSeconds=1",
        "eureka.client.serviceUrl.defaultZone=http://localhost:9000/eureka/v2/"}, // Overriding eureka location
        randomPort =  true)
public class NotesIntegrationTest {

  @Inject
  private NotesClient notesClient; // Notes Client injected

  @Value("${ocelliRefreshIntervalSeconds}")
  private Long ocelliRefreshIntervalSeconds; // How often Ocelli Client has been set up to refresh
  
  @Test
  public void createAndGetNote() throws InterruptedException {
    // Wait for Notes App to register
    waitForNotesRegistration(); // Waits for Notes Service to register before proceeding
    Thread.sleep((ocelliRefreshIntervalSeconds * 1000) + 1000L); // Ocelli might not be in sync
    Note note = new Note("Test");
    Long noteId = notesClient.createNote(note); // Create a note
    assertEquals(note, notesClient.getNote(noteId)); // Read the node
  }

  @Inject
  private DiscoveryClient discoveryClient;

  private void waitForNotesRegistration() throws InterruptedException {
   /* Discovery client is used to make sure service is registered before the Notes Client gets a chance to discover and exercise API */
  }
}
The important thing here to note is that I am using the 'war' version of the Netflix Eureka server to start up before the integration test runs. It is equally possible to launch the Spring Boot 'jar' version of the eureka server via the exec maven plugin prior to the start of the integration test.

Parting Thoughts


For the curious, this Notes Spring Cloud Netflix Eureka example can be found at sleeplessinslc github.

I would also like to share the below as my personal 'opinion' only

Netflix Eureka

The good - 

Awesome project, a Service Registry for the Web. Apps in any language can participate. REST + Jersey, preaching to the choir. More importantly, developed with a profound understanding of failure scenarios around a Microservice 'service registry'. Nitesh Kants presentation is a must see.

Good starter documentation and responsive folks on the mailing list. 

The bad - 

I understand Eureka was designed for AWS usage primarily by Netflix.  Maybe a more modular (plugin) approach would have been nicer ensuring a clean separation between AWS and non-AWS code, especially considering it is a fine candidate for private cloud.

Discovery client fetches entire registry rather than information about specific services. Eureka 2 architecture looks better in that regard where one only gets data about services they are interested in.

Jersey 1 - Lets update to Jersey 2.X folks! 

Eureka UI looks pretty horrid and that is coming from a 'server side' only programmer :-). 

There is an issue where shutting down a service instance leads to shutting down all other services on that host which is concerning if hosting multiple services on a node.

Ocelli vs. Ribbon. Can we have clear direction? Ocelli has not had a release since July 10th, Ribbon though has a more recent release. So is Ocelli really the new Ribbon? 

Spring Cloud Netflix

The good -

If using a Spring ecosystem, a no brainer to adopt. Meshes really well with Spring properties and enables Hystrix and other Netflix OSS.

Spring Boot is pretty sweet but you need to work in a 'controlled' environment in the 'Spring Boot Way' to enjoy full benefit. 

Spring Cloud Netflix re-bundled Eureka server is much more visually pleasing than the core Eureka UI. I believe they also have a workaround with reference to the Shutdown bug I mentioned.

The bad - 

You sold yourself to the Bootiful devil :-)
  • Custom extension classes of Netflix interfaces (EurekaClientConfigBean..etc), rather than bridging
  • Version of Eureka is older
  • Documentation does not match up with implementation. Case in point the property "spring.cloud.client.hostname" defined in their documentation is not even part of current release. I spent some time chasing this
  • Do we really need Lombok?
So, what I want to do next?

Leaning back on my reactive example,  lets say I have an aggregator Product Service that I want to test that utilizes a Base Product Service, a Price Service and an Inventory Service, all developed with Spring Cloud Eureka, how would I write a maven integration test for this? Off to Docker land my friends, onward and upward!

Tuesday, September 29, 2015

RxJava Presentation

I have been an employee of Overstock.com for close to 8 years now. Every year Overstock has a Technology Day where employees present on technology and simply have fun. This year I presented on RxJava with a colleague of mine. We had a lot of fun preparing for the presentation while learning as well. We also won a small reward for being a being a popular presentation :-)

The following are the slides that we are sharing from our presentation. I would recommend you check the RxAir Demo by my colleague, it's pretty sweet.


Enjoy!

Friday, May 8, 2015

RxNetty and Jersey 2.0

Introduction


I have been interested in reactive programming and micro-services of recent and a few things have caught my attention:
  • Netty - An asynchronous event-driven framework for high performance protocol servers and clients 
  • Netflix RxNetty - Reactive Extension Adapter for Netty
  • Netflix Karyon - A blueprint for a cloud ready application
Netty became my primary interest from a performance perspective and some reading demonstrated that Netflix had done some benchmarks between RxNetty and Tomcat and found results in favor of RxNetty. Take a look at the following links -

So I wanted to see how I could use RxNetty for HTTP. One problem that surfaced is when using RxNetty and HTTP, one is limited to a basic  HTTP framework, i.e, something not as rich as Jersey JAX-RS. The same led me to Karyon which has 'blocking' Jersey support. However, Karyon uses Jersey 1.X and the Karyon team did not look like they would be making a move to Jersey 2.X anytime soon . A post from Netflix on the same https://github.com/Netflix/karyon/issues/56

Searching to see if Jersey 2.X had support for Netty, I found the following two items -
The above project is a Netty-Jersey bridge but does not use RxNetty. So I set out to make my own RxNetty Jersey Container to see how it would work.

RxNettyHttpContainer


The following code demonstrates a RxNettyHttpContainer. It has been stripped of some methods. The container has been modeled after GrizzlyHttpContainer and code from Karyon.

public class RxNettyHttpContainer implements Container, RequestHandler<ByteBuf, ByteBuf> {

  private volatile ApplicationHandler appHandler;

  private final Type RequestTYPE = (new TypeLiteral<Ref<HttpServerRequest<ByteBuf>>>() {
  }).getType();
  private final Type ResponseTYPE = (new TypeLiteral<Ref<HttpServerResponse<ByteBuf>>>() {
  }).getType();
  
  private volatile ContainerLifecycleListener containerListener;
  
  /**
   * Referencing factory for RxNetty HttpServerRequest.
   */
  private static class RxNettyRequestReferencingFactory extends ReferencingFactory<HttpServerRequest<ByteBuf>> {
  }

  /**
   * Referencing factory for RxNetty HttpServerResponse.
   */
  private static class RxNettyResponseReferencingFactory extends ReferencingFactory<HttpServerResponse<ByteBuf>> {
  }
  
  /**
   * An internal binder to enable RxNetty HTTP container specific types injection.
   */
  static class RxNettyBinder extends AbstractBinder {

    @Override
    protected void configure() {
       // Bind RxNetty HttpServerRequest and HttpServerResponse so that they are available for injection...
    }
  }
  
  public RxNettyHttpContainer(Application application) {
    this.appHandler = new ApplicationHandler(application , new RxNettyBinder());
    this.containerListener = ConfigHelper.getContainerLifecycleListener(appHandler);
  }
  ...

  @Override
  public void reload(ResourceConfig configuration) {
    containerListener.onShutdown(this);
    appHandler = new ApplicationHandler(configuration, new RxNettyBinder());
    containerListener = ConfigHelper.getContainerLifecycleListener(appHandler);
    containerListener.onReload(this);
    containerListener.onStartup(this);
  }

  // This is the meat of the code where a request is handled
  public Observable<Void> handle(HttpServerRequest<ByteBuf> request,
    HttpServerResponse<ByteBuf> response) {
    InputStream requestData = new HttpContentInputStream(response.getAllocator(),
        request.getContent());
    URI baseUri = toUri("/");
    URI requestUri = toUri(request.getUri());

    ContainerRequest containerRequest = new ContainerRequest(baseUri, requestUri, request
        .getHttpMethod().name(), getSecurityContext(requestUri, request),
        new MapPropertiesDelegate());

    containerRequest.setEntityStream(requestData);

    for (String headerName : request.getHeaders().names()) {
      containerRequest.headers(headerName, request.getHeaders().getAll(headerName));
    }

    containerRequest.setWriter(new RxNettyContainerResponseWriter(response));

    containerRequest.setRequestScopedInitializer(new RequestScopedInitializer() {

        @Override
        public void initialize(final ServiceLocator locator) {
            locator.<Ref<HttpServerRequest<ByteBuf>>>getService(RequestTYPE).set(request);
            locator.<Ref<HttpServerResponse<ByteBuf>>>getService(ResponseTYPE).set(response);
        }
    });

   return Observable.<Void> create(subscriber -> {
      try {
        appHandler.handle(containerRequest);
        subscriber.onCompleted();
      }
      finally {
        IOUtils.closeQuietly(requestData);
      }
    }).doOnTerminate(() -> response.close(true)).subscribeOn(Schedulers.io());
  }

  private SecurityContext getSecurityContext(URI requestUri, HttpServerRequest<ByteBuf> request) {
     // Not yet handling security
  }

  // A custom ContainerResponseWriter to write to Netty response
  public static class RxNettyContainerResponseWriter implements ContainerResponseWriter {
    private final HttpServerResponse<ByteBuf> serverResponse;

    private final ByteBuf contentBuffer;

    public RxNettyContainerResponseWriter(HttpServerResponse<ByteBuf> serverResponse) {
      this.serverResponse = serverResponse;
      this.contentBuffer = serverResponse.getChannel().alloc().buffer();
    }

    @Override
    public void commit() {
      if (!serverResponse.isCloseIssued()) {
        serverResponse.writeAndFlush(contentBuffer);        
      }     
    }
    ..

    @Override
    public void failure(Throwable throwable) {
      LOGGER.error("Failure Servicing Request", throwable);
      if (!serverResponse.isCloseIssued()) {
        serverResponse.setStatus(HttpResponseStatus.INTERNAL_SERVER_ERROR);
        serverResponse.writeString("Request Failed...");     
      }
    }

    @Override
    public OutputStream writeResponseStatusAndHeaders(long contentLength,
      ContainerResponse containerResponse) throws ContainerException {
      
      serverResponse.setStatus(HttpResponseStatus.valueOf(containerResponse.getStatus()));
      HttpResponseHeaders responseHeaders = serverResponse.getHeaders();
      
      if (!containerResponse.getHeaders().containsKey(HttpHeaders.Names.CONTENT_LENGTH)) {
        responseHeaders.setHeader(HttpHeaders.Names.CONTENT_LENGTH, contentLength);
      }
      
      for (Map.Entry<String, List<String>> header : containerResponse.getStringHeaders().entrySet()) {
        responseHeaders.setHeader(header.getKey(), header.getValue());
      }
      
      return new ByteBufOutputStream(contentBuffer);
    }
  }
}
As mentioned, the above is  heavily influenced by the code from Karyon's Jersey 1.X JerseyBasedRouter and NettyToJerseyBridge. It has a few additional enhancements in being able to inject RxNetty's HttpServerRequest and HttpServerResponse into a Jersey Resource. One thing to note is that I could not find a way to inject the parameterized equivalents of HttpServerRequest and HttpServerResponse into a Jersey Resource and unless greatly mistaken it is limitation of HK2 that prevents the same. An example of the injected resource looks like:
public class SomeResource {
  private final HttpServerRequest<ByteBuf> request;

  private final HttpServerResponse<ByteBuf> response;

  @SuppressWarnings("unchecked")
  @Inject
  public SomeResource(@SuppressWarnings("rawtypes") HttpServerRequest request,
      @SuppressWarnings("rawtypes") HttpServerResponse response) {
    this.request = request;
    this.response = response;
  }
  ....
}  


Using the Container


In line with other containers provided by Jersey, there is a factory class and methods to start the RxNetty Jersey 2.X container.
public class RxNettyHttpServerFactory {

  private static final int DEFAULT_HTTP_PORT = 80;

  public static HttpServer<<ByteBuf, ByteBuf> createHttpServer(Application application) {
    return createHttpServer(DEFAULT_HTTP_PORT, application, false);
  }

  public static HttpServer<ByteBuf, ByteBuf> createHttpServer(Application application, boolean start) {
    return createHttpServer(DEFAULT_HTTP_PORT, application, start);
  }

  public static HttpServer<ByteBuf, ByteBuf> createHttpServer(int port, Application application) {
    return createHttpServer(port, application, false);
  }

  public static HttpServer<ByteBuf, ByteBuf> createHttpServer(int port, Application application,
    boolean start) {
    HttpServer<ByteBuf, ByteBuf> httpServer = RxNetty.createHttpServer(port,
      new RxNettyHttpContainer(application));
    if (start) {
      httpServer.start();
    }
    return httpServer;
  }
}
As an example:
public class Main {
 public static void main(String args[]) {
   ResourceConfig resourceConfig = ResourceConfig.forApplication(new NotesApplication());
   RxNettyHttpServerFactory.createHttpServer(9090, resourceConfig, true);
   System.in.read(); // Wait for user input to shutdown
 }
}
With the above, you have a RxNetty based Jersey 2.0 Container that you can have fun with. Wait! What about an example?

Example Using RxNetty Jersey 2.0 Container


As always, I lean back to my Notes example using Jersey 2.0 where a Note can be CRUDded! In addition, my example would not be complete without a touch of Spring in it, so we use Spring for DI. One of the sadness on the Jersey Spring integration from the artifact jersey-spring3 is that one needs to have javax.servlet as compile time dependency even if one does not use it. The Notes client is developed using RxJersey and the example has Spring for injection. and can be downloaded from HERE or accessed at github at https://github.com/sanjayvacharya/sleeplessinslc/tree/master/rxnetty-jersey2

Performance of the RxNetty Jersey 2 Container


A simple JMeter Load Tests exists in the Notes integration test project that tests the performance of the above RxNetty Container and Grizzly.
Now as a disclaimer, I am not a proclaimed JMeter expert so I could have got some stuff wrong. That said the following are some of the results contrasting the performance of the Grizzly container with the RxNetty Jersey 2 Container.

User Threads - 50 - Loop Count 25



User Threads - 500 - Loop Count 10


The two tests runs are to create notes and to obtain back notes. It appears that from both the tests, the throughput and responsiveness of the RxNettyContainer on the Notes Service is far better than the Grizzly equivalent. Note that GC metrics have not been looked into. There is one gotcha that I have not mentioned to the above. When a large payload is requested, the RxNetty Jersey 2.0 container chokes. As it is based of the Karyon Jersey 1.X implementation, I need to check and see if Karyon Jersey has the same problem. I believe it has to do with buffer copy between the HttpServerResponse and the content buffer (ByteBuf). To witness the problem in action, add the get all notes call to the JMeter script. There are two launchers for the Grizzly and RxNetty Containers respectively. Both of these need to be started before the JMeter test is launched.

Conclusion


RxNetty looks pretty sweet at least from my simple tests. Using it as a Micro-container looks interesting with Jersey 2. The container I have created does not support @Aysnc on a Jersey Resource, similar to the jdk-http container and the Karyon jersey bridge, and to be honest, I am not even sure it makes sense to do so.

The RxNetty Jersey 2.X container shown above -
  • Is only a proof of concept. It does not have the necessary unit-tests and or validation. Something that could be easily done though.
  • Utilizes a class from Karyon - HttpContentInputStream so has a dependency on a Karyon jar
  • Karyon Jersey 1.X could easily be migrated to Jersey 2.X based of the above code it appears
  • If you wanted to use Jersey 2 with Netty, I would look into the Netty Jersey 2.X Container I mentioned earlier.
If anyone has thoughts around the container, load test, or example, ping me.

Friday, April 17, 2015

Reactive Programming with Jersey and RxJava

Introduction


I have the reactive programming fever right now and am having fun using RxJava (Reactive Extensions for Java).  When I found out about reactive support for Jersey, I figured a BLOG might be warranted.

Problem Domain

It's kinda nice to have problem domain. In our case company Acme had a monolithic retail web app application that sold products. There were many modules composing the monolith such as CheckOut, Account details, etc all contributing to a single deployable. One module among them was Product which had had core product information, pricing information and inventory that were obtained from a database.



The solution worked really well at first. However, over time, the company felt the following pain points -
  • One Deploy thus One Rollback - All or nothing
  • Long build time and testing
  • Information such as price, reviews etc were needed elsewhere as there was some logic associated with these thus having it centralized for consumption would be nice.
  • Coordination nightmare among teams working on different areas to make sure that all came together.
The Architects  decided to extract the concept of Product into separate services.  Over time, the company created separate web services for the base product, inventory, reviews and price. These were aligned with the different teams working on the respective areas and allowed for each area to evolve and scale independently.



All the web service calls were written using JAX-RS/Jersey. The company had totally adopted the micro services bandwagon.  A product would be created for the Web Site by calling the different services and aggregating the result as shown below  -

The following is a sample code that uses just one jersey client that demonstrates the above :
   @GET
   public void getProduct(@PathParam("id") Long productId,
    @Suspended final AsyncResponse response) {
    try {
      // Get the Product
      BaseProduct baseProduct = client.target(serviceUrls.getBaseProductUrl()).path("/products")
          .path(String.valueOf(productId)).request().get(BaseProduct.class);

      // Get reviews of the Product
      Reviews reviews = client.target(serviceUrls.getReviewsUrl()).path("/productReviews")
          .path(String.valueOf(productId)).request().get(Reviews.class);

      // Create a Result Product object - Base Product object does not have price and inventory
      Product resultProduct = resultProductFrom(baseProduct, reviews);

      // Obtain the Price for Result Product
      resultProduct.setPrice(client.target(serviceUrls.getPriceUrl()).path("/productPrice")
          .path(productId.toString()).request().get(ProductPrice.class).getPrice());

      // Get Price and Inventory of Product Options and set the same
      for (Long optionId : optionId(resultProduct.getOptions())) {        
        Double price = client.target(serviceUrls.getPriceUrl()).path("/productPrice")
            .path(optionId.toString()).request().get(ProductPrice.class).getPrice();

        ProductInventory inventory = client.target(serviceUrls.getInventoryUrl())
            .path("/productInventory").path(optionId.toString()).request()
            .get(ProductInventory.class);

        ProductOption option = resultProduct.getOption(optionId);

        option.setInventory(inventory.getCount());
        option.setPrice(price);  
      }

      response.resume(resultProduct);
    }
    catch (Exception e) {
      response.resume(e);
    }
  }

ACME soon found that calling these services had a performance degradation compared to when they were a monolithic product service that obtained all product information using a database operation.The performance degradation was primarily attributed to the serial nature in which the requests were being invoked and results subsequently composed not counting the fact that a database join among disparate tables was far more performant. As an immediate fix, the direction from the Architects was to ensure these services were called in parallel when appropriate -
  • Call the Product Service to obtain core product data serially - You need base product information and its options
  • Call the Review Service asynchronously
  • Call the Pricing Service asynchronously to obtain the price of the Product and Options
  • Call the Inventory Service asynchronously to obtain the inventory of the different Product Options
The above worked and things were more performant, however, the code looked a mess due to the different CountDownLatches, composition logic and Futures they had in play. The Architects met again, hours were spent locked in a room while pizza's where delivered under their door until they heard of Reactive Programming in general and Reactive Jersey with RxJava in particular! Bling, Bling, bulbs coming to light illuminating their halo's and away they go reacting to their new idea. They found that Reactive Jersey promotes a clean API to handle parallel execution and composition while not having to worry about Countdown latches and the likes. The resulting code developed looked like -
  
 @GET
  public void observableProduct(@PathParam("id") final Long productId,
    @Suspended final AsyncResponse response) {

    // An Observable of a Result Product from 
    Observable<Product> product = Observable.zip(baseProduct(productId), reviews(productId),
      new Func2<BaseProduct, Reviews, Product>() {

        @Override
        public Product call(BaseProduct product, Reviews review) {
          return resultProductFrom(product, review);
        }
      });

    // All Product
    Observable<Long> productIds = productAndOptionIds(product);
    
    // Observable of Options only
    Observable<Long> optionIds = productIds.filter(new Func1<Long, Boolean>() {

      @Override
      public Boolean call(Long prodId) {
        return !prodId.equals(productId);
      }

    });
    
    // Set Inventory Data
    product
        .zipWith(inventories(productId, optionIds).toList(), new Func2<Product, List<ProductInventory>, Product>() {

          @Override
          public Product call(Product resultProduct, List<ProductInventory> productInventories) {
            for (ProductInventory inventory : productInventories) {
              if (!inventory.getProductId().equals(resultProduct.getProductId())) {
                resultProduct.getOption(inventory.getProductId())
                    .setInventory(inventory.getCount());
              }
            }
            return resultProduct;
          }
        })
        // Set Price Data
        .zipWith(prices(productIds).toList(), new Func2<Product, List<ProductPrice>, Product>() {

          @Override
          public Product call(Product resultProduct, List<ProductPrice> prices) {
            for (ProductPrice price : prices) {
              if (price.getProductId().equals(resultProduct.getProductId())) {
                resultProduct.setPrice(price.getPrice());
              }
              else {
                resultProduct.getOption(price.getProductId()).setPrice(price.getPrice());
              }
            }
            return resultProduct;
          }
        }).observeOn(Schedulers.io()).subscribe(new Action1<Product>() {

          @Override
          public void call(Product productToSet) {
            response.resume(productToSet);
          }

        }, new Action1<Throwable>() {

          @Override
          public void call(Throwable t1) {
            response.resume(t1);
          }
        });
  }
  
  /**
   * @return an Observable of the BaseProduct
   */
  private Observable<BaseProduct> baseProduct(Long productId) {
    return RxObservable
    .from(
      client.target(serviceUrls.getBaseProductUrl()).path("/products").path(String.valueOf(productId)))
    .request().rx().get(BaseProduct.class);
  }
  
  /**
   * @return An Observable of the Reviews
   */
  private Observable<Reviews> reviews(Long productId) {
    return RxObservable
        .from(client.target(serviceUrls.getReviewsUrl()).path("/productReviews").path(String.valueOf(productId)))
        .request().rx().get(Reviews.class);
  }
  
  /**
   * @return An Observable having Product and Option Ids
   */
  private Observable<Long> productAndOptionIds(Observable<Product> product) {
    return product.flatMap(new Func1<Product, Observable<Long>>() {

      @Override
      public Observable<Long> call(Product resultProduct) {
        return Observable.from(Iterables.concat(
          Lists.<Long> newArrayList(resultProduct.getProductId()),
          Iterables.transform(resultProduct.getOptions(), new Function<ProductOption, Long>() {

            @Override
            public Long apply(ProductOption option) {
              return option.getProductId();
            }
          })));
      }
    });
  }
  
  /**
   * Inventories returns back inventories of the Primary product and options. 
   * However, for the primary product, no web service call is invoked as inventory of the main product is the sum of 
   * inventories of all options. However, a dummy ProductInventory is created to maintain order during final concatenation.
   *
   * @param productId Id of the Product
   * @param optionIds Observable of OptionIds
   * @return An Observable of Product Inventory
   */
  private Observable<ProductInventory> inventories(Long productId, Observable<Long> optionIds) {
    return Observable.just(new ProductInventory(productId, 0))
        .concatWith(optionIds.flatMap(new Func1<Long, Observable<ProductInventory>>() {

      @Override
      public Observable<ProductInventory> call(Long optionId) {
        return RxObservable
            .from(client.target(serviceUrls.getInventoryUrl()).path("/productInventory").path("/{productId}"))
            .resolveTemplate("productId", optionId).request().rx().get(ProductInventory.class);
      }
    }));
  }
  
  /**
   * @return An Observable of ProductPrice for product Ids
   */
  private Observable<ProductPrice> prices(Observable<Long> productIds) {
    return productIds
        .flatMap(new Func1<Long, Observable<ProductPrice>>() {

          @Override
          public Observable<ProductPrice> call(Long productId) {
            return RxObservable
                .from(client.target(serviceUrls.getPriceUrl()).path("/productPrice").path("/{productId}"))
                .resolveTemplate("productId", productId.toString()).request().rx()
                .get(ProductPrice.class);
          }
        });
  }

Phew! A lot of code for something simple huh? The above code is demonstrated using jdk 7. With jdk 8 Lambda's, it should be far more succinct. However, I will admit that there is more code but not code that is not clear (hopefully). The important thing is that we are not dealing with the 'callback hell' associated with Java Futures or Invocation callbacks. That said, the performance difference between the serial execution and the Reactive Jersey version appears to strongly favor the Reactive Jersey version by a significant magnitude. By taking the serial call and turning in into parallel execution, I am certain you will achieve closer numbers to the Reactive Jersey version but at the cost of having to maintain latches etc.

Running the Example


An example application can be obtained from https://github.com/sanjayvacharya/sleeplessinslc/tree/master/reactive-jersey. The example does not have the Acme web site but has a Product-Gateway that serves as an orchestration layer. There are two resources in the Product-Gateway, one that returns the product using serial orchestration and a second that uses Reactive Jersey clients.  An Integration Test project exists where a client is used to invoke both the Serial and Reactive resources and logs an average response time.  Checkout the project, and simply execute a 'mvn install' at the root level of the project.

Summary

  • Reactive programming appears more verbose. Possible that I have not optimized it well enough, tips would be appreciated. jdk-8 Lamda's would regardless reduce the footprint.
  • Requires a mind shift regarding how you write code.
  • Aysnc could be introduced as required using Observable.subscribeOn() and Observable.observeOn() features as an when required. Read more about it on a Stack Overflow post.
  • It's a style of programming not a particular technology. RxJava seems to be emerging as the library of choice for Java.
  • Question whether something benefits from being Reactive before introducing Reactive code.
  • Jersey Reactive is a Glassfish implementation and not part of a JSR. So....
  • Monlith to Microservices might only be trading one problem for another so observe caution and be judicious in your selection of what you choose to make into a microservice
  • Batch request that fetches multiple items is not necessarily faster than multiple requests for a each item.
Finally, if there is a way to make my Reactive jersey code less verbose, let me know...all I ask is don't say collapse the services back to the monolith :-)

Monday, September 29, 2014

Spring @Value and javax.inject.Provider for Changeable Property Injection

Introduction


With Spring and other DI containers one works with 'properties' that are used to configure values. Some values might needs to be changed dynamically at runtime while others might be statically configured. A changeable property lends itself to a javax.inject.Provider style where the property is obtained as required and used. Sure that one could inject a java.util.Properties class and use it to obtain the corresponding property. It just seems more in line with The Law of Demeter to inject the actual property or the next closest thing, i.e., a dedicated Provider of the property.

Spring has the support for injecting a property via @Value annotation as :
public class Foo {
 @Value("${fooVal}") 
 private Integer integer;
 ...
}
or Constructor Injection
public class Foo {

 private final Integer integer;
 
 public Foo(@Value("${fooVal}") Integer integer) {
   this.integer = integer;
 }
 ...
}
or Setter Injection
public class Foo {

 private Integer integer;
 
 @Inject
 public setInteger(@Value("${fooVal}") Integer integer) {
   this.integer = integer;
 }
 ...
}

Or if using JavaConfig:
@Configuration
public class FooConfig {
  @Bean
  public Foo foo(@Value("${fooVal}") Integer integer) {
    return new Foo(integer);
  }
}

If fooVal never changes during the execution of the program, great, you inject it and then its set. What if it were to change, due to it being a configurable value? What would be nice to do is the following:
public class Foo {
  private final Provider<Integer> integerProvider;
  
  public Foo(Provider<Integer> integerProvider) {
    this.integerProvider = integerProvider;
  }
  
  public void someOperation() {
    Integer integer = integerProvider.get(); // Current value of fooVal
    ..
  }
}

@Configuration
public class FooConfig {
  @Bean
  public Foo foo(@Value("${fooVal}") Provider<Integer> integerProvider) {
    return new Foo(integer);
  }
}
Looks pretty reasonable, however, when you attempt the same with Spring Java Config, you end up with:
org.springframework.beans.TypeMismatchException: Failed to convert value of type 'java.lang.String' to required type 'java.lang.Integer'; nested exception is java.lang.IllegalArgumentException: MethodParameter argument must have its nestingLevel set to 1
 at org.springframework.beans.TypeConverterSupport.doConvert(TypeConverterSupport.java:77)
 at org.springframework.beans.TypeConverterSupport.convertIfNecessary(TypeConverterSupport.java:47)
 at org.springframework.beans.factory.support.DefaultListableBeanFactory.doResolveDependency(DefaultListableBeanFactory.java:875)

This should just work. Simplest way I could get it to work is to fool spring that the nesting level of the Provider<Integer&gt; is actual one level lesser. The way I did it was to create my own org.spring.framework.beans.TypeConverter that delegated to the default TypeConverter used by Spring, i.e., the SimpleTypeConverter:
public class CustomTypeConverter implements TypeConverter {
  private final SimpleTypeConverter simpleTypeConverter;

  public CustomTypeConverter() {
    simpleTypeConverter = new SimpleTypeConverter(); // This is the default used by Spring
  }

  public <T> T convertIfNecessary(Object newValue, Class<T> requiredType,
    MethodParameter methodParam) throws IllegalArgumentException {
    Type type = methodParam.getGenericParameterType();

    MethodParameter parameterTarget = null;

    if (type instanceof ParameterizedType) {
      ParameterizedType paramType = ParameterizedType.class.cast(type);
      Type rawType = paramType.getRawType();

      if (rawType.equals(Provider.class)
          && methodParam.hasParameterAnnotation(Value.class)) { 
        // If the Raw type is javax.inject.Provider, reduce the nesting level
        parameterTarget = new MethodParameter(methodParam); 
       // Send a new Method Parameter down stream, don't want to fiddle with original
        parameterTarget.decreaseNestingLevel();
      }
    }

    return simpleTypeConverter.convertIfNecessary(newValue, requiredType, parameterTarget);
  }
  ...// Delegate other methods to simpleTypeConverter
}
With the above you could do the following:
public class SpringTest {

  @Test
  public void test() {
    AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext();
    context.getBeanFactory().setTypeConverter(new CustomTypeConverter());
    context.register(SimpleConfig.class);
    context.refresh();
    
    context.getBean(SimpleBean.class).print();
  }
  
  public static class SimpleConfig {
    
    @Bean(name = "props") // Change this to a something that detects file system changes and pulls it in
    public PropertyPlaceholderConfigurer propertyPlaceHolderConfigurer() {
      PropertyPlaceholderConfigurer configurer = new PropertyPlaceholderConfigurer();
      Resource location = new ClassPathResource("appProperties.properties");
      configurer.setLocation(location);
      return configurer;
    }
    
    @Bean
    public SimpleBean simpleBean(@Value("${fooVal}") Provider<Integer> propValue,
      @Value("${barVal}") Provider<Boolean> booleanVal) {
      return new SimpleBean(propValue, booleanVal);
    }
  }
  
  public static class SimpleBean {
    private final Provider<Integer> propVal;
    private final Provider<Boolean> booleanVal;
    
    public SimpleBean(Provider<Integer>, Provider<Boolean> booleanVal) {
      this.propVal = propValue;
      this.booleanVal = booleanVal;
    }
    
    public void print() {
      System.out.println(propVal.get() + "," + booleanVal.get());
    }
  }
}
Now with the above if your properties changes, as you are injecting a javax.inject.Provider to the SimpleBean, at runtime, it will obtain the current value to use.

All this is great, I understand that I am hacking Spring to get what I want, is this the best way to handle something like dynamically changeable properties? Thoughts and suggestion welcomed.

Update :
I filed an enhancement request with Spring maintainers and they were fast to respond with this actually being a bug and are back porting fixes to previous versions as well as ensuring current and future versions have this feature available. https://jira.spring.io/browse/SPR-12297 

Sunday, September 14, 2014

Dynamic Service Discovery with Apache Curator

Service Discovery is a core part of most Service Oriented Architectures in particular and Distributed Network Component Architectures in general.

Traditional usages to find the location of a service have been to use Local file system, Shared File System, Database or some 3rd party Service Registry that statically define service locations. In many cases, one knows exactly the number of services that will be used and their deployment locations thus leading to a statically defined registry as an ideal candidate for discovery. In larger scale systems statically defined locations can become harder to manage as -
  • Number of services in the ecosystem grow (micro-services)
  • Number of deployment 'environments' grow - Test Environment 1, Test Enviroment 2...etc
  • Elasticity scalability becomes desirable - More important with Cloud based environments
  • Multiple services versions need to be supported simultaneously (A/B test)
Enter the concept of Dynamic Service Registration and Discovery. Services on becoming available register themselves as a member ready to participate with some 'registry'. Service Consumers are notified of the availability of the new service and avail its usage. If the service terminates for whatever reason, the Service Consumer is notified of the same and can stop availing its usage. This is not a novel concept and has likely been addressed in different ways.

Any Dynamic Service Registration System presents the following challenges -
  • Availability - The Service Registry can become the SPOF (Single Point Of Failure) for an entire ecosystem
  • Reliability - Keeping the registry and thus the service consumers in sync with which service instance is currently available. A service might go down and if the registry and service consumer are not notified immediately, service calls might fail.
  • Generality - Registry that supports multiple operating systems and technologies.
Again, the above is solvable by different ways. The benefit we have is that there are a number of tools that intrinsically support the requirements of a dynamic registry such as ZooKeeper, Netflix Eureka, Linked In's Dynamic Discovery, Doozer etc.  Jason Wilder has an excellent BLOG on Open Source Service Discovery and a comparison of popular open source service registries.

I have been looking into ZooKeeper and thus will share a few of my findings on using it but before I do the same, I would like to take a moment to discuss Load Balancing.

In standard H/A architectures a Load Balancer serves to balance traffic across the different service instances as shown below -




The same works well but poses the following challenges -
  • A new Load Balancer for every service and every environment
  • An additional hop between a Service Consumer and Service
  • Adding/Removing services per need and time to do the same
  • Intelligent Routing and Supporting Service Degradation
A Dynamic Service Registry could help with the mentioned concerns  where different instances of Service C register themselves with ZooKeeper and Service A and Service B on obtaining locations of Service C execute calls directly to them  -




ZooKeeper's Ephermal nodes serve well to maintain registered service instances as an Ephermal node only exists as long the client is alive, so if a client dies, the Ephermal node is deleted.  It is possible to use ZooKeeper directly and develop code that will register services, discover registered service etc. However, there is a library called Apache Curator that was initially created by Netflix to make writing ZooKeeper based application easier and more reliable. It was then incorporated as an Apache Project. Curator provides 'recipes' to common use cases one would use ZooKeeper for, i.e., Leader Election, Queue, Semaphore, Distributed Locks etc. In addition to these 'recipes', it also provides a mechanism for Service Registration and Discovery.

The remainder of this BLOG will look at providing a simple example that demonstrates the use of Apache Curator in the context of a simple JAX-RS web service that gets registered and is subsequently dynamically discovered by clients. For the sake of the example, I will continue to use the Notes JAX-RS Web Service that I have used on previous posts.

Service Registration -

In the example domain, all services will be registered under the ZooKeeper path "/services".  When the Notes Service starts, it registers itself with ZooKeeper as an ephermal node in the path "/services/notes/XXXXXXXX", where XXX indicates the ephermal instance.

A simple class called ServiceRegistrar is responsible for registering the Notes Service:
public class ServiceRegistrarImpl implements ServiceRegistrar {
   ...
  private static final String BASE_PATH = "/services";
  private static final String SERVICE_NAME = "notes";  

  private final CuratorFramework client; // A Curator Client
  
  private ServiceDiscovery<InstanceDetails> serviceDiscovery;
  
  // This instance of the service
  private ServiceInstance<InstanceDetails> thisInstance;
  
  private final JsonInstanceSerializer<InstanceDetails> serializer; 
  
  public ServiceRegistrarImpl(CuratorFramework client, int servicePort) throws UnknownHostException, Exception {
    this.client = client;
    serializer = new JsonInstanceSerializer<>(InstanceDetails.class); // Payload Serializer
    
    UriSpec uriSpec = new UriSpec("{scheme}://{address}:{port}"); // Scheme, address and port
      
    thisInstance = ServiceInstance.<InstanceDetails>builder().name(SERVICE_NAME)
      .uriSpec(uriSpec)
      .address(InetAddress.getLocalHost().getHostAddress()) // Service information 
      .payload(new InstanceDetails()).port(servicePort) // Port and payload
      .build(); // this instance definition
  }

  @Override 
  public void registerService() throws Exception {
    serviceDiscovery = ServiceDiscoveryBuilder.builder(InstanceDetails.class)
        .client(client)
        .basePath(BASE_PATH).serializer(serializer).thisInstance(thisInstance)
        .build();
    serviceDiscovery.start(); // Registers this service
  }
  
  @Override 
  public void close() throws IOException {
    try {
      serviceDiscovery.close();
    }
    catch (Exception e) {
      LOG.info("Error Closing Discovery", e);
    }
  }
}
A Servlet Listener is defined whose responsibility is to start the registrar -
public class ServiceRegistrarListener implements ServletContextListener {
  private static final Logger LOG = Logger.getLogger(ServiceRegistrarListener.class);
  
  @Override
  public void contextDestroyed(ServletContextEvent sc) {  
  }

  @Override
  public void contextInitialized(ServletContextEvent sc) {
    try {
      WebApplicationContextUtils.getRequiredWebApplicationContext(sc.getServletContext())
      .getBean(ServiceRegistrar.class).registerService();
    }
    catch (Exception e) {
      LOG.error("Error Registering Service", e);
      throw new RuntimeException("Exception Registering Service", e);
    }   
  }
}

Service Discovery -
A Simple Discoverer class is shown below which uses the Curator library in obtaining instances of registered Notes Services. Curator provides a ServiceDiscovery class that is made aware of a Service Registry. A ServiceProvider is then used to obtain registered service instances for a particular service.
public class ServiceDiscoverer {
  private static final Logger LOG = Logger.getLogger(ServiceDiscovery.class);
  private static final String BASE_PATH = "/services";

  private final CuratorFramework curatorClient;

  private final ServiceDiscovery<InstanceDetails> serviceDiscovery;

  private final ServiceProvider<InstanceDetails> serviceProvider;

  private final JsonInstanceSerializer<InstanceDetails> serializer;

  private final List<Closeable> closeAbles = Lists.newArrayList();

  public ServiceDiscoverer(String zookeeperAddress, String serviceName) throws Exception {
    curatorClient = CuratorFrameworkFactory.newClient(zookeeperAddress,
      new ExponentialBackoffRetry(1000, 3)); // Ideally this would be injected

    serializer = new JsonInstanceSerializer<>(InstanceDetails.class); // Payload Serializer

    serviceDiscovery = ServiceDiscoveryBuilder.builder(InstanceDetails.class).client(curatorClient)
        .basePath(BASE_PATH).serializer(serializer).build(); // Service Discovery

    serviceProvider = serviceDiscovery.serviceProviderBuilder().serviceName(serviceName).build(); // Service Provider for a particular service
  }

  public void start() {
    try {
      curatorClient.start();
      closeAbles.add(curatorClient);
      serviceDiscovery.start();
      closeAbles.add(0, serviceDiscovery);
      serviceProvider.start();
      closeAbles.add(0, serviceProvider);
    }
    catch (Exception e) {
      throw new RuntimeException("Error starting Service Discoverer", e);
    }
  }

  public void close() {
    for (Closeable closeable : closeAbles) {
      // Close all
      ...
    }
  }
  
  public ServiceInstance<InstanceDetails> getServiceUrl() {
    try {
      return instance = serviceProvider.getInstance();
    }
    catch (Exception e) {
      throw new RuntimeException("Error obtaining service url", e);
    }
  }
}
The ServiceProvider is used in the Notes Service Client as follows:
public class NotesClientImpl implements NotesClient {
  private static final Logger LOG = Logger.getLogger(NotesClientImpl.class);

  public static final String SERVICE_NAME = "notes";

  private final Client webServiceClient; // JAX-RS Client

  private final ServiceDiscoverer serviceDiscoverer; // Service Disoverer

  /**
   * @param getNotesServerUrl() Server URI
   * @throws Exception
   */
  public NotesClientImpl(String zookeeperAddress) throws Exception {
    serviceDiscoverer = new ServiceDiscoverer(zookeeperAddress, SERVICE_NAME);
    serviceDiscoverer.start();

    ClientConfig config = new ClientConfig();
    webServiceClient = ClientBuilder.newClient(config);
  }
  

  @Override
  public NoteResult create(Note note) {
    ServiceInstance<InstanceDetails> instance = serviceDiscoverer.getServiceInstance();
    
    try {
      return webServiceClient
          .target(UriBuilder.fromUri(instance.buildUriSpec()).path("/notes").build())
          .request(MediaType.APPLICATION_XML)
          .post(Entity.entity(note, MediaType.APPLICATION_XML_TYPE), NoteResult.class);
    }
    catch (ProcessingException e) {
      // If a ProcessingException occurs, the current ServiceInstance is marked as down
      serviceDiscoverer.noteError(instance);
      throw e;
    }
  }
In the above code, a ServiceInstance is obtained from the ServiceDiscoverer. It is important to 'not' locally hold onto the instance but use it during the processing of a call and then let go of it. If a javax.ws.rs.ProcessingException occurs say due to a Service Instance dying abruptly during processing for the request, then, notifying Curator to not use that instance for a certain period of time is the recommended direction. There is a time out delay between when the service instance dies and ZooKeeper times out the ephermal node associated with the service. So unless the instance is marked as being in an error state on the service client, the ServiceDiscoverer would continue to provide the downed service instance for usage.

Integration Test -

A simple Integration Test has been shown where a number of Notes Server instances are started up and they register themselves with a test ZooKeeper instance. The Notes Client is able to round robin to each of those instances. Each of the servers are designed to randomly die after a period of time leading to them being un-registered .When a Notes Server Instance goes down, it will no longer be invoked as part of the test. There is a Notes Server started that runs for a long time and acts as a fall back for demonstration completeness.
public class SimpleIntegrationTest {
  private TestingServer testServer; // ZooKeeper Server

  private NotesServer longRunningNotesServer; // A Notes Server that does not die for the demo
  
  private static final int MAX_SERVERS = 10; // Max Servers to use
 
  @Before
  public void setUp() throws Exception {
    testServer = new TestingServer(9001);   // Start the Curator ZooKeeper Test Server
  }  

  @After
  public void tearDown() throws IOException {...}

  @Test
  public void integration() throws Exception {
    CyclicBarrier barrier = new CyclicBarrier(MAX_SERVERS + 1);
    
    for (Integer port : ports()) { // Start Instances of Notes Server on different ports
      new NotesServer(port, barrier).start(); // Start an Instance of the Notes Server at the provided port
      Thread.sleep(1000);
    }    
    
    longRunningNotesServer = new NotesServer(9210, 500000L, null); // Start a Long Running Notes Server
    longRunningNotesServer.start();
    
    barrier.await(); // Wait for all Servers start
    
    NotesClient notesClient = new NotesClientImpl(testServer.getConnectString());
    List<NoteResult> noteResults = Lists.newArrayList();
    
    int i = 0;
    int maxNotes = 10000;
    
    while (i < maxNotes) {
      try {
        Note note = new Note("" + i, "" + i);
        NoteResult result = notesClient.create(note);
        noteResults.add(result);      
        i++;
      }
      catch (Exception e) {
        e.printStackTrace(); // If an error occurs on one instance, try again
      }
    }   

    // Ensure that all Notes Servers have been utilized
    assertEquals(MAX_SERVERS +1, notesClient.getServiceInstancesUsed().size());
  }
      
  private static Set<Integer> ports() {
    // Provide a unique set of ports for running multiple Notes Server
  }
  ....
}

Thoughts -
  • ZooKeeper is CP so in the event of Partition, writes can suffer.
  • ZooKeeper is SPOF of the ecosystem. Should have a contingency strategy.
    • Linked In's Dynamic Discovery Architecture addresses something like this but has a lot of moving parts
    • Simplicity of a Load Balancer makes one think YAGNI
      • Tools like Puppet Labs might make a lesser case for burden of standing up environments as well
  • Service Discovery allow clients to be 'smart'. Support for Service Degradation, alerting, and monitoring kind of go hand in hand.
  • Service Registration and De-Registration can be smartly built as well. For example, a Service could itself determine it's not healthy for whatever reason and de-registerer itself from the registry.
  • Supporting multiple environments where an ecosystem is self discoverable is simpler than having to provide overrides across different environments.
  • Managing  downed nodes with Curator needs to be handled carefully. Idempotence is your friend here.
  • Curator can definitely help with intelligent load balancing and different Load Balancing Strategies
    • Round Robin
    • Random
    • Custom one that involves Service Degradation
  • Curator provides a REST service that helps non-java clients wishing to participate in service registration and discovery.
  • If Netflix, Linked In and Amazon do something like this, it warrants looking into :-)


Friday, August 22, 2014

JVM Native memory leak

The Problem


There are times I learn something new of something that has been known since ages. Today was one of those times. Had a  Java Virtual Machine (JVM) whose heap usage looked great but the JVM process ended up growing native memory and finally died with an exception stating unable to allocate native memory. One such example might look like:
java.lang.OutOfMemoryError at java.util.zip.Inflater.init(Native Method) at java.util.zip.Inflater.(Inflater.java:101) at java.util.zip.ZipFile.getInflater(ZipFile.java:450) 

Or
#
# There is insufficient memory for the Java Runtime Environment to continue.
# Native memory allocation (malloc) failed to allocate 184 bytes for AllocateHeap
# Possible reasons:
#   The system is out of physical RAM or swap space
#   In 32 bit mode, the process size limit was hit
# Possible solutions:
#   Reduce memory load on the system
#   Increase physical memory or swap space
#   Check if swap backing store is full
#   Use 64 bit Java on a 64 bit OS
#   Decrease Java heap size (-Xmx/-Xms)
#   Decrease number of Java threads
#   Decrease Java thread stack sizes (-Xss)
#   Set larger code cache with -XX:ReservedCodeCacheSize=
# This output file may be truncated or incomplete.

The application in question that manifested the above is a simple jersey web service that serves up and consumes JSON using JSON JAXB (Clue...). Initial interrogation yielded the following:
  • Thread usage is normal (-Xss settings has no benefit)
  •  –XXMaxDirectMemorySize - No difference
  • Heap usage is healthy
  • Heap dumps look healthy - no leaks
  • Perm Gen is looking good
  • Native file descriptors are within normal usage limits
  • java.net.Socket creation did not appear to cause issues
  • JDK version latest (1.7.X)
  • Happens on Virtual machines or stand alone boxes
Some Stack Overflow and other assistance:
  • Using the G1 Garbage collector
  • Or Setting Xms and Xmx to smaller values, i.e., lowering heap size
Right, so using G1 Garbage collector worked, or setting max heap to a lower value also worked? WTF!

What happened?


Creation of  a JAXB Context on every request/response. The application in question was not caching JAXB Contexts but was creating them on every request/response. Initial thought is, yeah, that is a performance problem, but why will it count towards native process memory growth? Consider the following benign code:


public class SimpleTest extends Thread { 
  public void run() {
    while (!Thread.currentThread().isInterrupted()) {
      JSONJAXBContext context = new JSONJAXBContext(SomeDTOClass.class); // From jersey-json.jar
    }
  }

  public static void main(String args[]) {
    new SimpleTest().start();
  }
} 

Well, the problem is that JAXB Contexts indirectly involve in the invoking of native memory allocations. How does it do it and what happens underneath? Well, that is an exercise for the reader to figure out and comment on this BLOG with findings ;-).

The root cause can be dialed down to the following issue posted: http://bugs.java.com/view_bug.do?bug_id=4797189

public class Bug {
  public static void main( String args[] ) {
    while ( true ) {
      /* If ANY of these two lines is not commented, the JVM runs out of memory */
      final Deflater deflater = new Deflater( 9, true );
      final Inflater inflater = new Inflater( true );
    }
 }
}

The same code that might work without native OOM:
public class Bug {
  public static void main( String args[] ) {
    int count = 0;
    while ( true ) {
      final Deflater deflater = new Deflater( 9, true );
      final Inflater inflater = new Inflater( true );
      i++;
      if (i % 100 == 0) {
       System.gc(); // Don't, please don't do this!
      }
    }
 }
}

As shown above, you might allocate 'small' objects in the JVM but underneath they might end up allocating native memory for their operations. The intention is noble, that when the objects are finalized, the native operating system memory will be de-allocated as well. However, the freeing of the native memory is contingent on the Garbage collector running and leading to finalization of these 'small' objects. Its a matter of relative pace, the native memory is growing far faster compared to JVM heap and the lack of Garbage collection occurring is what leads to running out of native space. When using the G1 collector or lowering max heap settings, garbage collection occurs more often, thus preventing the native heap from exploding over time.

Peace!