Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -169,13 +169,28 @@ public interface JdbcToBigQueryOptions
optional = true,
description = "The name of a column of numeric type that will be used for partitioning.",
helpText =
"If this parameter is provided with the name of the `table` defined as an optional parameter, JdbcIO reads the table in parallel by executing multiple instances of the query on the same table (subquery) using ranges. Currently, only supports `Long` partition columns.")
"If `partitionColumn` is specified along with the `table`, JdbcIO reads the table in parallel by executing multiple instances of the query on the same table (subquery)"
+ " using ranges. Currently, supports `Long` and `DateTime` partition columns. Pass the column type through `partitionColumnType`.")
String getPartitionColumn();

void setPartitionColumn(String partitionColumn);

@TemplateParameter.Text(
@TemplateParameter.Enum(
order = 14,
enumOptions = {
@TemplateParameter.TemplateEnumOption("long"),
@TemplateParameter.TemplateEnumOption("datetime")
},
optional = true,
description = "Partition column type.",
helpText = "The type of the `partitionColumn`, accepts either `long` or `datetime`.")
@Default.String("long")
String getPartitionColumnType();

void setPartitionColumnType(String partitionColumnType);

@TemplateParameter.Text(
order = 15,
optional = true,
description = "Name of the table in the external database.",
helpText =
Expand All @@ -186,7 +201,7 @@ public interface JdbcToBigQueryOptions
void setTable(String table);

@TemplateParameter.Integer(
order = 15,
order = 16,
optional = true,
description = "The number of partitions.",
helpText =
Expand All @@ -195,28 +210,30 @@ public interface JdbcToBigQueryOptions

void setNumPartitions(Integer numPartitions);

@TemplateParameter.Long(
order = 16,
@TemplateParameter.Text(
order = 17,
optional = true,
description = "Lower bound of partition column.",
helpText =
"The lower bound to use in the partition scheme. If not provided, this value is automatically inferred by Apache Beam for the supported types.")
Long getLowerBound();
"The lower bound to use in the partition scheme. If not provided, this value is automatically inferred by Apache Beam for the supported types. `datetime` "
+ "partitionColumnType accepts lower bound in the format `yyyy-MM-dd HH:mm:ss.SSSZ`. For example, `2024-02-20 07:55:45.000+03:30`.")
String getLowerBound();

void setLowerBound(Long lowerBound);
void setLowerBound(String lowerBound);

@TemplateParameter.Long(
order = 17,
@TemplateParameter.Text(
order = 18,
optional = true,
description = "Upper bound of partition column",
helpText =
"The upper bound to use in the partition scheme. If not provided, this value is automatically inferred by Apache Beam for the supported types.")
Long getUpperBound();
"The upper bound to use in the partition scheme. If not provided, this value is automatically inferred by Apache Beam for the supported types. `datetime` "
+ "partitionColumnType accepts upper bound in the format `yyyy-MM-dd HH:mm:ss.SSSZ`. For example, `2024-02-20 07:55:45.000+03:30`.")
String getUpperBound();

void setUpperBound(Long lowerBound);
void setUpperBound(String upperBound);

@TemplateParameter.Integer(
order = 18,
order = 19,
optional = true,
description = "Fetch Size",
// TODO: remove the "Not used for partitioned reads" once
Expand All @@ -229,7 +246,7 @@ public interface JdbcToBigQueryOptions
void setFetchSize(Integer fetchSize);

@TemplateParameter.Enum(
order = 19,
order = 20,
enumOptions = {
@TemplateParameter.TemplateEnumOption("CREATE_IF_NEEDED"),
@TemplateParameter.TemplateEnumOption("CREATE_NEVER")
Expand All @@ -244,7 +261,7 @@ public interface JdbcToBigQueryOptions
void setCreateDisposition(String createDisposition);

@TemplateParameter.GcsReadFile(
order = 20,
order = 21,
optional = true,
description = "Cloud Storage path to BigQuery JSON schema",
helpText =
Expand All @@ -255,7 +272,7 @@ public interface JdbcToBigQueryOptions
void setBigQuerySchemaPath(String path);

@TemplateParameter.BigQueryTable(
order = 21,
order = 22,
optional = true,
description =
"Table for messages that failed to reach the output table (i.e., Deadletter table) when using Storage Write API",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,11 @@
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TypeDescriptor;
import org.apache.beam.sdk.values.TypeDescriptors;
import org.joda.time.DateTime;
import org.joda.time.format.DateTimeFormat;
import org.joda.time.format.DateTimeFormatter;

/**
* A template that copies data from a relational database using JDBC to an existing BigQuery table.
Expand Down Expand Up @@ -160,21 +165,57 @@ static PipelineResult run(JdbcToBigQueryOptions options, Write<TableRow> writeTo
PCollection<TableRow> rows;
if (options.getPartitionColumn() != null && options.getTable() != null) {
// Read with Partitions
// TODO(pranavbhandari): Support readWithPartitions for other data types.
JdbcIO.ReadWithPartitions<TableRow, Long> readIO =
JdbcIO.<TableRow>readWithPartitions()
.withDataSourceConfiguration(dataSourceConfiguration)
.withTable(options.getTable())
.withPartitionColumn(options.getPartitionColumn())
.withRowMapper(JdbcConverters.getResultSetToTableRow(options.getUseColumnAlias()));
JdbcIO.ReadWithPartitions<TableRow, ?> readIO = null;
final String partitionColumnType = options.getPartitionColumnType();
if (partitionColumnType == null || "long".equals(partitionColumnType)) {
JdbcIO.ReadWithPartitions<TableRow, Long> longTypeReadIO =
JdbcIO.<TableRow, Long>readWithPartitions(TypeDescriptors.longs())
.withDataSourceConfiguration(dataSourceConfiguration)
.withTable(options.getTable())
.withPartitionColumn(options.getPartitionColumn())
.withRowMapper(JdbcConverters.getResultSetToTableRow(options.getUseColumnAlias()));
if (options.getLowerBound() != null && options.getUpperBound() != null) {
// Check if lower bound and upper bound are long type.
try {
longTypeReadIO =
longTypeReadIO
.withLowerBound(Long.valueOf(options.getLowerBound()))
.withUpperBound(Long.valueOf(options.getUpperBound()));
} catch (NumberFormatException e) {
throw new NumberFormatException(
"Expected Long values for lowerBound and upperBound, received : " + e.getMessage());
}
}
readIO = longTypeReadIO;
} else if ("datetime".equals(partitionColumnType)) {
JdbcIO.ReadWithPartitions<TableRow, DateTime> dateTimeReadIO =
JdbcIO.<TableRow, DateTime>readWithPartitions(TypeDescriptor.of(DateTime.class))
.withDataSourceConfiguration(dataSourceConfiguration)
.withTable(options.getTable())
.withPartitionColumn(options.getPartitionColumn())
.withRowMapper(JdbcConverters.getResultSetToTableRow(options.getUseColumnAlias()));
if (options.getLowerBound() != null && options.getUpperBound() != null) {
DateTimeFormatter dateFormatter =
DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss.SSSZ").withOffsetParsed();
// Check if lowerBound and upperBound are DateTime type.
try {
dateTimeReadIO =
dateTimeReadIO
.withLowerBound(dateFormatter.parseDateTime(options.getLowerBound()))
.withUpperBound(dateFormatter.parseDateTime(options.getUpperBound()));
} catch (IllegalArgumentException e) {
throw new IllegalArgumentException(
"Expected DateTime values in the format for lowerBound and upperBound, received : "
+ e.getMessage());
}
}
readIO = dateTimeReadIO;
} else {
throw new IllegalStateException("Received unsupported partitionColumnType.");
}
if (options.getNumPartitions() != null) {
readIO = readIO.withNumPartitions(options.getNumPartitions());
}
if (options.getLowerBound() != null && options.getUpperBound() != null) {
readIO =
readIO.withLowerBound(options.getLowerBound()).withUpperBound(options.getUpperBound());
}

if (options.getFetchSize() != null && options.getFetchSize() > 0) {
readIO = readIO.withFetchSize(options.getFetchSize());
}
Expand Down
Loading