Skip to content

Commit

Permalink
merge 1.5.3
Browse files Browse the repository at this point in the history
  • Loading branch information
shaofengshi committed Jul 25, 2016
1 parent 1b6ed08 commit 90a0c5b
Show file tree
Hide file tree
Showing 11 changed files with 85 additions and 72 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
import org.apache.kylin.job.execution.ExecutableState;
import org.apache.kylin.job.impl.threadpool.DefaultScheduler;
import org.apache.kylin.job.manager.ExecutableManager;
import org.apache.kylin.storage.hbase.HBaseConnection;
import org.apache.kylin.storage.hbase.util.HBaseRegionSizeCalculator;
import org.apache.kylin.storage.hbase.util.StorageCleanupJob;
import org.apache.kylin.storage.hbase.util.ZookeeperJobLock;
Expand Down Expand Up @@ -419,7 +420,7 @@ private int cleanupOldStorage() throws Exception {
}

private void checkHFilesInHBase(CubeSegment segment) throws IOException {
Configuration conf = HBaseConfiguration.create(HadoopUtil.getCurrentConfiguration());
Connection conn = HBaseConnection.get(KylinConfig.getInstanceFromEnv().getStorageUrl());
String tableName = segment.getStorageLocationIdentifier();

HBaseRegionSizeCalculator cal = new HBaseRegionSizeCalculator(tableName, conn);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ public class MockAclHBaseStorage implements AclHBaseStorage {
private static final String aclTableName = "MOCK-ACL-TABLE";
private static final String userTableName = "MOCK-USER-TABLE";

private HTableInterface mockedAclTable;
private HTableInterface mockedUserTable;
private Table mockedAclTable;
private Table mockedUserTable;
private RealAclHBaseStorage realAcl;

public MockAclHBaseStorage() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import java.util.WeakHashMap;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.HTable;
import java.util.*;
import org.apache.kylin.common.KylinConfig;
Expand Down Expand Up @@ -65,6 +66,7 @@
import org.apache.kylin.source.hive.HiveSourceTableLoader;
import org.apache.kylin.source.hive.cardinality.HiveColumnCardinalityJob;
import org.apache.kylin.source.hive.cardinality.HiveColumnCardinalityUpdateJob;
import org.apache.kylin.storage.hbase.HBaseConnection;
import org.apache.kylin.storage.hbase.util.HBaseRegionSizeCalculator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HConnection;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
Expand Down Expand Up @@ -184,14 +183,10 @@ public List<Query> getQueries(final String creator) throws IOException {
List<Query> queries = new ArrayList<Query>();
Table htable = null;
try {
<<<<<<< b6296775f7793a63baf7f6a97cf4c0759d654341:server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java
HConnection conn = HBaseConnection.get(hbaseUrl);
org.apache.hadoop.hbase.client.Connection conn = HBaseConnection.get(hbaseUrl);
HBaseConnection.createHTableIfNeeded(conn, userTableName, USER_QUERY_FAMILY);

htable = conn.getTable(userTableName);
=======
htable = HBaseConnection.get(hbaseUrl).getTable(TableName.valueOf(userTableName));
>>>>>>> KYLIN-1528 Create a branch for v1.5 with HBase 1.x API:server/src/main/java/org/apache/kylin/rest/service/QueryService.java
Get get = new Get(Bytes.toBytes(creator));
get.addFamily(Bytes.toBytes(USER_QUERY_FAMILY));
Result result = htable.get(get);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,17 +146,12 @@ public void createUser(UserDetails user) {
public void updateUser(UserDetails user) {
Table htable = null;
try {
byte[] userAuthorities = serialize(user.getAuthorities());
htable = aclHBaseStorage.getTable(userTableName);

<<<<<<< b6296775f7793a63baf7f6a97cf4c0759d654341:server-base/src/main/java/org/apache/kylin/rest/service/UserService.java
Pair<byte[], byte[]> pair = userToHBaseRow(user);
Put put = new Put(pair.getKey());
put.add(Bytes.toBytes(AclHBaseStorage.USER_AUTHORITY_FAMILY), Bytes.toBytes(AclHBaseStorage.USER_AUTHORITY_COLUMN), pair.getSecond());
=======
Put put = new Put(Bytes.toBytes(user.getUsername()));
put.addColumn(Bytes.toBytes(AclHBaseStorage.USER_AUTHORITY_FAMILY), Bytes.toBytes(AclHBaseStorage.USER_AUTHORITY_COLUMN), userAuthorities);
>>>>>>> KYLIN-1528 Create a branch for v1.5 with HBase 1.x API:server/src/main/java/org/apache/kylin/rest/service/UserService.java

put.addColumn(Bytes.toBytes(AclHBaseStorage.USER_AUTHORITY_FAMILY), Bytes.toBytes(AclHBaseStorage.USER_AUTHORITY_COLUMN), pair.getSecond());

htable.put(put);
} catch (IOException e) {
Expand Down Expand Up @@ -219,13 +214,8 @@ public List<UserDetails> listUsers() {
Scan s = new Scan();
s.addColumn(Bytes.toBytes(AclHBaseStorage.USER_AUTHORITY_FAMILY), Bytes.toBytes(AclHBaseStorage.USER_AUTHORITY_COLUMN));

<<<<<<< b6296775f7793a63baf7f6a97cf4c0759d654341:server-base/src/main/java/org/apache/kylin/rest/service/UserService.java
List<UserDetails> all = new ArrayList<UserDetails>();
HTableInterface htable = null;
=======
List<String> authorities = new ArrayList<String>();
Table htable = null;
>>>>>>> KYLIN-1528 Create a branch for v1.5 with HBase 1.x API:server/src/main/java/org/apache/kylin/rest/service/UserService.java
ResultScanner scanner = null;
try {
htable = aclHBaseStorage.getTable(userTableName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import java.io.IOException;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.*;

import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
Expand All @@ -35,6 +35,7 @@
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.persistence.StorageException;
import org.apache.kylin.engine.mr.HadoopUtil;
Expand All @@ -51,14 +52,20 @@ public class HBaseConnection {

private static final Logger logger = LoggerFactory.getLogger(HBaseConnection.class);

private static final Map<String, Configuration> ConfigCache = new ConcurrentHashMap<String, Configuration>();
private static final Map<String, Connection> ConnPool = new ConcurrentHashMap<String, Connection>();
private static final Map<String, Configuration> configCache = new ConcurrentHashMap<String, Configuration>();
private static final Map<String, Connection> connPool = new ConcurrentHashMap<String, Connection>();

private static final ThreadLocal<Configuration> configThreadLocal = new ThreadLocal<>();

private static ExecutorService coprocessorPool = null;

static {
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
for (Connection conn : ConnPool.values()) {
closeCoprocessorPool();

for (Connection conn : connPool.values()) {
try {
conn.close();
} catch (IOException e) {
Expand All @@ -68,19 +75,62 @@ public void run() {
}
});
}

public static ExecutorService getCoprocessorPool() {
if (coprocessorPool != null) {
return coprocessorPool;
}

synchronized (HBaseConnection.class) {
if (coprocessorPool != null) {
return coprocessorPool;
}

KylinConfig config = KylinConfig.getInstanceFromEnv();

// copy from HConnectionImplementation.getBatchPool()
int maxThreads = config.getHBaseMaxConnectionThreads();
int coreThreads = config.getHBaseCoreConnectionThreads();
long keepAliveTime = config.getHBaseConnectionThreadPoolAliveSeconds();
LinkedBlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<Runnable>(maxThreads * 100);
ThreadPoolExecutor tpe = new ThreadPoolExecutor(coreThreads, maxThreads, keepAliveTime, TimeUnit.SECONDS, workQueue, //
Threads.newDaemonThreadFactory("kylin-coproc-"));
tpe.allowCoreThreadTimeOut(true);

logger.info("Creating coprocessor thread pool with max of {}, core of {}", maxThreads, coreThreads);

coprocessorPool = tpe;
return coprocessorPool;
}
}

private static void closeCoprocessorPool() {
if (coprocessorPool == null)
return;

coprocessorPool.shutdown();
try {
if (!coprocessorPool.awaitTermination(10, TimeUnit.SECONDS)) {
coprocessorPool.shutdownNow();
}
} catch (InterruptedException e) {
coprocessorPool.shutdownNow();
}
}


public static void clearConnCache() {
ConnPool.clear();
connPool.clear();
}

private static final ThreadLocal<Configuration> hbaseConfig = new ThreadLocal<>();

public static Configuration getCurrentHBaseConfiguration() {
if (hbaseConfig.get() == null) {
if (configThreadLocal.get() == null) {
String storageUrl = KylinConfig.getInstanceFromEnv().getStorageUrl();
hbaseConfig.set(newHBaseConfiguration(storageUrl));
configThreadLocal.set(newHBaseConfiguration(storageUrl));
}
return hbaseConfig.get();
return configThreadLocal.get();
}

private static Configuration newHBaseConfiguration(String url) {
Expand Down Expand Up @@ -128,20 +178,20 @@ public static String makeQualifiedPathInHBaseCluster(String path) {
@SuppressWarnings("resource")
public static Connection get(String url) {
// find configuration
Configuration conf = ConfigCache.get(url);
Configuration conf = configCache.get(url);
if (conf == null) {
conf = newHBaseConfiguration(url);
ConfigCache.put(url, conf);
configCache.put(url, conf);
}

Connection connection = ConnPool.get(url);
Connection connection = connPool.get(url);
try {
while (true) {
// I don't use DCL since recreate a connection is not a big issue.
if (connection == null || connection.isClosed()) {
logger.info("connection is null or closed, creating a new one");
connection = ConnectionFactory.createConnection(conf);
ConnPool.put(url, connection);
connPool.put(url, connection);
}

if (connection == null || connection.isClosed()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,9 @@
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.filter.CompareFilter;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.filter.FilterList;
Expand Down Expand Up @@ -286,7 +281,6 @@ protected void deleteResourceImpl(String resPath) throws IOException {

Delete del = new Delete(Bytes.toBytes(resPath));
table.delete(del);
table.flushCommits();

if (hdfsResourceExist) { // remove hdfs cell value
Path redirectPath = bigCellHDFSPath(resPath);
Expand All @@ -308,15 +302,15 @@ protected String getReadableResourcePathImpl(String resPath) {
}

private Result getFromHTable(String path, boolean fetchContent, boolean fetchTimestamp) throws IOException {
HTableInterface table = getConnection().getTable(getAllInOneTableName());
Table table = getConnection().getTable(TableName.valueOf(getAllInOneTableName()));
try {
return internalGetFromHTable(table, path, fetchContent, fetchTimestamp);
} finally {
IOUtils.closeQuietly(table);
}
}

private Result internalGetFromHTable(HTableInterface table, String path, boolean fetchContent, boolean fetchTimestamp) throws IOException {
private Result internalGetFromHTable(Table table, String path, boolean fetchContent, boolean fetchTimestamp) throws IOException {
byte[] rowkey = Bytes.toBytes(path);

Get get = new Get(rowkey);
Expand All @@ -330,14 +324,9 @@ private Result internalGetFromHTable(HTableInterface table, String path, boolean
get.addColumn(B_FAMILY, B_COLUMN_TS);
}

Table table = getConnection().getTable(TableName.valueOf(getAllInOneTableName()));
try {
Result result = table.get(get);
boolean exists = result != null && (!result.isEmpty() || (result.getExists() != null && result.getExists()));
return exists ? result : null;
} finally {
IOUtils.closeQuietly(table);
}
Result result = table.get(get);
boolean exists = result != null && (!result.isEmpty() || (result.getExists() != null && result.getExists()));
return exists ? result : null;
}

private Path writeLargeCellToHdfs(String resPath, byte[] largeColumn, Table table) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -360,7 +360,7 @@ public void run() {
final boolean[] abnormalFinish = new boolean[1];

try {
HTableInterface table = conn.get(cubeSeg.getStorageLocationIdentifier(), HBaseConnection.getCoprocessorPool());
Table table = conn.getTable(TableName.valueOf(cubeSeg.getStorageLocationIdentifier()), HBaseConnection.getCoprocessorPool());

final CubeVisitRequest request = builder.build();
final byte[] startKey = epRange.getFirst();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,6 @@ public final void flush() throws IOException {
logger.info("commit total " + puts.size() + " puts, totally cost:" + (System.currentTimeMillis() - t) + "ms");
puts.clear();
}
logger.info("commit total " + puts.size() + " puts, totally cost:" + (System.currentTimeMillis() - t) + "ms");
puts.clear();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,16 +27,8 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.*;
import org.apache.hadoop.hbase.client.*;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.persistence.JsonSerializer;
import org.apache.kylin.common.persistence.RawResource;
Expand All @@ -45,7 +37,6 @@
import org.apache.kylin.common.restclient.Broadcaster;
import org.apache.kylin.common.restclient.RestClient;
import org.apache.kylin.common.util.Bytes;
import org.apache.kylin.common.util.Dictionary;
import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.cube.CubeManager;
import org.apache.kylin.cube.CubeSegment;
Expand Down Expand Up @@ -463,11 +454,7 @@ private static void doOpt(Opt opt) throws IOException, InterruptedException {
put.addColumn(CellUtil.cloneFamily(cell), CellUtil.cloneQualifier(cell), CellUtil.cloneValue(cell));
destAclHtable.put(put);
}
Put put = new Put(Bytes.toBytes(cubeId));
put.add(family, column, value);
destAclHtable.put(put);
}
destAclHtable.flushCommits();
} finally {
IOUtils.closeQuietly(srcAclHtable);
IOUtils.closeQuietly(destAclHtable);
Expand Down Expand Up @@ -533,13 +520,12 @@ private static void undo(Opt opt) throws IOException, InterruptedException {
case COPY_ACL: {
String cubeId = (String) opt.params[0];
String modelId = (String) opt.params[1];
HTableInterface destAclHtable = null;
Table destAclHtable = null;
try {
destAclHtable = HBaseConnection.get(dstConfig.getStorageUrl()).getTable(dstConfig.getMetadataUrlPrefix() + ACL_TABLE_NAME);
destAclHtable = HBaseConnection.get(dstConfig.getStorageUrl()).getTable(TableName.valueOf(dstConfig.getMetadataUrlPrefix() + ACL_TABLE_NAME));

destAclHtable.delete(new Delete(Bytes.toBytes(cubeId)));
destAclHtable.delete(new Delete(Bytes.toBytes(modelId)));
destAclHtable.flushCommits();
} finally {
IOUtils.closeQuietly(destAclHtable);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.util.ToolRunner;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.util.CliCommandExecutor;
Expand Down Expand Up @@ -167,10 +168,10 @@ private void cleanUnusedHBaseTables(Configuration conf) throws IOException {
}

class DeleteHTableRunnable implements Callable {
HBaseAdmin hbaseAdmin;
Admin hbaseAdmin;
String htableName;

DeleteHTableRunnable(HBaseAdmin hbaseAdmin, String htableName) {
DeleteHTableRunnable(Admin hbaseAdmin, String htableName) {
this.hbaseAdmin = hbaseAdmin;
this.htableName = htableName;
}
Expand Down

0 comments on commit 90a0c5b

Please sign in to comment.