Skip to content

Commit

Permalink
make Session.shutdown()/close, Transaction.close(), and PreparedState…
Browse files Browse the repository at this point in the history
…ment close() execute asynchronously
  • Loading branch information
t-horikawa committed Dec 4, 2024
1 parent 712a02a commit 4abe9a9
Show file tree
Hide file tree
Showing 11 changed files with 699 additions and 232 deletions.
62 changes: 62 additions & 0 deletions docs/internal/transaction_close.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
# Transaction closeに関する状態遷移
2024.12.04
NT horikawa

## 定義
### 状態
* initial
* committed
* rollbacked
* toBeClosed
* toBeClosedWithCommit
* toBeClosedWithRollback
* closed

### 変数
* FutureResponse<Void> commitResult
`synchronized (this)` により排他アクセスを保証

### 状態と変数等
| 状態 | commitResult | commit要求 | rollback要求 | delayedClose登録 | Transaction close<br>completed |
| ---- | ---- | ---- | ---- | ---- | ---- |
| initial | null | false | false | false | false |
| committed | not null | true| false | false | false |
| rollbacked | null | false | true | false | false |
| toBeClosed | null | false | false | true | false |
| toBeClosedWithCommit | not null | true | false | true | false |
| toBeClosedWithRollback | null | false | true | true | false |
| closed | - | - | - | - | false |

`-`はdon't careを示す。

## 状態遷移
| 状態 | event<br>(API call) | 遷移先 | 備考 |
| ---- | ---- | ---- | ---- |
| initial | commit() | committed | commitを要求する
| initial | rollback() | rollbacked | rollbackを要求する
| initial | close() | toBeClosed | DelayedCloseをDisposerに登録する
| committed | commit() | - | ccommitを要求している旨の例外を投げる
| committed | rollback() | - | commitを要求している旨の例外を投げる
| committed | close() | closed | commitのレスポンスが返ってきている場合、<br>doClose()を実行する
| committed | close() | toBeClosedWithCommit | commitのレスポンスが返ってきていない場合、<br>DelayedCloseをDisposerに登録する
| rollbacked | commit() | - | rollbackを要求している旨の例外を投げる
| rollbacked | rollback() | - | 正常終了を返す
| rollbacked | close() | toBeClosedWithRollback | DelayedCloseをDisposerに登録する
| toBeClosed | commit() | - | closeされている旨の例外を投げる
| toBeClosed | rollback() | - | closeされている旨の例外を投げる
| toBeClosed | close() | - | do nothing
| toBeClosed | doClose() | closed | rollbackとdisposeを要求する
| toBeClosedWithCommit | commit() | - | closeされている旨の例外を投げる
| toBeClosedWithCommit | rollback() | - | closeされている旨の例外を投げる
| toBeClosedWithCommit | close() | - | do nothing
| toBeClosedWithCommit | doClose() | closed | commitResultをget()し、不成功ならdisposeを要求する
| toBeClosedWithRollback | commit() | - | closeされている旨の例外を投げる
| toBeClosedWithRollback | rollback() | - | closeされている旨の例外を投げる
| toBeClosedWithRollback | close() | - | do nothing
| toBeClosedWithRollback | doClose() | closed | disposeを要求する
| closed | commit() | - | closeされている旨の例外を投げる
| closed | rollback() | - | closeされている旨の例外を投げる
| closed | close() | - | do nothing

`-`は「状態遷移しない」を示す。
doClose()はDisposerに登録したDelayedCloseのみからcallされる。他はpublicメソッド。
23 changes: 23 additions & 0 deletions docs/internal/transactoin_auto_dispose.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
# Transactionのauto_disposeについて
2024.11.28
NT horikawa

## 前提
* SQL実行エンジンに対するTransactionからのcommit要求が、SQL実行エンジンにおいてエラーなく終了しなかった場合、そのTransactionはgetSqlServiceException()によりエラー情報を取得できる。
* これに備え、SQL実行エンジンはTransactionからのcommit要求の処理がエラーなく終了しなかった場合、そのTransactionに対応するトランザクションリソースを解放せずに残しておく。
* 上記SQL実行エンジンが確保しているトランザクションリソースはTransactionからのdispose要求により解放される。
* Transactionからのrollback要求は、commit失敗時と同等の扱いとなる。

