Skip to content

Commit

Permalink
KYLIN-1528 Create a branch for v1.5 with HBase 1.x API
Browse files Browse the repository at this point in the history
  • Loading branch information
shaofengshi authored and lidongsjtu committed Nov 14, 2016
1 parent 61335d9 commit e2adfdc
Show file tree
Hide file tree
Showing 53 changed files with 450 additions and 500 deletions.
19 changes: 1 addition & 18 deletions examples/test_case_data/sandbox/hbase-site.xml
Original file line number Diff line number Diff line change
Expand Up @@ -190,22 +190,5 @@
<name>zookeeper.znode.parent</name>
<value>/hbase-unsecure</value>
</property>
<property>
<name>hbase.client.pause</name>
<value>100</value>
<description>General client pause value. Used mostly as value to wait
before running a retry of a failed get, region lookup, etc.
See hbase.client.retries.number for description of how we backoff from
this initial pause amount and how this pause works w/ retries.</description>
</property>
<property>
<name>hbase.client.retries.number</name>
<value>5</value>
<description>Maximum retries. Used as maximum for all retryable
operations such as the getting of a cell's value, starting a row update,
etc. Retry interval is a rough function based on hbase.client.pause. At
first we retry at this interval but then with backoff, we pretty quickly reach
retrying every ten seconds. See HConstants#RETRY_BACKOFF for how the backup
ramps up. Change this setting and hbase.client.pause to suit your workload.</description>
</property>

</configuration>
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,7 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.util.ClassUtil;
import org.apache.kylin.common.util.HBaseMetadataTestCase;
Expand All @@ -55,6 +54,7 @@
import org.apache.kylin.job.execution.ExecutableState;
import org.apache.kylin.job.impl.threadpool.DefaultScheduler;
import org.apache.kylin.job.manager.ExecutableManager;
import org.apache.kylin.storage.hbase.HBaseConnection;
import org.apache.kylin.storage.hbase.util.HBaseRegionSizeCalculator;
import org.apache.kylin.storage.hbase.util.ZookeeperJobLock;
import org.apache.kylin.tool.StorageCleanupJob;
Expand Down Expand Up @@ -419,10 +419,10 @@ private int cleanupOldStorage() throws Exception {
}

