22
22
23
23
import java .io .File ;
24
24
import java .lang .reflect .Field ;
25
+ import java .lang .reflect .Method ;
26
+ import org .apache .hadoop .conf .Configuration ;
27
+ import org .apache .hadoop .fs .Path ;
25
28
import org .apache .iceberg .Files ;
26
29
import org .apache .iceberg .Schema ;
30
+ import org .apache .iceberg .data .GenericRecord ;
27
31
import org .apache .iceberg .data .Record ;
28
32
import org .apache .iceberg .data .orc .GenericOrcWriter ;
29
33
import org .apache .iceberg .io .FileAppender ;
34
+ import org .apache .iceberg .io .OutputFile ;
30
35
import org .apache .iceberg .types .Types ;
36
+ import org .apache .orc .OrcFile ;
37
+ import org .apache .orc .OrcProto ;
38
+ import org .apache .orc .Reader ;
39
+ import org .apache .orc .StripeInformation ;
40
+ import org .apache .orc .TypeDescription ;
41
+ import org .apache .orc .impl .OrcIndex ;
42
+ import org .apache .orc .impl .RecordReaderImpl ;
31
43
import org .apache .orc .impl .WriterImpl ;
32
44
import org .junit .Assert ;
33
45
import org .junit .Rule ;
@@ -48,14 +60,16 @@ public void testWriteOption() throws Exception {
48
60
File testFile = temp .newFile ();
49
61
Assert .assertTrue ("Delete should succeed" , testFile .delete ());
50
62
63
+ OutputFile outFile = Files .localOutput (testFile );
51
64
try (FileAppender <Record > writer =
52
- ORC .write (Files . localOutput ( testFile ) )
65
+ ORC .write (outFile )
53
66
.createWriterFunc (GenericOrcWriter ::buildWriter )
54
67
.schema (DATA_SCHEMA )
55
68
.set ("write.orc.bloom.filter.columns" , "id,name" )
56
69
.set ("write.orc.bloom.filter.fpp" , "0.04" )
57
70
.build ()) {
58
71
72
+ // Validate whether the bloom filters are set in ORC SDK or not
59
73
Class clazzOrcFileAppender = Class .forName ("org.apache.iceberg.orc.OrcFileAppender" );
60
74
Field writerField = clazzOrcFileAppender .getDeclaredField ("writer" );
61
75
writerField .setAccessible (true );
@@ -72,7 +86,45 @@ public void testWriteOption() throws Exception {
72
86
Assert .assertTrue (bloomFilterColumns [1 ]);
73
87
Assert .assertTrue (bloomFilterColumns [2 ]);
74
88
Assert .assertEquals (0.04 , bloomFilterFpp , 1e-15 );
89
+
90
+ Record recordTemplate = GenericRecord .create (DATA_SCHEMA );
91
+ Record record1 = recordTemplate .copy ("id" , 1L , "name" , "foo" , "price" , 1.0 );
92
+ Record record2 = recordTemplate .copy ("id" , 2L , "name" , "bar" , "price" , 2.0 );
93
+ writer .add (record1 );
94
+ writer .add (record2 );
75
95
}
96
+
97
+ // Validate whether the bloom filters are written ORC files or not
98
+ Class clazzFileDump = Class .forName ("org.apache.orc.tools.FileDump" );
99
+ Method getFormattedBloomFilters =
100
+ clazzFileDump .getDeclaredMethod (
101
+ "getFormattedBloomFilters" ,
102
+ int .class ,
103
+ OrcIndex .class ,
104
+ OrcFile .WriterVersion .class ,
105
+ TypeDescription .Category .class ,
106
+ OrcProto .ColumnEncoding .class );
107
+ getFormattedBloomFilters .setAccessible (true );
108
+
109
+ Reader reader =
110
+ OrcFile .createReader (
111
+ new Path (outFile .location ()), new OrcFile .ReaderOptions (new Configuration ()));
112
+ boolean [] readCols = new boolean [] {false , true , true , false };
113
+ RecordReaderImpl rows = (RecordReaderImpl ) reader .rows ();
114
+ OrcIndex indices = rows .readRowIndex (0 , null , readCols );
115
+ StripeInformation stripe = reader .getStripes ().get (0 );
116
+ OrcProto .StripeFooter footer = rows .readStripeFooter (stripe );
117
+ String bloomFilterString =
118
+ (String )
119
+ getFormattedBloomFilters .invoke (
120
+ null ,
121
+ 1 ,
122
+ indices ,
123
+ reader .getWriterVersion (),
124
+ reader .getSchema ().findSubtype (1 ).getCategory (),
125
+ footer .getColumns (1 ));
126
+
127
+ Assert .assertTrue (bloomFilterString .contains ("Bloom filters for column" ));
76
128
}
77
129
78
130
@ Test
0 commit comments