Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -84,12 +84,9 @@ void setJavascriptTextTransformFunctionName(

private static final Logger LOG = LoggerFactory.getLogger(TextIOToBigQuery.class);

private static final String BIGQUERY_SCHEMA = "BigQuery Schema";
private static final String NAME = "name";
private static final String TYPE = "type";
private static final String MODE = "mode";


public static void main(String[] args) {

Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
Pipeline pipeline = Pipeline.create(options);

Expand All @@ -107,49 +104,79 @@ public static void main(String[] args) {
.withSchema(
NestedValueProvider.of(
options.getJSONPath(),
new SerializableFunction<String, TableSchema>() {

@Override
public TableSchema apply(String jsonPath) {

TableSchema tableSchema = new TableSchema();
List<TableFieldSchema> fields = new ArrayList<>();
SchemaParser schemaParser = new SchemaParser();
JSONObject jsonSchema;

try {

jsonSchema = schemaParser.parseSchema(jsonPath);

JSONArray bqSchemaJsonArray =
jsonSchema.getJSONArray(BIGQUERY_SCHEMA);

for (int i = 0; i < bqSchemaJsonArray.length(); i++) {
JSONObject inputField = bqSchemaJsonArray.getJSONObject(i);
TableFieldSchema field =
new TableFieldSchema()
.setName(inputField.getString(NAME))
.setType(inputField.getString(TYPE));

if (inputField.has(MODE)) {
field.setMode(inputField.getString(MODE));
}

fields.add(field);
}
tableSchema.setFields(fields);

} catch (Exception e) {
throw new RuntimeException(e);
}
return tableSchema;
}
}))
new SchemaMapper()
))
.to(options.getOutputTable())
.withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
.withWriteDisposition(WriteDisposition.WRITE_TRUNCATE)
.withCustomGcsTempLocation(options.getBigQueryLoadingTemporaryDirectory()));

pipeline.run();
}

private static class SchemaMapper implements SerializableFunction<String, TableSchema> {
private static final long serialVersionUID = 7389953698875401789L;
private static final String BIGQUERY_SCHEMA = "BigQuery Schema";
private static final String NAME = "name";
private static final String TYPE = "type";
private static final String MODE = "mode";
private static final String RECORD = "RECORD";


@Override
public TableSchema apply(String jsonPath) {

TableSchema tableSchema = new TableSchema();
List<TableFieldSchema> fields = new ArrayList<>();
SchemaParser schemaParser = new SchemaParser();
JSONObject jsonSchema;

try {

jsonSchema = schemaParser.parseSchema(jsonPath);

JSONArray bqSchemaJsonArray =
jsonSchema.getJSONArray(BIGQUERY_SCHEMA);

for (int i = 0; i < bqSchemaJsonArray.length(); i++) {
JSONObject inputField = bqSchemaJsonArray.getJSONObject(i);
fields.add(mapJSONObjectToField(fields, inputField));
}

tableSchema.setFields(fields);

} catch (Exception e) {
throw new RuntimeException(e);
}
return tableSchema;
}
/**
* Method to process and map JSON bqSchema JSON objects
* into TableFieldSchema Objects.
* Recursive function to deal with nested RECORD type
* @param fields
* @param inputField
* @return
*/
private static TableFieldSchema mapJSONObjectToField(List<TableFieldSchema> fields, JSONObject inputField) {
TableFieldSchema field = new TableFieldSchema()
.setName(inputField.getString(NAME))
.setType(inputField.getString(TYPE));

if (inputField.has(MODE)) {
field.setMode(inputField.getString(MODE));
}
if (RECORD.equals(inputField.getString(TYPE))) {
List<TableFieldSchema> nestedFields = new ArrayList<>();
field.setFields(nestedFields);
JSONArray fieldsArr = inputField.getJSONArray("fields");
for (int i=0;i<fieldsArr.length();i++) {
JSONObject nestedJSON = fieldsArr.getJSONObject(i);
nestedFields.add(mapJSONObjectToField(nestedFields, nestedJSON));
}
}
return field;
}
}

}