Skip to content

[WIP] v0.1.0 #78

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ Under construction
| auth_method (service_account is supported) | string | optional | "application\_default" | See [Authentication](#authentication) |
| json_keyfile | string | optional | | keyfile path or `content` |
| project (x) | string | required unless service\_account's `json_keyfile` is given. | | project\_id |
| destination_project | string | optional | `project` value | A destination project to which the data will be loaded. Use this if you want to separate a billing project (the `project` value) and a destination project (the `destination_project` value). |
| dataset | string | required | | dataset |
| location | string | optional | nil | geographic location of dataset. See [Location](#location) |
| table | string | required | | table name, or table name with a partition decorator such as `table_name$20160929`|
Expand Down
55 changes: 31 additions & 24 deletions src/main/java/org/embulk/output/bigquery_java/BigqueryClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
public class BigqueryClient {
private final Logger logger = LoggerFactory.getLogger(BigqueryClient.class);
private BigQuery bigquery;
private String destinationProject;
private String dataset;
private String location;
private String locationForLog;
Expand All @@ -53,16 +54,17 @@ public class BigqueryClient {
public BigqueryClient(PluginTask task, Schema schema) {
this.task = task;
this.schema = schema;
this.destinationProject = task.getDestinationProject().get();
this.dataset = task.getDataset();
if (task.getLocation().isPresent()){
if (task.getLocation().isPresent()) {
this.location = task.getLocation().get();
this.locationForLog = task.getLocation().get();
}else{
} else {
this.locationForLog = "us/eu";
}
this.columnOptions = this.task.getColumnOptions().orElse(Collections.emptyList());
try {
this.bigquery = getClientWithJsonKey(this.task.getJsonKeyfile());
this.bigquery = getClientWithJsonKey(this.task.getJsonKeyfile().get());
} catch (IOException e) {
throw new RuntimeException(e);
}
Expand All @@ -75,15 +77,17 @@ private static BigQuery getClientWithJsonKey(String key) throws IOException {
.getService();
}

public Dataset createDataset(String datasetId){
DatasetInfo.Builder builder = DatasetInfo.newBuilder(datasetId);
if (this.location != null){
public Dataset createDataset(String datasetId) {
logger.info(String.format("embulk-output-bigquery_java: Create dataset... %s:%s in %s", this.destinationProject, datasetId, this.locationForLog));
DatasetInfo.Builder builder = DatasetInfo.newBuilder(this.destinationProject, datasetId);
if (this.location != null) {
builder.setLocation(this.location);
}
return bigquery.create(builder.build());
}

public Dataset getDataset(String datasetId) {
logger.info(String.format("embulk-output-bigquery_java: Get dataset... %s:%s", this.destinationProject, datasetId));
return bigquery.getDataset(datasetId);
}

Expand All @@ -92,14 +96,14 @@ public Job getJob(JobId jobId) {
}

public Table getTable(String name) {
return getTable(TableId.of(this.dataset, name));
return getTable(TableId.of(this.destinationProject, this.dataset, name));
}

public Table getTable(TableId tableId) {
return this.bigquery.getTable(tableId);
}

public void createTableIfNotExist(String table) {
public void createTableIfNotExist(String table) {
createTableIfNotExist(table, dataset);
}

Expand All @@ -112,14 +116,15 @@ public void createTableIfNotExist(String table, String dataset) {
}
TableDefinition tableDefinition = tableDefinitionBuilder.build();

try{
bigquery.create(TableInfo.newBuilder(TableId.of(dataset, table), tableDefinition).build());
}catch (BigQueryException e){
if(e.getCode() == 409 && e.getMessage().contains("Already Exists:")){
logger.info(String.format("embulk-output-bigquery: Create table... %s:%s.%s", this.destinationProject, dataset, table));
try {
bigquery.create(TableInfo.newBuilder(TableId.of(this.destinationProject, dataset, table), tableDefinition).build());
} catch (BigQueryException e) {
if (e.getCode() == 409 && e.getMessage().contains("Already Exists:")) {
return;
}
logger.error(String.format("embulk-out_bigquery: insert_table(%s, %s)", dataset, table));
throw new BigqueryException(String.format("failed to create table %s.%s, response: %s", dataset, table, e));
logger.error(String.format("embulk-out_bigquery: insert_table(%s, %s, %s)", this.destinationProject, dataset, table));
throw new BigqueryException(String.format("failed to create table %s:%s.%s, response: %s", this.destinationProject, dataset, table, e));
}
}

Expand Down Expand Up @@ -147,6 +152,7 @@ public JobStatistics.LoadStatistics load(Path loadFile, String table, JobInfo.Wr
int retries = this.task.getRetries();
PluginTask task = this.task;
Schema schema = this.schema;
String destinationProject = this.task.getDestinationProject().get();
List<BigqueryColumnOption> columnOptions = this.columnOptions;

try {
Expand All @@ -171,7 +177,7 @@ public JobStatistics.LoadStatistics call() {
return null;
}

TableId tableId = TableId.of(dataset, table);
TableId tableId = TableId.of(destinationProject, dataset, table);
WriteChannelConfiguration writeChannelConfiguration =
WriteChannelConfiguration.newBuilder(tableId)
.setFormatOptions(FormatOptions.json())
Expand Down Expand Up @@ -234,6 +240,7 @@ public JobStatistics.CopyStatistics copy(String sourceTable,
JobInfo.WriteDisposition writeDestination) throws BigqueryException {
String dataset = this.dataset;
int retries = this.task.getRetries();
String destinationProject = this.task.getDestinationProject().get();

try {
return retryExecutor()
Expand All @@ -245,8 +252,8 @@ public JobStatistics.CopyStatistics copy(String sourceTable,
public JobStatistics.CopyStatistics call() {
UUID uuid = UUID.randomUUID();
String jobId = String.format("embulk_load_job_%s", uuid.toString());
TableId destTableId = TableId.of(destinationDataset, destinationTable);
TableId srcTableId = TableId.of(dataset, sourceTable);
TableId destTableId = TableId.of(destinationProject, destinationDataset, destinationTable);
TableId srcTableId = TableId.of(destinationProject, dataset, sourceTable);

CopyJobConfiguration copyJobConfiguration = CopyJobConfiguration.newBuilder(destTableId, srcTableId)
.setWriteDisposition(writeDestination)
Expand Down Expand Up @@ -312,7 +319,7 @@ public JobStatistics.QueryStatistics call() {
.setUseLegacySql(false)
.build();
JobId.Builder jobIdBuilder = JobId.newBuilder().setJob(jobId);
if (location != null){
if (location != null) {
jobIdBuilder.setLocation(location);
}

Expand Down Expand Up @@ -362,24 +369,24 @@ public boolean deleteTable(String table) {
}

public boolean deleteTable(String table, String dataset) {
if (dataset == null){
if (dataset == null) {
dataset = this.dataset;
}
String chompedTable = BigqueryUtil.chompPartitionDecorator(table);
return deleteTableOrPartition(chompedTable, dataset);
}

public boolean deleteTableOrPartition(String table){
public boolean deleteTableOrPartition(String table) {
return deleteTableOrPartition(table, null);
}

// if `table` with a partition decorator is given, a partition is deleted.
public boolean deleteTableOrPartition(String table, String dataset){
if (dataset == null){
public boolean deleteTableOrPartition(String table, String dataset) {
if (dataset == null) {
dataset = this.dataset;
}
logger.info(String.format("embulk-output-bigquery: Delete table... %s.%s", dataset, table));
return this.bigquery.delete(TableId.of(dataset, table));
logger.info(String.format("embulk-output-bigquery: Delete table... %s:%s.%s", this.destinationProject, dataset, table));
return this.bigquery.delete(TableId.of(this.destinationProject, dataset, table));
}

private JobStatistics waitForLoad(Job job) throws BigqueryException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ public class BigqueryConfigValidator {
public static void validate(PluginTask task) {
validateMode(task);
validateModeAndAutoCreteTable(task);
validateTimePartitioning(task);
validateProject(task);
}

public static void validateMode(PluginTask task) throws ConfigException {
Expand All @@ -33,4 +35,14 @@ public static void validateTimePartitioning(PluginTask task) throws ConfigExcept
}
}
}

public static void validateProject(PluginTask task) throws ConfigException {
if (!task.getProject().isPresent()){
throw new ConfigException("project is empty");
}
String project = task.getProject().get();
if (project.isEmpty()){
throw new ConfigException("project is empty string");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,11 @@
import java.util.UUID;
import java.util.Optional;

import com.fasterxml.jackson.annotation.JacksonAnnotation;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import org.embulk.config.ConfigException;

public class BigqueryTaskBuilder {
private static final String uniqueName = UUID.randomUUID().toString().replace("-", "_");
Expand All @@ -16,6 +20,7 @@ public static PluginTask build(PluginTask task) {
setFileExt(task);
setTempTable(task);
setAbortOnError(task);
setProject(task);
return task;
}

Expand Down Expand Up @@ -63,4 +68,22 @@ protected static void setAbortOnError(PluginTask task) {
task.setAbortOnError(Optional.of(task.getMaxBadRecords() == 0));
}
}

@VisibleForTesting
protected static void setProject(PluginTask task){
if (task.getJsonKeyfile().isPresent()){
JsonNode root;
try {
ObjectMapper mapper = new ObjectMapper();
root = mapper.readTree(new File(task.getJsonKeyfile().get()));
}catch (IOException e){
throw new ConfigException(String.format("Parsing 'json_keyfile' failed with error: %s %s", e.getClass(), e.getMessage()));
}
task.setProject(Optional.of(root.get("project_id").asText()));
}

if (!task.getDestinationProject().isPresent()){
task.setDestinationProject(Optional.of(task.getProject().get()));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,20 @@ public interface PluginTask
String getAuthMethod();

@Config("json_keyfile")
String getJsonKeyfile();
@ConfigDefault("null")
Optional<String> getJsonKeyfile();

@Config("project")
@ConfigDefault("null")
Optional<String> getProject();

void setProject(Optional<String> project);

@Config("destination_project")
@ConfigDefault("null")
Optional<String> getDestinationProject();

void setDestinationProject(Optional<String> destinationProject);

@Config("dataset")
String getDataset();
Expand Down Expand Up @@ -154,4 +167,12 @@ public interface PluginTask
Optional<BigqueryTimePartitioning> getTimePartitioning();

void setTimePartitioning(Optional<BigqueryTimePartitioning> bigqueryTimePartitioning);

@Config("gcs_bucket")
@ConfigDefault("null")
Optional<String> getGcsBucket();

@Config("auto_create_gcs_bucket")
@ConfigDefault("false")
Optional<String> getAutoCreateGcsBucket();
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package org.embulk.output.bigquery_java.config;

import org.embulk.config.Config;
import org.embulk.config.ConfigException;
import org.embulk.config.ConfigSource;
import org.embulk.output.bigquery_java.BigqueryJavaOutputPlugin;
Expand All @@ -8,6 +9,8 @@
import org.junit.Rule;
import org.junit.Test;

import java.util.Optional;

import static org.junit.Assert.*;

public class TestBigqueryConfigValidator {
Expand Down Expand Up @@ -57,4 +60,30 @@ public void validateModeAndAutoCreteTable_autoCreateTable_False_configException(
task.setAutoCreateTable(false);
BigqueryConfigValidator.validateModeAndAutoCreteTable(task);
}

@Test
public void validateProject() {
config = loadYamlResource(embulk, "base.yml");
PluginTask task = config.loadConfig(PluginTask.class);
task.setProject(Optional.of("project_id"));
BigqueryConfigValidator.validateProject(task);

assertEquals("project_id", task.getProject().get());
}

@Test(expected = ConfigException.class)
public void validateProject_project_null_configException() {
config = loadYamlResource(embulk, "base.yml");
PluginTask task = config.loadConfig(PluginTask.class);
BigqueryConfigValidator.validateProject(task);
}

@Test(expected = ConfigException.class)
public void validateProject_project_empty_string_configException() {
config = loadYamlResource(embulk, "base.yml");
PluginTask task = config.loadConfig(PluginTask.class);
task.setProject(Optional.empty());
BigqueryConfigValidator.validateProject(task);
}

}
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package org.embulk.output.bigquery_java.config;

import com.google.common.io.Resources;
import org.embulk.config.ConfigException;
import org.embulk.config.ConfigSource;
import org.embulk.output.bigquery_java.BigqueryJavaOutputPlugin;
import org.embulk.spi.OutputPlugin;
Expand Down Expand Up @@ -44,4 +46,21 @@ public void setFileExt_JSONL_GZIP_JSONL_GZ() {
assertEquals("GZIP", task.getCompression());
assertEquals(".jsonl.gz", task.getFileExt().get());
}

@Test
public void setProject() {
config = loadYamlResource(embulk, "base.yml");
config.set("json_keyfile", Resources.getResource(BASIC_RESOURCE_PATH+"json_key.json").getPath());
PluginTask task = config.loadConfig(PluginTask.class);
BigqueryTaskBuilder.setProject(task);

assertEquals("project_id", task.getProject().get());
}

@Test(expected = ConfigException.class)
public void setProject_config_exception() {
config = loadYamlResource(embulk, "base.yml");
PluginTask task = config.loadConfig(PluginTask.class);
BigqueryTaskBuilder.setProject(task);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
{
"project_id": "project_id"
}