Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions v1/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -885,6 +885,8 @@
<exclude>**/*InformationSchemaScanner.*</exclude>
<!-- Excluding auto-generated classes. -->
<exclude>**/*AutoValue_*</exclude>
<!-- Excluding LocalSpannerIO changestream tests -->
<exclude>**/*com/google/cloud/teleport/spanner/spannerio/changestreams/**/*</exclude>
</excludes>
<rules>
<rule>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,11 @@
import com.google.api.gax.longrunning.OperationFuture;
import com.google.cloud.spanner.DatabaseAdminClient;
import com.google.cloud.teleport.spanner.ddl.Ddl;
import com.google.cloud.teleport.spanner.spannerio.SpannerAccessor;
import com.google.cloud.teleport.spanner.spannerio.SpannerConfig;
import com.google.spanner.admin.database.v1.UpdateDatabaseDdlMetadata;
import java.util.List;
import java.util.concurrent.ExecutionException;
import org.apache.beam.sdk.io.gcp.spanner.LocalSpannerAccessor;
import org.apache.beam.sdk.io.gcp.spanner.SpannerConfig;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
Expand Down Expand Up @@ -66,11 +66,11 @@ public PCollection<Ddl> expand(PCollection<Ddl> input) {
ParDo.of(
new DoFn<Ddl, Ddl>() {

private transient LocalSpannerAccessor spannerAccessor;
private transient SpannerAccessor spannerAccessor;

@Setup
public void setup() {
spannerAccessor = LocalSpannerAccessor.getOrCreate(spannerConfig);
spannerAccessor = SpannerAccessor.getOrCreate(spannerConfig);
}

@Teardown
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,12 @@
import com.google.cloud.teleport.spanner.ddl.Column;
import com.google.cloud.teleport.spanner.ddl.Ddl;
import com.google.cloud.teleport.spanner.ddl.Table;
import com.google.cloud.teleport.spanner.spannerio.ReadOperation;
import com.google.common.annotations.VisibleForTesting;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.beam.sdk.io.gcp.spanner.ReadOperation;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,10 @@
import com.google.cloud.teleport.metadata.TemplateParameter;
import com.google.cloud.teleport.metadata.TemplateParameter.TemplateEnumOption;
import com.google.cloud.teleport.spanner.ExportPipeline.ExportPipelineOptions;
import com.google.cloud.teleport.spanner.spannerio.SpannerConfig;
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.io.gcp.spanner.SpannerConfig;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,10 @@
import com.google.cloud.teleport.spanner.proto.ExportProtos.Export;
import com.google.cloud.teleport.spanner.proto.ExportProtos.ProtoDialect;
import com.google.cloud.teleport.spanner.proto.ExportProtos.TableManifest;
import com.google.cloud.teleport.spanner.spannerio.ReadOperation;
import com.google.cloud.teleport.spanner.spannerio.SpannerConfig;
import com.google.cloud.teleport.spanner.spannerio.SpannerIO;
import com.google.cloud.teleport.spanner.spannerio.Transaction;
import com.google.cloud.teleport.templates.common.SpannerConverters.CreateTransactionFnWithTimestamp;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Strings;
Expand Down Expand Up @@ -80,10 +84,6 @@
import org.apache.beam.sdk.io.WriteFilesResult;
import org.apache.beam.sdk.io.fs.ResolveOptions;
import org.apache.beam.sdk.io.fs.ResourceId;
import org.apache.beam.sdk.io.gcp.spanner.LocalSpannerIO;
import org.apache.beam.sdk.io.gcp.spanner.ReadOperation;
import org.apache.beam.sdk.io.gcp.spanner.SpannerConfig;
import org.apache.beam.sdk.io.gcp.spanner.Transaction;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.Combine.CombineFn;
Expand Down Expand Up @@ -185,7 +185,7 @@ public WriteFilesResult<String> expand(PBegin begin) {

/*
* Allow users to specify read timestamp.
* CreateTransaction and CreateTransactionFn classes in LocalSpannerIO
* CreateTransaction and CreateTransactionFn classes in SpannerIO
* only take a timestamp object for exact staleness which works when
* parameters are provided during template compile time. They do not work with
* a Timestamp valueProvider which can take parameters at runtime. Hence a new
Expand Down Expand Up @@ -457,7 +457,7 @@ public void processElement(ProcessContext c) {
PCollection<Struct> rows =
tableReadOperations.apply(
"Read all rows from Spanner",
LocalSpannerIO.readAll().withTransaction(tx).withSpannerConfig(spannerConfig));
SpannerIO.readAll().withTransaction(tx).withSpannerConfig(spannerConfig));

ValueProvider<ResourceId> resource =
ValueProvider.NestedValueProvider.of(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,10 @@
import com.google.cloud.teleport.metadata.TemplateParameter;
import com.google.cloud.teleport.metadata.TemplateParameter.TemplateEnumOption;
import com.google.cloud.teleport.spanner.ImportPipeline.Options;
import com.google.cloud.teleport.spanner.spannerio.SpannerConfig;
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.io.gcp.spanner.SpannerConfig;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,11 @@
import com.google.cloud.teleport.spanner.proto.ExportProtos.Export;
import com.google.cloud.teleport.spanner.proto.ExportProtos.ProtoDialect;
import com.google.cloud.teleport.spanner.proto.ExportProtos.TableManifest;
import com.google.cloud.teleport.spanner.spannerio.SpannerAccessor;
import com.google.cloud.teleport.spanner.spannerio.SpannerConfig;
import com.google.cloud.teleport.spanner.spannerio.SpannerIO;
import com.google.cloud.teleport.spanner.spannerio.SpannerWriteResult;
import com.google.cloud.teleport.spanner.spannerio.Transaction;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Strings;
import com.google.common.base.Verify;
Expand Down Expand Up @@ -66,11 +71,6 @@
import org.apache.beam.sdk.io.fs.EmptyMatchTreatment;
import org.apache.beam.sdk.io.fs.MatchResult;
import org.apache.beam.sdk.io.fs.ResourceId;
import org.apache.beam.sdk.io.gcp.spanner.LocalSpannerAccessor;
import org.apache.beam.sdk.io.gcp.spanner.LocalSpannerIO;
import org.apache.beam.sdk.io.gcp.spanner.SpannerConfig;
import org.apache.beam.sdk.io.gcp.spanner.SpannerWriteResult;
import org.apache.beam.sdk.io.gcp.spanner.Transaction;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider;
import org.apache.beam.sdk.transforms.Combine;
Expand Down Expand Up @@ -173,7 +173,7 @@ public void processElement(ProcessContext c) {
schemas.apply("Build avro DDL", Combine.globally(AsList.fn()));

PCollectionView<Transaction> tx =
begin.apply(LocalSpannerIO.createTransaction().withSpannerConfig(spannerConfig));
begin.apply(SpannerIO.createTransaction().withSpannerConfig(spannerConfig));

PCollection<Ddl> informationSchemaDdl =
begin.apply(
Expand Down Expand Up @@ -273,7 +273,7 @@ public void processElement(ProcessContext c) {
SpannerWriteResult result =
mutations.apply(
"Write mutations " + depth,
LocalSpannerIO.write()
SpannerIO.write()
.withSchemaReadySignal(ddl)
.withSpannerConfig(spannerConfig)
.withCommitDeadline(Duration.standardMinutes(1))
Expand Down Expand Up @@ -408,7 +408,7 @@ private static class CreateTables extends PTransform<PBegin, PCollectionTuple> {
private final ValueProvider<Boolean> earlyIndexCreateFlag;
private final ValueProvider<Integer> ddlCreationTimeoutInMinutes;

private transient LocalSpannerAccessor spannerAccessor;
private transient SpannerAccessor spannerAccessor;
private static final Logger LOG = LoggerFactory.getLogger(CreateTables.class);

/* If the schema has a lot of DDL changes after data load, it's preferable to create
Expand Down Expand Up @@ -464,7 +464,7 @@ public PCollectionTuple expand(PBegin begin) {

@Setup
public void setup() {
spannerAccessor = LocalSpannerAccessor.getOrCreate(spannerConfig);
spannerAccessor = SpannerAccessor.getOrCreate(spannerConfig);
}

@Teardown
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@

import com.google.cloud.spanner.DatabaseClient;
import com.google.cloud.spanner.Dialect;
import org.apache.beam.sdk.io.gcp.spanner.LocalSpannerAccessor;
import org.apache.beam.sdk.io.gcp.spanner.SpannerConfig;
import com.google.cloud.teleport.spanner.spannerio.SpannerAccessor;
import com.google.cloud.teleport.spanner.spannerio.SpannerConfig;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
Expand All @@ -43,15 +43,15 @@ public PCollection<Dialect> expand(PBegin p) {

private static class ReadDialectFn extends DoFn<Void, Dialect> {
private final SpannerConfig spannerConfig;
private transient LocalSpannerAccessor spannerAccessor;
private transient SpannerAccessor spannerAccessor;

public ReadDialectFn(SpannerConfig spannerConfig) {
this.spannerConfig = spannerConfig;
}

@Setup
public void setup() throws Exception {
spannerAccessor = LocalSpannerAccessor.getOrCreate(spannerConfig);
spannerAccessor = SpannerAccessor.getOrCreate(spannerConfig);
}

@Teardown
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@
import com.google.cloud.spanner.Dialect;
import com.google.cloud.teleport.spanner.ddl.Ddl;
import com.google.cloud.teleport.spanner.ddl.InformationSchemaScanner;
import org.apache.beam.sdk.io.gcp.spanner.LocalSpannerAccessor;
import org.apache.beam.sdk.io.gcp.spanner.SpannerConfig;
import org.apache.beam.sdk.io.gcp.spanner.Transaction;
import com.google.cloud.teleport.spanner.spannerio.SpannerAccessor;
import com.google.cloud.teleport.spanner.spannerio.SpannerConfig;
import com.google.cloud.teleport.spanner.spannerio.Transaction;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
Expand Down Expand Up @@ -59,7 +59,7 @@ public PCollection<Ddl> expand(PBegin p) {

private static class ReadInformationSchemaFn extends DoFn<Void, Ddl> {
private final SpannerConfig spannerConfig;
private transient LocalSpannerAccessor spannerAccessor;
private transient SpannerAccessor spannerAccessor;
private final PCollectionView<Transaction> tx;
private final PCollectionView<Dialect> dialectView;

Expand All @@ -74,7 +74,7 @@ public ReadInformationSchemaFn(

@Setup
public void setup() throws Exception {
spannerAccessor = LocalSpannerAccessor.getOrCreate(spannerConfig);
spannerAccessor = SpannerAccessor.getOrCreate(spannerConfig);
}

@Teardown
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,10 @@
import com.google.cloud.teleport.metadata.TemplateParameter;
import com.google.cloud.teleport.metadata.TemplateParameter.TemplateEnumOption;
import com.google.cloud.teleport.spanner.TextImportPipeline.Options;
import com.google.cloud.teleport.spanner.spannerio.SpannerConfig;
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.io.gcp.spanner.SpannerConfig;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,10 @@
import com.google.cloud.teleport.spanner.proto.ExportProtos.ProtoDialect;
import com.google.cloud.teleport.spanner.proto.TextImportProtos.ImportManifest;
import com.google.cloud.teleport.spanner.proto.TextImportProtos.ImportManifest.TableManifest;
import com.google.cloud.teleport.spanner.spannerio.SpannerConfig;
import com.google.cloud.teleport.spanner.spannerio.SpannerIO;
import com.google.cloud.teleport.spanner.spannerio.SpannerWriteResult;
import com.google.cloud.teleport.spanner.spannerio.Transaction;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.HashMultimap;
import com.google.protobuf.util.JsonFormat;
Expand Down Expand Up @@ -53,10 +57,6 @@
import org.apache.beam.sdk.io.fs.EmptyMatchTreatment;
import org.apache.beam.sdk.io.fs.MatchResult;
import org.apache.beam.sdk.io.fs.ResourceId;
import org.apache.beam.sdk.io.gcp.spanner.LocalSpannerIO;
import org.apache.beam.sdk.io.gcp.spanner.SpannerConfig;
import org.apache.beam.sdk.io.gcp.spanner.SpannerWriteResult;
import org.apache.beam.sdk.io.gcp.spanner.Transaction;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.Create;
Expand Down Expand Up @@ -105,7 +105,7 @@ public TextImportTransform(
@Override
public PDone expand(PBegin begin) {
PCollectionView<Transaction> tx =
begin.apply(LocalSpannerIO.createTransaction().withSpannerConfig(spannerConfig));
begin.apply(SpannerIO.createTransaction().withSpannerConfig(spannerConfig));

PCollectionView<Dialect> dialectView =
begin
Expand Down Expand Up @@ -203,7 +203,7 @@ public void processElement(ProcessContext c) {
.apply("Wait for previous depth " + depth, Wait.on(previousComputation))
.apply(
"Write mutations " + depth,
LocalSpannerIO.write()
SpannerIO.write()
.withSpannerConfig(spannerConfig)
.withCommitDeadline(Duration.standardMinutes(1))
.withMaxCumulativeBackoff(Duration.standardHours(2))
Expand Down
Loading
Loading