Skip to content

Commit

Permalink
misc - XA datasource implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
rusher committed Apr 27, 2021
1 parent 34f9ba7 commit 2999238
Show file tree
Hide file tree
Showing 13 changed files with 589 additions and 56 deletions.
22 changes: 2 additions & 20 deletions src/main/java/org/mariadb/jdbc/Configuration.java
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,6 @@ public class Configuration {
private TransactionIsolation transactionIsolation = TransactionIsolation.REPEATABLE_READ;
private int defaultFetchSize = 0;
private int maxQuerySizeToLog = 1024;
private boolean pinGlobalTxToPhysicalConnection = false;
private String geometryDefaultType = null;
private String restrictedAuth = null;

Expand Down Expand Up @@ -157,7 +156,6 @@ private Configuration(
TransactionIsolation transactionIsolation,
int defaultFetchSize,
int maxQuerySizeToLog,
boolean pinGlobalTxToPhysicalConnection,
String geometryDefaultType,
String restrictedAuth,
String socketFactory,
Expand Down Expand Up @@ -222,7 +220,6 @@ private Configuration(
this.transactionIsolation = transactionIsolation;
this.defaultFetchSize = defaultFetchSize;
this.maxQuerySizeToLog = maxQuerySizeToLog;
this.pinGlobalTxToPhysicalConnection = pinGlobalTxToPhysicalConnection;
this.geometryDefaultType = geometryDefaultType;
this.restrictedAuth = restrictedAuth;
this.socketFactory = socketFactory;
Expand Down Expand Up @@ -286,7 +283,6 @@ private Configuration(
String user,
String password,
String enabledSslProtocolSuites,
Boolean pinGlobalTxToPhysicalConnection,
String socketFactory,
Integer connectTimeout,
String pipe,
Expand Down Expand Up @@ -354,8 +350,6 @@ private Configuration(
this.user = user;
this.password = password;
this.enabledSslProtocolSuites = enabledSslProtocolSuites;
if (pinGlobalTxToPhysicalConnection != null)
this.pinGlobalTxToPhysicalConnection = pinGlobalTxToPhysicalConnection;
this.socketFactory = socketFactory;
if (connectTimeout != null) this.connectTimeout = connectTimeout;
this.pipe = pipe;
Expand Down Expand Up @@ -650,8 +644,8 @@ private static HaMode parseHaMode(String url, int separator) {

public Configuration clone(String username, String password) {
return new Configuration(
username,
password,
username != null && username.isEmpty() ? null : username,
password != null && password.isEmpty() ? null : password,
this.database,
this.addresses,
this.haMode,
Expand All @@ -661,7 +655,6 @@ public Configuration clone(String username, String password) {
this.transactionIsolation,
this.defaultFetchSize,
this.maxQuerySizeToLog,
this.pinGlobalTxToPhysicalConnection,
this.geometryDefaultType,
this.restrictedAuth,
this.socketFactory,
Expand Down Expand Up @@ -765,10 +758,6 @@ public String enabledSslProtocolSuites() {
return enabledSslProtocolSuites;
}

public boolean pinGlobalTxToPhysicalConnection() {
return pinGlobalTxToPhysicalConnection;
}

public String socketFactory() {
return socketFactory;
}
Expand Down Expand Up @@ -1155,7 +1144,6 @@ public static final class Builder implements Cloneable {
private Boolean autocommit;
private Integer defaultFetchSize;
private Integer maxQuerySizeToLog;
private Boolean pinGlobalTxToPhysicalConnection;
private String geometryDefaultType;
private String restrictedAuth;
private String transactionIsolation;
Expand Down Expand Up @@ -1278,11 +1266,6 @@ public Builder enabledSslProtocolSuites(String enabledSslProtocolSuites) {
return this;
}

public Builder pinGlobalTxToPhysicalConnection(Boolean pinGlobalTxToPhysicalConnection) {
this.pinGlobalTxToPhysicalConnection = pinGlobalTxToPhysicalConnection;
return this;
}

public Builder database(String database) {
this.database = database;
return this;
Expand Down Expand Up @@ -1695,7 +1678,6 @@ public Configuration build() throws SQLException {
this.user,
this.password,
this.enabledSslProtocolSuites,
this.pinGlobalTxToPhysicalConnection,
this.socketFactory,
this.connectTimeout,
this.pipe,
Expand Down
24 changes: 14 additions & 10 deletions src/main/java/org/mariadb/jdbc/Connection.java
Original file line number Diff line number Diff line change
Expand Up @@ -111,16 +111,20 @@ public PreparedStatement prepareInternal(
throws SQLException {
checkNotClosed();
if (useBinary) {
return new ServerPreparedStatement(
NativeSql.parse(sql, client.getContext()),
this,
lock,
canUseServerTimeout,
canUseServerMaxRows,
autoGeneratedKeys,
resultSetType,
resultSetConcurrency,
defaultFetchSize);
try {
return new ServerPreparedStatement(
NativeSql.parse(sql, client.getContext()),
this,
lock,
canUseServerTimeout,
canUseServerMaxRows,
autoGeneratedKeys,
resultSetType,
resultSetConcurrency,
defaultFetchSize);
} catch (SQLException e) {
// failover to client
}
}
return new ClientPreparedStatement(
NativeSql.parse(sql, client.getContext()),
Expand Down
23 changes: 15 additions & 8 deletions src/main/java/org/mariadb/jdbc/MariaDbDataSource.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,9 @@
import java.sql.*;
import java.sql.Connection;
import java.util.logging.Logger;
import javax.sql.ConnectionPoolDataSource;
import javax.sql.DataSource;
import javax.sql.PooledConnection;
import javax.sql.*;

public class MariaDbDataSource implements DataSource, ConnectionPoolDataSource {
public class MariaDbDataSource implements DataSource, ConnectionPoolDataSource, XADataSource {

private final Configuration conf;

Expand Down Expand Up @@ -161,15 +159,24 @@ public Logger getParentLogger() {

@Override
public PooledConnection getPooledConnection() throws SQLException {
org.mariadb.jdbc.Connection connection = Driver.connect(conf);
return new MariaDbPoolConnection(connection);
return new MariaDbPoolConnection(Driver.connect(conf));
}

@Override
public PooledConnection getPooledConnection(String username, String password)
throws SQLException {
Configuration conf = this.conf.clone(username, password);
org.mariadb.jdbc.Connection connection = Driver.connect(conf);
return new MariaDbPoolConnection(connection);
return new MariaDbPoolConnection(Driver.connect(conf));
}

@Override
public XAConnection getXAConnection() throws SQLException {
return new MariaDbPoolConnection(Driver.connect(conf));
}

@Override
public XAConnection getXAConnection(String username, String password) throws SQLException {
Configuration conf = this.conf.clone(username, password);
return new MariaDbPoolConnection(Driver.connect(conf));
}
}
180 changes: 179 additions & 1 deletion src/main/java/org/mariadb/jdbc/MariaDbPoolConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,18 @@
package org.mariadb.jdbc;

import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import javax.sql.*;
import javax.transaction.xa.XAException;
import javax.transaction.xa.XAResource;
import javax.transaction.xa.Xid;
import org.mariadb.jdbc.util.StringUtils;

public class MariaDbPoolConnection implements PooledConnection {
public class MariaDbPoolConnection implements PooledConnection, XAConnection {

private final Connection connection;
private final List<ConnectionEventListener> connectionEventListeners;
Expand Down Expand Up @@ -86,4 +92,176 @@ public void close() throws SQLException {
connection.setPoolConnection(null);
connection.close();
}

public static String xidToString(Xid xid) {
return "0x"
+ StringUtils.byteArrayToHexString(xid.getGlobalTransactionId())
+ ",0x"
+ StringUtils.byteArrayToHexString(xid.getBranchQualifier())
+ ",0x"
+ Integer.toHexString(xid.getFormatId());
}

@Override
public XAResource getXAResource() throws SQLException {
return new MariaDbXAResource();
}

private class MariaDbXAResource implements XAResource {

private String flagsToString(int flags) {
switch (flags) {
case TMJOIN:
return "JOIN";
case TMONEPHASE:
return "ONE PHASE";
case TMRESUME:
return "RESUME";
case TMSUSPEND:
return "SUSPEND";
default:
return "";
}
}

private XAException mapXaException(SQLException sqle) {
int xaErrorCode;

switch (sqle.getErrorCode()) {
case 1397:
xaErrorCode = XAException.XAER_NOTA;
break;
case 1398:
xaErrorCode = XAException.XAER_INVAL;
break;
case 1399:
xaErrorCode = XAException.XAER_RMFAIL;
break;
case 1400:
xaErrorCode = XAException.XAER_OUTSIDE;
break;
case 1401:
xaErrorCode = XAException.XAER_RMERR;
break;
case 1402:
xaErrorCode = XAException.XA_RBROLLBACK;
break;
default:
xaErrorCode = 0;
break;
}
XAException xaException;
if (xaErrorCode != 0) {
xaException = new XAException(xaErrorCode);
} else {
xaException = new XAException(sqle.getMessage());
}
xaException.initCause(sqle);
return xaException;
}

private void execute(String command) throws XAException {
try {
connection.createStatement().execute(command);
} catch (SQLException sqle) {
throw mapXaException(sqle);
}
}

@Override
public void commit(Xid xid, boolean onePhase) throws XAException {
execute("XA COMMIT " + xidToString(xid) + ((onePhase) ? " ONE PHASE" : ""));
}

@Override
public void end(Xid xid, int flags) throws XAException {
if (flags != TMSUCCESS && flags != TMSUSPEND && flags != TMFAIL) {
throw new XAException(XAException.XAER_INVAL);
}

execute("XA END " + xidToString(xid) + " " + flagsToString(flags));
}

@Override
public void forget(Xid xid) throws XAException {
// Not implemented by the server
}

@Override
public int getTransactionTimeout() throws XAException {
// not implemented
return 0;
}

public Configuration getConf() {
return connection.getContext().getConf();
}

@Override
public boolean isSameRM(XAResource xaResource) throws XAException {
if (xaResource instanceof MariaDbXAResource) {
MariaDbXAResource other = (MariaDbXAResource) xaResource;
return other.getConf().equals(this.getConf());
}
return false;
}

@Override
public int prepare(Xid xid) throws XAException {
execute("XA PREPARE " + xidToString(xid));
return XA_OK;
}

@Override
public Xid[] recover(int flags) throws XAException {
if (((flags & TMSTARTRSCAN) == 0) && ((flags & TMENDRSCAN) == 0) && (flags != TMNOFLAGS)) {
throw new XAException(XAException.XAER_INVAL);
}

if ((flags & TMSTARTRSCAN) == 0) {
return new MariaDbXid[0];
}

try {
ResultSet rs = connection.createStatement().executeQuery("XA RECOVER");
ArrayList<MariaDbXid> xidList = new ArrayList<>();

while (rs.next()) {
int formatId = rs.getInt(1);
int len1 = rs.getInt(2);
int len2 = rs.getInt(3);
byte[] arr = rs.getBytes(4);

byte[] globalTransactionId = new byte[len1];
byte[] branchQualifier = new byte[len2];
System.arraycopy(arr, 0, globalTransactionId, 0, len1);
System.arraycopy(arr, len1, branchQualifier, 0, len2);
xidList.add(new MariaDbXid(formatId, globalTransactionId, branchQualifier));
}
Xid[] xids = new Xid[xidList.size()];
xidList.toArray(xids);
return xids;
} catch (SQLException sqle) {
throw mapXaException(sqle);
}
}

@Override
public void rollback(Xid xid) throws XAException {
execute("XA ROLLBACK " + xidToString(xid));
}

@Override
public boolean setTransactionTimeout(int i) throws XAException {
return false;
}

@Override
public void start(Xid xid, int flags) throws XAException {
if (flags != TMJOIN && flags != TMRESUME && flags != TMNOFLAGS) {
throw new XAException(XAException.XAER_INVAL);
}
execute("XA START " + xidToString(xid) + " " + flagsToString(flags));
}
}
}
Loading

0 comments on commit 2999238

Please sign in to comment.