Search This Blog

Friday, May 8, 2015

RxNetty and Jersey 2.0

Introduction


I have been interested in reactive programming and micro-services of recent and a few things have caught my attention:
  • Netty - An asynchronous event-driven framework for high performance protocol servers and clients 
  • Netflix RxNetty - Reactive Extension Adapter for Netty
  • Netflix Karyon - A blueprint for a cloud ready application
Netty became my primary interest from a performance perspective and some reading demonstrated that Netflix had done some benchmarks between RxNetty and Tomcat and found results in favor of RxNetty. Take a look at the following links -

So I wanted to see how I could use RxNetty for HTTP. One problem that surfaced is when using RxNetty and HTTP, one is limited to a basic  HTTP framework, i.e, something not as rich as Jersey JAX-RS. The same led me to Karyon which has 'blocking' Jersey support. However, Karyon uses Jersey 1.X and the Karyon team did not look like they would be making a move to Jersey 2.X anytime soon . A post from Netflix on the same https://github.com/Netflix/karyon/issues/56

Searching to see if Jersey 2.X had support for Netty, I found the following two items -
The above project is a Netty-Jersey bridge but does not use RxNetty. So I set out to make my own RxNetty Jersey Container to see how it would work.

RxNettyHttpContainer


The following code demonstrates a RxNettyHttpContainer. It has been stripped of some methods. The container has been modeled after GrizzlyHttpContainer and code from Karyon.

public class RxNettyHttpContainer implements Container, RequestHandler<ByteBuf, ByteBuf> {

  private volatile ApplicationHandler appHandler;

  private final Type RequestTYPE = (new TypeLiteral<Ref<HttpServerRequest<ByteBuf>>>() {
  }).getType();
  private final Type ResponseTYPE = (new TypeLiteral<Ref<HttpServerResponse<ByteBuf>>>() {
  }).getType();
  
  private volatile ContainerLifecycleListener containerListener;
  
  /**
   * Referencing factory for RxNetty HttpServerRequest.
   */
  private static class RxNettyRequestReferencingFactory extends ReferencingFactory<HttpServerRequest<ByteBuf>> {
  }

  /**
   * Referencing factory for RxNetty HttpServerResponse.
   */
  private static class RxNettyResponseReferencingFactory extends ReferencingFactory<HttpServerResponse<ByteBuf>> {
  }
  
  /**
   * An internal binder to enable RxNetty HTTP container specific types injection.
   */
  static class RxNettyBinder extends AbstractBinder {

    @Override
    protected void configure() {
       // Bind RxNetty HttpServerRequest and HttpServerResponse so that they are available for injection...
    }
  }
  
  public RxNettyHttpContainer(Application application) {
    this.appHandler = new ApplicationHandler(application , new RxNettyBinder());
    this.containerListener = ConfigHelper.getContainerLifecycleListener(appHandler);
  }
  ...

  @Override
  public void reload(ResourceConfig configuration) {
    containerListener.onShutdown(this);
    appHandler = new ApplicationHandler(configuration, new RxNettyBinder());
    containerListener = ConfigHelper.getContainerLifecycleListener(appHandler);
    containerListener.onReload(this);
    containerListener.onStartup(this);
  }

