|
39 | 39 | import org.apache.arrow.dataset.source.DatasetFactory; |
40 | 40 | import org.apache.arrow.memory.ReservationListener; |
41 | 41 | import org.apache.arrow.memory.RootAllocator; |
| 42 | +import org.apache.arrow.util.AutoCloseables; |
42 | 43 | import org.apache.arrow.vector.VectorSchemaRoot; |
43 | 44 | import org.apache.arrow.vector.dictionary.Dictionary; |
| 45 | +import org.apache.arrow.vector.ipc.message.ArrowRecordBatch; |
| 46 | +import org.apache.arrow.vector.types.Types; |
44 | 47 | import org.apache.arrow.vector.types.pojo.Schema; |
45 | 48 | import org.junit.Assert; |
46 | 49 | import org.junit.Ignore; |
47 | 50 | import org.junit.Test; |
48 | 51 |
|
| 52 | +import static org.junit.Assert.assertEquals; |
| 53 | + |
49 | 54 | public class NativeDatasetTest { |
50 | 55 |
|
51 | 56 | private String sampleParquetLocal() { |
52 | | - return "file://" + NativeDatasetTest.class.getResource(File.separator + "userdata1.parquet").getPath(); |
| 57 | + return "file://" + resourcePath("userdata1.parquet"); |
| 58 | + } |
| 59 | + |
| 60 | + private String resourcePath(String resource) { |
| 61 | + return NativeDatasetTest.class.getResource(File.separator + resource).getPath(); |
53 | 62 | } |
54 | 63 |
|
55 | 64 | private void testDatasetFactoryEndToEnd(DatasetFactory factory, int taskCount, int vectorCount, int rowCount) { |
@@ -319,6 +328,42 @@ public void testScannerWithEmptyProjector() { |
319 | 328 | allocator.close(); |
320 | 329 | } |
321 | 330 |
|
| 331 | + @Test |
| 332 | + public void testCsvRead() throws Exception { |
| 333 | + RootAllocator allocator = new RootAllocator(Long.MAX_VALUE); |
| 334 | + SingleFileDatasetFactory factory = new SingleFileDatasetFactory(allocator, |
| 335 | + NativeMemoryPool.getDefault(), FileFormat.CSV, "file://" + resourcePath("data/people.csv")); |
| 336 | + ScanOptions options = new ScanOptions(new String[]{}, Filter.EMPTY, 100); |
| 337 | + Schema schema = factory.inspect(); |
| 338 | + NativeDataset dataset = factory.finish(schema); |
| 339 | + NativeScanner nativeScanner = dataset.newScan(options); |
| 340 | + List<? extends ScanTask> scanTasks = collect(nativeScanner.scan()); |
| 341 | + Assert.assertEquals(1, scanTasks.size()); |
| 342 | + ScanTask scanTask = scanTasks.get(0); |
| 343 | + ScanTask.Itr itr = scanTask.scan(); |
| 344 | + |
| 345 | + VectorSchemaRoot vsr = null; |
| 346 | + int rowCount = 0; |
| 347 | + while (itr.hasNext()) { |
| 348 | + // FIXME VectorSchemaRoot is not actually something ITERABLE. Using a reader convention instead. |
| 349 | + vsr = itr.next().valueVectors; |
| 350 | + rowCount += vsr.getRowCount(); |
| 351 | + |
| 352 | + // check if projector is applied |
| 353 | + Assert.assertEquals("Schema<name: Utf8, age: Int(64, true), job: Utf8>", |
| 354 | + vsr.getSchema().toString()); |
| 355 | + } |
| 356 | + Assert.assertEquals(2, rowCount); |
| 357 | + assertEquals(3, schema.getFields().size()); |
| 358 | + assertEquals("name", schema.getFields().get(0).getName()); |
| 359 | + assertEquals("age", schema.getFields().get(1).getName()); |
| 360 | + assertEquals("job", schema.getFields().get(2).getName()); |
| 361 | + if (vsr != null) { |
| 362 | + vsr.close(); |
| 363 | + } |
| 364 | + allocator.close(); |
| 365 | + } |
| 366 | + |
322 | 367 | @Ignore |
323 | 368 | public void testFilter() { |
324 | 369 | // todo |
|
0 commit comments