From 2d12eedc8ff2e900fb1f2f50a97a2a6955cb94b7 Mon Sep 17 00:00:00 2001 From: hns Date: Wed, 6 Feb 2008 16:52:33 +0000 Subject: [PATCH] * Transactor does not extend java.lang.Thread anymore, uses a java.lang.ThreadLocal for thread-transactor mapping instead. --- src/helma/framework/ResponseBean.java | 8 +- .../framework/core/RequestEvaluator.java | 81 ++++++++++--------- src/helma/objectmodel/db/DbSource.java | 5 +- src/helma/objectmodel/db/Node.java | 36 +++++---- src/helma/objectmodel/db/NodeManager.java | 30 ++++--- src/helma/objectmodel/db/Transactor.java | 39 ++++++--- 6 files changed, 112 insertions(+), 87 deletions(-) diff --git a/src/helma/framework/ResponseBean.java b/src/helma/framework/ResponseBean.java index c8c244a71..47e3b7514 100644 --- a/src/helma/framework/ResponseBean.java +++ b/src/helma/framework/ResponseBean.java @@ -626,8 +626,8 @@ public String getBuffer() { * @throws Exception thrown if commit fails */ public void commit() throws Exception { - if (Thread.currentThread() instanceof Transactor) { - Transactor tx = (Transactor) Thread.currentThread(); + Transactor tx = Transactor.getInstance(); + if (tx != null) { String tname = tx.getTransactionName(); tx.commit(); tx.begin(tname); @@ -640,8 +640,8 @@ public void commit() throws Exception { * @throws Exception thrown if rollback fails */ public void rollback() throws Exception { - if (Thread.currentThread() instanceof Transactor) { - Transactor tx = (Transactor) Thread.currentThread(); + Transactor tx = Transactor.getInstance(); + if (tx != null) { String tname = tx.getTransactionName(); tx.abort(); tx.begin(tname); diff --git a/src/helma/framework/core/RequestEvaluator.java b/src/helma/framework/core/RequestEvaluator.java index 783e6251c..d53685051 100644 --- a/src/helma/framework/core/RequestEvaluator.java +++ b/src/helma/framework/core/RequestEvaluator.java @@ -53,7 +53,7 @@ public final class RequestEvaluator implements Runnable { private volatile ResponseTrans res; // the one and only transactor thread - private volatile Transactor rtx; + private volatile Thread thread; // the type of request to be serviced, // used to coordinate worker and waiter threads @@ -128,13 +128,13 @@ protected synchronized void initScriptingEngine() { public void run() { // first, set a local variable to the current transactor thread so we know // when it's time to quit because another thread took over. - Transactor localrtx = (Transactor) Thread.currentThread(); + Thread localThread = Thread.currentThread(); // spans whole execution loop - close connections in finally clause try { // while this thread is serving requests - while (localrtx == rtx) { + while (localThread == thread) { // object reference to ressolve request path Object currentElement; @@ -153,7 +153,7 @@ public void run() { String functionName = function instanceof String ? (String) function : null; - while (!done && localrtx == rtx) { + while (!done && localThread == thread) { // catch errors in path resolution and script execution try { @@ -196,7 +196,7 @@ public void run() { txname.append((error == null) ? req.getPath() : "error"); // begin transaction - localrtx.begin(txname.toString()); + Transactor.getInstance(app.nmgr).begin(txname.toString()); Object root = app.getDataRoot(); initGlobals(root, requestPath); @@ -412,7 +412,7 @@ public void run() { } // check if request is still valid, or if the requesting thread has stopped waiting already - if (localrtx != rtx) { + if (localThread != thread) { return; } commitTransaction(); @@ -459,13 +459,13 @@ public void run() { ScriptingEngine.ARGS_WRAP_XMLRPC, false); // check if request is still valid, or if the requesting thread has stopped waiting already - if (localrtx != rtx) { + if (localThread != thread) { return; } commitTransaction(); } catch (Exception x) { // check if request is still valid, or if the requesting thread has stopped waiting already - if (localrtx != rtx) { + if (localThread != thread) { return; } abortTransaction(); @@ -473,7 +473,7 @@ public void run() { // If the transactor thread has been killed by the invoker thread we don't have to // bother for the error message, just quit. - if (localrtx != rtx) { + if (localThread != thread) { return; } @@ -495,13 +495,13 @@ public void run() { ScriptingEngine.ARGS_WRAP_DEFAULT, true); // check if request is still valid, or if the requesting thread has stopped waiting already - if (localrtx != rtx) { + if (localThread != thread) { return; } commitTransaction(); } catch (Exception x) { // check if request is still valid, or if the requesting thread has stopped waiting already - if (localrtx != rtx) { + if (localThread != thread) { return; } abortTransaction(); @@ -509,7 +509,7 @@ public void run() { // If the transactor thread has been killed by the invoker thread we don't have to // bother for the error message, just quit. - if (localrtx != rtx) { + if (localThread != thread) { return; } @@ -524,7 +524,7 @@ public void run() { // res.abort() just aborts the transaction and // leaves the response untouched // check if request is still valid, or if the requesting thread has stopped waiting already - if (localrtx != rtx) { + if (localThread != thread) { return; } abortTransaction(); @@ -535,7 +535,7 @@ public void run() { if (++tries < 8) { // try again after waiting some period // check if request is still valid, or if the requesting thread has stopped waiting already - if (localrtx != rtx) { + if (localThread != thread) { return; } abortTransaction(); @@ -549,11 +549,11 @@ public void run() { res.reportError(interrupt); done = true; // and release resources and thread - rtx = null; + thread = null; } } else { // check if request is still valid, or if the requesting thread has stopped waiting already - if (localrtx != rtx) { + if (localThread != thread) { return; } abortTransaction(); @@ -563,16 +563,15 @@ public void run() { done = true; } } catch (Throwable x) { - String txname = localrtx.getTransactionName(); // check if request is still valid, or if the requesting thread has stopped waiting already - if (localrtx != rtx) { + if (localThread != thread) { return; } abortTransaction(); // If the transactor thread has been killed by the invoker thread we don't have to // bother for the error message, just quit. - if (localrtx != rtx) { + if (localThread != thread) { return; } @@ -589,6 +588,7 @@ public void run() { done = false; error = x; + String txname = Transactor.getInstance(app.nmgr).getTransactionName(); app.logError(txname + ": " + error, x); if (req.isXmlRpc()) { @@ -618,7 +618,7 @@ public void run() { } } finally { - localrtx.closeConnections(); + Transactor.getInstance(app.nmgr).closeConnections(); } } @@ -627,10 +627,12 @@ public void run() { * @throws Exception transaction couldn't be committed */ synchronized void commitTransaction() throws Exception { - Transactor localrtx = (Transactor) Thread.currentThread(); + Thread localThread = Thread.currentThread(); - if (localrtx == rtx) { - localrtx.commit(); + if (localThread == thread) { + Transactor tx = Transactor.getInstance(); + if (tx != null) + tx.commit(); } else { throw new TimeoutException(); } @@ -640,7 +642,7 @@ synchronized void commitTransaction() throws Exception { * Called by the transactor thread when the request didn't terminate successfully. */ synchronized void abortTransaction() { - Transactor localrtx = (Transactor) Thread.currentThread(); + Transactor localrtx = Transactor.getInstance(app.nmgr); localrtx.abort(); } @@ -652,11 +654,11 @@ private synchronized void startTransactor() { throw new ApplicationStoppedException(); } - if ((rtx == null) || !rtx.isAlive()) { + if ((thread == null) || !thread.isAlive()) { // app.logEvent ("Starting Thread"); - rtx = new Transactor(this, app.threadgroup, app.nmgr); - rtx.setContextClassLoader(app.getClassLoader()); - rtx.start(); + thread = new Thread(app.threadgroup, this); + thread.setContextClassLoader(app.getClassLoader()); + thread.start(); } else { notifyAll(); } @@ -666,14 +668,17 @@ private synchronized void startTransactor() { * Tell waiting thread that we're done, then wait for next request */ synchronized void notifyAndWait() { - Transactor localrtx = (Transactor) Thread.currentThread(); + Thread localThread = Thread.currentThread(); // make sure there is only one thread running per instance of this class // if localrtx != rtx, the current thread has been aborted and there's no need to notify - if (localrtx != rtx) { + if (localThread != thread) { // A new request came in while we were finishing the last one. // Return to run() to get the work done. - localrtx.closeConnections(); + Transactor tx = Transactor.getInstance(); + if (tx != null) { + tx.closeConnections(); + } return; } @@ -685,16 +690,16 @@ synchronized void notifyAndWait() { wait(1000 * 60 * 10); } catch (InterruptedException ix) { // we got interrrupted, releases resources and thread - rtx = null; + thread = null; } // if no request arrived, release ressources and thread - if ((reqtype == NONE) && (rtx == localrtx)) { + if ((reqtype == NONE) && (thread == localThread)) { // comment this in to release not just the thread, but also the scripting engine. // currently we don't do this because of the risk of memory leaks (objects from // framework referencing into the scripting engine) // scriptingEngine = null; - rtx = null; + thread = null; } } @@ -704,8 +709,8 @@ synchronized void notifyAndWait() { * thread. If currently active kill the request, otherwise just notify. */ synchronized boolean stopTransactor() { - Transactor t = rtx; - rtx = null; + Transactor t = Transactor.getInstance(); + thread = null; boolean stopped = false; if (t != null && t.isActive()) { // let the scripting engine know that the @@ -1092,8 +1097,8 @@ public synchronized ResponseTrans getResponse() { * * @return the current transactor thread */ - public synchronized Transactor getThread() { - return rtx; + public synchronized Thread getThread() { + return thread; } /** diff --git a/src/helma/objectmodel/db/DbSource.java b/src/helma/objectmodel/db/DbSource.java index 7d89e72b5..2e18179c9 100644 --- a/src/helma/objectmodel/db/DbSource.java +++ b/src/helma/objectmodel/db/DbSource.java @@ -69,9 +69,8 @@ public DbSource(String name, ResourceProperties props) public synchronized Connection getConnection() throws ClassNotFoundException, SQLException { Connection con; - Transactor tx = null; - if (Thread.currentThread() instanceof Transactor) { - tx = (Transactor) Thread.currentThread(); + Transactor tx = Transactor.getInstance(); + if (tx != null) { con = tx.getConnection(this); } else { con = getThreadLocalConnection(); diff --git a/src/helma/objectmodel/db/Node.java b/src/helma/objectmodel/db/Node.java index 8265ab1d9..1aacde3de 100644 --- a/src/helma/objectmodel/db/Node.java +++ b/src/helma/objectmodel/db/Node.java @@ -266,9 +266,10 @@ synchronized void checkWriteLock() { return; // no need to lock transient node } - Transactor current = (Transactor) Thread.currentThread(); + Thread thread = Thread.currentThread(); + Transactor tx = Transactor.getInstance(nmgr.nmgr); - if (!current.isActive()) { + if (!tx.isActive()) { throw new helma.framework.TimeoutException(); } @@ -279,14 +280,14 @@ synchronized void checkWriteLock() { " was invalidated by another thread."); } - if ((lock != null) && (lock != current) && lock.isAlive() && lock.isActive()) { + if ((lock != null) && (lock != tx) && /* lock.isAlive() && */ lock.isActive()) { // nmgr.logEvent("Concurrency conflict for " + this + ", lock held by " + lock); throw new ConcurrencyException("Tried to modify " + this + " from two threads at the same time."); } - current.visitDirtyNode(this); - lock = current; + tx.visitDirtyNode(this); + lock = tx; } /** @@ -306,9 +307,8 @@ void markAs(int s) { state = s; - if (Thread.currentThread() instanceof Transactor) { - Transactor tx = (Transactor) Thread.currentThread(); - + Transactor tx = Transactor.getInstance(); + if (tx != null) { if (s == CLEAN) { clearWriteLock(); tx.dropDirtyNode(this); @@ -332,9 +332,11 @@ void registerSubnodeChange() { // the process of being persistified - except if "manual" subnoderelation is set. if ((state == TRANSIENT || state == NEW) && subnodeRelation == null) { return; - } else if (Thread.currentThread() instanceof Transactor) { - Transactor tx = (Transactor) Thread.currentThread(); - tx.visitParentNode(this); + } else { + Transactor tx = Transactor.getInstance(); + if (tx != null) { + tx.visitParentNode(this); + } } } @@ -934,7 +936,7 @@ public INode addNode(INode elem, int where) { } if (state != TRANSIENT) { - Transactor tx = (Transactor) Thread.currentThread(); + Transactor tx = Transactor.getInstance(nmgr.nmgr); SyntheticKey key = new SyntheticKey(this.getKey(), prop); tx.visitCleanNode(key, node); nmgr.registerNode(node, key); @@ -1250,7 +1252,7 @@ protected Node getGroupbySubnode(String sid, boolean create) { // nodemanager. Otherwise, we just evict whatever was there before if (create) { // register group node with transactor - Transactor tx = (Transactor) Thread.currentThread(); + Transactor tx = Transactor.getInstance(nmgr.nmgr); tx.visitCleanNode(node); nmgr.registerNode(node); } else { @@ -2387,7 +2389,7 @@ public void setNode(String propname, INode value) { // this is done anyway when the node becomes persistent. if (n.state != TRANSIENT) { // check node in with transactor cache - Transactor tx = (Transactor) Thread.currentThread(); + Transactor tx = Transactor.getInstance(nmgr.nmgr); // tx.visitCleanNode (new DbKey (dbm, nID), n); // UPDATE: using n.getKey() instead of manually constructing key. HW 2002/09/13 @@ -2539,9 +2541,9 @@ private void makePersistable() { getHandle().becomePersistent(); // register node with the transactor - Transactor current = (Transactor) Thread.currentThread(); - current.visitDirtyNode(this); - current.visitCleanNode(this); + Transactor tx = Transactor.getInstance(nmgr.nmgr); + tx.visitDirtyNode(this); + tx.visitCleanNode(this); // recursively make children persistable makeChildrenPersistable(); diff --git a/src/helma/objectmodel/db/NodeManager.java b/src/helma/objectmodel/db/NodeManager.java index 04a8a9a41..f07120d6d 100644 --- a/src/helma/objectmodel/db/NodeManager.java +++ b/src/helma/objectmodel/db/NodeManager.java @@ -149,7 +149,7 @@ public void shutdown() throws DatabaseException { public void deleteNode(Node node) throws Exception { if (node != null) { synchronized (this) { - Transactor tx = (Transactor) Thread.currentThread(); + Transactor tx = Transactor.getInstance(app.getNodeManager()); node.setState(Node.INVALID); deleteNode(db, tx.txn, node); @@ -162,7 +162,7 @@ public void deleteNode(Node node) throws Exception { * a reference to another node via a NodeHandle/Key. */ public Node getNode(Key key) throws Exception { - Transactor tx = (Transactor) Thread.currentThread(); + Transactor tx = Transactor.getInstance(app.getNodeManager()); // See if Transactor has already come across this node Node node = tx.getCleanNode(key); @@ -212,7 +212,7 @@ public Node getNode(Node home, String kstr, Relation rel) return null; } - Transactor tx = (Transactor) Thread.currentThread(); + Transactor tx = Transactor.getInstance(app.getNodeManager()); Key key; DbMapping otherDbm = rel == null ? null : rel.otherType; @@ -408,8 +408,9 @@ public void evictNodeByKey(Key key) { public void evictKey(Key key) { cache.remove(key); // also drop key from thread-local transactor cache - if (Thread.currentThread() instanceof Transactor) { - ((Transactor) Thread.currentThread()).dropCleanNode(key); + Transactor tx = Transactor.getInstance(); + if (tx != null) { + tx.dropCleanNode(key); } } @@ -1856,14 +1857,17 @@ public Node createNode(DbMapping dbm, ResultSet rs, DbColumn[] columns, int offs if (id == null) { return null; - } else if (Thread.currentThread() instanceof Transactor) { - // Check if the node is already registered with the transactor - - // it may be in the process of being DELETED, but do return the - // new node if the old one has been marked as INVALID. - DbKey key = new DbKey(dbmap, id); - Node dirtyNode = ((Transactor) Thread.currentThread()).getDirtyNode(key); - if (dirtyNode != null && dirtyNode.getState() != Node.INVALID) { - return dirtyNode; + } else { + Transactor tx = Transactor.getInstance(); + if (tx != null) { + // Check if the node is already registered with the transactor - + // it may be in the process of being DELETED, but do return the + // new node if the old one has been marked as INVALID. + DbKey key = new DbKey(dbmap, id); + Node dirtyNode = tx.getDirtyNode(key); + if (dirtyNode != null && dirtyNode.getState() != Node.INVALID) { + return dirtyNode; + } } } diff --git a/src/helma/objectmodel/db/Transactor.java b/src/helma/objectmodel/db/Transactor.java index 8edc1eec7..0c159a831 100644 --- a/src/helma/objectmodel/db/Transactor.java +++ b/src/helma/objectmodel/db/Transactor.java @@ -28,7 +28,7 @@ * A subclass of thread that keeps track of changed nodes and triggers * changes in the database when a transaction is commited. */ -public class Transactor extends Thread { +public class Transactor { // The associated node manager NodeManager nmgr; @@ -61,15 +61,15 @@ public class Transactor extends Thread { // a name to log the transaction. For HTTP transactions this is the rerquest path private String tname; + private static final ThreadLocal txtor = new ThreadLocal (); + /** * Creates a new Transactor object. * - * @param runnable ... - * @param group ... * @param nmgr ... */ - public Transactor(Runnable runnable, ThreadGroup group, NodeManager nmgr) { - super(group, runnable, group.getName()); + private Transactor(NodeManager nmgr) { + // super(group, runnable, group.getName()); this.nmgr = nmgr; dirtyNodes = new HashMap(); @@ -82,6 +82,19 @@ public Transactor(Runnable runnable, ThreadGroup group, NodeManager nmgr) { killed = false; } + public static Transactor getInstance() { + return txtor.get(); + } + + public static Transactor getInstance(NodeManager nmgr) { + Transactor t = txtor.get(); + if (t == null) { + t = new Transactor(nmgr); + txtor.set(t); + } + return t; + } + /** * Mark a Node as modified/created/deleted during this transaction * @@ -430,28 +443,30 @@ public synchronized void abort() { * Kill this transaction thread. Used as last measure only. */ public synchronized void kill() { + Thread thread = Thread.currentThread(); + killed = true; - interrupt(); + thread.interrupt(); // Interrupt the thread if it has not noticed the flag (e.g. because it is busy // reading from a network socket). - if (isAlive()) { - interrupt(); + if (thread.isAlive()) { + thread.interrupt(); try { - join(1000); + thread.join(1000); } catch (InterruptedException ir) { // interrupted by other thread } } - if (isAlive() && "true".equals(nmgr.app.getProperty("requestTimeoutStop"))) { + if (thread.isAlive() && "true".equals(nmgr.app.getProperty("requestTimeoutStop"))) { // still running - check if we ought to stop() it try { Thread.sleep(2000); - if (isAlive()) { + if (thread.isAlive()) { // thread is still running, pull emergency break nmgr.app.logEvent("Stopping Thread for Transactor " + this); - stop(); + thread.stop(); } } catch (InterruptedException ir) { // interrupted by other thread