Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ private[pinot] object PinotUtils {
case FieldSpec.DataType.DOUBLE => DoubleType
case FieldSpec.DataType.STRING => StringType
case FieldSpec.DataType.BYTES => ArrayType(ByteType)
case FieldSpec.DataType.TIMESTAMP => LongType
case FieldSpec.DataType.BOOLEAN => BooleanType
case _ =>
throw PinotException(s"Unsupported pinot data type '$dataType")
}
Expand Down Expand Up @@ -97,6 +99,10 @@ private[pinot] object PinotUtils {
dataTable.getFloat(rowIndex, colIndex)
case ColumnDataType.DOUBLE =>
dataTable.getDouble(rowIndex, colIndex)
case ColumnDataType.TIMESTAMP =>
dataTable.getLong(rowIndex, colIndex)
case ColumnDataType.BOOLEAN =>
dataTable.getInt(rowIndex, colIndex) == 1

// array column types
case ColumnDataType.STRING_ARRAY =>
Expand All @@ -113,6 +119,12 @@ private[pinot] object PinotUtils {
ArrayData.toArrayData(dataTable.getDoubleArray(rowIndex, colIndex).toSeq)
case ColumnDataType.BYTES =>
ArrayData.toArrayData(dataTable.getBytes(rowIndex, colIndex).getBytes)
case ColumnDataType.TIMESTAMP_ARRAY =>
ArrayData.toArrayData(dataTable.getLongArray(rowIndex, colIndex).toSeq)
case ColumnDataType.BOOLEAN_ARRAY =>
ArrayData.toArrayData(
dataTable.getIntArray(rowIndex, colIndex).map(i => i == 1).toSeq
)

case _ =>
throw PinotException(s"'$columnDataType' is not supported")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@
}, {
"name" : "longDim",
"dataType" : "LONG"
}, {
"name": "boolDim",
"dataType": "BOOLEAN"
}, {
"name" : "stringArrayDim",
"dataType" : "STRING",
Expand All @@ -24,6 +27,10 @@
"name" : "floatArrayDim",
"dataType" : "FLOAT",
"singleValueField" : false
}, {
"name": "boolArrayDim",
"dataType": "BOOLEAN",
"singleValueField": false
} ],
"metricFieldSpecs" : [ {
"name" : "floatMetric",
Expand Down Expand Up @@ -56,5 +63,13 @@
"dataType" : "INT",
"timeType" : "DAYS"
}
}
},
"dateTimeFieldSpecs": [
{
"name": "timestampField",
"dataType": "TIMESTAMP",
"format": "1:MILLISECONDS:EPOCH",
"granularity": "1:SECONDS"
}
]
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,11 @@
"type" : "long",
"nullable" : true,
"metadata" : { }
}, {
"name" : "boolDim",
"type" : "boolean",
"nullable" : true,
"metadata" : { }
}, {
"name" : "stringArrayDim",
"type" : {
Expand All @@ -68,6 +73,15 @@
},
"nullable" : true,
"metadata" : { }
}, {
"name" : "boolArrayDim",
"type" : {
"type" : "array",
"elementType" : "boolean",
"containsNull" : true
},
"nullable" : true,
"metadata" : { }
}, {
"name" : "byteDim",
"type" : {
Expand All @@ -82,5 +96,10 @@
"type" : "integer",
"nullable" : true,
"metadata" : { }
}, {
"name" : "timestampField",
"type" : "long",
"nullable" : true,
"metadata" : { }
} ]
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,11 @@ class PinotUtilsTest extends BaseTest {
"longArrayCol",
"floatArrayCol",
"doubleArrayCol",
"byteType"
"byteType",
"timestampArrayCol",
"timestampCol",
"booleanArrayCol",
"booleanCol",
)
val columnTypes = Array(
ColumnDataType.STRING,
Expand All @@ -62,7 +66,11 @@ class PinotUtilsTest extends BaseTest {
ColumnDataType.LONG_ARRAY,
ColumnDataType.FLOAT_ARRAY,
ColumnDataType.DOUBLE_ARRAY,
ColumnDataType.BYTES
ColumnDataType.BYTES,
ColumnDataType.TIMESTAMP_ARRAY,
ColumnDataType.TIMESTAMP,
ColumnDataType.BOOLEAN_ARRAY,
ColumnDataType.BOOLEAN,
)
val dataSchema = new DataSchema(columnNames, columnTypes)

Expand All @@ -79,6 +87,11 @@ class PinotUtilsTest extends BaseTest {
dataTableBuilder.setColumn(8, Array[Float](0, 15.20f))
dataTableBuilder.setColumn(9, Array[Double](0, 10.3d))
dataTableBuilder.setColumn(10, new ByteArray("byte_test".getBytes))
dataTableBuilder.setColumn(11, Array[Long](123L,456L))
dataTableBuilder.setColumn(12, 123L)
dataTableBuilder.setColumn(13, Array[Int](1,0,1,0))
dataTableBuilder.setColumn(14, 1)

dataTableBuilder.finishRow()
val dataTable = dataTableBuilder.build()

Expand All @@ -94,7 +107,11 @@ class PinotUtilsTest extends BaseTest {
StructField("strCol", StringType),
StructField("floatArrayCol", ArrayType(FloatType)),
StructField("floatCol", FloatType),
StructField("byteType", ArrayType(ByteType))
StructField("byteType", ArrayType(ByteType)),
StructField("timestampArrayCol", ArrayType(LongType)),
StructField("timestampCol", LongType),
StructField("booleanArrayCol", ArrayType(BooleanType)),
StructField("booleanCol", BooleanType),
)
)

Expand All @@ -112,6 +129,10 @@ class PinotUtilsTest extends BaseTest {
result.getArray(8) shouldEqual ArrayData.toArrayData(Seq(0f, 15.20f))
result.getFloat(9) shouldEqual 10.05f
result.getArray(10) shouldEqual ArrayData.toArrayData("byte_test".getBytes)
result.getArray(11) shouldEqual ArrayData.toArrayData(Seq(123L,456L))
result.getLong(12) shouldEqual 123L
result.getArray(13) shouldEqual ArrayData.toArrayData(Seq(true, false, true, false))
result.getBoolean(14) shouldEqual true
}

test("Method should throw field not found exception while converting pinot data table") {
Expand Down Expand Up @@ -148,5 +169,4 @@ class PinotUtilsTest extends BaseTest {
val sparkSchema = DataType.fromJson(sparkSchemaAsString).asInstanceOf[StructType]
resultSchema.fields should contain theSameElementsAs sparkSchema.fields
}

}