Skip to content

Commit

Permalink
* Transactor does not extend java.lang.Thread anymore, uses a java.la…
Browse files Browse the repository at this point in the history
…ng.ThreadLocal

  for thread-transactor mapping instead.
  • Loading branch information
hns committed Feb 6, 2008
1 parent 8b55032 commit 2d12eed
Show file tree
Hide file tree
Showing 6 changed files with 112 additions and 87 deletions.
8 changes: 4 additions & 4 deletions src/helma/framework/ResponseBean.java
Original file line number Diff line number Diff line change
Expand Up @@ -626,8 +626,8 @@ public String getBuffer() {
* @throws Exception thrown if commit fails
*/
public void commit() throws Exception {
if (Thread.currentThread() instanceof Transactor) {
Transactor tx = (Transactor) Thread.currentThread();
Transactor tx = Transactor.getInstance();
if (tx != null) {
String tname = tx.getTransactionName();
tx.commit();
tx.begin(tname);
Expand All @@ -640,8 +640,8 @@ public void commit() throws Exception {
* @throws Exception thrown if rollback fails
*/
public void rollback() throws Exception {
if (Thread.currentThread() instanceof Transactor) {
Transactor tx = (Transactor) Thread.currentThread();
Transactor tx = Transactor.getInstance();
if (tx != null) {
String tname = tx.getTransactionName();
tx.abort();
tx.begin(tname);
Expand Down
81 changes: 43 additions & 38 deletions src/helma/framework/core/RequestEvaluator.java
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ public final class RequestEvaluator implements Runnable {
private volatile ResponseTrans res;

// the one and only transactor thread
private volatile Transactor rtx;
private volatile Thread thread;

// the type of request to be serviced,
// used to coordinate worker and waiter threads
Expand Down Expand Up @@ -128,13 +128,13 @@ protected synchronized void initScriptingEngine() {
public void run() {
// first, set a local variable to the current transactor thread so we know
// when it's time to quit because another thread took over.
Transactor localrtx = (Transactor) Thread.currentThread();
Thread localThread = Thread.currentThread();

// spans whole execution loop - close connections in finally clause
try {

// while this thread is serving requests
while (localrtx == rtx) {
while (localThread == thread) {

// object reference to ressolve request path
Object currentElement;
Expand All @@ -153,7 +153,7 @@ public void run() {
String functionName = function instanceof String ?
(String) function : null;

while (!done && localrtx == rtx) {
while (!done && localThread == thread) {
// catch errors in path resolution and script execution
try {

Expand Down Expand Up @@ -196,7 +196,7 @@ public void run() {
txname.append((error == null) ? req.getPath() : "error");

// begin transaction
localrtx.begin(txname.toString());
Transactor.getInstance(app.nmgr).begin(txname.toString());

Object root = app.getDataRoot();
initGlobals(root, requestPath);
Expand Down Expand Up @@ -412,7 +412,7 @@ public void run() {
}

// check if request is still valid, or if the requesting thread has stopped waiting already
if (localrtx != rtx) {
if (localThread != thread) {
return;
}
commitTransaction();
Expand Down Expand Up @@ -459,21 +459,21 @@ public void run() {
ScriptingEngine.ARGS_WRAP_XMLRPC,
false);
// check if request is still valid, or if the requesting thread has stopped waiting already
if (localrtx != rtx) {
if (localThread != thread) {
return;
}
commitTransaction();
} catch (Exception x) {
// check if request is still valid, or if the requesting thread has stopped waiting already
if (localrtx != rtx) {
if (localThread != thread) {
return;
}
abortTransaction();
app.logError(txname + ": " + error, x);

// If the transactor thread has been killed by the invoker thread we don't have to
// bother for the error message, just quit.
if (localrtx != rtx) {
if (localThread != thread) {
return;
}

Expand All @@ -495,21 +495,21 @@ public void run() {
ScriptingEngine.ARGS_WRAP_DEFAULT,
true);
// check if request is still valid, or if the requesting thread has stopped waiting already
if (localrtx != rtx) {
if (localThread != thread) {
return;
}
commitTransaction();
} catch (Exception x) {
// check if request is still valid, or if the requesting thread has stopped waiting already
if (localrtx != rtx) {
if (localThread != thread) {
return;
}
abortTransaction();
app.logError(txname + ": " + error, x);

// If the transactor thread has been killed by the invoker thread we don't have to
// bother for the error message, just quit.
if (localrtx != rtx) {
if (localThread != thread) {
return;
}

Expand All @@ -524,7 +524,7 @@ public void run() {
// res.abort() just aborts the transaction and
// leaves the response untouched
// check if request is still valid, or if the requesting thread has stopped waiting already
if (localrtx != rtx) {
if (localThread != thread) {
return;
}
abortTransaction();
Expand All @@ -535,7 +535,7 @@ public void run() {
if (++tries < 8) {
// try again after waiting some period
// check if request is still valid, or if the requesting thread has stopped waiting already
if (localrtx != rtx) {
if (localThread != thread) {
return;
}
abortTransaction();
Expand All @@ -549,11 +549,11 @@ public void run() {
res.reportError(interrupt);
done = true;
// and release resources and thread
rtx = null;
thread = null;
}
} else {
// check if request is still valid, or if the requesting thread has stopped waiting already
if (localrtx != rtx) {
if (localThread != thread) {
return;
}
abortTransaction();
Expand All @@ -563,16 +563,15 @@ public void run() {
done = true;
}
} catch (Throwable x) {
String txname = localrtx.getTransactionName();
// check if request is still valid, or if the requesting thread has stopped waiting already
if (localrtx != rtx) {
if (localThread != thread) {
return;
}
abortTransaction();

// If the transactor thread has been killed by the invoker thread we don't have to
// bother for the error message, just quit.
if (localrtx != rtx) {
if (localThread != thread) {
return;
}

Expand All @@ -589,6 +588,7 @@ public void run() {
done = false;
error = x;

String txname = Transactor.getInstance(app.nmgr).getTransactionName();
app.logError(txname + ": " + error, x);

if (req.isXmlRpc()) {
Expand Down Expand Up @@ -618,7 +618,7 @@ public void run() {

}
} finally {
localrtx.closeConnections();
Transactor.getInstance(app.nmgr).closeConnections();
}
}

Expand All @@ -627,10 +627,12 @@ public void run() {
* @throws Exception transaction couldn't be committed
*/
synchronized void commitTransaction() throws Exception {
Transactor localrtx = (Transactor) Thread.currentThread();
Thread localThread = Thread.currentThread();

if (localrtx == rtx) {
localrtx.commit();
if (localThread == thread) {
Transactor tx = Transactor.getInstance();
if (tx != null)
tx.commit();
} else {
throw new TimeoutException();
}
Expand All @@ -640,7 +642,7 @@ synchronized void commitTransaction() throws Exception {
* Called by the transactor thread when the request didn't terminate successfully.
*/
synchronized void abortTransaction() {
Transactor localrtx = (Transactor) Thread.currentThread();
Transactor localrtx = Transactor.getInstance(app.nmgr);
localrtx.abort();
}

Expand All @@ -652,11 +654,11 @@ private synchronized void startTransactor() {
throw new ApplicationStoppedException();
}

if ((rtx == null) || !rtx.isAlive()) {
if ((thread == null) || !thread.isAlive()) {
// app.logEvent ("Starting Thread");
rtx = new Transactor(this, app.threadgroup, app.nmgr);
rtx.setContextClassLoader(app.getClassLoader());
rtx.start();
thread = new Thread(app.threadgroup, this);
thread.setContextClassLoader(app.getClassLoader());
thread.start();
} else {
notifyAll();
}
Expand All @@ -666,14 +668,17 @@ private synchronized void startTransactor() {
* Tell waiting thread that we're done, then wait for next request
*/
synchronized void notifyAndWait() {
Transactor localrtx = (Transactor) Thread.currentThread();
Thread localThread = Thread.currentThread();

// make sure there is only one thread running per instance of this class
// if localrtx != rtx, the current thread has been aborted and there's no need to notify
if (localrtx != rtx) {
if (localThread != thread) {
// A new request came in while we were finishing the last one.
// Return to run() to get the work done.
localrtx.closeConnections();
Transactor tx = Transactor.getInstance();
if (tx != null) {
tx.closeConnections();
}
return;
}

Expand All @@ -685,16 +690,16 @@ synchronized void notifyAndWait() {
wait(1000 * 60 * 10);
} catch (InterruptedException ix) {
// we got interrrupted, releases resources and thread
rtx = null;
thread = null;
}

// if no request arrived, release ressources and thread
if ((reqtype == NONE) && (rtx == localrtx)) {
if ((reqtype == NONE) && (thread == localThread)) {
// comment this in to release not just the thread, but also the scripting engine.
// currently we don't do this because of the risk of memory leaks (objects from
// framework referencing into the scripting engine)
// scriptingEngine = null;
rtx = null;
thread = null;
}
}

Expand All @@ -704,8 +709,8 @@ synchronized void notifyAndWait() {
* thread. If currently active kill the request, otherwise just notify.
*/
synchronized boolean stopTransactor() {
Transactor t = rtx;
rtx = null;
Transactor t = Transactor.getInstance();
thread = null;
boolean stopped = false;
if (t != null && t.isActive()) {
// let the scripting engine know that the
Expand Down Expand Up @@ -1092,8 +1097,8 @@ public synchronized ResponseTrans getResponse() {
*
* @return the current transactor thread
*/
public synchronized Transactor getThread() {
return rtx;
public synchronized Thread getThread() {
return thread;
}

/**
Expand Down
5 changes: 2 additions & 3 deletions src/helma/objectmodel/db/DbSource.java
Original file line number Diff line number Diff line change
Expand Up @@ -69,9 +69,8 @@ public DbSource(String name, ResourceProperties props)
public synchronized Connection getConnection()
throws ClassNotFoundException, SQLException {
Connection con;
Transactor tx = null;
if (Thread.currentThread() instanceof Transactor) {
tx = (Transactor) Thread.currentThread();
Transactor tx = Transactor.getInstance();
if (tx != null) {
con = tx.getConnection(this);
} else {
con = getThreadLocalConnection();
Expand Down
Loading

0 comments on commit 2d12eed

Please sign in to comment.