Skip to content

Commit

Permalink
Rename files written by HivePageSink
Browse files Browse the repository at this point in the history
  • Loading branch information
Nikhil Collooru authored and highker committed Oct 1, 2020
1 parent a74b034 commit c2b2b0d
Show file tree
Hide file tree
Showing 6 changed files with 94 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import com.facebook.presto.hive.HiveFileContext;
import com.facebook.presto.hive.HiveFileInfo;
import com.facebook.presto.hive.filesystem.ExtendedFileSystem;
import com.google.common.util.concurrent.ListenableFuture;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.ContentSummary;
Expand Down Expand Up @@ -498,4 +499,11 @@ public RemoteIterator<HiveFileInfo> listFiles(Path path)
{
return dataTier.listFiles(path);
}

@Override
public ListenableFuture<Void> renameFileAsync(Path source, Path destination)
throws IOException
{
return dataTier.renameFileAsync(source, destination);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import com.facebook.presto.hive.HiveFileContext;
import com.facebook.presto.hive.HiveFileInfo;
import com.google.common.util.concurrent.ListenableFuture;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
Expand Down Expand Up @@ -43,4 +44,10 @@ public RemoteIterator<LocatedFileStatus> listDirectory(Path path)
{
throw new UnsupportedOperationException();
}

public ListenableFuture<Void> renameFileAsync(Path source, Path destination)
throws IOException
{
throw new UnsupportedOperationException();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import com.facebook.presto.hive.HiveFileContext;
import com.facebook.presto.hive.filesystem.ExtendedFileSystem;
import com.google.common.util.concurrent.ListenableFuture;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.permission.AclEntry;
import org.apache.hadoop.fs.permission.AclStatus;
Expand All @@ -29,6 +30,8 @@
import java.util.List;
import java.util.Map;

import static com.google.common.util.concurrent.Futures.immediateFuture;

public class HadoopExtendedFileSystem
extends ExtendedFileSystem
{
Expand Down Expand Up @@ -189,6 +192,14 @@ public boolean rename(Path src, Path dst)
return fs.rename(src, dst);
}

@Override
public ListenableFuture<Void> renameFileAsync(Path src, Path dst)
throws IOException
{
fs.rename(src, dst);
return immediateFuture(null);
}

@Override
protected void rename(Path src, Path dst, Options.Rename... options)
throws IOException
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.facebook.presto.common.block.IntArrayBlockBuilder;
import com.facebook.presto.common.type.Type;
import com.facebook.presto.common.type.TypeManager;
import com.facebook.presto.hive.filesystem.ExtendedFileSystem;
import com.facebook.presto.spi.ConnectorPageSink;
import com.facebook.presto.spi.ConnectorSession;
import com.facebook.presto.spi.PageIndexer;
Expand All @@ -31,10 +32,13 @@
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.SettableFuture;
import io.airlift.slice.Slice;
import it.unimi.dsi.fastutil.objects.Object2IntMap;
import it.unimi.dsi.fastutil.objects.Object2IntOpenHashMap;
import org.apache.hadoop.fs.Path;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
Expand All @@ -46,11 +50,16 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;

import static com.facebook.airlift.concurrent.MoreFutures.addSuccessCallback;
import static com.facebook.airlift.concurrent.MoreFutures.toListenableFuture;
import static com.facebook.presto.common.type.IntegerType.INTEGER;
import static com.facebook.presto.hive.HiveBucketFunction.createHiveCompatibleBucketFunction;
import static com.facebook.presto.hive.HiveBucketFunction.createPrestoNativeBucketFunction;
import static com.facebook.presto.hive.HiveErrorCode.HIVE_FILESYSTEM_ERROR;
import static com.facebook.presto.hive.HiveErrorCode.HIVE_TOO_MANY_OPEN_PARTITIONS;
import static com.facebook.presto.hive.HiveErrorCode.HIVE_WRITER_CLOSE_ERROR;
import static com.facebook.presto.hive.PartitionUpdate.FileWriteInfo;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Verify.verify;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
Expand Down Expand Up @@ -94,6 +103,8 @@ public class HivePageSink
private long systemMemoryUsage;
private long validationCpuNanos;

private boolean waitForFileRenaming;

public HivePageSink(
HiveWriterFactory writerFactory,
List<HiveColumnHandle> inputColumns,
Expand Down Expand Up @@ -228,6 +239,19 @@ private ListenableFuture<Collection<Slice>> doFinish()
.mapToLong(HiveWriter::getValidationCpuNanos)
.sum();

if (waitForFileRenaming && verificationTasks.isEmpty()) {
ImmutableList.Builder<Slice> partitionUpdatesWithRenamedFileNames = ImmutableList.builder();
List<ListenableFuture<?>> futures = new ArrayList<>();
for (int i = 0; i < writers.size(); i++) {
int writerIndex = i;
ListenableFuture<?> fileNameFuture = toListenableFuture(hiveMetadataUpdater.getMetadataResult(writerIndex));
SettableFuture renamingFuture = SettableFuture.create();
futures.add(renamingFuture);
addSuccessCallback(fileNameFuture, obj -> renameFiles((String) obj, writerIndex, renamingFuture, partitionUpdatesWithRenamedFileNames));
}
return Futures.transform(Futures.allAsList(futures), input -> partitionUpdatesWithRenamedFileNames.build(), directExecutor());
}

if (verificationTasks.isEmpty()) {
return Futures.immediateFuture(result);
}
Expand Down Expand Up @@ -354,6 +378,47 @@ private void sendMetadataUpdateRequest(Optional<String> partitionName, int write
return;
}
hiveMetadataUpdater.addMetadataUpdateRequest(schemaName, tableName, partitionName, writerIndex);
waitForFileRenaming = true;
}

private void renameFiles(String fileName, int writerIndex, SettableFuture<?> renamingFuture, ImmutableList.Builder<Slice> partitionUpdatesWithRenamedFileNames)
{
HdfsContext context = new HdfsContext(session, schemaName, tableName);
HiveWriter writer = writers.get(writerIndex);
PartitionUpdate partitionUpdate = writer.getPartitionUpdate();

// Check that only one file is written by a writer
checkArgument(partitionUpdate.getFileWriteInfos().size() == 1, "HiveWriter wrote data to more than one file");

FileWriteInfo fileWriteInfo = partitionUpdate.getFileWriteInfos().get(0);
Path fromPath = new Path(partitionUpdate.getWritePath(), fileWriteInfo.getWriteFileName());
Path toPath = new Path(partitionUpdate.getWritePath(), fileName);
try {
ExtendedFileSystem fileSystem = hdfsEnvironment.getFileSystem(context, fromPath);
ListenableFuture<Void> asyncFuture = fileSystem.renameFileAsync(fromPath, toPath);
addSuccessCallback(asyncFuture, () -> updateFileInfo(partitionUpdatesWithRenamedFileNames, renamingFuture, partitionUpdate, fileName, fileWriteInfo, writerIndex));
}
catch (IOException e) {
throw new PrestoException(HIVE_FILESYSTEM_ERROR, format("Error renaming file. fromPath: %s toPath: %s", fromPath, toPath), e);
}
}

private void updateFileInfo(ImmutableList.Builder<Slice> partitionUpdatesWithRenamedFileNames, SettableFuture<?> renamingFuture, PartitionUpdate partitionUpdate, String fileName, FileWriteInfo fileWriteInfo, int writerIndex)
{
// Update the file info in partitionUpdate with new filename
FileWriteInfo fileInfoWithRenamedFileName = new FileWriteInfo(fileName, fileName, fileWriteInfo.getFileSize());
PartitionUpdate partitionUpdateWithRenamedFileName = new PartitionUpdate(partitionUpdate.getName(),
partitionUpdate.getUpdateMode(),
partitionUpdate.getWritePath(),
partitionUpdate.getTargetPath(),
ImmutableList.of(fileInfoWithRenamedFileName),
partitionUpdate.getRowCount(),
partitionUpdate.getInMemoryDataSizeInBytes(),
partitionUpdate.getOnDiskDataSizeInBytes());
partitionUpdatesWithRenamedFileNames.add(wrappedBuffer(partitionUpdateCodec.toJsonBytes(partitionUpdateWithRenamedFileName)));

hiveMetadataUpdater.removeResultFuture(writerIndex);
renamingFuture.set(null);
}

private int[] getWriterIndexes(Page page)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ private ConnectorPageSink createPageSink(HiveWritableTableHandle handle, boolean
handle.getInputColumns(),
handle.getBucketProperty(),
handle.getSchemaName(),
handle.getSchemaName(),
handle.getTableName(),
pageIndexerFactory,
typeManager,
hdfsEnvironment,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -325,7 +325,8 @@ public HiveWriter createWriter(Page partitionColumns, int position, OptionalInt
String extension = getFileExtension(writerParameters.getOutputStorageFormat(), compressionCodec);
String targetFileName;
if (bucketNumber.isPresent()) {
targetFileName = computeBucketedFileName(filePrefix, bucketNumber.getAsInt()) + extension;
// Use the bucket number for file name.
targetFileName = bucketNumber.getAsInt() + extension;
}
else {
targetFileName = filePrefix + "_" + randomUUID() + extension;
Expand Down

0 comments on commit c2b2b0d

Please sign in to comment.