Skip to content

Commit e5fac57

Browse files
i-onyashahabyongtangvnghiakvignesh1420
authored
Adds AVRO_PARSER_NUM_MINIBATCH to override num_minibatches and logs the parsing time (#1283)
* Exposes num_parallel_reads and num_parallel_calls -Exposes `num_parallel_reads` and `num_parallel_calls` in AvroRecordDataset and `make_avro_record_dataset` -Adds parameter constraints -Fixes lint issues -Adds test method for _require() function -This update adds a test to check if ValueErrors are raised when given an invalid input for num_parallel_calls * Bump Apache Arrow to 2.0.0 (#1231) * Bump Apache Arrow to 2.0.0 Also bumps Apache Thrift to 0.13.0 Signed-off-by: Yong Tang <yong.tang.github@outlook.com> * Update code to match Arrow Signed-off-by: Yong Tang <yong.tang.github@outlook.com> * Bump pyarrow to 2.0.0 Signed-off-by: Yong Tang <yong.tang.github@outlook.com> * Stay with version=1 for write_feather to pass tests Signed-off-by: Yong Tang <yong.tang.github@outlook.com> * Bump flatbuffers to 1.12.0 Signed-off-by: Yong Tang <yong.tang.github@outlook.com> * Fix Windows issue Signed-off-by: Yong Tang <yong.tang.github@outlook.com> * Fix tests Signed-off-by: Yong Tang <yong.tang.github@outlook.com> * Fix Windows Signed-off-by: Yong Tang <yong.tang.github@outlook.com> * Remove -std=c++11 and leave default -std=c++14 for arrow build Signed-off-by: Yong Tang <yong.tang.github@outlook.com> * Update sha256 of libapr1 As the hash changed by the repo. Signed-off-by: Yong Tang <yong.tang.github@outlook.com> * Add emulator for gcs (#1234) * Bump com_github_googleapis_google_cloud_cpp to `1.21.0` * Add gcs testbench * Bump `libcurl` to `7.69.1` * Remove the CI build for CentOS 8 (#1237) Building shared libraries on CentOS 8 is pretty much the same as on Ubuntu 20.04 except `apt` should be changed to `yum`. For that our CentOS 8 CI test is not adding a lot of value. Furthermore with the upcoming CentOS 8 change: https://www.phoronix.com/scan.php?page=news_item&px=CentOS-8-Ending-For-Stream CentOS 8 is effectively EOLed at 2021. For that we may want to drop the CentOS 8 build (only leave a comment in README.md) Note we keep CentOS 7 build for now as there are still many users using CentOS 7 and CentOS 7 will only be EOLed at 2024. We might drop CentOS 7 build in the future as well if there is similiar changes to CentOS 7 like CentOS 8. Signed-off-by: Yong Tang <yong.tang.github@outlook.com> * add tf-c-header rule (#1244) * Skip tf-nightly:tensorflow-io==0.17.0 on API compatibility test (#1247) Signed-off-by: Yong Tang <yong.tang.github@outlook.com> * [s3] add support for testing on macOS (#1253) * [s3] add support for testing on macOS * modify docker-compose cmd * add notebook formatting instruction in README (#1256) * [docs] Restructure README.md content (#1257) * Refactor README.md content * bump to run ci jobs * Update libtiff/libgeotiff dependency (#1258) This PR updates libtiff/libgeotiff to the latest version. Signed-off-by: Yong Tang <yong.tang.github@outlook.com> * remove unstable elasticsearch test setup on macOS (#1263) * Exposes num_parallel_reads and num_parallel_calls (#1232) -Exposes `num_parallel_reads` and `num_parallel_calls` in AvroRecordDataset and `make_avro_record_dataset` -Adds parameter constraints -Fixes lint issues - Adds test method for _require() function -This update adds a test to check if ValueErrors are raised when given an invalid input for num_parallel_calls Co-authored-by: Abin Shahab <ashahab@linkedin.com> * Added AVRO_PARSER_NUM_MINIBATCH to override num_minibatches Added AVRO_PARSER_NUM_MINIBATCH to override num_minibatches. This is recommended to be set equal to the vcore request. * Exposes num_parallel_reads and num_parallel_calls (#1232) * Exposes num_parallel_reads and num_parallel_calls -Exposes `num_parallel_reads` and `num_parallel_calls` in AvroRecordDataset and `make_avro_record_dataset` -Adds parameter constraints -Fixes lint issues * Exposes num_parallel_reads and num_parallel_calls -Exposes `num_parallel_reads` and `num_parallel_calls` in AvroRecordDataset and `make_avro_record_dataset` -Adds parameter constraints -Fixes lint issues * Exposes num_parallel_reads and num_parallel_calls -Exposes `num_parallel_reads` and `num_parallel_calls` in AvroRecordDataset and `make_avro_record_dataset` -Adds parameter constraints -Fixes lint issues * Fixes Lint Issues * Removes Optional typing for method parameter - * Adds test method for _require() function -This update adds a test to check if ValueErrors are raised when given an invalid input for num_parallel_calls * Uncomments skip for macOS pytests * Fixes Lint issues Co-authored-by: Abin Shahab <ashahab@linkedin.com> * add avro tutorial testing data (#1267) Co-authored-by: Cheng Ren <1428327+chengren311@users.noreply.github.com> * Update Kafka tutorial to work with Apache Kafka (#1266) * Update Kafka tutorial to work with Apache Kafka Minor update to the Kafka tutorial to remove the dependency on Confluent's distribution of Kafka, and instead work with vanilla Apache Kafka. Signed-off-by: Dale Lane <dale.lane@uk.ibm.com> * Address review comments Remove redundant pip install commands Signed-off-by: Dale Lane <dale.lane@gmail.com> * add github workflow for performance benchmarking (#1269) * add github workflow for performance benchmarking * add github-action-benchmark step * handle missing dependencies while benchmarking (#1271) * handle missing dependencies while benchmarking * setup test_sql * job name change * set auto-push to true * remove auto-push * add personal access token * use alternate method to push to gh-pages * add name to the action * use different id * modify creds * use github_token * change repo name * set auto-push * set origin and push results * set env * use PERSONAL_GITHUB_TOKEN * use push changes action * use github.head_ref to push the changes * try using fetch-depth * modify branch name * use alternative push approach * git switch - * test by merging with forked master * Disable s3 macOS for now as docker is not working on GitHub Actions for macOS (#1277) * Revert "[s3] add support for testing on macOS (#1253)" This reverts commit 81789bd. Signed-off-by: Yong Tang <yong.tang.github@outlook.com> * Update Signed-off-by: Yong Tang <yong.tang.github@outlook.com> * rename testing data files (#1278) * Add tutorial for avro dataset API (#1250) * remove docker based mongodb tests in macos (#1279) * trigger benchmarks workflow only on commits (#1282) * Bump Apache Arrow to 3.0.0 (#1285) Signed-off-by: Yong Tang <yong.tang.github@outlook.com> * Add bazel cache (#1287) Signed-off-by: Yong Tang <yong.tang.github@outlook.com> * Add initial bigtable stub test (#1286) * Add initial bigtable stub test Signed-off-by: Yong Tang <yong.tang.github@outlook.com> * Fix kokoro test Signed-off-by: Yong Tang <yong.tang.github@outlook.com> * Add reference to github-pages benchmarks in README (#1289) * add reference to github-pages benchmarks * minor grammar change * Update README.md Co-authored-by: Yuan Tang <terrytangyuan@gmail.com> Co-authored-by: Yuan Tang <terrytangyuan@gmail.com> * Clear outputs (#1292) * fix kafka online-learning section in tutorial notebook (#1274) * kafka notebook fix for colab env * change timeout from 30 to 20 seconds * reduce stream_timeout * Only enable bazel caching writes for tensorflow/io github actions (#1293) This PR updates so that only GitHub actions run on tensorflow/io repo will be enabled with bazel cache writes. Without the updates, a focked repo actions will cause error. Note once bazel cache read-permissions are enabled from gcs forked repo will be able to access bazel cache (read-only). Signed-off-by: Yong Tang <yong.tang.github@outlook.com> * Enable ready-only bazel cache (#1294) This PR enables read-only bazel cache Signed-off-by: Yong Tang <yong.tang.github@outlook.com> * Rename tests (#1297) * Combine Ubuntu 20.04 and CentOS 7 tests into one GitHub jobs (#1299) When GitHub Actions runs it looks like there is an implicit concurrent jobs limit. As such the CentOS 7 test normally is scheduled later after other jobs completes. However, many times CentOS 7 test hangs (e.g., https://github.com/tensorflow/io/runs/1825943449). This is likely due to the CentOS 7 test is on the GitHub Actions queue for too long. This PR moves CentOS 7 to run after Ubuntu 20.04 test complete, to try to avoid hangs. Signed-off-by: Yong Tang <yong.tang.github@outlook.com> * Update names of api tests (#1300) We renamed the tests to remove "_eager" parts. This PR updates the api test for correct filenames Signed-off-by: Yong Tang <yong.tang.github@outlook.com> * Fix wrong benchmark tests names (#1301) Fixes wrong benchmark tests names caused by last commit Signed-off-by: Yong Tang <yong.tang.github@outlook.com> * Patch arrow to temporarily resolve the ARROW-11518 issue (#1304) This PR patchs arrow to temporarily resolve the ARROW-11518 issue. See 1281 for details Credit to diggerk. We will update arrow after the upstream PR is merged. Signed-off-by: Yong Tang <yong.tang.github@outlook.com> * Remove AWS headers from tensorflow, and use headers from third_party … (#1241) * Remove external headers from tensorflow, and use third_party headers instead This PR removes external headers from tensorflow, and use third_party headers instead. Signed-off-by: Yong Tang <yong.tang.github@outlook.com> * Address review comment Signed-off-by: Yong Tang <yong.tang.github@outlook.com> * Switch to use github to download libgeotiff (#1307) Signed-off-by: Yong Tang <yong.tang.github@outlook.com> * Add @com_google_absl//absl/strings:cord (#1308) Fix read/STDIN_FILENO Signed-off-by: Yong Tang <yong.tang.github@outlook.com> * Switch to modular file system for hdfs (#1309) * Switch to modular file system for hdfs This PR is part of the effort to switch to modular file system for hdfs. When TF_ENABLE_LEGACY_FILESYSTEM=1 is provided, old behavior will be preserved. Signed-off-by: Yong Tang <yong.tang.github@outlook.com> * Build against tf-nightly Signed-off-by: Yong Tang <yong.tang.github@outlook.com> * Update tests Signed-off-by: Yong Tang <yong.tang.github@outlook.com> * Adjust the if else logic, follow review comment Signed-off-by: Yong Tang <yong.tang.github@outlook.com> * Disable test_write_kafka test for now. (#1310) With tensorflow upgrade to tf-nightly, the test_write_kafka test is failing and that is block the plan to modular file system migration. This PR disables the test temporarily so that CI can continue to push tensorflow-io-nightly image (needed for modular file system migration) Signed-off-by: Yong Tang <yong.tang.github@outlook.com> * Switch to modular file system for s3 (#1312) This PR is part of the effort to switch to modular file system for s3. When TF_ENABLE_LEGACY_FILESYSTEM=1 is provided, old behavior will be preserved. Signed-off-by: Yong Tang <yong.tang.github@outlook.com> * Add python 3.9 on Windows (#1316) * Updates the PR to use attribute instead of Env Variable -Originally AVRO_PARSER_NUM_MINIBATCH was set as an environmental variable. Because tensorflow-io rarely uses env vars to fine tune kernal ops this was changed to an attribute. See comment here: #1283 (comment) * Added AVRO_PARSER_NUM_MINIBATCH to override num_minibatches Added AVRO_PARSER_NUM_MINIBATCH to override num_minibatches. This is recommended to be set equal to the vcore request. * Updates the PR to use attribute instead of Env Variable -Originally AVRO_PARSER_NUM_MINIBATCH was set as an environmental variable. Because tensorflow-io rarely uses env vars to fine tune kernal ops this was changed to an attribute. See comment here: #1283 (comment) * Adds addtional comments in source code for understandability Co-authored-by: Abin Shahab <ashahab@linkedin.com> Co-authored-by: Yong Tang <yong.tang.github@outlook.com> Co-authored-by: Vo Van Nghia <vovannghia2409@gmail.com> Co-authored-by: Vignesh Kothapalli <vikoth18@in.ibm.com> Co-authored-by: Cheng Ren <chren@linkedin.com> Co-authored-by: Cheng Ren <1428327+chengren311@users.noreply.github.com> Co-authored-by: Dale Lane <dale.lane@gmail.com> Co-authored-by: Yuan Tang <terrytangyuan@gmail.com> Co-authored-by: Mark Daoust <markdaoust@google.com>
1 parent 5345316 commit e5fac57

File tree

5 files changed

+76
-10
lines changed

5 files changed

+76
-10
lines changed

.github/workflows/build.yml

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,21 @@ jobs:
8888
docker run -i --rm -v $PWD:/v -w /v --net=host \
8989
-e BAZEL_OPTIMIZATION="${BAZEL_OPTIMIZATION}" \
9090
ubuntu:20.04 bash -x -e source.sh
91+
- name: CentOS 7
92+
run: |
93+
if [[ "${EVENT_NAME}" == "push" && "${REPO_NAME}" == "tensorflow/io" ]]; then
94+
printf '%s\n' "${GCP_CREDS}" >service_account_creds.json
95+
export BAZEL_OPTIMIZATION="--remote_cache=https://storage.googleapis.com/tensorflow-sigs-io --remote_upload_local_results=true --google_credentials=service_account_creds.json"
96+
else
97+
export BAZEL_OPTIMIZATION="--remote_cache=https://storage.googleapis.com/tensorflow-sigs-io --remote_upload_local_results=false"
98+
fi
99+
set -x -e
100+
bash -x -e .github/workflows/build.space.sh
101+
python3 .github/workflows/build.instruction.py docs/development.md "##### CentOS 7" > source.sh
102+
cat source.sh
103+
docker run -i --rm -v $PWD:/v -w /v --net=host \
104+
-e BAZEL_OPTIMIZATION="${BAZEL_OPTIMIZATION}" \
105+
centos:7 bash -x -e source.sh
91106
92107
macos-bazel:
93108
name: Bazel macOS

tensorflow_io/core/kernels/avro/parse_avro_kernels.cc

Lines changed: 37 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -172,7 +172,9 @@ Status ParseAvro(const AvroParserConfig& config,
172172
const gtl::ArraySlice<tstring>& serialized,
173173
thread::ThreadPool* thread_pool, AvroResult* result) {
174174
DCHECK(result != nullptr);
175-
175+
using clock = std::chrono::system_clock;
176+
using ms = std::chrono::duration<double, std::milli>;
177+
const auto before = clock::now();
176178
// Allocate dense output for fixed length dense values
177179
// (variable-length dense and sparse and ragged have to be buffered).
178180
/* std::vector<Tensor> fixed_len_dense_values(config.dense.size());
@@ -189,6 +191,10 @@ Status ParseAvro(const AvroParserConfig& config,
189191
// This parameter affects performance in a big and data-dependent way.
190192
const size_t kMiniBatchSizeBytes = 50000;
191193

194+
// avro_num_minibatches_ is int64 in the op interface. If not set
195+
// the default value is 0.
196+
size_t avro_num_minibatches_;
197+
192198
// Calculate number of minibatches.
193199
// In main regime make each minibatch around kMiniBatchSizeBytes bytes.
194200
// Apply 'special logic' below for small and big regimes.
@@ -204,8 +210,13 @@ Status ParseAvro(const AvroParserConfig& config,
204210
minibatch_bytes = 0;
205211
}
206212
}
207-
// 'special logic'
208-
const size_t min_minibatches = std::min<size_t>(8, serialized.size());
213+
if (avro_num_minibatches_) {
214+
VLOG(5) << "Overriding num_minibatches with " << avro_num_minibatches_;
215+
result = avro_num_minibatches_;
216+
}
217+
// This is to ensure users can control the num minibatches all the way down
218+
// to size of 1(no parallelism).
219+
const size_t min_minibatches = std::min<size_t>(1, serialized.size());
209220
const size_t max_minibatches = 64;
210221
return std::max<size_t>(min_minibatches,
211222
std::min<size_t>(max_minibatches, result));
@@ -245,13 +256,16 @@ Status ParseAvro(const AvroParserConfig& config,
245256
auto read_value = [&](avro::GenericDatum& d) {
246257
return range_reader.read(d);
247258
};
248-
259+
VLOG(5) << "Processing minibatch " << minibatch;
249260
status_of_minibatch[minibatch] = parser_tree.ParseValues(
250261
&buffers[minibatch], read_value, reader_schema, defaults);
251262
};
252-
263+
const auto before_parse = clock::now();
253264
ParallelFor(ProcessMiniBatch, num_minibatches, thread_pool);
254-
265+
const auto after_parse = clock::now();
266+
const ms parse_read_duration = after_parse - before_parse;
267+
VLOG(5) << "PARSER_TIMING: Time spend reading and parsing "
268+
<< parse_read_duration.count() << " ms ";
255269
for (Status& status : status_of_minibatch) {
256270
TF_RETURN_IF_ERROR(status);
257271
}
@@ -367,15 +381,22 @@ Status ParseAvro(const AvroParserConfig& config,
367381

368382
return Status::OK();
369383
};
370-
384+
const auto before_sparse_merge = clock::now();
371385
for (size_t d = 0; d < config.sparse.size(); ++d) {
372386
TF_RETURN_IF_ERROR(MergeSparseMinibatches(d));
373387
}
374-
388+
const auto after_sparse_merge = clock::now();
389+
const ms s_merge_duration = after_sparse_merge - before_sparse_merge;
375390
for (size_t d = 0; d < config.dense.size(); ++d) {
376391
TF_RETURN_IF_ERROR(MergeDenseMinibatches(d));
377392
}
393+
const auto after_dense_merge = clock::now();
394+
const ms d_merge_duration = after_dense_merge - after_sparse_merge;
395+
VLOG(5) << "PARSER_TIMING: Sparse merge duration" << s_merge_duration.count()
396+
<< " ms ";
378397

398+
VLOG(5) << "PARSER_TIMING: Dense merge duration" << d_merge_duration.count()
399+
<< " ms ";
379400
return Status::OK();
380401
}
381402

@@ -388,6 +409,8 @@ class ParseAvroOp : public OpKernel {
388409
OP_REQUIRES_OK(ctx, ctx->GetAttr("sparse_types", &sparse_types_));
389410
OP_REQUIRES_OK(ctx, ctx->GetAttr("dense_types", &dense_types_));
390411
OP_REQUIRES_OK(ctx, ctx->GetAttr("dense_shapes", &dense_shapes_));
412+
OP_REQUIRES_OK(
413+
ctx, ctx->GetAttr("avro_num_minibatches", &avro_num_minibatches_));
391414

392415
OP_REQUIRES_OK(ctx, ctx->GetAttr("sparse_keys", &sparse_keys_));
393416
OP_REQUIRES_OK(ctx, ctx->GetAttr("dense_keys", &dense_keys_));
@@ -401,6 +424,11 @@ class ParseAvroOp : public OpKernel {
401424
dense_shapes_[d].dims() > 1 && dense_shapes_[d].dim_size(0) == -1;
402425
}
403426

427+
// Check that avro_num_minibatches is not negative
428+
OP_REQUIRES(ctx, avro_num_minibatches_ >= 0,
429+
errors::InvalidArgument("Need avro_num_minibatches >= 0, got ",
430+
avro_num_minibatches_));
431+
404432
string reader_schema_str;
405433
OP_REQUIRES_OK(ctx, ctx->GetAttr("reader_schema", &reader_schema_str));
406434

@@ -495,6 +523,7 @@ class ParseAvroOp : public OpKernel {
495523
avro::ValidSchema reader_schema_;
496524
size_t num_dense_;
497525
size_t num_sparse_;
526+
int64 avro_num_minibatches_;
498527

499528
private:
500529
std::vector<std::pair<string, DataType>> CreateKeysAndTypes() {

tensorflow_io/core/kernels/avro/utils/avro_parser_tree.cc

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,9 @@ Status AvroParserTree::ParseValues(
8181
const std::function<bool(avro::GenericDatum&)> read_value,
8282
const avro::ValidSchema& reader_schema,
8383
const std::map<string, Tensor>& defaults) const {
84+
using clock = std::chrono::system_clock;
85+
using ms = std::chrono::duration<double, std::milli>;
86+
8487
// new assignment of all buffers
8588
TF_RETURN_IF_ERROR(InitializeValueBuffers(key_to_value));
8689

@@ -90,11 +93,24 @@ Status AvroParserTree::ParseValues(
9093
avro::GenericDatum datum(reader_schema);
9194

9295
bool has_value = false;
93-
94-
while ((has_value = read_value(datum))) {
96+
ms parse_duration;
97+
ms read_duration;
98+
while (true) {
99+
const auto before_read = clock::now();
100+
if (!(has_value = read_value(datum))) {
101+
break;
102+
}
103+
const auto after_read = clock::now();
95104
TF_RETURN_IF_ERROR((*root_).Parse(key_to_value, datum, defaults));
105+
const auto after_parse = clock::now();
106+
parse_duration += after_parse - after_read;
107+
read_duration += after_read - before_read;
96108
}
97109

110+
VLOG(5) << "PARSER_TIMING: Avro Read times " << read_duration.count()
111+
<< " ms ";
112+
VLOG(5) << "PARSER_TIMING: Avro Parse times " << parse_duration.count()
113+
<< " ms ";
98114
// add end marks to all buffers for batch
99115
TF_RETURN_IF_ERROR(AddFinishMarks(key_to_value));
100116

tensorflow_io/core/ops/avro_ops.cc

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,7 @@ REGISTER_OP("IO>ParseAvro")
8383
.Output("sparse_values: sparse_types")
8484
.Output("sparse_shapes: num_sparse * int64")
8585
.Output("dense_values: dense_types")
86+
.Attr("avro_num_minibatches: int >= 0")
8687
.Attr("num_sparse: int >= 0")
8788
.Attr("reader_schema: string")
8889
.Attr("sparse_keys: list(string) >= 0")
@@ -94,6 +95,7 @@ REGISTER_OP("IO>ParseAvro")
9495
.SetShapeFn([](shape_inference::InferenceContext* c) {
9596
size_t num_dense;
9697
size_t num_sparse;
98+
int64 avro_num_minibatches;
9799
int64 num_sparse_from_user;
98100
std::vector<DataType> sparse_types;
99101
std::vector<DataType> dense_types;
@@ -106,6 +108,8 @@ REGISTER_OP("IO>ParseAvro")
106108
TF_RETURN_IF_ERROR(c->GetAttr("sparse_types", &sparse_types));
107109
TF_RETURN_IF_ERROR(c->GetAttr("dense_types", &dense_types));
108110
TF_RETURN_IF_ERROR(c->GetAttr("dense_shapes", &dense_shapes));
111+
TF_RETURN_IF_ERROR(
112+
c->GetAttr("avro_num_minibatches", &avro_num_minibatches));
109113

110114
TF_RETURN_IF_ERROR(c->GetAttr("sparse_keys", &sparse_keys));
111115
TF_RETURN_IF_ERROR(c->GetAttr("sparse_ranks", &sparse_ranks));

tensorflow_io/core/python/experimental/parse_avro_ops.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -130,6 +130,7 @@ def _parse_avro(
130130
dense_defaults=None,
131131
dense_shapes=None,
132132
name=None,
133+
avro_num_minibatches=0,
133134
):
134135
"""Parses Avro records.
135136
@@ -196,6 +197,7 @@ def _parse_avro(
196197
dense_keys=dense_keys,
197198
dense_shapes=dense_shapes,
198199
name=name,
200+
avro_num_minibatches=avro_num_minibatches,
199201
)
200202

201203
(sparse_indices, sparse_values, sparse_shapes, dense_values) = outputs

0 commit comments

Comments
 (0)