Skip to content

Commit

Permalink
feat: add support for missing value interpretation (#428)
Browse files Browse the repository at this point in the history
Fixes #426  🦕
  • Loading branch information
alvarowolfx authored Mar 21, 2024
1 parent e6f9323 commit 1a3e4ba
Show file tree
Hide file tree
Showing 3 changed files with 260 additions and 21 deletions.
45 changes: 34 additions & 11 deletions src/managedwriter/json_writer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,17 @@

import * as protos from '../../protos/protos';
import {PendingWrite} from './pending_write';
import {StreamConnection, RemoveListener} from './stream_connection';
import {RemoveListener} from './stream_connection';
import * as adapt from '../adapt';
import {Writer} from './writer';
import {Writer, WriterOptions} from './writer';
import {JSONEncoder} from './encoder';

type TableSchema = protos.google.cloud.bigquery.storage.v1.ITableSchema;
type MissingValueInterpretation =
protos.google.cloud.bigquery.storage.v1.AppendRowsRequest['defaultMissingValueInterpretation'];
type MissingValueInterpretationMap = {
[column: string]: MissingValueInterpretation;
};
type IInt64Value = protos.google.protobuf.IInt64Value;
type IDescriptorProto = protos.google.protobuf.IDescriptorProto;
export type JSONPrimitive = string | number | boolean | Date | null;
Expand Down Expand Up @@ -49,16 +54,10 @@ export class JSONWriter {
/**
* Creates a new JSONWriter instance.
*
* @param {Object} params - The parameters for the JSONWriter.
* @param {StreamConnection} params.connection - The stream connection
* to the BigQuery streaming insert operation.
* @param {IDescriptorProto} params.protoDescriptor - The proto descriptor
* for the JSON rows.
* @param {WriterOptions} params - The parameters for the JSONWriter.
* See WriterOptions docs for more information.
*/
constructor(params: {
connection: StreamConnection;
protoDescriptor: IDescriptorProto;
}) {
constructor(params: WriterOptions) {
const {connection, protoDescriptor} = params;
this._writer = new Writer(params);
this._encoder = new JSONEncoder({
Expand Down Expand Up @@ -88,6 +87,30 @@ export class JSONWriter {
this._encoder.setProtoDescriptor(protoDescriptor);
}

/**
* Update how missing values are interpreted for the given stream.
*
* @param {MissingValueInterpretation} defaultMissingValueInterpretation
*/
setDefaultMissingValueInterpretation(
defaultMissingValueInterpretation: MissingValueInterpretation
) {
this._writer.setDefaultMissingValueInterpretation(
defaultMissingValueInterpretation
);
}

/**
* Update how missing values are interpreted for individual columns.
*
* @param {MissingValueInterpretationMap} missingValueInterpretations
*/
setMissingValueInterpretations(
missingValueInterpretations: MissingValueInterpretationMap
) {
this._writer.setMissingValueInterpretations(missingValueInterpretations);
}

/**
* Writes a JSONList that contains objects to be written to the BigQuery table by first converting
* the JSON data to protobuf messages, then using Writer's appendRows() to write the data at current end
Expand Down
100 changes: 90 additions & 10 deletions src/managedwriter/writer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,60 @@ type ProtoData =
protos.google.cloud.bigquery.storage.v1.AppendRowsRequest.IProtoData;
type IDescriptorProto = protos.google.protobuf.IDescriptorProto;
type DescriptorProto = protos.google.protobuf.DescriptorProto;
type MissingValueInterpretation =
AppendRowRequest['defaultMissingValueInterpretation'];
type MissingValueInterpretationMap = {
[column: string]: MissingValueInterpretation;
};

const DescriptorProto = protos.google.protobuf.DescriptorProto;

export interface WriterOptions {
/** The stream connection to the BigQuery streaming insert operation. */
connection: StreamConnection;

/** The proto descriptor for the stream. */
protoDescriptor: IDescriptorProto;

/**
* Controls how missing values are interpreted for a given stream.
* `missingValueInterpretations` set for individual columns can override the default chosen
* with this option.
*
* For example, if you want to write
* `NULL` instead of using default values for some columns, you can set
* `defaultMissingValueInterpretation` to `DEFAULT_VALUE` and at the same
* time, set `missingValueInterpretations` to `NULL_VALUE` on those columns.
*/
defaultMissingValueInterpretation?: MissingValueInterpretation;

/**
* Control how missing values are interpreted for individual columns.
*
* You must provide an object to indicate how to interpret missing value for some fields. Missing
* values are fields present in user schema but missing in rows. The key is
* the field name. The value is the interpretation of missing values for the
* field.
*
* For example, the following option would indicate that missing values in the "foo"
* column are interpreted as null, whereas missing values in the "bar" column are
* treated as the default value:
*
* {
* "foo": 'DEFAULT_VALUE',
* "bar": 'NULL_VALUE',
* }
*
* If a field is not in this object and has missing values, the missing values
* in this field are interpreted as NULL unless overridden with a default missing
* value interpretation.
*
* Currently, field name can only be top-level column name, can't be a struct
* field path like 'foo.bar'.
*/
missingValueInterpretations?: MissingValueInterpretationMap;
}

/**
* A BigQuery Storage API Writer that can be used to write data into BigQuery Table
* using the Storage API.
Expand All @@ -37,23 +88,26 @@ const DescriptorProto = protos.google.protobuf.DescriptorProto;
export class Writer {
private _protoDescriptor: DescriptorProto;
private _streamConnection: StreamConnection;
private _defaultMissingValueInterpretation?: MissingValueInterpretation;
private _missingValueInterpretations?: MissingValueInterpretationMap;

/**
* Creates a new Writer instance.
*
* @param {Object} params - The parameters for the JSONWriter.
* @param {StreamConnection} params.connection - The stream connection
* to the BigQuery streaming insert operation.
* @param {IDescriptorProto} params.protoDescriptor - The proto descriptor
* for the JSON rows.
* @param {WriterOptions} params - The parameters for the Writer.
* See WriterOptions docs for more information.
*/
constructor(params: {
connection: StreamConnection;
protoDescriptor: IDescriptorProto;
}) {
const {connection, protoDescriptor} = params;
constructor(params: WriterOptions) {
const {
connection,
protoDescriptor,
missingValueInterpretations,
defaultMissingValueInterpretation,
} = params;
this._streamConnection = connection;
this._protoDescriptor = new DescriptorProto(protoDescriptor);
this._defaultMissingValueInterpretation = defaultMissingValueInterpretation;
this._missingValueInterpretations = missingValueInterpretations;
}

/**
Expand All @@ -72,6 +126,28 @@ export class Writer {
}
}

/**
* Update how missing values are interpreted for the given stream.
*
* @param {MissingValueInterpretation} defaultMissingValueInterpretation
*/
setDefaultMissingValueInterpretation(
defaultMissingValueInterpretation: MissingValueInterpretation
) {
this._defaultMissingValueInterpretation = defaultMissingValueInterpretation;
}

/**
* Update how missing values are interpreted for individual columns.
*
* @param {MissingValueInterpretationMap} missingValueInterpretations
*/
setMissingValueInterpretations(
missingValueInterpretations: MissingValueInterpretationMap
) {
this._missingValueInterpretations = missingValueInterpretations;
}

/**
* Schedules the writing of rows at given offset.
*
Expand All @@ -97,6 +173,10 @@ export class Writer {
protoDescriptor: this._protoDescriptor.toJSON(),
},
},
defaultMissingValueInterpretation:
this._defaultMissingValueInterpretation,
missingValueInterpretations: this
._missingValueInterpretations as AppendRowRequest['missingValueInterpretations'],
offset,
};

Expand Down
136 changes: 136 additions & 0 deletions system-test/managed_writer_client_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -629,6 +629,142 @@ describe('managedwriter.WriterClient', () => {
}).timeout(30 * 1000);
});

it('should fill default values when MissingValuesInterpretation is set', async () => {
bqWriteClient.initialize();
const client = new WriterClient();
client.setClient(bqWriteClient);

const schema: TableSchema = {
fields: [
{
name: 'customer_name',
type: 'STRING',
mode: 'REQUIRED',
},
{
name: 'row_num',
type: 'INTEGER',
mode: 'REQUIRED',
},
{
name: 'id',
type: 'STRING',
defaultValueExpression: 'GENERATE_UUID()',
},
{
name: 'created_at',
type: 'TIMESTAMP',
defaultValueExpression: 'CURRENT_TIMESTAMP()',
},
{
name: 'updated_at',
type: 'TIMESTAMP',
defaultValueExpression: 'CURRENT_TIMESTAMP()',
},
],
};
const [table] = await bigquery
.dataset(datasetId)
.createTable(tableId + '_default_values', {schema});
const parent = `projects/${projectId}/datasets/${datasetId}/tables/${table.id}`;

const storageSchema =
adapt.convertBigQuerySchemaToStorageTableSchema(schema);
const protoDescriptor: DescriptorProto =
adapt.convertStorageSchemaToProto2Descriptor(storageSchema, 'root');

const row1 = {
customer_name: 'Ada Lovelace',
row_num: 1,
};

const row2 = {
customer_name: 'Alan Turing',
row_num: 2,
};

try {
const connection = await client.createStreamConnection({
streamType: managedwriter.PendingStream,
destinationTable: parent,
});

const streamId = connection.getStreamId();
const writer = new JSONWriter({
connection,
protoDescriptor,
defaultMissingValueInterpretation: 'DEFAULT_VALUE',
missingValueInterpretations: {
updated_at: 'NULL_VALUE',
},
});

let pw = writer.appendRows([row1, row2], 0);
let result = await pw.getResult();

// change MVI config
writer.setDefaultMissingValueInterpretation('NULL_VALUE');
writer.setMissingValueInterpretations({
updated_at: 'DEFAULT_VALUE',
});

const row3 = {
customer_name: 'Charles Babbage',
row_num: 3,
};

const row4 = {
customer_name: 'Lord Byron',
row_num: 4,
};

pw = writer.appendRows([row3, row4], 2);
result = await pw.getResult();

assert.equal(result.error, null);

const res = await connection.finalize();
connection.close();
assert.equal(res?.rowCount, 4);

const commitResponse = await client.batchCommitWriteStream({
parent,
writeStreams: [streamId],
});
assert.equal(commitResponse.streamErrors?.length, 0);

const [rows] = await bigquery.query(
`SELECT * FROM \`${projectId}.${datasetId}.${table.id}\` order by row_num`
);
assert.strictEqual(rows.length, 4);

const first = rows[0];
assert.notEqual(first.id, null);
assert.notEqual(first.created_at, null);
assert.equal(first.updated_at, null);

const second = rows[1];
assert.notEqual(second.id, null);
assert.notEqual(second.created_at, null);
assert.equal(second.updated_at, null);

// After change on MVI config
const third = rows[2];
assert.equal(third.id, null);
assert.equal(third.created_at, null);
assert.notEqual(third.updated_at, null);

const forth = rows[3];
assert.equal(forth.id, null);
assert.equal(forth.created_at, null);
assert.notEqual(forth.updated_at, null);

writer.close();
} finally {
client.close();
}
});

describe('Error Scenarios', () => {
it('send request with mismatched proto descriptor', async () => {
bqWriteClient.initialize();
Expand Down

0 comments on commit 1a3e4ba

Please sign in to comment.