Introduction
I have been interested in reactive programming and micro-services of recent and a few things have caught my attention:
Netty became my primary interest from a performance perspective and some reading demonstrated that Netflix had done some benchmarks between RxNetty and Tomcat and found results in favor of RxNetty. Take a look at the following links -
So I wanted to see how I could use RxNetty for HTTP. One problem that surfaced is when using RxNetty and HTTP, one is limited to a basic HTTP framework, i.e, something not as rich as Jersey JAX-RS. The same led me to Karyon which has 'blocking' Jersey support. However, Karyon uses Jersey 1.X and the Karyon team did not look like they would be making a move to Jersey 2.X anytime soon . A post from Netflix on the same https://github.com/Netflix/karyon/issues/56
Searching to see if Jersey 2.X had support for Netty, I found the following two items -
- An email thread indicating Jersey has no intent of building support for Netty as a container, especially RxNetty - https://java.net/projects/jersey/lists/users/archive/2015-01/message/48
- A nice git hub project with a Netty Container for Jersey - https://github.com/Graylog2/jersey-netty. I hope this project makes it to Jersey as a container.
RxNettyHttpContainer
The following code demonstrates a RxNettyHttpContainer. It has been stripped of some methods. The container has been modeled after GrizzlyHttpContainer and code from Karyon.
public class RxNettyHttpContainer implements Container, RequestHandler<ByteBuf, ByteBuf> { private volatile ApplicationHandler appHandler; private final Type RequestTYPE = (new TypeLiteral<Ref<HttpServerRequest<ByteBuf>>>() { }).getType(); private final Type ResponseTYPE = (new TypeLiteral<Ref<HttpServerResponse<ByteBuf>>>() { }).getType(); private volatile ContainerLifecycleListener containerListener; /** * Referencing factory for RxNetty HttpServerRequest. */ private static class RxNettyRequestReferencingFactory extends ReferencingFactory<HttpServerRequest<ByteBuf>> { } /** * Referencing factory for RxNetty HttpServerResponse. */ private static class RxNettyResponseReferencingFactory extends ReferencingFactory<HttpServerResponse<ByteBuf>> { } /** * An internal binder to enable RxNetty HTTP container specific types injection. */ static class RxNettyBinder extends AbstractBinder { @Override protected void configure() { // Bind RxNetty HttpServerRequest and HttpServerResponse so that they are available for injection... } } public RxNettyHttpContainer(Application application) { this.appHandler = new ApplicationHandler(application , new RxNettyBinder()); this.containerListener = ConfigHelper.getContainerLifecycleListener(appHandler); } ... @Override public void reload(ResourceConfig configuration) { containerListener.onShutdown(this); appHandler = new ApplicationHandler(configuration, new RxNettyBinder()); containerListener = ConfigHelper.getContainerLifecycleListener(appHandler); containerListener.onReload(this); containerListener.onStartup(this); } // This is the meat of the code where a request is handled public Observable<Void> handle(HttpServerRequest<ByteBuf> request, HttpServerResponse<ByteBuf> response) { InputStream requestData = new HttpContentInputStream(response.getAllocator(), request.getContent()); URI baseUri = toUri("/"); URI requestUri = toUri(request.getUri()); ContainerRequest containerRequest = new ContainerRequest(baseUri, requestUri, request .getHttpMethod().name(), getSecurityContext(requestUri, request), new MapPropertiesDelegate()); containerRequest.setEntityStream(requestData); for (String headerName : request.getHeaders().names()) { containerRequest.headers(headerName, request.getHeaders().getAll(headerName)); } containerRequest.setWriter(new RxNettyContainerResponseWriter(response)); containerRequest.setRequestScopedInitializer(new RequestScopedInitializer() { @Override public void initialize(final ServiceLocator locator) { locator.<Ref<HttpServerRequest<ByteBuf>>>getService(RequestTYPE).set(request); locator.<Ref<HttpServerResponse<ByteBuf>>>getService(ResponseTYPE).set(response); } }); return Observable.<Void> create(subscriber -> { try { appHandler.handle(containerRequest); subscriber.onCompleted(); } finally { IOUtils.closeQuietly(requestData); } }).doOnTerminate(() -> response.close(true)).subscribeOn(Schedulers.io()); } private SecurityContext getSecurityContext(URI requestUri, HttpServerRequest<ByteBuf> request) { // Not yet handling security } // A custom ContainerResponseWriter to write to Netty response public static class RxNettyContainerResponseWriter implements ContainerResponseWriter { private final HttpServerResponse<ByteBuf> serverResponse; private final ByteBuf contentBuffer; public RxNettyContainerResponseWriter(HttpServerResponse<ByteBuf> serverResponse) { this.serverResponse = serverResponse; this.contentBuffer = serverResponse.getChannel().alloc().buffer(); } @Override public void commit() { if (!serverResponse.isCloseIssued()) { serverResponse.writeAndFlush(contentBuffer); } } .. @Override public void failure(Throwable throwable) { LOGGER.error("Failure Servicing Request", throwable); if (!serverResponse.isCloseIssued()) { serverResponse.setStatus(HttpResponseStatus.INTERNAL_SERVER_ERROR); serverResponse.writeString("Request Failed..."); } } @Override public OutputStream writeResponseStatusAndHeaders(long contentLength, ContainerResponse containerResponse) throws ContainerException { serverResponse.setStatus(HttpResponseStatus.valueOf(containerResponse.getStatus())); HttpResponseHeaders responseHeaders = serverResponse.getHeaders(); if (!containerResponse.getHeaders().containsKey(HttpHeaders.Names.CONTENT_LENGTH)) { responseHeaders.setHeader(HttpHeaders.Names.CONTENT_LENGTH, contentLength); } for (Map.Entry<String, List<String>> header : containerResponse.getStringHeaders().entrySet()) { responseHeaders.setHeader(header.getKey(), header.getValue()); } return new ByteBufOutputStream(contentBuffer); } } }As mentioned, the above is heavily influenced by the code from Karyon's Jersey 1.X JerseyBasedRouter and NettyToJerseyBridge. It has a few additional enhancements in being able to inject RxNetty's HttpServerRequest and HttpServerResponse into a Jersey Resource. One thing to note is that I could not find a way to inject the parameterized equivalents of HttpServerRequest and HttpServerResponse into a Jersey Resource and unless greatly mistaken it is limitation of HK2 that prevents the same. An example of the injected resource looks like:
public class SomeResource { private final HttpServerRequest<ByteBuf> request; private final HttpServerResponse<ByteBuf> response; @SuppressWarnings("unchecked") @Inject public SomeResource(@SuppressWarnings("rawtypes") HttpServerRequest request, @SuppressWarnings("rawtypes") HttpServerResponse response) { this.request = request; this.response = response; } .... }
Using the Container
public class RxNettyHttpServerFactory { private static final int DEFAULT_HTTP_PORT = 80; public static HttpServer<<ByteBuf, ByteBuf> createHttpServer(Application application) { return createHttpServer(DEFAULT_HTTP_PORT, application, false); } public static HttpServer<ByteBuf, ByteBuf> createHttpServer(Application application, boolean start) { return createHttpServer(DEFAULT_HTTP_PORT, application, start); } public static HttpServer<ByteBuf, ByteBuf> createHttpServer(int port, Application application) { return createHttpServer(port, application, false); } public static HttpServer<ByteBuf, ByteBuf> createHttpServer(int port, Application application, boolean start) { HttpServer<ByteBuf, ByteBuf> httpServer = RxNetty.createHttpServer(port, new RxNettyHttpContainer(application)); if (start) { httpServer.start(); } return httpServer; } }As an example:
public class Main { public static void main(String args[]) { ResourceConfig resourceConfig = ResourceConfig.forApplication(new NotesApplication()); RxNettyHttpServerFactory.createHttpServer(9090, resourceConfig, true); System.in.read(); // Wait for user input to shutdown } }With the above, you have a RxNetty based Jersey 2.0 Container that you can have fun with. Wait! What about an example?
Example Using RxNetty Jersey 2.0 Container
Performance of the RxNetty Jersey 2 Container
User Threads - 50 - Loop Count 25
User Threads - 500 - Loop Count 10
The two tests runs are to create notes and to obtain back notes. It appears that from both the tests, the throughput and responsiveness of the RxNettyContainer on the Notes Service is far better than the Grizzly equivalent. Note that GC metrics have not been looked into. There is one gotcha that I have not mentioned to the above. When a large payload is requested, the RxNetty Jersey 2.0 container chokes. As it is based of the Karyon Jersey 1.X implementation, I need to check and see if Karyon Jersey has the same problem. I believe it has to do with buffer copy between the HttpServerResponse and the content buffer (ByteBuf). To witness the problem in action, add the get all notes call to the JMeter script. There are two launchers for the Grizzly and RxNetty Containers respectively. Both of these need to be started before the JMeter test is launched.
Conclusion
RxNetty looks pretty sweet at least from my simple tests. Using it as a Micro-container looks interesting with Jersey 2. The container I have created does not support @Aysnc on a Jersey Resource, similar to the jdk-http container and the Karyon jersey bridge, and to be honest, I am not even sure it makes sense to do so.
The RxNetty Jersey 2.X container shown above -
The RxNetty Jersey 2.X container shown above -
- Is only a proof of concept. It does not have the necessary unit-tests and or validation. Something that could be easily done though.
- Utilizes a class from Karyon - HttpContentInputStream so has a dependency on a Karyon jar
- Karyon Jersey 1.X could easily be migrated to Jersey 2.X based of the above code it appears
- If you wanted to use Jersey 2 with Netty, I would look into the Netty Jersey 2.X Container I mentioned earlier.
1 comment:
This is a great post. I'm looking to do the same kind of thing, but with RestEasy and Guice. If you have any pointers of where I should begin ...... ;)
Post a Comment