Skip to content

Commit

Permalink
add TableMetaDataExecutorAdapter
Browse files Browse the repository at this point in the history
  • Loading branch information
terrymanu committed Aug 1, 2018
1 parent ea299b3 commit 1367542
Show file tree
Hide file tree
Showing 12 changed files with 247 additions and 181 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@
import io.shardingsphere.core.rule.ShardingDataSourceNames;
import io.shardingsphere.core.rule.ShardingRule;
import io.shardingsphere.core.rule.TableRule;
import lombok.RequiredArgsConstructor;
import lombok.AllArgsConstructor;
import lombok.Setter;

import java.sql.Connection;
import java.sql.ResultSet;
Expand All @@ -48,13 +49,16 @@
* @author zhaojun
* @author zhangliang
*/
@RequiredArgsConstructor
public abstract class ShardingTableMetaData {
@AllArgsConstructor
public class ShardingTableMetaData {

private final ListeningExecutorService executorService;

private final Map<String, TableMetaData> tableMetaDataMap = new HashMap<>();

@Setter
private TableMetaDataExecutorAdapter executorAdapter;

/**
* Initialize sharding meta data.
*
Expand Down Expand Up @@ -86,7 +90,7 @@ private void initDefaultTables(final ShardingRule shardingRule) throws SQLExcept

private Collection<String> getAllTableNames(final String dataSourceName) throws SQLException {
Collection<String> result = new LinkedList<>();
try (Connection connection = getConnection(dataSourceName);
try (Connection connection = executorAdapter.getConnection(dataSourceName);
ResultSet resultSet = connection.getMetaData().getTables(null, null, null, null)) {
while (resultSet.next()) {
result.add(resultSet.getString("TABLE_NAME"));
Expand All @@ -95,71 +99,42 @@ private Collection<String> getAllTableNames(final String dataSourceName) throws
return result;
}

protected abstract Connection getConnection(String dataSourceName) throws SQLException;

/**
* Refresh table meta data.
*
* @param logicTableName logic table name
* @param shardingRule sharding rule
*/
public void refresh(final String logicTableName, final ShardingRule shardingRule) {
refresh(logicTableName, shardingRule, Collections.<String, Connection>emptyMap());
tableMetaDataMap.put(logicTableName, loadTableMetaData(shardingRule.getTableRule(logicTableName), shardingRule.getShardingDataSourceNames()));
}

/**
* Refresh table meta data.
*
* @param logicTableName logic table name
* @param shardingRule sharding rule
* @param connectionMap connection map passing from sharding connection
*/
public void refresh(final String logicTableName, final ShardingRule shardingRule, final Map<String, Connection> connectionMap) {
tableMetaDataMap.put(logicTableName, loadTableMetaData(shardingRule.getTableRule(logicTableName), shardingRule.getShardingDataSourceNames(), connectionMap));
}

private TableMetaData loadTableMetaData(final TableRule tableRule, final ShardingDataSourceNames shardingDataSourceNames, final Map<String, Connection> connectionMap) {
List<TableMetaData> actualTableMetaDataList = loadActualTableMetaDataList(tableRule.getActualDataNodes(), shardingDataSourceNames, connectionMap);
private TableMetaData loadTableMetaData(final TableRule tableRule, final ShardingDataSourceNames shardingDataSourceNames) {
List<TableMetaData> actualTableMetaDataList = loadActualTableMetaDataList(tableRule.getActualDataNodes(), shardingDataSourceNames);
checkUniformed(tableRule.getLogicTable(), actualTableMetaDataList);
return actualTableMetaDataList.iterator().next();
}

protected abstract TableMetaData loadTableMetaData(DataNode dataNode, Map<String, Connection> connectionMap) throws SQLException;

private List<TableMetaData> loadActualTableMetaDataList(final List<DataNode> actualDataNodes, final ShardingDataSourceNames shardingDataSourceNames, final Map<String, Connection> connectionMap) {
List<ListenableFuture<TableMetaData>> result = new LinkedList<>();
for (final DataNode each : actualDataNodes) {
result.add(executorService.submit(new Callable<TableMetaData>() {

@Override
public TableMetaData call() throws SQLException {
return loadTableMetaData(new DataNode(shardingDataSourceNames.getRawMasterDataSourceName(each.getDataSourceName()), each.getTableName()), connectionMap);
}
}));
}
try {
return Futures.allAsList(result).get();
} catch (final InterruptedException | ExecutionException ex) {
throw new ShardingException(ex);
private TableMetaData loadTableMetaData(final DataNode dataNode) throws SQLException {
if (executorAdapter.isAutoClose()) {
try (Connection connection = executorAdapter.getConnection(dataNode.getDataSourceName())) {
return loadTableMetaData(connection, dataNode);
}
}
return loadTableMetaData(executorAdapter.getConnection(dataNode.getDataSourceName()), dataNode);
}

private void checkUniformed(final String logicTableName, final List<TableMetaData> actualTableMetaDataList) {
final TableMetaData sample = actualTableMetaDataList.iterator().next();
for (TableMetaData each : actualTableMetaDataList) {
if (!sample.equals(each)) {
throw new ShardingException("Cannot get uniformed table structure for `%s`. The different meta data of actual tables are as follows:\n%s\n%s.", logicTableName, sample, each);
}
}
private TableMetaData loadTableMetaData(final Connection connection, final DataNode dataNode) throws SQLException {
return new TableMetaData(isTableExist(connection, dataNode.getTableName()) ? getColumnMetaDataList(connection, dataNode.getTableName()) : Collections.<ColumnMetaData>emptyList());
}

protected boolean isTableExist(final Connection connection, final String actualTableName) throws SQLException {
private boolean isTableExist(final Connection connection, final String actualTableName) throws SQLException {
try (ResultSet resultSet = connection.getMetaData().getTables(null, null, actualTableName, null)) {
return resultSet.next();
}
}

protected List<ColumnMetaData> getColumnMetaDataList(final Connection connection, final String actualTableName) throws SQLException {
private List<ColumnMetaData> getColumnMetaDataList(final Connection connection, final String actualTableName) throws SQLException {
List<ColumnMetaData> result = new LinkedList<>();
Collection<String> primaryKeys = getPrimaryKeys(connection, actualTableName);
try (ResultSet resultSet = connection.getMetaData().getColumns(null, null, actualTableName, null)) {
Expand All @@ -182,6 +157,33 @@ private Collection<String> getPrimaryKeys(final Connection connection, final Str
return result;
}

private List<TableMetaData> loadActualTableMetaDataList(final List<DataNode> actualDataNodes, final ShardingDataSourceNames shardingDataSourceNames) {
List<ListenableFuture<TableMetaData>> result = new LinkedList<>();
for (final DataNode each : actualDataNodes) {
result.add(executorService.submit(new Callable<TableMetaData>() {

@Override
public TableMetaData call() throws SQLException {
return loadTableMetaData(new DataNode(shardingDataSourceNames.getRawMasterDataSourceName(each.getDataSourceName()), each.getTableName()));
}
}));
}
try {
return Futures.allAsList(result).get();
} catch (final InterruptedException | ExecutionException ex) {
throw new ShardingException(ex);
}
}

private void checkUniformed(final String logicTableName, final List<TableMetaData> actualTableMetaDataList) {
final TableMetaData sample = actualTableMetaDataList.iterator().next();
for (TableMetaData each : actualTableMetaDataList) {
if (!sample.equals(each)) {
throw new ShardingException("Cannot get uniformed table structure for `%s`. The different meta data of actual tables are as follows:\n%s\n%s.", logicTableName, sample, each);
}
}
}

/**
* Judge contains table from table meta data or not.
*
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Copyright 2016-2018 shardingsphere.io.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* </p>
*/

package io.shardingsphere.core.metadata.table;

import java.sql.Connection;
import java.sql.SQLException;

/**
* Table meta data executor adapter.
*
* @author zhangliang
*/
public interface TableMetaDataExecutorAdapter {

/**
* Get connection.
*
* @param dataSourceName data source name
* @return connection
* @throws SQLException SQL exception
*/
Connection getConnection(String dataSourceName) throws SQLException;

/**
* Is auto close created connection or not.
* @return auto close or not
*/
boolean isAutoClose();
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
import io.shardingsphere.core.jdbc.adapter.AbstractDataSourceAdapter;
import io.shardingsphere.core.jdbc.core.ShardingContext;
import io.shardingsphere.core.jdbc.core.connection.ShardingConnection;
import io.shardingsphere.core.jdbc.metadata.JDBCShardingTableMetaData;
import io.shardingsphere.core.jdbc.metadata.AutoClosedJDBCTableMetaDataExecutorAdapter;
import io.shardingsphere.core.metadata.table.ShardingTableMetaData;
import io.shardingsphere.core.rule.MasterSlaveRule;
import io.shardingsphere.core.rule.ShardingRule;
Expand Down Expand Up @@ -68,7 +68,7 @@ public ShardingDataSource(final Map<String, DataSource> dataSourceMap, final Sha
shardingProperties = new ShardingProperties(null == props ? new Properties() : props);
int executorSize = shardingProperties.getValue(ShardingPropertiesConstant.EXECUTOR_SIZE);
executorEngine = new ExecutorEngine(executorSize);
ShardingTableMetaData shardingTableMetaData = new JDBCShardingTableMetaData(executorEngine.getExecutorService(), dataSourceMap);
ShardingTableMetaData shardingTableMetaData = new ShardingTableMetaData(executorEngine.getExecutorService(), new AutoClosedJDBCTableMetaDataExecutorAdapter(dataSourceMap));
shardingTableMetaData.init(shardingRule);
boolean showSQL = shardingProperties.getValue(ShardingPropertiesConstant.SQL_SHOW);
shardingContext = new ShardingContext(dataSourceMap, shardingRule, getDatabaseType(), executorEngine, shardingTableMetaData, showSQL);
Expand All @@ -91,7 +91,7 @@ public void renew(final Map<String, DataSource> newDataSourceMap, final Sharding
originalExecutorEngine.close();
}
boolean newShowSQL = newShardingProperties.getValue(ShardingPropertiesConstant.SQL_SHOW);
ShardingTableMetaData shardingMetaData = new JDBCShardingTableMetaData(executorEngine.getExecutorService(), newDataSourceMap);
ShardingTableMetaData shardingMetaData = new ShardingTableMetaData(executorEngine.getExecutorService(), new AutoClosedJDBCTableMetaDataExecutorAdapter(newDataSourceMap));
shardingMetaData.init(newShardingRule);
shardingProperties = newShardingProperties;
shardingContext = new ShardingContext(newDataSourceMap, newShardingRule, getDatabaseType(), executorEngine, shardingMetaData, newShowSQL);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,15 @@
import io.shardingsphere.core.jdbc.core.connection.ShardingConnection;
import io.shardingsphere.core.jdbc.core.resultset.GeneratedKeysResultSet;
import io.shardingsphere.core.jdbc.core.resultset.ShardingResultSet;
import io.shardingsphere.core.jdbc.metadata.ConnectionHoldJDBCTableMetaDataExecutorAdapter;
import io.shardingsphere.core.merger.JDBCQueryResult;
import io.shardingsphere.core.merger.MergeEngine;
import io.shardingsphere.core.merger.MergeEngineFactory;
import io.shardingsphere.core.merger.MergedResult;
import io.shardingsphere.core.merger.QueryResult;
import io.shardingsphere.core.merger.event.EventMergeType;
import io.shardingsphere.core.merger.event.ResultSetMergeEvent;
import io.shardingsphere.core.metadata.table.ShardingTableMetaData;
import io.shardingsphere.core.parsing.parser.sql.dal.DALStatement;
import io.shardingsphere.core.parsing.parser.sql.dml.insert.InsertStatement;
import io.shardingsphere.core.parsing.parser.sql.dql.DQLStatement;
Expand Down Expand Up @@ -212,7 +214,9 @@ private void sqlRoute() {
private void refreshTableMetaData() throws SQLException {
if (null != routeResult && null != connection && SQLType.DDL == routeResult.getSqlStatement().getType() && !routeResult.getSqlStatement().getTables().isEmpty()) {
String logicTableName = routeResult.getSqlStatement().getTables().getSingleTableName();
connection.getShardingContext().getMetaData().getTable().refresh(logicTableName, connection.getShardingContext().getShardingRule(), connection.getConnections(logicTableName));
ShardingTableMetaData shardingTableMetaData = connection.getShardingContext().getMetaData().getTable();
shardingTableMetaData.setExecutorAdapter(new ConnectionHoldJDBCTableMetaDataExecutorAdapter(connection.getConnections(logicTableName)));
shardingTableMetaData.refresh(logicTableName, connection.getShardingContext().getShardingRule());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,15 @@
import io.shardingsphere.core.jdbc.core.connection.ShardingConnection;
import io.shardingsphere.core.jdbc.core.resultset.GeneratedKeysResultSet;
import io.shardingsphere.core.jdbc.core.resultset.ShardingResultSet;
import io.shardingsphere.core.jdbc.metadata.ConnectionHoldJDBCTableMetaDataExecutorAdapter;
import io.shardingsphere.core.merger.JDBCQueryResult;
import io.shardingsphere.core.merger.MergeEngine;
import io.shardingsphere.core.merger.MergeEngineFactory;
import io.shardingsphere.core.merger.MergedResult;
import io.shardingsphere.core.merger.QueryResult;
import io.shardingsphere.core.merger.event.EventMergeType;
import io.shardingsphere.core.merger.event.ResultSetMergeEvent;
import io.shardingsphere.core.metadata.table.ShardingTableMetaData;
import io.shardingsphere.core.parsing.parser.sql.dal.DALStatement;
import io.shardingsphere.core.parsing.parser.sql.dml.insert.InsertStatement;
import io.shardingsphere.core.parsing.parser.sql.dql.DQLStatement;
Expand Down Expand Up @@ -246,7 +248,9 @@ private void sqlRoute(final String sql) {
private void refreshTableMetaData() throws SQLException {
if (null != routeResult && null != connection && SQLType.DDL == routeResult.getSqlStatement().getType() && !routeResult.getSqlStatement().getTables().isEmpty()) {
String logicTableName = routeResult.getSqlStatement().getTables().getSingleTableName();
connection.getShardingContext().getMetaData().getTable().refresh(logicTableName, connection.getShardingContext().getShardingRule(), connection.getConnections(logicTableName));
ShardingTableMetaData shardingTableMetaData = connection.getShardingContext().getMetaData().getTable();
shardingTableMetaData.setExecutorAdapter(new ConnectionHoldJDBCTableMetaDataExecutorAdapter(connection.getConnections(logicTableName)));
shardingTableMetaData.refresh(logicTableName, connection.getShardingContext().getShardingRule());
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Copyright 2016-2018 shardingsphere.io.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* </p>
*/

package io.shardingsphere.core.jdbc.metadata;

import io.shardingsphere.core.metadata.table.TableMetaDataExecutorAdapter;
import lombok.RequiredArgsConstructor;

import javax.sql.DataSource;
import java.sql.Connection;
import java.sql.SQLException;
import java.util.Map;

/**
* Table meta data executor adapter for JDBC and auto closed.
*
* @author zhangliang
*/
@RequiredArgsConstructor
public final class AutoClosedJDBCTableMetaDataExecutorAdapter implements TableMetaDataExecutorAdapter {

private final Map<String, DataSource> dataSourceMap;

@Override
public Connection getConnection(final String dataSourceName) throws SQLException {
return dataSourceMap.get(dataSourceName).getConnection();
}

@Override
public boolean isAutoClose() {
return true;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Copyright 2016-2018 shardingsphere.io.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* </p>
*/

package io.shardingsphere.core.jdbc.metadata;

import io.shardingsphere.core.metadata.table.TableMetaDataExecutorAdapter;
import lombok.RequiredArgsConstructor;

import java.sql.Connection;
import java.util.Map;

/**
* Table meta data executor adapter for JDBC and connection hold by creator.
*
* @author zhangliang
*/
@RequiredArgsConstructor
public final class ConnectionHoldJDBCTableMetaDataExecutorAdapter implements TableMetaDataExecutorAdapter {

private final Map<String, Connection> connectionMap;

@Override
public Connection getConnection(final String dataSourceName) {
return connectionMap.get(dataSourceName);
}

@Override
public boolean isAutoClose() {
return false;
}
}
Loading

0 comments on commit 1367542

Please sign in to comment.