Skip to content

Commit

Permalink
KYLIN-3275, add ut for storage clean job.
Browse files Browse the repository at this point in the history
  • Loading branch information
Aaaaaaron authored and liyang-kylin committed Mar 11, 2018
1 parent 56e7333 commit 9ab6b52
Show file tree
Hide file tree
Showing 11 changed files with 544 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.client.HBaseAdmin;
Expand All @@ -45,11 +44,16 @@ public class StorageCleanJobHbaseUtil {
protected static final Logger logger = LoggerFactory.getLogger(StorageCleanJobHbaseUtil.class);

public static void cleanUnusedHBaseTables(boolean delete, int deleteTimeout) throws IOException {
Configuration conf = HBaseConfiguration.create();
try (HBaseAdmin hbaseAdmin = new HBaseAdmin(HBaseConfiguration.create())) {
cleanUnusedHBaseTables(hbaseAdmin, delete, deleteTimeout);
}
}

static void cleanUnusedHBaseTables(HBaseAdmin hbaseAdmin, boolean delete, int deleteTimeout) throws IOException {
KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
CubeManager cubeMgr = CubeManager.getInstance(kylinConfig);
// get all kylin hbase tables
try (HBaseAdmin hbaseAdmin = new HBaseAdmin(conf)) {
try {
String namespace = kylinConfig.getHBaseStorageNameSpace();
String tableNamePrefix = (namespace.equals("default") || namespace.equals(""))
? kylinConfig.getHBaseTableNamePrefix() : (namespace + ":" + kylinConfig.getHBaseTableNamePrefix());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
import org.apache.commons.cli.OptionBuilder;
import org.apache.commons.cli.Options;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
Expand Down Expand Up @@ -71,12 +70,27 @@ public class StorageCleanupJob extends AbstractApplication {

protected static final Logger logger = LoggerFactory.getLogger(StorageCleanupJob.class);
public static final int deleteTimeout = 10; // Unit minute

protected FileSystem hbaseFs;
protected FileSystem defaultFs;
protected boolean delete = false;
protected boolean force = false;
protected static ExecutableManager executableManager = ExecutableManager.getInstance(KylinConfig
.getInstanceFromEnv());

public StorageCleanupJob(FileSystem defaultFs, FileSystem hbaseFs) {
this.defaultFs = defaultFs;
this.hbaseFs = hbaseFs;
}

public StorageCleanupJob() throws IOException {
this.defaultFs = HadoopUtil.getWorkingFileSystem();
this.hbaseFs = HadoopUtil.getWorkingFileSystem(HBaseConfiguration.create());
}

public void setDelete(boolean delete) {
this.delete = delete;
}

protected void cleanUnusedHBaseTables() throws IOException {
KylinConfig config = KylinConfig.getInstanceFromEnv();
if ("hbase".equals(config.getMetadataUrl().getScheme())) {
Expand Down Expand Up @@ -107,34 +121,44 @@ protected void execute(OptionsHelper optionsHelper) throws Exception {
logger.info("force option value: '" + optionsHelper.getOptionValue(OPTION_FORCE) + "'");
delete = Boolean.parseBoolean(optionsHelper.getOptionValue(OPTION_DELETE));
force = Boolean.parseBoolean(optionsHelper.getOptionValue(OPTION_FORCE));
cleanUnusedIntermediateHiveTable();

KylinConfig config = KylinConfig.getInstanceFromEnv();
cleanUnusedIntermediateHiveTable(getHiveTables(), getCliCommandExecutor());

if (StringUtils.isNotEmpty(config.getHBaseClusterFs())) {
cleanUnusedHdfsFiles(HBaseConfiguration.create());
cleanUnusedHdfsFiles(hbaseFs);
}
Configuration conf = HadoopUtil.getCurrentConfiguration();
cleanUnusedHdfsFiles(conf);
cleanUnusedHdfsFiles(defaultFs);
cleanUnusedHBaseTables();
}

private void cleanUnusedHdfsFiles(Configuration conf) throws IOException {
protected List<String> getHiveTables() throws Exception {
ISourceMetadataExplorer explr = SourceFactory.getDefaultSource().getSourceMetadataExplorer();
return explr.listTables(KylinConfig.getInstanceFromEnv().getHiveDatabaseForIntermediateTable());
}

protected CliCommandExecutor getCliCommandExecutor() throws IOException {
return KylinConfig.getInstanceFromEnv().getCliCommandExecutor();
}

void cleanUnusedHdfsFiles(FileSystem fs) throws IOException {
JobEngineConfig engineConfig = new JobEngineConfig(KylinConfig.getInstanceFromEnv());
CubeManager cubeMgr = CubeManager.getInstance(KylinConfig.getInstanceFromEnv());
FileSystem fs = HadoopUtil.getWorkingFileSystem(conf);
List<String> allHdfsPathsNeedToBeDeleted = new ArrayList<String>();
// GlobFilter filter = new
// GlobFilter(KylinConfig.getInstanceFromEnv().getHdfsWorkingDirectory()
// + "/kylin-.*");
// TODO: when first use, /kylin/kylin_metadata does not exist.
try {
FileStatus[] fStatus = fs.listStatus(new Path(KylinConfig.getInstanceFromEnv().getHdfsWorkingDirectory()));
for (FileStatus status : fStatus) {
String path = status.getPath().getName();
// System.out.println(path);
if (path.startsWith("kylin-")) {
String kylinJobPath = engineConfig.getHdfsWorkingDirectory() + path;
allHdfsPathsNeedToBeDeleted.add(kylinJobPath);
if (fStatus != null) {
for (FileStatus status : fStatus) {
String path = status.getPath().getName();
// System.out.println(path);
if (path.startsWith("kylin-")) {
String kylinJobPath = engineConfig.getHdfsWorkingDirectory() + path;
allHdfsPathsNeedToBeDeleted.add(kylinJobPath);
}
}
}
} catch (FileNotFoundException e) {
Expand Down Expand Up @@ -187,17 +211,13 @@ private void cleanUnusedHdfsFiles(Configuration conf) throws IOException {
}
}

private void cleanUnusedIntermediateHiveTable() throws Exception {
Configuration conf = HadoopUtil.getCurrentConfiguration();
void cleanUnusedIntermediateHiveTable(List<String> hiveTableNames, CliCommandExecutor cmdExec) throws Exception {
final KylinConfig config = KylinConfig.getInstanceFromEnv();
JobEngineConfig engineConfig = new JobEngineConfig(KylinConfig.getInstanceFromEnv());
final CliCommandExecutor cmdExec = config.getCliCommandExecutor();
final int uuidLength = 36;
final String preFix = MetadataConstants.KYLIN_INTERMEDIATE_PREFIX;
final String uuidPattern = "[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}";

ISourceMetadataExplorer explr = SourceFactory.getDefaultSource().getSourceMetadataExplorer();
List<String> hiveTableNames = explr.listTables(config.getHiveDatabaseForIntermediateTable());
Iterable<String> kylinIntermediates = Iterables.filter(hiveTableNames, new Predicate<String>() {
@Override
public boolean apply(@Nullable String input) {
Expand Down Expand Up @@ -294,9 +314,8 @@ public boolean apply(@Nullable String input) {
String path = JobBuilderSupport.getJobWorkingDir(engineConfig.getHdfsWorkingDirectory(),
segmentId2JobId.get(segmentId)) + "/" + tableToDelete;
Path externalDataPath = new Path(path);
FileSystem fs = HadoopUtil.getWorkingFileSystem();
if (fs.exists(externalDataPath)) {
fs.delete(externalDataPath, true);
if (defaultFs.exists(externalDataPath)) {
defaultFs.delete(externalDataPath, true);
logger.info("Hive table {}'s external path {} deleted", tableToDelete, path);
} else {
logger.info(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,12 @@ public void updateConfig(String key, String value) {

@PreAuthorize(Constant.ACCESS_HAS_ROLE_ADMIN)
public void cleanupStorage() {
StorageCleanupJob job = new StorageCleanupJob();
StorageCleanupJob job = null;
try {
job = new StorageCleanupJob();
} catch (IOException e) {
logger.error("can not init StorageCleanupJob", e);
}
String[] args = new String[] { "-delete", "true" };
job.execute(args);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.kylin.rest.job;

import static org.apache.kylin.common.util.LocalFileMetadataTestCase.cleanAfterClass;
import static org.apache.kylin.common.util.LocalFileMetadataTestCase.staticCreateTestMetadata;
import static org.junit.Assert.assertEquals;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

import java.io.IOException;

import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.kylin.common.util.LocalFileMetadataTestCase.OverlayMetaHook;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.mockito.ArgumentCaptor;

import com.google.common.collect.Lists;

public class StorageCleanJobHbaseUtilTest {
@Before
public void setup() {
staticCreateTestMetadata(true, new OverlayMetaHook("src/test/resources/ut_meta/hbase_storage_ut/"));
}

@After
public void after() {
cleanAfterClass();
}

@Test
public void test() throws IOException {
HBaseAdmin hBaseAdmin = mock(HBaseAdmin.class);
HTableDescriptor[] hds = new HTableDescriptor[2];
HTableDescriptor d1 = mock(HTableDescriptor.class);
HTableDescriptor d2 = mock(HTableDescriptor.class);
hds[0] = d1;
hds[1] = d2;
when(d1.getValue("KYLIN_HOST")).thenReturn("../examples/test_metadata/");
when(d2.getValue("KYLIN_HOST")).thenReturn("../examples/test_metadata/");
when(d1.getTableName()).thenReturn(TableName.valueOf("KYLIN_J9TE08D9IA"));
String toBeDel = "to-be-del";
when(d2.getTableName()).thenReturn(TableName.valueOf(toBeDel));
when(hBaseAdmin.listTables("KYLIN_.*")).thenReturn(hds);

when(hBaseAdmin.tableExists(toBeDel)).thenReturn(true);
when(hBaseAdmin.isTableEnabled(toBeDel)).thenReturn(false);
StorageCleanJobHbaseUtil.cleanUnusedHBaseTables(hBaseAdmin, true, 100000);

ArgumentCaptor<String> captor = ArgumentCaptor.forClass(String.class);
verify(hBaseAdmin).deleteTable(captor.capture());
assertEquals(Lists.newArrayList(toBeDel), captor.getAllValues());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.kylin.rest.job;

import static org.apache.kylin.common.util.LocalFileMetadataTestCase.cleanAfterClass;
import static org.apache.kylin.common.util.LocalFileMetadataTestCase.staticCreateTestMetadata;
import static org.junit.Assert.assertEquals;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.ArgumentMatchers.notNull;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.kylin.common.util.CliCommandExecutor;
import org.apache.kylin.common.util.LocalFileMetadataTestCase.OverlayMetaHook;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.mockito.ArgumentCaptor;

import com.google.common.collect.Lists;

public class StorageCleanupJobTest {
@Before
public void setup() {
staticCreateTestMetadata(true, new OverlayMetaHook("src/test/resources/ut_meta/storage_ut/"));
}

@After
public void after() {
cleanAfterClass();
}

@Test
public void test() throws Exception {
FileSystem mockFs = mock(FileSystem.class);
prepareUnusedIntermediateHiveTable(mockFs);
prepareUnusedHDFSFiles(mockFs);

MockStorageCleanupJob job = new MockStorageCleanupJob(mockFs, mockFs);
job.execute(new String[] { "--delete", "true" });

ArgumentCaptor<Path> pathCaptor = ArgumentCaptor.forClass(Path.class);
verify(mockFs, times(2)).delete(pathCaptor.capture(), eq(true));
ArrayList<Path> expected = Lists.newArrayList(
// verifyCleanUnusedIntermediateHiveTable
new Path("file:///tmp/examples/test_metadata/kylin-f8edd777-8756-40d5-be19-3159120e4f7b/kylin_intermediate_2838c7fc-722a-48fa-9d1a-8ab37837a952"),

// verifyCleanUnusedHdfsFiles
new Path("file:///tmp/examples/test_metadata/kylin-to-be-delete")
);
assertEquals(expected, pathCaptor.getAllValues());
}

private void prepareUnusedHDFSFiles(FileSystem mockFs) throws IOException {
Path p1 = new Path("file:///tmp/examples/test_metadata/");
FileStatus[] statuses = new FileStatus[3];
FileStatus f1 = mock(FileStatus.class);
FileStatus f2 = mock(FileStatus.class);
FileStatus f3 = mock(FileStatus.class);
// only remove FINISHED and DISCARDED job intermediate files, so this exclude.
when(f1.getPath()).thenReturn(new Path("kylin-091a0322-249c-43e7-91df-205603ab6883"));
// remove every segment working dir from deletion list, so this exclude.
when(f2.getPath()).thenReturn(new Path("kylin-bcf2f125-9b0b-40dd-9509-95ec59b31333"));
when(f3.getPath()).thenReturn(new Path("kylin-to-be-delete"));
statuses[0] = f1;
statuses[1] = f2;
statuses[2] = f3;

when(mockFs.listStatus(p1)).thenReturn(statuses);
Path p2 = new Path("file:///tmp/examples/test_metadata/kylin-to-be-delete");
when(mockFs.exists(p2)).thenReturn(true);
}

private void prepareUnusedIntermediateHiveTable(FileSystem mockFs) throws IOException {
Path p1 = new Path(
"file:///tmp/examples/test_metadata/kylin-f8edd777-8756-40d5-be19-3159120e4f7b/kylin_intermediate_2838c7fc-722a-48fa-9d1a-8ab37837a952");
when(mockFs.exists(p1)).thenReturn(true);
}

class MockStorageCleanupJob extends StorageCleanupJob {

MockStorageCleanupJob(FileSystem defaultFs, FileSystem hbaseFs) {
super(defaultFs, hbaseFs);
}

@Override
protected List<String> getHiveTables() throws Exception {
List<String> l = new ArrayList<>();
l.add("kylin_intermediate_2838c7fc-722a-48fa-9d1a-8ab37837a952");
// wrong prefix, so this is exclude.
l.add("wrong_prefix_6219a647-d8be-49bb-8562-3f4976922a96");
// intermediate table still in use, so this is exclude.
l.add("kylin_intermediate_091a0322-249c-43e7-91df-205603ab6883");
return l;
}

@Override
protected CliCommandExecutor getCliCommandExecutor() throws IOException {
CliCommandExecutor mockCli = mock(CliCommandExecutor.class);
when(mockCli.execute((String) notNull())).thenReturn(null);
return mockCli;
}
}
}
Loading

0 comments on commit 9ab6b52

Please sign in to comment.