Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,16 @@ public InternalRow constructEngineRecord(Schema schema,
return new GenericInternalRow(values);
}

@Override
public InternalRow constructEngineRecord(Schema schema, List<Object> values) {
List<Schema.Field> fields = schema.getFields();
if (fields.size() != values.size()) {
throw new IllegalArgumentException(
"Value count (" + values.size() + ") does not match field count (" + fields.size() + ")");
}
return toBinaryRow(schema, new GenericInternalRow(values.toArray()));
}

@Override
public InternalRow seal(InternalRow internalRow) {
return internalRow.copy();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,120 @@ void testConstructEngineRecordWithNullValueFromBase() {
assertTrue(result.getBoolean(2));
}

@Test
void testConstructEngineRecordWithListOfValues() {
List<Object> values = Arrays.asList(1, UTF8String.fromString("Alice"), true);
InternalRow result = readerContext.constructEngineRecord(SCHEMA, values);

assertEquals(1, result.getInt(0));
assertEquals("Alice", result.getString(1));
assertTrue(result.getBoolean(2));
}

@Test
void testConstructEngineRecordWithNullValues() {
List<Object> values = Arrays.asList(null, UTF8String.fromString("Bob"), null);
InternalRow result = readerContext.constructEngineRecord(SCHEMA, values);

assertTrue(result.isNullAt(0));
assertEquals("Bob", result.getString(1));
assertTrue(result.isNullAt(2));
}

@Test
void testConstructEngineRecordWithMixedTypes() {
List<Object> values = Arrays.asList(42, UTF8String.fromString("Carol"), false);
InternalRow result = readerContext.constructEngineRecord(SCHEMA, values);

assertEquals(42, result.getInt(0));
assertEquals("Carol", result.getString(1));
assertFalse(result.getBoolean(2));
}

@Test
void testConstructEngineRecordWithEmptyValues() {
List<Object> values = Arrays.asList(0, UTF8String.fromString(""), false);
InternalRow result = readerContext.constructEngineRecord(SCHEMA, values);

assertEquals(0, result.getInt(0));
assertEquals("", result.getString(1));
assertFalse(result.getBoolean(2));
}

@Test
void testConstructEngineRecordWithValueCountMismatch() {
List<Object> values = Arrays.asList(1, UTF8String.fromString("Alice")); // Missing boolean value

try {
readerContext.constructEngineRecord(SCHEMA, values);
// Should not reach here
assertTrue(false, "Expected IllegalArgumentException");
} catch (IllegalArgumentException e) {
assertTrue(e.getMessage().contains("Value count (2) does not match field count (3)"));
}
}

@Test
void testConstructEngineRecordWithExtraValues() {
List<Object> values = Arrays.asList(1, UTF8String.fromString("Alice"), true, "extra");

try {
readerContext.constructEngineRecord(SCHEMA, values);
// Should not reach here
assertTrue(false, "Expected IllegalArgumentException");
} catch (IllegalArgumentException e) {
assertTrue(e.getMessage().contains("Value count (4) does not match field count (3)"));
}
}

@Test
void testConstructEngineRecordWithComplexSchema() {
// Create a more complex schema with nested fields
Schema complexSchema = SchemaBuilder.record("ComplexRecord").fields()
.requiredInt("id")
.requiredString("name")
.requiredBoolean("active")
.requiredLong("timestamp")
.requiredDouble("score")
.endRecord();

List<Object> values = Arrays.asList(
123,
UTF8String.fromString("ComplexName"),
true,
1234567890L,
95.5
);

InternalRow result = readerContext.constructEngineRecord(complexSchema, values);

assertEquals(123, result.getInt(0));
assertEquals("ComplexName", result.getString(1));
assertTrue(result.getBoolean(2));
assertEquals(1234567890L, result.getLong(3));
assertEquals(95.5, result.getDouble(4), 0.001);
}

@Test
void testConstructEngineRecordWithAllNullValues() {
List<Object> values = Arrays.asList(null, null, null);
InternalRow result = readerContext.constructEngineRecord(SCHEMA, values);

assertTrue(result.isNullAt(0));
assertTrue(result.isNullAt(1));
assertTrue(result.isNullAt(2));
}

@Test
void testConstructEngineRecordWithZeroValues() {
List<Object> values = Arrays.asList(0, UTF8String.fromString("Zero"), false);
InternalRow result = readerContext.constructEngineRecord(SCHEMA, values);

assertEquals(0, result.getInt(0));
assertEquals("Zero", result.getString(1));
assertFalse(result.getBoolean(2));
}

static class DummySparkReaderContext extends BaseSparkInternalRowReaderContext {
public DummySparkReaderContext(StorageConfiguration<?> config,
HoodieTableConfig tableConfig) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,21 @@ public IndexedRecord constructEngineRecord(Schema schema,
return engineRecord;
}

@Override
public IndexedRecord constructEngineRecord(Schema schema, List<Object> values) {
List<Schema.Field> fields = schema.getFields();
if (fields.size() != values.size()) {
throw new IllegalArgumentException(
"Value count (" + values.size() + ") does not match field count (" + fields.size() + ")");
}

GenericData.Record record = new GenericData.Record(schema);
for (int i = 0; i < fields.size(); i++) {
record.put(i, values.get(i));
}
return record;
}

@Override
public IndexedRecord seal(IndexedRecord record) {
return record;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -415,6 +415,15 @@ public abstract T constructEngineRecord(Schema schema,
Map<Integer, Object> updateValues,
BufferedRecord<T> baseRecord);

/**
* Constructs a new Engine based record based on a given schema and its field values.
*
* @param schema The schema of the new record.
* @param values The list of values.
* @return A new instance of engine record type {@link T}.
*/
public abstract T constructEngineRecord(Schema schema, List<Object> values);

/**
* Seals the engine-specific record to make sure the data referenced in memory do not change.
*
Expand Down
Loading
Loading