Skip to content

Commit

Permalink
创建record writer前先更新字典
Browse files Browse the repository at this point in the history
  • Loading branch information
demotto committed Dec 27, 2018
1 parent 9fdf53a commit fb56abb
Show file tree
Hide file tree
Showing 6 changed files with 68 additions and 61 deletions.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,15 @@ private CarbonDictionaryUtil() {
// hehe
}


public static void generateEmptyDictionaryIfNotExists(CarbonLoadModel carbonLoadModel) throws IOException {
String[] headers = carbonLoadModel.getCsvHeaderColumns();
String[] empty = new String[headers.length];
List<String[]> data = new ArrayList<>();
data.add(empty);
generateGlobalDictionary(carbonLoadModel, data);
}

/**
* generate global dictionary with SQLContext and CarbonLoadModel
* 入口
Expand All @@ -84,6 +93,11 @@ public static void generateGlobalDictionary(CarbonLoadModel carbonLoadModel, Lis
Tuple2<CarbonDimension[],String[]> tuple2 = pruneDimensions(dimensions, headers,headers);
CarbonDimension[] requireDimension = tuple2.getField(0);
String[] requireColumnNames = tuple2.getField(1);

if(requireColumnNames == null || requireColumnNames.length == 0) {
LOG.info("no dictionary table");
return;
}
DictionaryLoadModel model = createDictionaryLoadModel(carbonLoadModel, carbonTableIdentifier, requireDimension, dictfolderPath, false);

int[] dictColIndices = new int[requireColumnNames.length];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ public abstract class AbstractRecordWriterAssemble {

protected List<String[]> data = new ArrayList<>();

protected boolean dictionaryCreated = false;

public AbstractRecordWriterAssemble(CarbonTable carbonTable) {
this.carbonTable = carbonTable;
}
Expand All @@ -77,16 +79,9 @@ protected TaskAttemptContext createTaskContext() {

public void write(String[] record) throws IOException, InterruptedException {
data.add(record);
int writerNo = getRecordWriterNumber(record);
ObjectArrayWritable writable = new ObjectArrayWritable();
writable.set(record);
recordWriterList.get(writerNo).write(NullWritable.get(), writable);
counter[writerNo]++;
}

protected void closeRecordWriter(int writerNo) throws IOException, InterruptedException {
CarbonDictionaryUtil.generateGlobalDictionary(carbonLoadModelList.get(0), data);
data.clear();
RecordWriter recordWriter = recordWriterList.get(writerNo);
if(recordWriter != null) {
recordWriter.close(taskAttemptContextList.get(writerNo));
Expand Down Expand Up @@ -137,17 +132,31 @@ protected void postCloseRecordWriter(int writerNo) throws IOException {

}

private void writeDictionary() {
public void close() throws IOException, InterruptedException {
CarbonDictionaryUtil.generateGlobalDictionary(carbonLoadModelList.get(0), data);

}
createRecordWriterList();

for(String[] record : data) {
int writerNo = getRecordWriterNumber(record);
ObjectArrayWritable writable = new ObjectArrayWritable();
writable.set(record);
recordWriterList.get(writerNo).write(NullWritable.get(), writable);
counter[writerNo]++;
}

data.clear();

public void close() throws IOException, InterruptedException {
for(int i = 0; i < recordWriterList.size(); ++i) {
closeRecordWriter(i);
}
}

protected abstract void createRecordWriterList();


protected RecordWriter createRecordWriter(CarbonLoadModel model, TaskAttemptContext context) throws IOException {

CarbonTableOutputFormat.setLoadModel(context.getConfiguration(), model);
CarbonTableOutputFormat.setCarbonTable(context.getConfiguration(), model.getCarbonDataLoadSchema().getCarbonTable());
CarbonTableOutputFormat carbonTableOutputFormat = new CarbonTableOutputFormat();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,13 +51,6 @@ public CarbonPartitionRecordWriterAssemble(CarbonTable carbonTable) {
TaskAttemptContext context = createTaskContext();
context.getConfiguration().set("carbon.outputformat.taskno", String.valueOf(partitionId));
taskAttemptContextList.add(context);
RecordWriter recordWriter = null;
try {
recordWriter = createRecordWriter(carbonLoadModel, context);
recordWriterList.add(recordWriter);
} catch (IOException e) {
throw new RuntimeException(e);
}
}

List<ColumnSchema> columnSchemaList = partitionInfo.getColumnSchemaList();
Expand Down Expand Up @@ -116,4 +109,17 @@ protected int getRecordWriterNumber(String[] record) {
return partitionIds.indexOf(partitionId);
}

@Override
protected void createRecordWriterList() {
for(int i = 0; i < partitionIds.size(); ++i) {
RecordWriter recordWriter = null;
try {
recordWriter = createRecordWriter(carbonLoadModelList.get(i), taskAttemptContextList.get(i));
recordWriterList.add(recordWriter);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -48,13 +48,6 @@ public HivePartitionRecordWriterAssemble(CarbonTable carbonTable, String partiti
carbonLoadModelList.add(carbonLoadModel);
context = createTaskContext();
taskAttemptContextList.add(context);
RecordWriter recordWriter = null;
try {
recordWriter = createRecordWriter(carbonLoadModel, context);
recordWriterList.add(recordWriter);
} catch (IOException e) {
throw new RuntimeException(e);
}
}

@Override
Expand Down Expand Up @@ -127,6 +120,17 @@ protected void postCloseRecordWriter(int writerNo) throws IOException {

}

@Override
protected void createRecordWriterList() {
RecordWriter recordWriter = null;
try {
recordWriter = createRecordWriter(carbonLoadModel, context);
recordWriterList.add(recordWriter);
} catch (IOException e) {
throw new RuntimeException(e);
}
}


@Override
protected TaskAttemptContext createTaskContext() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,24 @@ public SimpleRecordWriterAssemble(CarbonTable carbonTable) {
carbonLoadModelList.add(carbonLoadModel);
TaskAttemptContext context = createTaskContext();
taskAttemptContextList.add(context);

}

@Override
protected int getRecordWriterNumber(String[] record) {
return 0;
}

@Override
protected void createRecordWriterList() {
RecordWriter recordWriter = null;
try {
recordWriter = createRecordWriter(carbonLoadModel, context);
recordWriter = createRecordWriter(carbonLoadModelList.get(0), taskAttemptContextList.get(0));
recordWriterList.add(recordWriter);
} catch (IOException e) {
throw new RuntimeException(e);
}
}

@Override
protected int getRecordWriterNumber(String[] record) {
return 0;
}

}

0 comments on commit fb56abb

Please sign in to comment.