Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix multithreaded problem with orphaned timeouts #214

Merged
merged 1 commit into from
Feb 12, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
fix multithreaded problem with orphaned timeouts that appears in high…
…ly concurrent non-atomic sequence cancel() -> schedule()
  • Loading branch information
Alim Akbashev committed Feb 11, 2015
commit aca0205a9675326630389ebff8b1a54748a70d78
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package com.corundumstudio.socketio;

import com.corundumstudio.socketio.scheduler.HashedWheelTimeoutScheduler;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
Expand Down Expand Up @@ -82,7 +83,7 @@ public class SocketIOChannelInitializer extends ChannelInitializer<Channel> impl
private EncoderHandler encoderHandler;
private WrongUrlHandler wrongUrlHandler;

private CancelableScheduler scheduler = new HashedWheelScheduler();
private CancelableScheduler scheduler = new HashedWheelTimeoutScheduler();

private InPacketHandler packetHandler;
private SSLContext sslContext;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,6 @@ public void cancelPingTimeout() {
}

public void schedulePingTimeout() {
cancelPingTimeout();
SchedulerKey key = new SchedulerKey(Type.PING_TIMEOUT, sessionId);
disconnectScheduler.schedule(key, new Runnable() {
@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
/**
* Modified version of HashedWheelScheduler specially for timeouts handling.
* Difference:
* - handling old timeout with same key after adding new one
* fixes multithreaded problem that appears in highly concurrent non-atomic sequence cancel() -> schedule()
*
* (c) Alim Akbashev, 2015-02-11
*/

/**
* Copyright 2012 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.corundumstudio.socketio.scheduler;

import io.netty.channel.ChannelHandlerContext;
import io.netty.util.HashedWheelTimer;
import io.netty.util.Timeout;
import io.netty.util.TimerTask;
import io.netty.util.internal.PlatformDependent;

import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;

public class HashedWheelTimeoutScheduler implements CancelableScheduler {

private final ConcurrentMap<SchedulerKey, Timeout> scheduledFutures = PlatformDependent.newConcurrentHashMap();
private final HashedWheelTimer executorService = new HashedWheelTimer();

private volatile ChannelHandlerContext ctx;

@Override
public void update(ChannelHandlerContext ctx) {
this.ctx = ctx;
}

public void cancel(SchedulerKey key) {
Timeout timeout = scheduledFutures.remove(key);
if (timeout != null) {
timeout.cancel();
}
}

public void schedule(final Runnable runnable, long delay, TimeUnit unit) {
executorService.newTimeout(new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
runnable.run();
}
}, delay, unit);
}

public void scheduleCallback(final SchedulerKey key, final Runnable runnable, long delay, TimeUnit unit) {
Timeout timeout = executorService.newTimeout(new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
ctx.executor().execute(new Runnable() {
@Override
public void run() {
try {
runnable.run();
} finally {
scheduledFutures.remove(key);
}
}
});
}
}, delay, unit);

replaceScheduledFuture(key, timeout);
}

public void schedule(final SchedulerKey key, final Runnable runnable, long delay, TimeUnit unit) {
Timeout timeout = executorService.newTimeout(new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
try {
runnable.run();
} finally {
scheduledFutures.remove(key);
}
}
}, delay, unit);

replaceScheduledFuture(key, timeout);
}

public void shutdown() {
executorService.stop();
}

private void replaceScheduledFuture(final SchedulerKey key, final Timeout newTimeout) {
final Timeout oldTimeout;

if (newTimeout.isExpired()) {
// no need to put already expired timeout to scheduledFutures map.
// simply remove old timeout
oldTimeout = scheduledFutures.remove(key);
} else {
oldTimeout = scheduledFutures.put(key, newTimeout);
}

// if there was old timeout, cancel it
if (oldTimeout != null) {
oldTimeout.cancel();
}
}
}