Skip to content

Commit

Permalink
KYLIN-3485 Make unloading table more flexible
Browse files Browse the repository at this point in the history
  • Loading branch information
Rongchuan Jin authored and shaofengshi committed Aug 7, 2018
1 parent e7bacd3 commit 050f1c1
Show file tree
Hide file tree
Showing 5 changed files with 35 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.kylin.source;

import java.io.Closeable;
import java.io.IOException;

import org.apache.kylin.metadata.model.IBuildable;
import org.apache.kylin.metadata.model.TableDesc;
Expand Down Expand Up @@ -55,4 +56,9 @@ public interface ISource extends Closeable {
* For testing purpose.
*/
ISampleDataDeployer getSampleDataDeployer();

/**
* Unload table.
*/
void unloadTable(String tableName, String project) throws IOException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@

import com.google.common.collect.Maps;
import org.apache.commons.lang.StringUtils;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.util.HadoopUtil;
import org.apache.kylin.common.util.Pair;
import org.apache.kylin.cube.CubeManager;
Expand All @@ -53,7 +54,6 @@
import org.apache.kylin.metadata.model.TableDesc;
import org.apache.kylin.metadata.model.TableExtDesc;
import org.apache.kylin.metadata.project.ProjectInstance;
import org.apache.kylin.metadata.streaming.StreamingConfig;
import org.apache.kylin.rest.exception.BadRequestException;
import org.apache.kylin.rest.msg.Message;
import org.apache.kylin.rest.msg.MsgPicker;
Expand All @@ -62,11 +62,11 @@
import org.apache.kylin.rest.response.TableSnapshotResponse;
import org.apache.kylin.source.IReadableTable;
import org.apache.kylin.source.IReadableTable.TableSignature;
import org.apache.kylin.source.ISource;
import org.apache.kylin.source.ISourceMetadataExplorer;
import org.apache.kylin.source.SourceManager;
import org.apache.kylin.source.hive.cardinality.HiveColumnCardinalityJob;
import org.apache.kylin.source.hive.cardinality.HiveColumnCardinalityUpdateJob;
import org.apache.kylin.source.kafka.config.KafkaConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
Expand Down Expand Up @@ -277,8 +277,6 @@ public boolean unloadHiveTable(String tableName, String project) throws IOExcept
return false;
}

tableType = desc.getSourceType();

if (!modelService.isTableInModel(desc, project)) {
removeTableFromProject(tableName, project);
rtn = true;
Expand All @@ -293,20 +291,9 @@ public boolean unloadHiveTable(String tableName, String project) throws IOExcept
metaMgr.removeSourceTable(tableName, project);

// remove streaming info
if (tableType == 1) {
StreamingConfig config = null;
KafkaConfig kafkaConfig = null;
try {
config = streamingService.getStreamingManager().getStreamingConfig(tableName);
kafkaConfig = kafkaConfigService.getKafkaConfig(tableName, project);
streamingService.dropStreamingConfig(config, project);
kafkaConfigService.dropKafkaConfig(kafkaConfig, project);
rtn = true;
} catch (Exception e) {
rtn = false;
logger.error(e.getLocalizedMessage(), e);
}
}
SourceManager sourceManager = SourceManager.getInstance(KylinConfig.getInstanceFromEnv());
ISource source = sourceManager.getCachedSource(desc);
source.unloadTable(tableName, project);
return rtn;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,11 @@ public ISampleDataDeployer getSampleDataDeployer() {
return new HiveMetadataExplorer();
}

@Override
public void unloadTable(String tableName, String project) throws IOException {

}

@Override
public void close() throws IOException {
// not needed
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,11 @@ public ISampleDataDeployer getSampleDataDeployer() {
return new JdbcExplorer();
}

@Override
public void unloadTable(String tableName, String project) throws IOException {

}

@Override
public void close() throws IOException {
// not needed
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.apache.kylin.metadata.model.TableDesc;
import org.apache.kylin.metadata.model.TableExtDesc;
import org.apache.kylin.metadata.streaming.StreamingConfig;
import org.apache.kylin.metadata.streaming.StreamingManager;
import org.apache.kylin.source.IReadableTable;
import org.apache.kylin.source.ISampleDataDeployer;
import org.apache.kylin.source.ISource;
Expand Down Expand Up @@ -246,6 +247,19 @@ public ISampleDataDeployer getSampleDataDeployer() {
throw new UnsupportedOperationException();
}

@Override
public void unloadTable(String tableName, String project) throws IOException {
StreamingConfig config;
KafkaConfig kafkaConfig;
KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
StreamingManager streamingManager = StreamingManager.getInstance(kylinConfig);
KafkaConfigManager kafkaConfigManager = KafkaConfigManager.getInstance(kylinConfig);
config = streamingManager.getStreamingConfig(tableName);
kafkaConfig = kafkaConfigManager.getKafkaConfig(tableName);
streamingManager.removeStreamingConfig(config);
kafkaConfigManager.removeKafkaConfig(kafkaConfig);
}

@Override
public void close() throws IOException {
// not needed
Expand Down

0 comments on commit 050f1c1

Please sign in to comment.