private void checkHFilesInHBase(CubeSegment segment) throws IOException {
Configuration conf = HBaseConfiguration.create(HadoopUtil.getCurrentConfiguration());
String tableName = segment.getStorageLocationIdentifier();
try (HTable table = new HTable(conf, tableName)) {
HBaseRegionSizeCalculator cal = new HBaseRegionSizeCalculator(table);
try (Connection conn = HBaseConnection.get(KylinConfig.getInstanceFromEnv().getStorageUrl())) {
String tableName = segment.getStorageLocationIdentifier();

HBaseRegionSizeCalculator cal = new HBaseRegionSizeCalculator(tableName, conn);
Map<byte[], Long> sizeMap = cal.getRegionSizeMap();
long totalSize = 0;
for (Long size : sizeMap.values()) {
Expand All @@ -448,5 +448,4 @@ private void checkHFilesInHBase(CubeSegment segment) throws IOException {
}
}
}

}
12 changes: 6 additions & 6 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -46,20 +46,20 @@
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>

<!-- Hadoop versions -->
<hadoop2.version>2.6.0</hadoop2.version>
<yarn.version>2.6.0</yarn.version>
<hadoop2.version>2.7.1</hadoop2.version>
<yarn.version>2.7.1</yarn.version>

<!-- Hive versions -->
<hive.version>0.14.0</hive.version>
<hive-hcatalog.version>0.14.0</hive-hcatalog.version>
<hive.version>1.2.1</hive.version>
<hive-hcatalog.version>1.2.1</hive-hcatalog.version>

<!-- HBase versions -->
<hbase-hadoop2.version>0.98.8-hadoop2</hbase-hadoop2.version>
<hbase-hadoop2.version>1.1.1</hbase-hadoop2.version>
<kafka.version>0.8.1</kafka.version>

<!-- Hadoop deps, keep compatible with hadoop2.version -->
<zookeeper.version>3.4.6</zookeeper.version>
<curator.version>2.6.0</curator.version>
<curator.version>2.7.1</curator.version>
<jackson.version>2.2.4</jackson.version>
<jsr305.version>3.0.1</jsr305.version>
<guava.version>14.0</guava.version>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

import java.io.IOException;

import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.Table;

/**
*/
Expand All @@ -37,6 +37,6 @@ public interface AclHBaseStorage {

String prepareHBaseTable(Class<?> clazz) throws IOException;

HTableInterface getTable(String tableName) throws IOException;
Table getTable(String tableName) throws IOException;

}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import java.io.IOException;

import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.Table;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.rest.service.AclService;
import org.apache.kylin.rest.service.QueryService;
Expand All @@ -34,8 +34,8 @@ public class MockAclHBaseStorage implements AclHBaseStorage {
private static final String aclTableName = "MOCK-ACL-TABLE";
private static final String userTableName = "MOCK-USER-TABLE";

private HTableInterface mockedAclTable;
private HTableInterface mockedUserTable;
private Table mockedAclTable;
private Table mockedUserTable;
private RealAclHBaseStorage realAcl;

public MockAclHBaseStorage() {
Expand Down Expand Up @@ -65,7 +65,7 @@ public String prepareHBaseTable(Class<?> clazz) throws IOException {
}

@Override
public HTableInterface getTable(String tableName) throws IOException {
public Table getTable(String tableName) throws IOException {
if (realAcl != null) {
return realAcl.getTable(tableName);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.client.Increment;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
Expand Down Expand Up @@ -91,7 +91,7 @@
* <li>remove some methods for loading data, checking values ...</li>
* </ul>
*/
public class MockHTable implements HTableInterface {
public class MockHTable implements Table {
private final String tableName;
private final List<String> columnFamilies = new ArrayList<>();

Expand All @@ -114,14 +114,6 @@ public void addColumnFamily(String columnFamily) {
this.columnFamilies.add(columnFamily);
}

/**
* {@inheritDoc}
*/
@Override
public byte[] getTableName() {
return tableName.getBytes();
}

@Override
public TableName getName() {
return null;
Expand Down Expand Up @@ -200,8 +192,8 @@ public boolean exists(Get get) throws IOException {
}

@Override
public Boolean[] exists(List<Get> gets) throws IOException {
return new Boolean[0];
public boolean[] existsAll(List<Get> list) throws IOException {
return new boolean[0];
}

/**
Expand Down Expand Up @@ -302,15 +294,6 @@ public Result[] get(List<Get> gets) throws IOException {
return results.toArray(new Result[results.size()]);
}

/**
* {@inheritDoc}
*/
@Override
public Result getRowOrBefore(byte[] row, byte[] family) throws IOException {
// FIXME: implement
return null;
}

/**
* {@inheritDoc}
*/
Expand Down Expand Up @@ -446,7 +429,7 @@ private List<KeyValue> filter(Filter filter, List<KeyValue> kvs) throws IOExcept
*/
}
if (filter.hasFilterRow() && !filteredOnRowKey) {
filter.filterRow(nkvs);
filter.filterRow();
}
if (filter.filterRow() || filteredOnRowKey) {
nkvs.clear();
Expand Down Expand Up @@ -535,6 +518,11 @@ public boolean checkAndPut(byte[] row, byte[] family, byte[] qualifier, byte[] v
return false;
}

@Override
public boolean checkAndPut(byte[] bytes, byte[] bytes1, byte[] bytes2, CompareFilter.CompareOp compareOp, byte[] bytes3, Put put) throws IOException {
return false;
}

/**
* {@inheritDoc}
*/
Expand All @@ -555,7 +543,7 @@ public void delete(Delete delete) throws IOException {
continue;
}
for (KeyValue kv : delete.getFamilyMap().get(family)) {
if (kv.isDeleteFamily()) {
if (kv.isDelete()) {
data.get(row).get(kv.getFamily()).clear();
} else {
data.get(row).get(kv.getFamily()).remove(kv.getQualifier());
Expand Down Expand Up @@ -592,6 +580,11 @@ public boolean checkAndDelete(byte[] row, byte[] family, byte[] qualifier, byte[
return false;
}

@Override
public boolean checkAndDelete(byte[] bytes, byte[] bytes1, byte[] bytes2, CompareFilter.CompareOp compareOp, byte[] bytes3, Delete delete) throws IOException {
return false;
}

/**
* {@inheritDoc}
*/
Expand All @@ -605,45 +598,14 @@ public Result increment(Increment increment) throws IOException {
*/
@Override
public long incrementColumnValue(byte[] row, byte[] family, byte[] qualifier, long amount) throws IOException {
return incrementColumnValue(row, family, qualifier, amount, true);
return incrementColumnValue(row, family, qualifier, amount, null);
}

@Override
public long incrementColumnValue(byte[] row, byte[] family, byte[] qualifier, long amount, Durability durability) throws IOException {
return 0;
}

/**
* {@inheritDoc}
*/
@Override
public long incrementColumnValue(byte[] row, byte[] family, byte[] qualifier, long amount, boolean writeToWAL) throws IOException {
if (check(row, family, qualifier, null)) {
Put put = new Put(row);
put.add(family, qualifier, Bytes.toBytes(amount));
put(put);
return amount;
}
long newValue = Bytes.toLong(data.get(row).get(family).get(qualifier).lastEntry().getValue()) + amount;
data.get(row).get(family).get(qualifier).put(System.currentTimeMillis(), Bytes.toBytes(newValue));
return newValue;
}

/**
* {@inheritDoc}
*/
@Override
public boolean isAutoFlush() {
return true;
}

/**
* {@inheritDoc}
*/
@Override
public void flushCommits() throws IOException {
}

/**
* {@inheritDoc}
*/
Expand All @@ -669,29 +631,6 @@ public <T extends Service, R> void coprocessorService(Class<T> service, byte[] s

}

/**
* {@inheritDoc}
*/
@Override
public void setAutoFlush(boolean autoFlush) {
throw new NotImplementedException();

}

/**
* {@inheritDoc}
*/
@Override
public void setAutoFlush(boolean autoFlush, boolean clearBufferOnFail) {
throw new NotImplementedException();

}

@Override
public void setAutoFlushTo(boolean autoFlush) {
throw new NotImplementedException();
}

/**
* {@inheritDoc}
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@
import java.io.IOException;

import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Table;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.rest.service.AclService;
import org.apache.kylin.rest.service.QueryService;
Expand Down Expand Up @@ -57,11 +58,11 @@ public String prepareHBaseTable(Class<?> clazz) throws IOException {
}

@Override
public HTableInterface getTable(String tableName) throws IOException {
public Table getTable(String tableName) throws IOException {
if (StringUtils.equals(tableName, aclTableName)) {
return HBaseConnection.get(hbaseUrl).getTable(aclTableName);
return HBaseConnection.get(hbaseUrl).getTable(TableName.valueOf(aclTableName));
} else if (StringUtils.equals(tableName, userTableName)) {
return HBaseConnection.get(hbaseUrl).getTable(userTableName);
return HBaseConnection.get(hbaseUrl).getTable(TableName.valueOf(userTableName));
} else {
throw new IllegalStateException("getTable failed" + tableName);
}
Expand Down
Loading

0 comments on commit e2adfdc

Please sign in to comment.