  // This is the meat of the code where a request is handled
  public Observable<Void> handle(HttpServerRequest<ByteBuf> request,
    HttpServerResponse<ByteBuf> response) {
    InputStream requestData = new HttpContentInputStream(response.getAllocator(),
        request.getContent());
    URI baseUri = toUri("/");
    URI requestUri = toUri(request.getUri());

    ContainerRequest containerRequest = new ContainerRequest(baseUri, requestUri, request
        .getHttpMethod().name(), getSecurityContext(requestUri, request),
        new MapPropertiesDelegate());

    containerRequest.setEntityStream(requestData);

    for (String headerName : request.getHeaders().names()) {
      containerRequest.headers(headerName, request.getHeaders().getAll(headerName));
    }

    containerRequest.setWriter(new RxNettyContainerResponseWriter(response));

    containerRequest.setRequestScopedInitializer(new RequestScopedInitializer() {

        @Override
        public void initialize(final ServiceLocator locator) {
            locator.<Ref<HttpServerRequest<ByteBuf>>>getService(RequestTYPE).set(request);
            locator.<Ref<HttpServerResponse<ByteBuf>>>getService(ResponseTYPE).set(response);
        }
    });

   return Observable.<Void> create(subscriber -> {
      try {
        appHandler.handle(containerRequest);
        subscriber.onCompleted();
      }
      finally {
        IOUtils.closeQuietly(requestData);
      }
    }).doOnTerminate(() -> response.close(true)).subscribeOn(Schedulers.io());
  }

  private SecurityContext getSecurityContext(URI requestUri, HttpServerRequest<ByteBuf> request) {
     // Not yet handling security
  }

  // A custom ContainerResponseWriter to write to Netty response
  public static class RxNettyContainerResponseWriter implements ContainerResponseWriter {
    private final HttpServerResponse<ByteBuf> serverResponse;

    private final ByteBuf contentBuffer;

    public RxNettyContainerResponseWriter(HttpServerResponse<ByteBuf> serverResponse) {
      this.serverResponse = serverResponse;
      this.contentBuffer = serverResponse.getChannel().alloc().buffer();
    }

    @Override
    public void commit() {
      if (!serverResponse.isCloseIssued()) {
        serverResponse.writeAndFlush(contentBuffer);        
      }     
    }
    ..

    @Override
    public void failure(Throwable throwable) {
      LOGGER.error("Failure Servicing Request", throwable);
      if (!serverResponse.isCloseIssued()) {
        serverResponse.setStatus(HttpResponseStatus.INTERNAL_SERVER_ERROR);
        serverResponse.writeString("Request Failed...");     
      }
    }

    @Override
    public OutputStream writeResponseStatusAndHeaders(long contentLength,
      ContainerResponse containerResponse) throws ContainerException {
      
      serverResponse.setStatus(HttpResponseStatus.valueOf(containerResponse.getStatus()));
      HttpResponseHeaders responseHeaders = serverResponse.getHeaders();
      
      if (!containerResponse.getHeaders().containsKey(HttpHeaders.Names.CONTENT_LENGTH)) {
        responseHeaders.setHeader(HttpHeaders.Names.CONTENT_LENGTH, contentLength);
      }
      
      for (Map.Entry<String, List<String>> header : containerResponse.getStringHeaders().entrySet()) {
        responseHeaders.setHeader(header.getKey(), header.getValue());
      }
      
      return new ByteBufOutputStream(contentBuffer);
    }
  }
}
As mentioned, the above is  heavily influenced by the code from Karyon's Jersey 1.X JerseyBasedRouter and NettyToJerseyBridge. It has a few additional enhancements in being able to inject RxNetty's HttpServerRequest and HttpServerResponse into a Jersey Resource. One thing to note is that I could not find a way to inject the parameterized equivalents of HttpServerRequest and HttpServerResponse into a Jersey Resource and unless greatly mistaken it is limitation of HK2 that prevents the same. An example of the injected resource looks like:
public class SomeResource {
  private final HttpServerRequest<ByteBuf> request;

  private final HttpServerResponse<ByteBuf> response;

  @SuppressWarnings("unchecked")
  @Inject
  public SomeResource(@SuppressWarnings("rawtypes") HttpServerRequest request,
      @SuppressWarnings("rawtypes") HttpServerResponse response) {
    this.request = request;
    this.response = response;
  }
  ....
}  


Using the Container


In line with other containers provided by Jersey, there is a factory class and methods to start the RxNetty Jersey 2.X container.
public class RxNettyHttpServerFactory {

  private static final int DEFAULT_HTTP_PORT = 80;

