Some datasets are too big to be processed on a single machine. tfds
supports
generating data across many machines by using
Apache Beam.
This doc has two sections:
- For user who want to generate an existing Beam dataset
- For developers who want to create a new Beam dataset
Below are different examples of generating a Beam dataset, both on the cloud or locally.
Warning: When generating the dataset with the
tfds build
CLI,
make sure to specify the dataset config you want to generate or it will default
to generate all existing configs. For example, for
wikipedia, use tfds build wikipedia/20200301.en
instead of tfds build wikipedia
.
To run the pipeline using Google Cloud Dataflow and take advantage of distributed computation, first follow the Quickstart instructions.
Once your environment is set up, you can run the
tfds build
CLI
using a data directory on GCS and
specifying the
required options
for the --beam_pipeline_options
flag.
To make it easier to launch the script, it's helpful to define the following variables using the actual values for your GCP/GCS setup and the dataset you want to generate:
DATASET_NAME=<dataset-name>
DATASET_CONFIG=<dataset-config>
GCP_PROJECT=my-project-id
GCS_BUCKET=gs://my-gcs-bucket
You will then need to create a file to tell Dataflow to install tfds
on the
workers:
echo "tensorflow_datasets[$DATASET_NAME]" > /tmp/beam_requirements.txt
If you're using tfds-nightly
, make sure to to echo from tfds-nightly
in case
the dataset has been updated since the last release.
echo "tfds-nightly[$DATASET_NAME]" > /tmp/beam_requirements.txt
Finally, you can launch the job using the command below:
tfds build $DATASET_NAME/$DATASET_CONFIG \
--data_dir=$GCS_BUCKET/tensorflow_datasets \
--beam_pipeline_options=\
"runner=DataflowRunner,project=$GCP_PROJECT,job_name=$DATASET_NAME-gen,"\
"staging_location=$GCS_BUCKET/binaries,temp_location=$GCS_BUCKET/temp,"\
"requirements_file=/tmp/beam_requirements.txt"
To run your script locally using the default Apache Beam runner, the command is the same as for other datasets:
tfds build my_dataset
Warning: Beam datasets can be huge (terabytes or larger) and take a significant amount of resources to be generated (can take weeks on a local computer). It is recommended to generate the datasets using a distributed environment. Have a look at the Apache Beam Documentation for a list of supported runtimes.
To generate the dataset on Beam, the API is the same as for other datasets. You
can customize the
beam.Pipeline
using the beam_options
(and beam_runner
) arguments of DownloadConfig
.
# If you are running on Dataflow, Spark,..., you may have to set-up runtime
# flags. Otherwise, you can leave flags empty [].
flags = ['--runner=DataflowRunner', '--project=<project-name>', ...]
# `beam_options` (and `beam_runner`) will be forwarded to `beam.Pipeline`
dl_config = tfds.download.DownloadConfig(
beam_options=beam.options.pipeline_options.PipelineOptions(flags=flags)
)
data_dir = 'gs://my-gcs-bucket/tensorflow_datasets'
builder = tfds.builder('wikipedia/20190301.en', data_dir=data_dir)
builder.download_and_prepare(download_config=dl_config)
In order to write Apache Beam datasets, you should be familiar with the following concepts:
- Be familiar with the
tfds
dataset creation guide as most of the content still applies for Beam datasets. - Get an introduction to Apache Beam with the Beam programming guide.
- If you want to generate your dataset using Cloud Dataflow, read the Google Cloud Documentation and the Apache Beam dependency guide.
If you are familiar with the
dataset creation guide,
adding a Beam dataset only requires to modify the _generate_examples
function.
The function should returns a beam object, rather than a generator:
Non-beam dataset:
def _generate_examples(self, path):
for f in path.iterdir():
yield _process_example(f)
Beam dataset:
def _generate_examples(self, path):
return (
beam.Create(path.iterdir())
| beam.Map(_process_example)
)
All the rest can be 100% identical, including tests.
Some additional considerations:
- Use
tfds.core.lazy_imports
to import Apache Beam. By using a lazy dependency, users can still read the dataset after it has been generated without having to install Beam. - Be careful with Python closures. When running the pipeline, the
beam.Map
andbeam.DoFn
functions are serialized usingpickle
and sent to all workers. Do not mutable objects inside abeam.PTransform
if the state has to be shared across workers. - Due to the way
tfds.core.DatasetBuilder
is serialized with pickle, mutatingtfds.core.DatasetBuilder
during data creation will be ignored on the workers (e.g. it's not possible to setself.info.metadata['offset'] = 123
in_split_generators
and access it from the workers likebeam.Map(lambda x: x + self.info.metadata['offset'])
) - If you need to share some pipeline steps between the splits, you can add add
an extra
pipeline: beam.Pipeline
kwarg to_split_generator
and control the full generation pipeline. See_generate_examples
documentation oftfds.core.GeneratorBasedBuilder
.
Here is an example of a Beam dataset.
class DummyBeamDataset(tfds.core.GeneratorBasedBuilder):
VERSION = tfds.core.Version('1.0.0')
def _info(self):
return tfds.core.DatasetInfo(
builder=self,
features=tfds.features.FeaturesDict({
'image': tfds.features.Image(shape=(16, 16, 1)),
'label': tfds.features.ClassLabel(names=['dog', 'cat']),
}),
)
def _split_generators(self, dl_manager):
...
return {
'train': self._generate_examples(file_dir='path/to/train_data/'),
'test': self._generate_examples(file_dir='path/to/test_data/'),
}
def _generate_examples(self, file_dir: str):
"""Generate examples as dicts."""
beam = tfds.core.lazy_imports.apache_beam
def _process_example(filename):
# Use filename as key
return filename, {
'image': os.path.join(file_dir, filename),
'label': filename.split('.')[1], # Extract label: "0010102.dog.jpeg"
}
return (
beam.Create(tf.io.gfile.listdir(file_dir))
| beam.Map(_process_example)
)
To run the pipeline, have a look at the above section.
Note: Like for non-beam datasets, do not forget to register download
checksums with --register_checksums
(only the first time to register the
downloads).
tfds build my_dataset --register_checksums
If you want to create a beam pipeline which takes a TFDS dataset as source, you
can use the tfds.beam.ReadFromTFDS
:
builder = tfds.builder('my_dataset')
_ = (
pipeline
| tfds.beam.ReadFromTFDS(builder, split='train')
| beam.Map(tfds.as_numpy)
| ...
)
It will process each shard of the dataset in parallel.
Note: This require the dataset to be already generated. To generate datasets using beam, see the other sections.