1616 * limitations under the License.
1717 */
1818
19- package org .apache .xtable .service . it ;
19+ package org .apache .xtable .service ;
2020
2121import static org .apache .xtable .GenericTable .getTableName ;
2222import static org .apache .xtable .model .storage .TableFormat .DELTA ;
2929
3030import java .io .File ;
3131import java .io .IOException ;
32- import java .nio .ByteBuffer ;
3332import java .nio .file .Files ;
3433import java .nio .file .Path ;
3534import java .util .ArrayList ;
3635import java .util .Arrays ;
37- import java .util .Base64 ;
3836import java .util .Collections ;
3937import java .util .Comparator ;
4038import java .util .HashMap ;
5856
5957import org .apache .hudi .common .config .HoodieMetadataConfig ;
6058
61- import com .fasterxml .jackson .core .JsonProcessingException ;
62- import com .fasterxml .jackson .databind .JsonNode ;
63- import com .fasterxml .jackson .databind .ObjectMapper ;
64- import com .fasterxml .jackson .databind .node .ObjectNode ;
65-
6659import org .apache .xtable .GenericTable ;
6760import org .apache .xtable .hudi .HudiTestUtil ;
6861import org .apache .xtable .model .storage .TableFormat ;
69- import org .apache .xtable .service .ConversionService ;
7062import org .apache .xtable .service .models .ConvertTableRequest ;
7163import org .apache .xtable .service .models .ConvertTableResponse ;
7264import org .apache .xtable .service .models .ConvertedTable ;
7365
7466import io .quarkus .test .junit .QuarkusTest ;
75- import io .quarkus .test .junit .TestProfile ;
7667import jakarta .inject .Inject ;
7768
78- @ TestProfile (ConversionTestProfile .class )
7969@ QuarkusTest
8070public class ITConversionService {
8171
8272 @ Inject ConversionService conversionService ;
8373 private static Path tempDir ;
84- private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper ();
8574 protected static JavaSparkContext jsc ;
8675 protected static SparkSession sparkSession ;
8776
@@ -139,11 +128,11 @@ public void testVariousOperations(String sourceTableFormat, boolean isPartitione
139128 sourceTableFormat ,
140129 tableName ,
141130 table .getBasePath (),
131+ table .getDataPath (),
142132 targetTableFormats ,
143133 partitionConfig );
144134 ConvertTableResponse response = conversionService .convertTable (request );
145135 assertConversionResponse (response , targetTableFormats );
146-
147136 checkDatasetEquivalence (sourceTableFormat , table , targetTableFormats , 100 );
148137
149138 // Make multiple commits and then sync with service
@@ -160,11 +149,55 @@ public void testVariousOperations(String sourceTableFormat, boolean isPartitione
160149 checkDatasetEquivalenceWithFilter (
161150 sourceTableFormat , table , targetTableFormats , table .getFilterQuery ());
162151 }
152+
153+ try (GenericTable tableWithUpdatedSchema =
154+ GenericTable .getInstanceWithAdditionalColumns (
155+ tableName , tempDir , sparkSession , jsc , sourceTableFormat , isPartitioned )) {
156+ ConvertTableRequest request =
157+ createConvertTableRequest (
158+ sourceTableFormat ,
159+ tableName ,
160+ tableWithUpdatedSchema .getBasePath (),
161+ tableWithUpdatedSchema .getDataPath (),
162+ targetTableFormats ,
163+ partitionConfig );
164+
165+ List <Row > insertsAfterSchemaUpdate = tableWithUpdatedSchema .insertRows (100 );
166+ tableWithUpdatedSchema .reload ();
167+ ConvertTableResponse response = conversionService .convertTable (request );
168+ assertConversionResponse (response , targetTableFormats );
169+ checkDatasetEquivalence (sourceTableFormat , tableWithUpdatedSchema , targetTableFormats , 280 );
170+
171+ tableWithUpdatedSchema .deleteRows (insertsAfterSchemaUpdate .subList (60 , 90 ));
172+ response = conversionService .convertTable (request );
173+ assertConversionResponse (response , targetTableFormats );
174+ checkDatasetEquivalence (sourceTableFormat , tableWithUpdatedSchema , targetTableFormats , 250 );
175+
176+ if (isPartitioned ) {
177+ // Adds new partition.
178+ tableWithUpdatedSchema .insertRecordsForSpecialPartition (50 );
179+ response = conversionService .convertTable (request );
180+ assertConversionResponse (response , targetTableFormats );
181+ checkDatasetEquivalence (sourceTableFormat , tableWithUpdatedSchema , targetTableFormats , 300 );
182+
183+ // Drops partition.
184+ tableWithUpdatedSchema .deleteSpecialPartition ();
185+ response = conversionService .convertTable (request );
186+ assertConversionResponse (response , targetTableFormats );
187+ checkDatasetEquivalence (sourceTableFormat , tableWithUpdatedSchema , targetTableFormats , 250 );
188+
189+ // Insert records to the dropped partition again.
190+ tableWithUpdatedSchema .insertRecordsForSpecialPartition (50 );
191+ response = conversionService .convertTable (request );
192+ assertConversionResponse (response , targetTableFormats );
193+ checkDatasetEquivalence (sourceTableFormat , tableWithUpdatedSchema , targetTableFormats , 300 );
194+ }
195+ }
163196 }
164197
165198 private static Stream <Arguments > generateTestParametersFormatsAndPartitioning () {
166199 List <Arguments > arguments = new ArrayList <>();
167- for (String sourceTableFormat : Arrays .asList (DELTA , ICEBERG )) {
200+ for (String sourceTableFormat : Arrays .asList (HUDI , DELTA , ICEBERG )) {
168201 for (boolean isPartitioned : new boolean [] {true , false }) {
169202 arguments .add (Arguments .of (sourceTableFormat , isPartitioned ));
170203 }
@@ -178,60 +211,6 @@ protected static List<String> getOtherFormats(String sourceTableFormat) {
178211 .collect (Collectors .toList ());
179212 }
180213
181- private void compareDatasetWithUUID (List <String > dataset1Rows , List <String > dataset2Rows ) {
182- for (int i = 0 ; i < dataset1Rows .size (); i ++) {
183- String row1 = dataset1Rows .get (i );
184- String row2 = dataset2Rows .get (i );
185- if (row1 .contains ("uuid_field" ) && row2 .contains ("uuid_field" )) {
186- try {
187- JsonNode node1 = OBJECT_MAPPER .readTree (row1 );
188- JsonNode node2 = OBJECT_MAPPER .readTree (row2 );
189-
190- // check uuid field
191- String uuidStr1 = node1 .get ("uuid_field" ).asText ();
192- byte [] bytes = Base64 .getDecoder ().decode (node2 .get ("uuid_field" ).asText ());
193- ByteBuffer bb = ByteBuffer .wrap (bytes );
194- UUID uuid2 = new UUID (bb .getLong (), bb .getLong ());
195- String uuidStr2 = uuid2 .toString ();
196- assertEquals (
197- uuidStr1 ,
198- uuidStr2 ,
199- String .format (
200- "Datasets are not equivalent when reading from Spark. Source: %s, Target: %s" ,
201- uuidStr1 , uuidStr2 ));
202-
203- // check other fields
204- ((ObjectNode ) node1 ).remove ("uuid_field" );
205- ((ObjectNode ) node2 ).remove ("uuid_field" );
206- assertEquals (
207- node1 .toString (),
208- node2 .toString (),
209- String .format (
210- "Datasets are not equivalent when comparing other fields. Source: %s, Target: %s" ,
211- node1 , node2 ));
212- } catch (JsonProcessingException e ) {
213- throw new RuntimeException (e );
214- }
215- } else {
216- assertEquals (
217- row1 ,
218- row2 ,
219- String .format (
220- "Datasets are not equivalent when reading from Spark. Source: %s, Target: %s" ,
221- row1 , row2 ));
222- }
223- }
224- }
225-
226- private boolean containsUUIDFields (List <String > rows ) {
227- for (String row : rows ) {
228- if (row .contains ("\" uuid_field\" " )) {
229- return true ;
230- }
231- }
232- return false ;
233- }
234-
235214 protected void checkDatasetEquivalenceWithFilter (
236215 String sourceFormat ,
237216 GenericTable <?, ?> sourceTable ,
@@ -296,7 +275,7 @@ private void checkDatasetEquivalence(
296275 .read ()
297276 .options (finalTargetOptions )
298277 .format (targetFormat .toLowerCase ())
299- .load (sourceTable .getBasePath ())
278+ .load (sourceTable .getDataPath ())
300279 .orderBy (sourceTable .getOrderByColumn ())
301280 .filter (filterCondition );
302281 }));
@@ -320,24 +299,20 @@ private void checkDatasetEquivalence(
320299 // if count is not known ahead of time, ensure datasets are non-empty
321300 assertFalse (dataset1Rows .isEmpty ());
322301 }
323-
324- if (containsUUIDFields (dataset1Rows ) && containsUUIDFields (dataset2Rows )) {
325- compareDatasetWithUUID (dataset1Rows , dataset2Rows );
326- } else {
327- assertEquals (
328- dataset1Rows ,
329- dataset2Rows ,
330- String .format (
331- "Datasets are not equivalent when reading from Spark. Source: %s, Target: %s" ,
332- sourceFormat , format ));
333- }
302+ assertEquals (
303+ dataset1Rows ,
304+ dataset2Rows ,
305+ String .format (
306+ "Datasets are not equivalent when reading from Spark. Source: %s, Target: %s" ,
307+ sourceFormat , format ));
334308 });
335309 }
336310
337311 private ConvertTableRequest createConvertTableRequest (
338312 String sourceFormat ,
339313 String tableName ,
340314 String tablePath ,
315+ String dataPath ,
341316 List <String > targetFormats ,
342317 String partitionConfig ) {
343318 Map <String , String > configs = new HashMap <>();
@@ -348,6 +323,7 @@ private ConvertTableRequest createConvertTableRequest(
348323 .sourceFormat (sourceFormat )
349324 .sourceTableName (tableName )
350325 .sourceTablePath (tablePath )
326+ .sourceDataPath (dataPath )
351327 .targetFormats (targetFormats )
352328 .configurations (configs )
353329 .build ();
0 commit comments