  public static HttpServer<<ByteBuf, ByteBuf> createHttpServer(Application application) {
    return createHttpServer(DEFAULT_HTTP_PORT, application, false);
  }

  public static HttpServer<ByteBuf, ByteBuf> createHttpServer(Application application, boolean start) {
    return createHttpServer(DEFAULT_HTTP_PORT, application, start);
  }

  public static HttpServer<ByteBuf, ByteBuf> createHttpServer(int port, Application application) {
    return createHttpServer(port, application, false);
  }

  public static HttpServer<ByteBuf, ByteBuf> createHttpServer(int port, Application application,
    boolean start) {
    HttpServer<ByteBuf, ByteBuf> httpServer = RxNetty.createHttpServer(port,
      new RxNettyHttpContainer(application));
    if (start) {
      httpServer.start();
    }
    return httpServer;
  }
}
As an example:
public class Main {
 public static void main(String args[]) {
   ResourceConfig resourceConfig = ResourceConfig.forApplication(new NotesApplication());
   RxNettyHttpServerFactory.createHttpServer(9090, resourceConfig, true);
   System.in.read(); // Wait for user input to shutdown
 }
}
With the above, you have a RxNetty based Jersey 2.0 Container that you can have fun with. Wait! What about an example?

Example Using RxNetty Jersey 2.0 Container


As always, I lean back to my Notes example using Jersey 2.0 where a Note can be CRUDded! In addition, my example would not be complete without a touch of Spring in it, so we use Spring for DI. One of the sadness on the Jersey Spring integration from the artifact jersey-spring3 is that one needs to have javax.servlet as compile time dependency even if one does not use it. The Notes client is developed using RxJersey and the example has Spring for injection. and can be downloaded from HERE or accessed at github at https://github.com/sanjayvacharya/sleeplessinslc/tree/master/rxnetty-jersey2

Performance of the RxNetty Jersey 2 Container


A simple JMeter Load Tests exists in the Notes integration test project that tests the performance of the above RxNetty Container and Grizzly.
Now as a disclaimer, I am not a proclaimed JMeter expert so I could have got some stuff wrong. That said the following are some of the results contrasting the performance of the Grizzly container with the RxNetty Jersey 2 Container.

User Threads - 50 - Loop Count 25



User Threads - 500 - Loop Count 10


The two tests runs are to create notes and to obtain back notes. It appears that from both the tests, the throughput and responsiveness of the RxNettyContainer on the Notes Service is far better than the Grizzly equivalent. Note that GC metrics have not been looked into. There is one gotcha that I have not mentioned to the above. When a large payload is requested, the RxNetty Jersey 2.0 container chokes. As it is based of the Karyon Jersey 1.X implementation, I need to check and see if Karyon Jersey has the same problem. I believe it has to do with buffer copy between the HttpServerResponse and the content buffer (ByteBuf). To witness the problem in action, add the get all notes call to the JMeter script. There are two launchers for the Grizzly and RxNetty Containers respectively. Both of these need to be started before the JMeter test is launched.

Conclusion


RxNetty looks pretty sweet at least from my simple tests. Using it as a Micro-container looks interesting with Jersey 2. The container I have created does not support @Aysnc on a Jersey Resource, similar to the jdk-http container and the Karyon jersey bridge, and to be honest, I am not even sure it makes sense to do so.

The RxNetty Jersey 2.X container shown above -
  • Is only a proof of concept. It does not have the necessary unit-tests and or validation. Something that could be easily done though.
  • Utilizes a class from Karyon - HttpContentInputStream so has a dependency on a Karyon jar
  • Karyon Jersey 1.X could easily be migrated to Jersey 2.X based of the above code it appears
  • If you wanted to use Jersey 2 with Netty, I would look into the Netty Jersey 2.X Container I mentioned earlier.
If anyone has thoughts around the container, load test, or example, ping me.