Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@
<packaging>jar</packaging>
<name>rest-utils</name>

<properties>
<junit.version>4.11</junit.version>
</properties>

<dependencyManagement>
<dependencies>
<dependency>
Expand Down Expand Up @@ -83,5 +87,12 @@
<artifactId>jackson-annotations</artifactId>
<version>${jackson.version}</version>
</dependency>

<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>${junit.version}</version>
<scope>test</scope>
</dependency>
</dependencies>
</project>
57 changes: 56 additions & 1 deletion core/src/main/java/io/confluent/rest/Application.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import org.glassfish.jersey.server.validation.ValidationFeature;
import org.glassfish.jersey.servlet.ServletContainer;

import java.util.concurrent.CountDownLatch;

import javax.ws.rs.core.Configurable;

import io.confluent.rest.exceptions.ConstraintViolationExceptionMapper;
Expand All @@ -39,6 +41,8 @@
*/
public abstract class Application<T extends RestConfig> {
protected T config;
protected Server server = null;
protected CountDownLatch shutdownLatch = new CountDownLatch(1);

public Application() {}

Expand Down Expand Up @@ -82,11 +86,25 @@ public Server createServer() throws RestConfigException {
// Configure the servlet container
ServletContainer servletContainer = new ServletContainer(resourceConfig);
ServletHolder servletHolder = new ServletHolder(servletContainer);
Server server = new Server(getConfiguration().getInt(RestConfig.PORT_CONFIG));
server = new Server(getConfiguration().getInt(RestConfig.PORT_CONFIG)) {
@Override
protected void doStop() throws Exception {
super.doStop();
Application.this.onShutdown();
Application.this.shutdownLatch.countDown();
}
};
ServletContextHandler context = new ServletContextHandler(ServletContextHandler.SESSIONS);
context.setContextPath("/");
context.addServlet(servletHolder, "/*");
server.setHandler(context);

int gracefulShutdownMs = getConfiguration().getInt(RestConfig.SHUTDOWN_GRACEFUL_MS_CONFIG);
if (gracefulShutdownMs > 0) {
server.setGracefulShutdown(gracefulShutdownMs);
}
server.setStopAtShutdown(true);

return server;
}

Expand All @@ -112,5 +130,42 @@ public void configureBaseApplication(Configurable<?> config) {
public T getConfiguration() {
return this.config;
}

/**
* Start the server (creating it if necessary).
* @throws Exception
*/
public void start() throws Exception {
if (server == null) {
createServer();
}
server.start();
}

/**
* Wait for the server to exit, allowing existing requests to complete if graceful shutdown is
* enabled and invoking the shutdown hook before returning.
* @throws InterruptedException
*/
public void join() throws InterruptedException {
server.join();
shutdownLatch.await();
}

/**
* Request that the server shutdown.
* @throws Exception
*/
public void stop() throws Exception {
server.stop();
}

/**
* Shutdown hook that is invoked after the Jetty server has processed the shutdown request,
* stopped accepting new connections, and tried to gracefully finish existing requests. At this
* point it should be safe to clean up any resources used while processing requests.
*/
public void onShutdown() {
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we make this an abstract method to force the subclasses to define onShutdown()? Most apps probably have some resources that they need to clean up on shutdown.

Other than that, the patch looks good.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I considered that since I agree most apps probably have some resources that should get cleaned up gracefully, but doing so doesn't scale down to tests and simple apps like the example. You end up having to implement methods just to leave them empty. I'm going to leave it as is for now but we can always revisit.

}

12 changes: 10 additions & 2 deletions core/src/main/java/io/confluent/rest/RestConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import java.util.Map;
import java.util.TreeMap;

public abstract class RestConfig extends AbstractConfig {
public class RestConfig extends AbstractConfig {
protected static final ConfigDef config;

public static final String DEBUG_CONFIG = "debug";
Expand All @@ -48,6 +48,11 @@ public abstract class RestConfig extends AbstractConfig {
"an Accept header.";
protected static final String RESPONSE_MEDIATYPE_DEFAULT_CONFIG_DEFAULT = "application/json";

public static final String SHUTDOWN_GRACEFUL_MS_CONFIG = "shutdown.graceful.ms";
protected static final String SHUTDOWN_GRACEFUL_MS_DOC =
"Amount of time to wait after a shutdown request for outstanding requests to complete.";
protected static final String SHUTDOWN_GRACEFUL_MS_DEFAULT = "1000";

static {
config = new ConfigDef()
.define(DEBUG_CONFIG, Type.BOOLEAN,
Expand All @@ -59,7 +64,10 @@ public abstract class RestConfig extends AbstractConfig {
RESPONSE_MEDIATYPE_PREFERRED_CONFIG_DOC)
.define(RESPONSE_MEDIATYPE_DEFAULT_CONFIG, Type.STRING,
RESPONSE_MEDIATYPE_DEFAULT_CONFIG_DEFAULT, Importance.HIGH,
RESPONSE_MEDIATYPE_DEFAULT_CONFIG_DOC);
RESPONSE_MEDIATYPE_DEFAULT_CONFIG_DOC)
.define(SHUTDOWN_GRACEFUL_MS_CONFIG, Type.INT,
SHUTDOWN_GRACEFUL_MS_DEFAULT, Importance.LOW,
SHUTDOWN_GRACEFUL_MS_DOC);
}

public RestConfig() {
Expand Down
149 changes: 149 additions & 0 deletions core/src/test/java/io/confluent/rest/ShutdownTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
/**
* Copyright 2014 Confluent Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
**/

package io.confluent.rest;

import org.junit.Test;

import java.io.IOException;
import java.net.ServerSocket;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;

import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.client.Client;
import javax.ws.rs.client.ClientBuilder;
import javax.ws.rs.core.Configurable;

import static org.junit.Assert.*;

public class ShutdownTest {
@Test
public void testShutdownHook() throws Exception {
Properties props = new Properties();
props.put("shutdown.graceful.ms", "50");
ShutdownApplication app = new ShutdownApplication(new RestConfig(props));
app.start();

StopThread stop = new StopThread(app);
stop.start();

app.join();
assertTrue(app.shutdown.get());
}

@Test
public void testGracefulShutdown() throws Exception {
Properties props = new Properties();
props.put("shutdown.graceful.ms", "50");
final RestConfig config = new RestConfig(props);
ShutdownApplication app = new ShutdownApplication(config);
app.start();

RequestThread req = new RequestThread(config);
req.start();
app.resource.requestProcessingStarted.await();

StopThread stop = new StopThread(app);
stop.start();

app.join();

assertTrue(req.finished);
assertEquals("done", req.response);
}


private static class ShutdownApplication extends Application<RestConfig> {
public AtomicBoolean shutdown = new AtomicBoolean(false);
public SlowResource resource = new SlowResource();

ShutdownApplication(RestConfig props) {
super(props);
}

@Override
public void onShutdown() {
shutdown.set(true);
}

@Override
public void setupResources(Configurable<?> config, RestConfig appConfig) {
config.register(resource);
}
}

@Path("/")
public static class SlowResource {
public CountDownLatch requestProcessingStarted = new CountDownLatch(1);

@GET
public String test() throws InterruptedException {
requestProcessingStarted.countDown();
Thread.sleep(25);
return "done";
}
}

private static class StopThread extends Thread {
ShutdownApplication app;

StopThread(ShutdownApplication app) {
this.app = app;
}

@Override
public void run() {
try {
app.stop();
} catch (Exception e) {
}
}
};

private static class RequestThread extends Thread {
RestConfig config;
boolean finished = false;
String response = null;

RequestThread(RestConfig config) {
this.config = config;
}
@Override
public void run() {
// It seems that the server isn't necessarily listening when start() returns, which makes it
// difficult to know when it is safe to make this request. Just retry until we're able to make
// the request.
while(true) {
try {
Client client = ClientBuilder.newClient();
response = client
.target("http://localhost:" + config.getInt(RestConfig.PORT_CONFIG))
.path("/")
.request()
.get(String.class);
finished = true;
return;
} catch (javax.ws.rs.ProcessingException e) {
// ignore and retry
}
}
}
};

}
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
*/
package io.confluent.rest.examples.helloworld;

import org.eclipse.jetty.server.Server;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -60,10 +59,9 @@ public static void main(String[] args) {
}
HelloWorldRestConfig config = new HelloWorldRestConfig(settings);
HelloWorldApplication app = new HelloWorldApplication(config);
Server server = app.createServer();
server.start();
app.start();
log.info("Server started, listening for requests...");
server.join();
app.join();
} catch (RestConfigException e) {
log.error("Server configuration failed: " + e.getMessage());
System.exit(1);
Expand Down