Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,9 @@ public void process(PipelineOptions pipelineOptions, @Element KV<String, Operati
// TODO: Storage API should provide a more-specific way of identifying this failure.
return RetryType.DONT_RETRY;
}
if (statusCode.equals(Code.NOT_FOUND)) {
return RetryType.DONT_RETRY;
}
}
return RetryType.RETRY_ALL_OPERATIONS;
},
Expand Down Expand Up @@ -207,6 +210,13 @@ public void process(PipelineOptions pipelineOptions, @Element KV<String, Operati
+ " failed with "
+ Iterables.getFirst(contexts, null).getError());
finalizeOperationsFailed.inc();
Throwable error = Iterables.getFirst(contexts, null).getError();
if (error instanceof ApiException) {
Code statusCode = ((ApiException) error).getStatusCode().getCode();
if (statusCode.equals(Code.NOT_FOUND)) {
return RetryType.DONT_RETRY;
}
}
return RetryType.RETRY_ALL_OPERATIONS;
},
r -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.google.api.core.ApiFuture;
import com.google.api.services.bigquery.model.TableSchema;
import com.google.cloud.bigquery.storage.v1.AppendRowsResponse;
import com.google.cloud.bigquery.storage.v1.Exceptions.StreamFinalizedException;
import com.google.cloud.bigquery.storage.v1.ProtoRows;
import com.google.cloud.bigquery.storage.v1.WriteStream.Type;
import com.google.protobuf.Descriptors.Descriptor;
Expand Down Expand Up @@ -56,13 +57,18 @@
import org.apache.beam.sdk.schemas.SchemaRegistry;
import org.apache.beam.sdk.state.StateSpec;
import org.apache.beam.sdk.state.StateSpecs;
import org.apache.beam.sdk.state.TimeDomain;
import org.apache.beam.sdk.state.Timer;
import org.apache.beam.sdk.state.TimerSpec;
import org.apache.beam.sdk.state.TimerSpecs;
import org.apache.beam.sdk.state.ValueState;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.Max;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.transforms.windowing.Repeatedly;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.util.ShardedKey;
Expand Down Expand Up @@ -92,12 +98,14 @@ public class StorageApiWritesShardedRecords<DestinationT, ElementT>
extends PTransform<
PCollection<KV<ShardedKey<DestinationT>, Iterable<byte[]>>>, PCollection<Void>> {
private static final Logger LOG = LoggerFactory.getLogger(StorageApiWritesShardedRecords.class);
private static final Duration DEFAULT_STREAM_IDLE_TIME = Duration.standardHours(1);

private final StorageApiDynamicDestinations<ElementT, DestinationT> dynamicDestinations;
private final CreateDisposition createDisposition;
private final String kmsKey;
private final BigQueryServices bqServices;
private final Coder<DestinationT> destinationCoder;
private final Duration streamIdleTime = DEFAULT_STREAM_IDLE_TIME;
private static final ExecutorService closeWriterExecutor = Executors.newCachedThreadPool();

private static final Cache<String, StreamAppendClient> APPEND_CLIENTS =
Expand Down Expand Up @@ -148,7 +156,7 @@ public PCollection<Void> expand(
PCollection<KV<String, Operation>> written =
input.apply(
"Write Records",
ParDo.of(new WriteRecordsDoFn(operationName))
ParDo.of(new WriteRecordsDoFn(operationName, streamIdleTime))
.withSideInputs(dynamicDestinations.getSideInputs()));

SchemaCoder<Operation> operationCoder;
Expand Down Expand Up @@ -185,6 +193,8 @@ class WriteRecordsDoFn
Metrics.counter(WriteRecordsDoFn.class, "recordsAppended");
private final Counter streamsCreated =
Metrics.counter(WriteRecordsDoFn.class, "streamsCreated");
private final Counter streamsIdle =
Metrics.counter(WriteRecordsDoFn.class, "idleStreamsFinalized");
private final Counter appendFailures =
Metrics.counter(WriteRecordsDoFn.class, "appendFailures");
private final Counter appendOffsetFailures =
Expand Down Expand Up @@ -212,8 +222,14 @@ class WriteRecordsDoFn
@StateId("streamOffset")
private final StateSpec<ValueState<Long>> streamOffsetSpec = StateSpecs.value();

public WriteRecordsDoFn(String operationName) {
@TimerId("idleTimer")
private final TimerSpec idleTimer = TimerSpecs.timer(TimeDomain.PROCESSING_TIME);

private final Duration streamIdleTime;

public WriteRecordsDoFn(String operationName, Duration streamIdleTime) {
this.messageConverters = new TwoLevelMessageConverterCache<>(operationName);
this.streamIdleTime = streamIdleTime;
}

@StartBundle
Expand All @@ -229,6 +245,7 @@ String getOrCreateStream(
String tableId,
ValueState<String> streamName,
ValueState<Long> streamOffset,
Timer streamIdleTimer,
DatasetService datasetService)
throws IOException, InterruptedException {
String stream = streamName.read();
Expand All @@ -239,6 +256,12 @@ String getOrCreateStream(
streamOffset.write(0L);
streamsCreated.inc();
}
// Reset the idle timer.
streamIdleTimer
.offset(streamIdleTime)
.withOutputTimestamp(GlobalWindow.INSTANCE.maxTimestamp())
.setRelative();

return stream;
}

Expand Down Expand Up @@ -270,6 +293,7 @@ public void process(
@Element KV<ShardedKey<DestinationT>, Iterable<byte[]>> element,
final @AlwaysFetched @StateId("streamName") ValueState<String> streamName,
final @AlwaysFetched @StateId("streamOffset") ValueState<Long> streamOffset,
@TimerId("idleTimer") Timer idleTimer,
final OutputReceiver<KV<String, Operation>> o)
throws Exception {
dynamicDestinations.setSideInputAccessorFromProcessContext(c);
Expand Down Expand Up @@ -343,7 +367,8 @@ public String toString() {
// Clear the stream name, forcing a new one to be created.
streamName.write("");
}
String stream = getOrCreateStream(tableId, streamName, streamOffset, datasetService);
String stream =
getOrCreateStream(tableId, streamName, streamOffset, idleTimer, datasetService);
StreamAppendClient appendClient =
APPEND_CLIENTS.get(
stream, () -> datasetService.getStreamAppendClient(stream, descriptor));
Expand Down Expand Up @@ -398,28 +423,39 @@ public String toString() {
// The first context is always the one that fails.
AppendRowsContext failedContext =
Preconditions.checkNotNull(Iterables.getFirst(failedContexts, null));
Status.Code statusCode = Status.fromThrowable(failedContext.getError()).getCode();
// Invalidate the StreamWriter and force a new one to be created.
LOG.error(
"Got error " + failedContext.getError() + " closing " + failedContext.streamName);
clearClients.accept(contexts);
appendFailures.inc();
if (statusCode.equals(Code.OUT_OF_RANGE) || statusCode.equals(Code.ALREADY_EXISTS)) {

boolean explicitStreamFinalized =
failedContext.getError() instanceof StreamFinalizedException;
Status.Code statusCode = Status.fromThrowable(failedContext.getError()).getCode();
// This means that the offset we have stored does not match the current end of
// the stream in the Storage API. Usually this happens because a crash or a bundle
// failure
// happened after an append but before the worker could checkpoint it's
// state. The records that were appended in a failed bundle will be retried,
// meaning that the unflushed tail of the stream must be discarded to prevent
// duplicates.
boolean offsetMismatch =
statusCode.equals(Code.OUT_OF_RANGE) || statusCode.equals(Code.ALREADY_EXISTS);
// This implies that the stream doesn't exist or has already been finalized. In this
// case we have no choice but to create a new stream.
boolean streamDoesNotExist =
explicitStreamFinalized
|| statusCode.equals(Code.INVALID_ARGUMENT)
|| statusCode.equals(Code.NOT_FOUND)
|| statusCode.equals(Code.FAILED_PRECONDITION);
if (offsetMismatch || streamDoesNotExist) {
appendOffsetFailures.inc();
LOG.warn(
"Append to "
+ failedContext
+ " failed with "
+ failedContext.getError()
+ " Will retry with a new stream");
// This means that the offset we have stored does not match the current end of
// the stream in the Storage API. Usually this happens because a crash or a bundle
// failure
// happened after an append but before the worker could checkpoint it's
// state. The records that were appended in a failed bundle will be retried,
// meaning that the unflushed tail of the stream must be discarded to prevent
// duplicates.

// Finalize the stream and clear streamName so a new stream will be created.
o.output(
KV.of(failedContext.streamName, new Operation(failedContext.offset - 1, true)));
Expand Down Expand Up @@ -466,24 +502,48 @@ public String toString() {

java.time.Duration timeElapsed = java.time.Duration.between(now, Instant.now());
appendLatencyDistribution.update(timeElapsed.toMillis());
idleTimer
.offset(streamIdleTime)
.withOutputTimestamp(GlobalWindow.INSTANCE.maxTimestamp())
.setRelative();
}

@OnWindowExpiration
public void onWindowExpiration(
// called by the idleTimer and window-expiration handlers.
private void finalizeStream(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will there be a risk of some exceptions thrown here?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry didn't send it out.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we shouldn't have exceptions here.

@AlwaysFetched @StateId("streamName") ValueState<String> streamName,
@AlwaysFetched @StateId("streamOffset") ValueState<Long> streamOffset,
OutputReceiver<KV<String, Operation>> o) {
// Window is done - usually because the pipeline has been drained. Make sure to clean up
// streams so that they are not leaked.
String stream = MoreObjects.firstNonNull(streamName.read(), null);

if (!Strings.isNullOrEmpty(stream)) {
// Finalize the stream
long nextOffset = MoreObjects.firstNonNull(streamOffset.read(), 0L);
o.output(KV.of(stream, new Operation(nextOffset - 1, true)));
streamName.clear();
streamOffset.clear();
// Make sure that the stream object is closed.
APPEND_CLIENTS.invalidate(stream);
}
}

@OnTimer("idleTimer")
public void onTimer(
@AlwaysFetched @StateId("streamName") ValueState<String> streamName,
@AlwaysFetched @StateId("streamOffset") ValueState<Long> streamOffset,
OutputReceiver<KV<String, Operation>> o) {
// Stream is idle - clear it.
finalizeStream(streamName, streamOffset, o);
streamsIdle.inc();
}

@OnWindowExpiration
public void onWindowExpiration(
@AlwaysFetched @StateId("streamName") ValueState<String> streamName,
@AlwaysFetched @StateId("streamOffset") ValueState<Long> streamOffset,
OutputReceiver<KV<String, Operation>> o) {
// Window is done - usually because the pipeline has been drained. Make sure to clean up
// streams so that they are not leaked.
finalizeStream(streamName, streamOffset, o);
}
}
}