### 実装における要注意点
* DisposeTransactionメッセージ内のtransaction_handle(https://github.com/project-tsurugi/tsubakuro/blob/712a02a98329338b744d8a63289793374c461d46/modules/proto/src/main/protos/jogasaki/proto/sql/request.proto#L435 )はSQL実行エンジンにおいて一意ではなく、あるTransactionに対応するトランザクションリソースが解放されると、そのTransactionのものと同じ値のtransaction_handleが再利用される可能性がある。
* 上記のため、あるTransactionからのauto_dispose(https://github.com/project-tsurugi/tsubakuro/blob/712a02a98329338b744d8a63289793374c461d46/modules/proto/src/main/protos/jogasaki/proto/sql/request.proto#L384 )をtrueとしたcommit要求が成功し、SQL実行エンジンがTransactionに対応するトランザクションリソースを解放した後は、そのTransactionのものと同じ値のtransaction_handleが再利用される可能がある。

### Transactionに必要な制約
「実装における要注意点」に示したSQLエンジン挙動のため、Transactionは下記制約を充足する必要がある。
* あるTransactionからのauto_disposeをtrueとしたcommit要求をSQL実行エンジンが受け付け、それが成功した場合、以降、そのTransactionからSQL実行エンジンに対してgetSqlServiceException()やdispose要求を行ってはならない。
* なぜなら、SQL実行エンジンがtransaction_handleを再利用した場合、最初のTransactionが保有するtransaction_handleは既に別のTransactionと対応しているため、最初のTransactionからgetSqlServiceException()やdispose要求を行ってはならない、特にdispose要求の場合はSQL実行エンジンの誤動作を引き起こすことになるからである。

## TsubakuroのTrancacionImplクラスにおける制約の充足方法
「Transactionに必要な制約」を充足するため、TransactionImplは下記の制御を行う。
* auto_disposeをtrueとしたTransaction.commit()が実行された際は、戻り値であるFutureResponse<Void>をTransaction内に格納する。
* TransactionのgetSqlServiceException()やclose()が実行された際、前記FutureResponse<Void>が格納されていると、それをget()してcommitが成功したか否かを調べ、commitが失敗していた場合にのみSQL実行エンジンにgetSqlServiceExceptionやdispose要求を送る。これにより、auto_disposeをtrueとしたcommitが成功した場合、そのTransactionからgetSqlServiceExceptionやdispose要求をSQL実行エンジンに送らないように制御する。
Original file line number Diff line number Diff line change
Expand Up @@ -15,20 +15,20 @@
*/
package com.tsurugidb.tsubakuro.channel.common.connection;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.ArrayDeque;
import java.util.Objects;
import java.util.Queue;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;

import javax.annotation.Nonnull;
import java.util.concurrent.atomic.AtomicReference;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.tsurugidb.tsubakuro.channel.common.connection.wire.impl.ChannelResponse;
import com.tsurugidb.tsubakuro.client.SessionAlreadyClosedException;
// import com.tsurugidb.tsubakuro.client.SessionAlreadyClosedException;
import com.tsurugidb.tsubakuro.exception.ServerException;
import com.tsurugidb.tsubakuro.util.ServerResource;

/**
Expand All @@ -39,32 +39,55 @@ public class Disposer extends Thread {

private AtomicBoolean started = new AtomicBoolean();

private AtomicBoolean sessionClosed = new AtomicBoolean();
private Queue<ForegroundFutureResponse<?>> futureResponseQueue = new ArrayDeque<>();

private Queue<DelayedClose> serverResourceQueue = new ArrayDeque<>();

private AtomicBoolean empty = new AtomicBoolean();

private final AtomicReference<DelayedShutdown> shutdown = new AtomicReference<>();

private Queue<ForegroundFutureResponse<?>> queue = new ArrayDeque<>();
private final AtomicReference<DelayedClose> close = new AtomicReference<>();

private AtomicBoolean queueHasEntry = new AtomicBoolean();
/**
* Enclodure of delayed clean up procedure.
*/
public interface DelayedShutdown {
/**
* clean up procedure.
* @throws IOException An error was occurred while cleanUP() is executed.
*/
void shutdown() throws IOException;
}

private ServerResource session;
/**
* Enclodure of delayed clean up procedure.
*/
public interface DelayedClose {
/**
* invoke the close() procedure of its belonging object.
* @throws ServerException if error was occurred while disposing this session
* @throws IOException if I/O error was occurred while disposing this session
* @throws InterruptedException if interrupted while disposing this session
*/
void delayedClose() throws ServerException, IOException, InterruptedException;
}

/**
* Creates a new instance.
* @param session the current session which this blongs to
*/
public Disposer(@Nonnull ServerResource session) {
Objects.requireNonNull(session);
this.session = session;
public Disposer() {
}

@Override
public void run() {
Exception exception = null;
boolean shutdownProcessed = false;

while (true) {
ForegroundFutureResponse<?> futureResponse;
synchronized (queue) {
futureResponse = queue.poll();
}
if (sessionClosed.get() && futureResponse == null) {
break;
synchronized (futureResponseQueue) {
futureResponse = futureResponseQueue.poll();
}
if (futureResponse != null) {
try {
Expand All @@ -77,80 +100,174 @@ public void run() {
// Server resource has not created at the server
continue;
} catch (SessionAlreadyClosedException e) {
// Server resource has been disposed by the session close
throw new AssertionError("SessionAlreadyClosedException should not occur in the current server implementation"); // FIXME remove this line
// continue;
if (exception == null) {
exception = e;
} else {
exception.addSuppressed(e);
}
continue;
} catch (TimeoutException e) {
// Let's try again
queue.add(futureResponse);
futureResponseQueue.add(futureResponse);
continue;
} catch (Exception e) {
// should not occur
LOG.info(e.getMessage());
} catch (Exception e) { // should not occur
if (exception == null) {
exception = e;
} else {
exception.addSuppressed(e);
}
continue;
}
} else {
notifyQueueIsEmpty();
}
if (!sessionClosed.get()) {
DelayedClose serverResource;
synchronized (serverResourceQueue) {
serverResource = serverResourceQueue.poll();
}
if (serverResource != null) {
try {
Thread.sleep(10);
} catch (InterruptedException e) {
// No problem, it's OK
serverResource.delayedClose();
} catch (ServerException | IOException | InterruptedException e) {
exception = addSuppressed(exception, e);
}
continue;
}
notifyEmpty();
if (!shutdownProcessed) {
try {
var sh = shutdown.get();
if (sh != null) {
sh.shutdown();
shutdownProcessed = true;
}
} catch (IOException e) {
exception = addSuppressed(exception, e);
}
}
var cl = close.get();
if (cl != null) {
if (shutdownProcessed || shutdown.get() == null) {
try {
cl.delayedClose();
} catch (ServerException | IOException | InterruptedException e) {
exception = addSuppressed(exception, e);
}
break;
}
}
try {
Thread.sleep(10);
} catch (InterruptedException e) {
// No problem, it's OK
}
}

if (exception != null) {
LOG.info(exception.getMessage());
throw new UncheckedIOException(new IOException(exception));
}
}

private Exception addSuppressed(Exception exception, Exception e) {
if (exception == null) {
exception = e;
} else {
exception.addSuppressed(e);
}
return exception;
}

synchronized void add(ForegroundFutureResponse<?> futureResponse) {
if (close.get() != null) {
throw new AssertionError("Session already closed");
}
synchronized (futureResponseQueue) {
futureResponseQueue.add(futureResponse);
}
if (!started.getAndSet(true)) {
this.start();
}
}

/**
* Add a DelayedClose object containing a close procedure for a certain ServerResource object.
* If disposer thread has not started, a disposer thread will be started.
* @param resource the DelayedClose to be added
*/
public synchronized void add(DelayedClose resource) {
if (close.get() != null) {
throw new AssertionError("Session already closed");
}
synchronized (serverResourceQueue) {
serverResourceQueue.add(resource);
}
if (!started.getAndSet(true)) {
this.start();
}
// FIXME Revive these lines when the server implementation improves.
// try {
// session.close();
// } catch (Exception e) {
// LOG.error(e.getMessage());
// }
}

/**
* Receive notification that the session is to be closed soon and
* let the caller know if the session can be closed immediately.
* NOTE: This method is assumed to be called from Session.close() only.
* @return true if the session can be closed immediately
* as the queue that stores unhandled ForegroundFutureResponse is empty
* Register a delayed shutdown procesure of the Session.
* If disposer thread has not started, c.shoutdown() is immediately executed.
* NOTE: This method is assumed to be called only in close and/or shutdown of a Session.
* @param c the clean up procesure to be registered
* @throws IOException An error was occurred in c.shoutdown() execution.
*/
public boolean prepareCloseAndIsEmpty() {
// FIXME Remove the following line when the server implementation improves.
waitForFinishDisposal();
synchronized (queue) {
sessionClosed.set(true);
return queue.isEmpty();
public synchronized void registerDelayedShutdown(DelayedShutdown c) throws IOException {
if (!started.getAndSet(true)) {
empty.set(true);
c.shutdown();
return;
}
shutdown.set(c);
}

void add(ForegroundFutureResponse<?> futureResponse) {
/**
* Register a delayed close object in charge of asynchronous close of the Session.
* If disposer thread has not started or both queue is empty, c.delayedClose() is immediately executed.
* NOTE: This method is assumed to be called only in close and/or shutdown of a Session.
* @param c the clean up procesure to be registered
* @throws ServerException if server error was occurred while disposing the session
* @throws IOException if I/O error was occurred while disposing the session
* @throws InterruptedException if interrupted while disposing the session
*/
public synchronized void registerDelayedClose(DelayedClose c) throws ServerException, IOException, InterruptedException {
if (!started.getAndSet(true)) {
this.start();
empty.set(true);
c.delayedClose();
return;
}
synchronized (queue) {
queue.add(futureResponse);
queueHasEntry.set(true);
if (futureResponseQueue.isEmpty() && serverResourceQueue.isEmpty()) {
c.delayedClose();
close.set(new DelayedClose() {
@Override
public void delayedClose() {
// do nothing
}
});
return;
}
close.set(c);
}

/**
* Wait until the release of the server resource corresponding to the response
* closed without getting is completed.
* NOTE: This method must be called with the guarantee that no subsequent add() will be called.
*/
public synchronized void waitForFinishDisposal() {
while (queueHasEntry.get()) {
try {
wait();
} catch (InterruptedException e) {
continue;
public synchronized void waitForEmpty() {
if (started.get()) {
while (!empty.get()) {
try {
wait();
} catch (InterruptedException e) {
continue;
}
}
}
}

private synchronized void notifyQueueIsEmpty() {
queueHasEntry.set(false);
private synchronized void notifyEmpty() {
empty.set(true);
notify();
}
}
Loading

0 comments on commit 4abe9a9

Please sign in to comment.