Skip to content

Confgurable executor service for IRP queue #2

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 10 commits into
base: master
Choose a base branch
from
Open
132 changes: 42 additions & 90 deletions src/main/java/org/usb4java/javax/AbstractIrpQueue.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,9 @@
package org.usb4java.javax;

import java.nio.ByteBuffer;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import javax.usb.UsbControlIrp;
import javax.usb.UsbException;
Expand All @@ -27,18 +28,18 @@
*/
abstract class AbstractIrpQueue<T extends UsbIrp>
{
/** The queued packets. */
private final Queue<T> irps = new ConcurrentLinkedQueue<T>();

/** The queue processor thread. */
private volatile Thread processor;

/** If queue is currently aborting. */
private volatile boolean aborting;

/** The USB device. */
private final AbstractDevice device;

/** The non-parallel ExecutorService we will use for this queue on this device. */
private final ExecutorService singleThreadExecutor;

/** The job counter for active jobs in this queue. */
private final AtomicInteger activeJobs = new AtomicInteger(0);

/**
* Constructor.
*
Expand All @@ -50,6 +51,8 @@ abstract class AbstractIrpQueue<T extends UsbIrp>
if (device == null)
throw new IllegalArgumentException("device must be set");
this.device = device;

this.singleThreadExecutor = Services.getInstance().getConfig().newExecutorService();
}

/**
Expand All @@ -60,73 +63,29 @@ abstract class AbstractIrpQueue<T extends UsbIrp>
*/
public final void add(final T irp)
{
this.irps.add(irp);

// Start the queue processor if not already running.
if (this.processor == null)
{
this.processor = new Thread(new Runnable()
{
@Override
public void run()
{
process();
}
});
this.processor.setDaemon(true);
this.processor.setName("usb4java IRP Queue Processor");
this.processor.start();
}
}

/**
* Processes the queue. Methods returns when the queue is empty.
*/
final void process()
{
// Get the next IRP
T irp = this.irps.poll();

// If there are no IRPs to process then mark the thread as closing
// right away. Otherwise process the IRP (and more IRPs from the queue
// if present).
if (irp == null)
{
this.processor = null;
}
else
{
while (irp != null)
{
// Process the IRP
try
{
processIrp(irp);
singleThreadExecutor.execute(new Runnable() {
final T irp0 = irp;

@Override
public void run() {
activeJobs.incrementAndGet();

try {
if (!aborting) {
try {
processIrp(irp0);
} catch (final UsbException e) {
irp0.setUsbException(e);
}

irp0.complete();
finishIrp(irp0);
}
} finally {
activeJobs.decrementAndGet();
}
catch (final UsbException e)
{
irp.setUsbException(e);
}

// Get next IRP and mark the thread as closing before sending
// the events for the previous IRP
final T nextIrp = this.irps.poll();
if (nextIrp == null) this.processor = null;

// Finish the previous IRP
irp.complete();
finishIrp(irp);

// Process next IRP (if present)
irp = nextIrp;
}
}

// No more IRPs are present in the queue so terminate the thread.
synchronized (this.irps)
{
this.irps.notifyAll();
}
});
}

/**
Expand Down Expand Up @@ -156,22 +115,15 @@ final void process()
public final void abort()
{
this.aborting = true;
this.irps.clear();
while (isBusy())
{
try
{
synchronized (this.irps)
{
if (isBusy()) this.irps.wait();
}
}
catch (final InterruptedException e)
{
Thread.currentThread().interrupt();
}

singleThreadExecutor.shutdown();
try {
singleThreadExecutor.awaitTermination(4, TimeUnit.SECONDS);
this.aborting = false;
} catch (InterruptedException e) {
e.printStackTrace();
Thread.currentThread().interrupt();
}
this.aborting = false;
}

/**
Expand All @@ -180,9 +132,9 @@ public final void abort()
*
* @return True if queue is busy, false if not.
*/
public final boolean isBusy()
public final synchronized boolean isBusy()
{
return !this.irps.isEmpty() || this.processor != null;
return activeJobs.get() > 0;
}

/**
Expand Down
66 changes: 66 additions & 0 deletions src/main/java/org/usb4java/javax/Config.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
package org.usb4java.javax;

import java.util.Properties;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;

/**
* Configuration.
Expand All @@ -29,12 +31,54 @@ final class Config
/** Key name for USB communication timeout. */
private static final String SCAN_INTERVAL_KEY = KEY_BASE + "scanInterval";

/** Key name for USB IRP executor. */
private static final String EXECUTOR_SERVICE_KEY = KEY_BASE + "irpExecutorService";

/** The timeout for USB communication in milliseconds. */
private int timeout = DEFAULT_TIMEOUT;

/** The scan interval in milliseconds. */
private int scanInterval = DEFAULT_SCAN_INTERVAL;

/** The executor service factory. */
private ExecutorServiceProvider executorService = new ExecutorServiceProvider() {
private final AtomicInteger poolNumber = new AtomicInteger(1);

class LocalThreadFactory extends Object implements ThreadFactory {
private final ThreadGroup group;
private final AtomicInteger threadNumber = new AtomicInteger(1);
private final String namePrefix;

LocalThreadFactory() {
SecurityManager s = System.getSecurityManager();
group = (s != null) ? s.getThreadGroup() :
Thread.currentThread().getThreadGroup();
namePrefix = "usb4java-irp-" +
poolNumber.getAndIncrement() +
"-thread-";
}

public Thread newThread(Runnable r) {
Thread t = new Thread(group, r,
namePrefix + threadNumber.getAndIncrement(),
0);
t.setDaemon(true);
if (t.getPriority() != Thread.MAX_PRIORITY)
t.setPriority(Thread.MAX_PRIORITY);
return t;
}
}

public ExecutorService newExecutorService() {
/* The default executor is a pool of max 1 thread, with 3s timeout. */
ThreadPoolExecutor es = new ThreadPoolExecutor(0, 1,
3L, TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>());
es.setThreadFactory(new LocalThreadFactory());
return es;
}
};

/**
* Constructs new configuration from the specified properties.
*
Expand All @@ -55,6 +99,22 @@ final class Config
this.scanInterval = Integer.valueOf(properties.getProperty(
SCAN_INTERVAL_KEY));
}

// Read the irp executor class
if (properties.containsKey(EXECUTOR_SERVICE_KEY))
{
try {
Class<?> cls = getClass().getClassLoader().loadClass(properties.getProperty(
EXECUTOR_SERVICE_KEY));
this.executorService = (ExecutorServiceProvider)cls.newInstance();
} catch (ClassNotFoundException e) {
e.printStackTrace();
} catch (InstantiationException e) {
e.printStackTrace();
} catch (IllegalAccessException e) {
e.printStackTrace();
}
}
}

/**
Expand All @@ -76,4 +136,10 @@ public int getScanInterval()
{
return this.scanInterval;
}

/**
* Creates a new non-parallel execution service. Defaults to single thread exec
* @return new non-parallel ExecutorService
*/
public ExecutorService newExecutorService() { return this.executorService.newExecutorService(); }
}
7 changes: 7 additions & 0 deletions src/main/java/org/usb4java/javax/ExecutorServiceProvider.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package org.usb4java.javax;

import java.util.concurrent.ExecutorService;

public interface ExecutorServiceProvider {
public ExecutorService newExecutorService();
}