Skip to content

Commit

Permalink
Make the file_name in carbonindex exactly.
Browse files Browse the repository at this point in the history
  • Loading branch information
watermen committed Mar 24, 2017
1 parent b7c636f commit 4da2a70
Show file tree
Hide file tree
Showing 5 changed files with 121 additions and 7 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
package org.apache.carbondata.spark.testsuite.dataload

import java.io.{File, FilenameFilter}

import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.reader.CarbonIndexFileReader
import org.apache.carbondata.core.util.CarbonProperties
import org.apache.carbondata.core.util.path.CarbonTablePath
import org.apache.spark.sql.common.util.QueryTest
import org.scalatest.BeforeAndAfterAll

class TestDataLoadWithFileName extends QueryTest with BeforeAndAfterAll {
var originVersion = ""

override def beforeAll() {
originVersion =
CarbonProperties.getInstance.getProperty(CarbonCommonConstants.CARBON_DATA_FILE_VERSION)
}

test("Test the file_name in carbonindex with v1 format") {
CarbonProperties.getInstance.addProperty(CarbonCommonConstants.CARBON_DATA_FILE_VERSION, "1")
sql("DROP TABLE IF EXISTS test_table_v1")
sql(
"""
| CREATE TABLE test_table_v1(id int, name string, city string, age int)
| STORED BY 'org.apache.carbondata.format'
""".stripMargin)
val testData = s"$resourcesPath/sample.csv"
sql(s"LOAD DATA LOCAL INPATH '$testData' into table test_table_v1")
val indexReader = new CarbonIndexFileReader()
val carbonIndexPaths = new File(s"$storeLocation/default/test_table_v1/Fact/Part0/Segment_0/")
.listFiles(new FilenameFilter {
override def accept(dir: File, name: String): Boolean = {
name.endsWith(CarbonTablePath.getCarbonIndexExtension)
}
})
for (carbonIndexPath <- carbonIndexPaths) {
indexReader.openThriftReader(carbonIndexPath.getCanonicalPath)
assert(indexReader.readIndexHeader().getVersion === 1)
while (indexReader.hasNext) {
val readBlockIndexInfo = indexReader.readBlockIndexInfo()
assert(readBlockIndexInfo.getFile_name.startsWith(storeLocation))
}
}
}

test("Test the file_name in carbonindex with v2 format") {
CarbonProperties.getInstance.addProperty(CarbonCommonConstants.CARBON_DATA_FILE_VERSION, "2")
sql("DROP TABLE IF EXISTS test_table_v2")
sql(
"""
| CREATE TABLE test_table_v2(id int, name string, city string, age int)
| STORED BY 'org.apache.carbondata.format'
""".stripMargin)
val testData = s"$resourcesPath/sample.csv"
sql(s"LOAD DATA LOCAL INPATH '$testData' into table test_table_v2")
val indexReader = new CarbonIndexFileReader()
val carbonIndexPaths = new File(s"$storeLocation/default/test_table_v2/Fact/Part0/Segment_0/")
.listFiles(new FilenameFilter {
override def accept(dir: File, name: String): Boolean = {
name.endsWith(CarbonTablePath.getCarbonIndexExtension)
}
})
for (carbonIndexPath <- carbonIndexPaths) {
indexReader.openThriftReader(carbonIndexPath.getCanonicalPath)
assert(indexReader.readIndexHeader().getVersion === 2)
while (indexReader.hasNext) {
val readBlockIndexInfo = indexReader.readBlockIndexInfo()
assert(readBlockIndexInfo.getFile_name.startsWith(storeLocation))
}
}
}

test("Test the file_name in carbonindex with v3 format") {
CarbonProperties.getInstance.addProperty(CarbonCommonConstants.CARBON_DATA_FILE_VERSION, "3")
sql("DROP TABLE IF EXISTS test_table_v3")
sql(
"""
| CREATE TABLE test_table_v3(id int, name string, city string, age int)
| STORED BY 'org.apache.carbondata.format'
""".stripMargin)
val testData = s"$resourcesPath/sample.csv"
sql(s"LOAD DATA LOCAL INPATH '$testData' into table test_table_v3")
val indexReader = new CarbonIndexFileReader()
val carbonIndexPaths = new File(s"$storeLocation/default/test_table_v3/Fact/Part0/Segment_0/")
.listFiles(new FilenameFilter {
override def accept(dir: File, name: String): Boolean = {
name.endsWith(CarbonTablePath.getCarbonIndexExtension)
}
})
for (carbonIndexPath <- carbonIndexPaths) {
indexReader.openThriftReader(carbonIndexPath.getCanonicalPath)
assert(indexReader.readIndexHeader().getVersion === 3)
while (indexReader.hasNext) {
val readBlockIndexInfo = indexReader.readBlockIndexInfo()
assert(readBlockIndexInfo.getFile_name.startsWith(storeLocation))
}
}
}

override protected def afterAll() {
sql("DROP TABLE IF EXISTS test_table_v1")
sql("DROP TABLE IF EXISTS test_table_v2")
sql("DROP TABLE IF EXISTS test_table_v3")
CarbonProperties.getInstance.addProperty(CarbonCommonConstants.CARBON_DATA_FILE_VERSION,
originVersion)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,12 @@ public abstract class AbstractFactDataWriter<T> implements CarbonFactDataWriter<
* file name
*/
protected String fileName;

/**
* The path of carbonData file
*/
protected String carbonDataFilePath;

/**
* Local cardinality for the segment
*/
Expand Down Expand Up @@ -295,6 +301,8 @@ public void initializeWriter() throws CarbonDataWriterException {
dataWriterVo.getFileManager().add(fileData);
this.fileName = dataWriterVo.getStoreLocation() + File.separator + carbonDataFileName
+ CarbonCommonConstants.FILE_INPROGRESS_STATUS;
this.carbonDataFilePath =
dataWriterVo.getCarbonDataDirectoryPath() + File.separator + carbonDataFileName;
this.fileCount++;
try {
// open channel for new data file
Expand Down Expand Up @@ -376,8 +384,7 @@ protected void fillBlockIndexInfoDetails(long numberOfRows,
minmax.setMaxValues(currentMaxValue);
BlockletIndex blockletIndex = new BlockletIndex(btree, minmax);
BlockIndexInfo blockIndexInfo =
new BlockIndexInfo(numberOfRows, filePath.substring(0, filePath.lastIndexOf('.')),
currentPosition, blockletIndex);
new BlockIndexInfo(numberOfRows, filePath, currentPosition, blockletIndex);
blockIndexInfoList.add(blockIndexInfo);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -373,7 +373,7 @@ protected void writeBlockletInfoToFile(FileChannel channel, String filePath)
FileFooter convertFileMeta = CarbonMetadataUtil
.convertFileFooter(blockletInfoList, localCardinality.length, localCardinality,
thriftColumnSchemaList, dataWriterVo.getSegmentProperties());
fillBlockIndexInfoDetails(convertFileMeta.getNum_rows(), filePath, currentPosition);
fillBlockIndexInfoDetails(convertFileMeta.getNum_rows(), carbonDataFilePath, currentPosition);
writer.writeFooter(convertFileMeta, currentPosition);
} catch (IOException e) {
throw new CarbonDataWriterException("Problem while writing the carbon file: ", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,7 @@ protected void writeBlockletInfoToFile(FileChannel channel,
.convertFilterFooter2(blockletInfoList, localCardinality, thriftColumnSchemaList,
dataChunksOffsets, dataChunksLength);
// fill the carbon index details
fillBlockIndexInfoDetails(convertFileMeta.getNum_rows(), filePath, currentPosition);
fillBlockIndexInfoDetails(convertFileMeta.getNum_rows(), carbonDataFilePath, currentPosition);
// write the footer
writer.writeFooter(convertFileMeta, currentPosition);
} catch (IOException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,7 @@ private byte[] getByteArray(short[] data) {
.convertFileFooterVersion3(blockletMetadata, blockletIndex, localCardinality,
thriftColumnSchemaList.size(), dataWriterVo.getSegmentProperties());
// fill the carbon index details
fillBlockIndexInfoDetails(convertFileMeta.getNum_rows(), filePath, currentPosition);
fillBlockIndexInfoDetails(convertFileMeta.getNum_rows(), carbonDataFilePath, currentPosition);
// write the footer
byte[] byteArray = CarbonUtil.getByteArray(convertFileMeta);
ByteBuffer buffer =
Expand Down Expand Up @@ -528,8 +528,7 @@ protected void fillBlockIndexInfoDetails(long numberOfRows, String filePath,
org.apache.carbondata.core.metadata.blocklet.index.BlockletIndex blockletIndex =
new org.apache.carbondata.core.metadata.blocklet.index.BlockletIndex(btree, minmax);
BlockIndexInfo blockIndexInfo =
new BlockIndexInfo(numberOfRows, filePath.substring(0, filePath.lastIndexOf('.')),
currentPosition, blockletIndex);
new BlockIndexInfo(numberOfRows, filePath, currentPosition, blockletIndex);
blockIndexInfoList.add(blockIndexInfo);
}

Expand Down

0 comments on commit 4da2a70

Please sign in to comment.