Skip to content

Commit

Permalink
[HUDI-6825] Use UTF_8 to encode String to byte array in all places (#…
Browse files Browse the repository at this point in the history
…9634)

Unify the encoding of Java `String` to byte array in Hudi,
especially for writing bytes to the storage,
by using `UTF_8` encoding only.

---------

Co-authored-by: Sagar Sumit <sagarsumit09@gmail.com>
  • Loading branch information
yihua and codope committed Feb 26, 2024
1 parent 7b5b6c7 commit 6f25f41
Show file tree
Hide file tree
Showing 103 changed files with 396 additions and 322 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import java.util.stream.Collectors;

import static org.apache.hudi.common.table.HoodieTableMetaClient.METAFOLDER_NAME;
import static org.apache.hudi.common.util.StringUtils.getUTF8Bytes;

/**
* CLI command to display hudi table options.
Expand Down Expand Up @@ -261,7 +262,7 @@ private static void writeToFile(String filePath, String data) throws IOException
OutputStream os = null;
try {
os = new FileOutputStream(outFile);
os.write(data.getBytes(), 0, data.length());
os.write(getUTF8Bytes(data), 0, data.length());
} finally {
os.close();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,6 @@

package org.apache.hudi.cli.integ;

import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.cli.HoodieCLI;
import org.apache.hudi.cli.commands.TableCommand;
import org.apache.hudi.cli.testutils.HoodieCLIIntegrationTestBase;
Expand All @@ -33,6 +30,10 @@
import org.apache.hudi.utilities.HDFSParquetImporter;
import org.apache.hudi.utilities.functional.TestHDFSParquetImporter;
import org.apache.hudi.utilities.functional.TestHDFSParquetImporter.HoodieTripModel;

import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Path;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.junit.jupiter.api.BeforeEach;
Expand All @@ -49,6 +50,7 @@
import java.util.List;
import java.util.stream.Collectors;

import static org.apache.hudi.common.util.StringUtils.getUTF8Bytes;
import static org.junit.jupiter.api.Assertions.assertAll;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
Expand Down Expand Up @@ -81,7 +83,7 @@ public void init() throws IOException, ParseException {

// create schema file
try (FSDataOutputStream schemaFileOS = fs.create(new Path(schemaFile))) {
schemaFileOS.write(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA.getBytes());
schemaFileOS.write(getUTF8Bytes(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA));
}

importer = new TestHDFSParquetImporter();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
import org.apache.hadoop.fs.Path;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
Expand All @@ -42,6 +41,7 @@

import static org.apache.hudi.common.testutils.FileCreateUtils.baseFileName;
import static org.apache.hudi.common.util.CollectionUtils.createImmutableList;
import static org.apache.hudi.common.util.StringUtils.getUTF8Bytes;

/**
* Class to be used in tests to keep generating test inserts and updates against a corpus.
Expand Down Expand Up @@ -114,7 +114,7 @@ public static void createCommitFileWithMetadata(String basePath, String commitTi
static void createFileWithMetadata(String basePath, Configuration configuration, String name, String content) throws IOException {
Path commitFilePath = new Path(basePath + "/" + HoodieTableMetaClient.METAFOLDER_NAME + "/" + name);
try (FSDataOutputStream os = FSUtils.getFs(basePath, configuration).create(commitFilePath, true)) {
os.writeBytes(new String(content.getBytes(StandardCharsets.UTF_8)));
os.writeBytes(new String(getUTF8Bytes(content)));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,6 @@
import javax.annotation.Nullable;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
Expand All @@ -85,6 +84,7 @@
import static org.apache.hudi.common.table.timeline.HoodieTimeline.COMMIT_ACTION;
import static org.apache.hudi.common.table.timeline.HoodieTimeline.COMPACTION_ACTION;
import static org.apache.hudi.common.table.timeline.HoodieTimeline.GREATER_THAN;
import static org.apache.hudi.common.util.StringUtils.getUTF8Bytes;
import static org.apache.hudi.metadata.HoodieTableMetadata.isMetadataTable;
import static org.apache.hudi.metadata.HoodieTableMetadataUtil.isIndexingCommit;

Expand Down Expand Up @@ -500,7 +500,7 @@ private void completeClustering(HoodieReplaceCommitMetadata metadata,

table.getActiveTimeline().transitionReplaceInflightToComplete(
clusteringInstant,
Option.of(metadata.toJsonString().getBytes(StandardCharsets.UTF_8)));
Option.of(getUTF8Bytes(metadata.toJsonString())));
} catch (Exception e) {
throw new HoodieClusteringException("unable to transition clustering inflight to complete: " + clusteringCommitTime, e);
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,6 @@
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
Expand All @@ -106,6 +105,7 @@

import static org.apache.hudi.avro.AvroSchemaUtils.getAvroRecordQualifiedName;
import static org.apache.hudi.common.model.HoodieCommitMetadata.SCHEMA_KEY;
import static org.apache.hudi.common.util.StringUtils.getUTF8Bytes;
import static org.apache.hudi.metadata.HoodieTableMetadata.getMetadataTableBasePath;

/**
Expand Down Expand Up @@ -285,7 +285,7 @@ protected void commit(HoodieTable table, String commitActionType, String instant
// update Metadata table
writeTableMetadata(table, instantTime, metadata, writeStatuses);
activeTimeline.saveAsComplete(new HoodieInstant(true, commitActionType, instantTime),
Option.of(metadata.toJsonString().getBytes(StandardCharsets.UTF_8)));
Option.of(getUTF8Bytes(metadata.toJsonString())));
}

// Save internal schema
Expand Down Expand Up @@ -1542,7 +1542,7 @@ private void commitTableChange(InternalSchema newSchema, HoodieTableMetaClient m
HoodieCommitMetadata metadata = new HoodieCommitMetadata();
metadata.setOperationType(WriteOperationType.ALTER_SCHEMA);
try {
timeLine.transitionRequestedToInflight(requested, Option.of(metadata.toJsonString().getBytes(StandardCharsets.UTF_8)));
timeLine.transitionRequestedToInflight(requested, Option.of(getUTF8Bytes(metadata.toJsonString())));
} catch (IOException io) {
throw new HoodieCommitException("Failed to commit " + instantTime + " unable to save inflight metadata ", io);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import static org.apache.hudi.common.model.HoodieConsistentHashingMetadata.HASHING_METADATA_COMMIT_FILE_SUFFIX;
import static org.apache.hudi.common.model.HoodieConsistentHashingMetadata.HASHING_METADATA_FILE_SUFFIX;
import static org.apache.hudi.common.model.HoodieConsistentHashingMetadata.getTimestampFromFile;
import static org.apache.hudi.common.util.StringUtils.getUTF8Bytes;

/**
* Utilities class for consistent bucket index metadata management.
Expand Down Expand Up @@ -208,7 +209,7 @@ private static void createCommitMarker(HoodieTable table, Path fileStatus, Path
if (fs.exists(fullPath)) {
return;
}
FileIOUtils.createFileInPath(fs, fullPath, Option.of(StringUtils.EMPTY_STRING.getBytes()));
FileIOUtils.createFileInPath(fs, fullPath, Option.of(getUTF8Bytes(StringUtils.EMPTY_STRING)));
}

/***
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,6 @@
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.time.Instant;
import java.util.Collections;
Expand All @@ -71,6 +70,7 @@
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static org.apache.hudi.common.util.StringUtils.getUTF8Bytes;
import static org.apache.hudi.config.HoodieWriteConfig.WRITE_STATUS_STORAGE_LEVEL_VALUE;

public abstract class BaseCommitActionExecutor<T, I, K, O, R>
Expand Down Expand Up @@ -154,7 +154,7 @@ void saveWorkloadProfileMetadataToInflight(WorkloadProfile profile, String insta
String commitActionType = getCommitActionType();
HoodieInstant requested = new HoodieInstant(State.REQUESTED, commitActionType, instantTime);
activeTimeline.transitionRequestedToInflight(requested,
Option.of(metadata.toJsonString().getBytes(StandardCharsets.UTF_8)),
Option.of(getUTF8Bytes(metadata.toJsonString())),
config.shouldAllowMultiWriteOnSameInstant());
} catch (IOException io) {
throw new HoodieCommitException("Failed to commit " + instantTime + " unable to save inflight metadata ", io);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,11 @@
import org.apache.hudi.table.HoodieTable;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Set;

import static org.apache.hudi.common.util.StringUtils.getUTF8Bytes;

/**
* Base class helps to perform compact.
*
Expand Down Expand Up @@ -83,7 +84,7 @@ public void completeInflightCompaction(HoodieTable table, String compactionCommi
try {
activeTimeline.transitionCompactionInflightToComplete(
HoodieTimeline.getCompactionInflightInstant(compactionCommitTime),
Option.of(commitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8)));
Option.of(getUTF8Bytes(commitMetadata.toJsonString())));
} catch (IOException e) {
throw new HoodieCompactionException(
"Failed to commit " + table.getMetaClient().getBasePath() + " at time " + compactionCommitTime, e);
Expand All @@ -95,7 +96,7 @@ public void completeInflightLogCompaction(HoodieTable table, String logCompactio
try {
activeTimeline.transitionLogCompactionInflightToComplete(
HoodieTimeline.getLogCompactionInflightInstant(logCompactionCommitTime),
Option.of(commitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8)));
Option.of(getUTF8Bytes(commitMetadata.toJsonString())));
} catch (IOException e) {
throw new HoodieCompactionException(
"Failed to commit " + table.getMetaClient().getBasePath() + " at time " + logCompactionCommitTime, e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
Expand All @@ -46,6 +45,7 @@
import java.util.UUID;

import static org.apache.hudi.common.table.log.HoodieLogFormat.DEFAULT_WRITE_TOKEN;
import static org.apache.hudi.common.util.StringUtils.getUTF8Bytes;

public class HoodieTestCommitGenerator {
public static final String BASE_FILE_WRITE_TOKEN = "1-0-1";
Expand Down Expand Up @@ -163,7 +163,7 @@ public static void createCommitFileWithMetadata(
String filename, String content) throws IOException {
Path commitFilePath = new Path(basePath + "/" + HoodieTableMetaClient.METAFOLDER_NAME + "/" + filename);
try (FSDataOutputStream os = FSUtils.getFs(basePath, configuration).create(commitFilePath, true)) {
os.writeBytes(new String(content.getBytes(StandardCharsets.UTF_8)));
os.writeBytes(new String(getUTF8Bytes(content)));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,6 @@

package org.apache.hudi.io.storage;

import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.CellComparatorImpl;
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import org.apache.hadoop.hbase.io.hfile.HFile;
import org.apache.hudi.common.bootstrap.index.HFileBootstrapIndex;
import org.apache.hudi.common.engine.TaskContextSupplier;
import org.apache.hudi.common.fs.FSUtils;
Expand All @@ -40,6 +30,16 @@
import org.apache.hudi.config.HoodieIndexConfig;
import org.apache.hudi.config.HoodieWriteConfig;

import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.CellComparatorImpl;
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import org.apache.hadoop.hbase.io.hfile.HFile;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
Expand Down Expand Up @@ -70,8 +70,9 @@
import static org.apache.hudi.common.testutils.FileSystemTestUtils.RANDOM;
import static org.apache.hudi.common.testutils.SchemaTestUtil.getSchemaFromResource;
import static org.apache.hudi.common.util.CollectionUtils.toStream;
import static org.apache.hudi.io.storage.HoodieHFileConfig.HFILE_COMPARATOR;
import static org.apache.hudi.common.util.StringUtils.getUTF8Bytes;
import static org.apache.hudi.io.storage.HoodieAvroHFileReader.SCHEMA_KEY;
import static org.apache.hudi.io.storage.HoodieHFileConfig.HFILE_COMPARATOR;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
Expand Down Expand Up @@ -130,7 +131,7 @@ protected void verifySchema(Configuration conf, String schemaPath) throws IOExce
FileSystem fs = getFilePath().getFileSystem(conf);
HFile.Reader hfileReader = HoodieHFileUtils.createHFileReader(fs, getFilePath(), new CacheConfig(conf), conf);
assertEquals(getSchemaFromResource(TestHoodieHFileReaderWriter.class, schemaPath),
new Schema.Parser().parse(new String(hfileReader.getHFileInfo().get(SCHEMA_KEY.getBytes()))));
new Schema.Parser().parse(new String(hfileReader.getHFileInfo().get(getUTF8Bytes(SCHEMA_KEY)))));
}

private static Stream<Arguments> populateMetaFieldsAndTestAvroWithMeta() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,11 +51,12 @@
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.text.ParseException;
import java.util.List;
import java.util.stream.Collectors;

import static org.apache.hudi.common.util.StringUtils.getUTF8Bytes;

public class HoodieFlinkTableServiceClient<T> extends BaseHoodieTableServiceClient<List<HoodieRecord<T>>, List<WriteStatus>, List<WriteStatus>> {

private static final Logger LOG = LoggerFactory.getLogger(HoodieFlinkTableServiceClient.class);
Expand Down Expand Up @@ -137,7 +138,7 @@ protected void completeClustering(
LOG.info("Committing Clustering {} finished with result {}.", clusteringCommitTime, metadata);
table.getActiveTimeline().transitionReplaceInflightToComplete(
HoodieTimeline.getReplaceCommitInflightInstant(clusteringCommitTime),
Option.of(metadata.toJsonString().getBytes(StandardCharsets.UTF_8)));
Option.of(getUTF8Bytes(metadata.toJsonString())));
} catch (IOException e) {
throw new HoodieClusteringException(
"Failed to commit " + table.getMetaClient().getBasePath() + " at time " + clusteringCommitTime, e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,13 @@
import org.apache.hudi.avro.HoodieBloomFilterWriteSupport;
import org.apache.hudi.common.bloom.BloomFilter;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;

import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.RowType;
import org.apache.hadoop.conf.Configuration;
import org.apache.parquet.hadoop.api.WriteSupport;

import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.Map;

Expand Down Expand Up @@ -71,7 +71,7 @@ public HoodieBloomFilterRowDataWriteSupport(BloomFilter bloomFilter) {

@Override
protected byte[] getUTF8Bytes(String key) {
return key.getBytes(StandardCharsets.UTF_8);
return StringUtils.getUTF8Bytes(key);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.Collections;
import java.util.Iterator;
Expand All @@ -55,6 +54,8 @@
import java.util.Map;
import java.util.stream.Collectors;

import static org.apache.hudi.common.util.StringUtils.getUTF8Bytes;

/**
* With {@code org.apache.hudi.operator.partitioner.BucketAssigner}, each hoodie record
* is tagged with a bucket ID (partition path + fileID) in streaming way. All the records consumed by this
Expand Down Expand Up @@ -156,7 +157,7 @@ protected void commit(Option<Map<String, String>> extraMetadata, HoodieData<Writ
writeTableMetadata(metadata, writeStatuses, actionType);

activeTimeline.saveAsComplete(new HoodieInstant(true, getCommitActionType(), instantTime),
Option.of(metadata.toJsonString().getBytes(StandardCharsets.UTF_8)));
Option.of(getUTF8Bytes(metadata.toJsonString())));
LOG.info("Committed " + instantTime);
result.setCommitMetadata(Option.of(metadata));
} catch (IOException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.time.Instant;
import java.util.Collections;
Expand All @@ -66,6 +65,8 @@
import java.util.Map;
import java.util.stream.Collectors;

import static org.apache.hudi.common.util.StringUtils.getUTF8Bytes;

public abstract class BaseJavaCommitActionExecutor<T> extends
BaseCommitActionExecutor<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>, HoodieWriteMetadata> {

Expand Down Expand Up @@ -215,7 +216,7 @@ protected void commit(Option<Map<String, String>> extraMetadata, HoodieWriteMeta
writeTableMetadata(metadata, HoodieListData.eager(result.getWriteStatuses()), actionType);

activeTimeline.saveAsComplete(new HoodieInstant(true, getCommitActionType(), instantTime),
Option.of(metadata.toJsonString().getBytes(StandardCharsets.UTF_8)));
Option.of(getUTF8Bytes(metadata.toJsonString())));
LOG.info("Committed " + instantTime);
result.setCommitMetadata(Option.of(metadata));
} catch (IOException e) {
Expand Down
Loading

0 comments on commit 6f25f41

Please sign in to comment.