Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 68 additions & 8 deletions sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@

import static org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions.RESOLVE_FILE;
import static org.apache.beam.sdk.transforms.Contextful.fn;
import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument;
import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState;

import com.google.auto.value.AutoValue;
import java.io.IOException;
Expand Down Expand Up @@ -74,10 +74,10 @@
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.TypeDescriptor;
import org.apache.beam.sdk.values.TypeDescriptors;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.MoreObjects;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Objects;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.MoreObjects;
import com.google.common.base.Objects;
import com.google.common.collect.Lists;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.joda.time.Duration;
import org.joda.time.Instant;
Expand Down Expand Up @@ -548,8 +548,9 @@ public MatchConfiguration withEmptyMatchTreatment(EmptyMatchTreatment treatment)
public MatchConfiguration continuously(
Duration interval, TerminationCondition<String, ?> condition, boolean matchUpdatedFiles) {
LOG.warn(
"Matching Continuously is stateful, and can scale poorly. Consider using Pub/Sub "
+ "Notifications (https://cloud.google.com/storage/docs/pubsub-notifications) if possible");
"Matching Continuously is stateful, and can scale poorly. Consider using Pub/Sub"
+ " Notifications (https://cloud.google.com/storage/docs/pubsub-notifications) if"
+ " possible");
return toBuilder()
.setWatchInterval(interval)
.setWatchTerminationCondition(condition)
Expand Down Expand Up @@ -1059,6 +1060,12 @@ public static FileNaming relativeFileNaming(

abstract @Nullable Integer getMaxNumWritersPerBundle();

abstract @Nullable Integer getBatchSize();

abstract @Nullable Integer getBatchSizeBytes();

abstract @Nullable Duration getBatchMaxBufferingDuration();

abstract @Nullable ErrorHandler<BadRecord, ?> getBadRecordErrorHandler();

abstract Builder<DestinationT, UserT> toBuilder();
Expand Down Expand Up @@ -1112,6 +1119,13 @@ abstract Builder<DestinationT, UserT> setSharding(
abstract Builder<DestinationT, UserT> setMaxNumWritersPerBundle(
@Nullable Integer maxNumWritersPerBundle);

abstract Builder<DestinationT, UserT> setBatchSize(@Nullable Integer batchSize);

abstract Builder<DestinationT, UserT> setBatchSizeBytes(@Nullable Integer batchSizeBytes);

abstract Builder<DestinationT, UserT> setBatchMaxBufferingDuration(
@Nullable Duration batchMaxBufferingDuration);

abstract Builder<DestinationT, UserT> setBadRecordErrorHandler(
@Nullable ErrorHandler<BadRecord, ?> badRecordErrorHandler);

Expand Down Expand Up @@ -1301,6 +1315,7 @@ public Write<DestinationT, UserT> withDestinationCoder(Coder<DestinationT> desti
*/
public Write<DestinationT, UserT> withNumShards(int numShards) {
checkArgument(numShards >= 0, "numShards must be non-negative, but was: %s", numShards);
checkArgument(!getAutoSharding(), "Cannot set numShards when withAutoSharding() is used");
if (numShards == 0) {
return withNumShards(null);
}
Expand All @@ -1311,6 +1326,7 @@ public Write<DestinationT, UserT> withNumShards(int numShards) {
* Like {@link #withNumShards(int)}. Specifying {@code null} means runner-determined sharding.
*/
public Write<DestinationT, UserT> withNumShards(@Nullable ValueProvider<Integer> numShards) {
checkArgument(!getAutoSharding(), "Cannot set numShards when withAutoSharding() is used");
return toBuilder().setNumShards(numShards).build();
}

Expand All @@ -1321,6 +1337,7 @@ public Write<DestinationT, UserT> withNumShards(@Nullable ValueProvider<Integer>
public Write<DestinationT, UserT> withSharding(
PTransform<PCollection<UserT>, PCollectionView<Integer>> sharding) {
checkArgument(sharding != null, "sharding can not be null");
checkArgument(!getAutoSharding(), "Cannot set sharding when withAutoSharding() is used");
return toBuilder().setSharding(sharding).build();
}

Expand All @@ -1337,6 +1354,9 @@ public Write<DestinationT, UserT> withIgnoreWindowing() {
}

public Write<DestinationT, UserT> withAutoSharding() {
checkArgument(
getNumShards() == null && getSharding() == null,
"Cannot use withAutoSharding() when withNumShards() or withSharding() is set");
return toBuilder().setAutoSharding(true).build();
}

Expand Down Expand Up @@ -1366,6 +1386,37 @@ public Write<DestinationT, UserT> withBadRecordErrorHandler(
return toBuilder().setBadRecordErrorHandler(errorHandler).build();
}

/**
* Returns a new {@link Write} that will batch the input records using specified batch size. The
* default value is {@link WriteFiles#FILE_TRIGGERING_RECORD_COUNT}.
*
* <p>This option is used only for writing unbounded data with auto-sharding.
*/
public Write<DestinationT, UserT> withBatchSize(@Nullable Integer batchSize) {
return toBuilder().setBatchSize(batchSize).build();
}

/**
* Returns a new {@link Write} that will batch the input records using specified batch size in
* bytes. The default value is {@link WriteFiles#FILE_TRIGGERING_BYTE_COUNT}.
*
* <p>This option is used only for writing unbounded data with auto-sharding.
*/
public Write<DestinationT, UserT> withBatchSizeBytes(@Nullable Integer batchSizeBytes) {
return toBuilder().setBatchSizeBytes(batchSizeBytes).build();
}

/**
* Returns a new {@link Write} that will batch the input records using specified max buffering
* duration. The default value is {@link WriteFiles#FILE_TRIGGERING_RECORD_BUFFERING_DURATION}.
*
* <p>This option is used only for writing unbounded data with auto-sharding.
*/
public Write<DestinationT, UserT> withBatchMaxBufferingDuration(
@Nullable Duration batchMaxBufferingDuration) {
return toBuilder().setBatchMaxBufferingDuration(batchMaxBufferingDuration).build();
}

@VisibleForTesting
Contextful<Fn<DestinationT, FileNaming>> resolveFileNamingFn() {
if (getDynamic()) {
Expand Down Expand Up @@ -1482,6 +1533,15 @@ public WriteFilesResult<DestinationT> expand(PCollection<UserT> input) {
if (getBadRecordErrorHandler() != null) {
writeFiles = writeFiles.withBadRecordErrorHandler(getBadRecordErrorHandler());
}
if (getBatchSize() != null) {
writeFiles = writeFiles.withBatchSize(getBatchSize());
}
if (getBatchSizeBytes() != null) {
writeFiles = writeFiles.withBatchSizeBytes(getBatchSizeBytes());
}
if (getBatchMaxBufferingDuration() != null) {
writeFiles = writeFiles.withBatchMaxBufferingDuration(getBatchMaxBufferingDuration());
}
return input.apply(writeFiles);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,17 @@
*/
package org.apache.beam.sdk.io;

import static com.google.common.base.MoreObjects.firstNonNull;
import static org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions.RESOLVE_FILE;
import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.MoreObjects.firstNonNull;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.isA;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;

import com.google.common.collect.Lists;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
Expand All @@ -38,35 +42,44 @@
import java.nio.file.Paths;
import java.nio.file.StandardCopyOption;
import java.nio.file.attribute.FileTime;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.zip.GZIPOutputStream;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.coders.VarIntCoder;
import org.apache.beam.sdk.io.fs.EmptyMatchTreatment;
import org.apache.beam.sdk.io.fs.MatchResult;
import org.apache.beam.sdk.io.fs.MatchResult.Metadata;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.state.StateSpec;
import org.apache.beam.sdk.state.StateSpecs;
import org.apache.beam.sdk.state.ValueState;
import org.apache.beam.sdk.testing.NeedsRunner;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.testing.UsesUnboundedPCollections;
import org.apache.beam.sdk.testing.UsesUnboundedSplittableParDo;
import org.apache.beam.sdk.transforms.Contextful;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.Requirements;
import org.apache.beam.sdk.transforms.SerializableFunctions;
import org.apache.beam.sdk.transforms.View;
import org.apache.beam.sdk.transforms.Watch;
import org.apache.beam.sdk.transforms.windowing.AfterWatermark;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollection.IsBounded;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.TypeDescriptor;
import org.apache.beam.sdk.values.TypeDescriptors;
Expand All @@ -91,6 +104,11 @@ public class FileIOTest implements Serializable {

@Rule public transient Timeout globalTimeout = Timeout.seconds(1200);

private static final int CUSTOM_FILE_TRIGGERING_RECORD_COUNT = 50000;
private static final int CUSTOM_FILE_TRIGGERING_BYTE_COUNT = 32 * 1024 * 1024; // 32MiB
private static final Duration CUSTOM_FILE_TRIGGERING_RECORD_BUFFERING_DURATION =
Duration.standardSeconds(4);

@Test
@Category(NeedsRunner.class)
public void testMatchAndMatchAll() throws IOException {
Expand Down Expand Up @@ -547,4 +565,77 @@ public void testFileIoDynamicNaming() throws IOException {
"Output file shard 0 exists after pipeline completes",
new File(outputFileName + "-0").exists());
}

@Test
@Category({NeedsRunner.class, UsesUnboundedPCollections.class})
public void testWriteUnboundedWithCustomBatchParameters() throws IOException {
File root = tmpFolder.getRoot();
List<String> inputs = Arrays.asList("one", "two", "three", "four", "five", "six");

PTransform<PCollection<String>, PCollection<String>> transform =
Window.<String>into(FixedWindows.of(Duration.standardSeconds(10)))
.triggering(AfterWatermark.pastEndOfWindow())
.withAllowedLateness(Duration.ZERO)
.discardingFiredPanes();

FileIO.Write<Void, String> write =
FileIO.<String>write()
.via(TextIO.sink())
.to(root.getAbsolutePath())
.withPrefix("output")
.withSuffix(".txt")
.withAutoSharding()
.withBatchSize(CUSTOM_FILE_TRIGGERING_RECORD_COUNT)
.withBatchSizeBytes(CUSTOM_FILE_TRIGGERING_BYTE_COUNT)
.withBatchMaxBufferingDuration(CUSTOM_FILE_TRIGGERING_RECORD_BUFFERING_DURATION);

// Prepare timestamps for the elements.
List<Long> timestamps = new ArrayList<>();
for (long i = 0; i < inputs.size(); i++) {
timestamps.add(i + 1);
}

p.apply(Create.timestamped(inputs, timestamps).withCoder(StringUtf8Coder.of()))
.setIsBoundedInternal(IsBounded.UNBOUNDED)
.apply(transform)
.apply(write);
p.run().waitUntilFinish();

// Verify that the custom batch parameters are set.
assertEquals(CUSTOM_FILE_TRIGGERING_RECORD_COUNT, write.getBatchSize().intValue());
assertEquals(CUSTOM_FILE_TRIGGERING_BYTE_COUNT, write.getBatchSizeBytes().intValue());
assertEquals(
CUSTOM_FILE_TRIGGERING_RECORD_BUFFERING_DURATION, write.getBatchMaxBufferingDuration());

checkFileContents(root, "output", inputs);
}

static void checkFileContents(File rootDir, String prefix, List<String> inputs)
throws IOException {
List<File> outputFiles = Lists.newArrayList();
final String pattern = new File(rootDir, prefix).getAbsolutePath() + "*";
List<Metadata> metadata =
FileSystems.match(Collections.singletonList(pattern)).get(0).metadata();
for (Metadata meta : metadata) {
outputFiles.add(new File(meta.resourceId().toString()));
}
assertFalse("Should have produced at least 1 output file", outputFiles.isEmpty());

List<String> actual = Lists.newArrayList();
for (File outputFile : outputFiles) {
List<String> actualShard = Lists.newArrayList();
try (BufferedReader reader =
Files.newBufferedReader(outputFile.toPath(), StandardCharsets.UTF_8)) {
for (; ; ) {
String line = reader.readLine();
if (line == null) {
break;
}
actualShard.add(line);
}
}
actual.addAll(actualShard);
}
assertThat(actual, containsInAnyOrder(inputs.toArray()));
}
}
Loading