Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Tf2estimator on Pyspark support tensorboard #3959

Merged
merged 13 commits into from
Feb 9, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 45 additions & 0 deletions python/orca/src/bigdl/orca/data/file.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,12 @@

import os
import subprocess
import logging

from bigdl.dllib.utils.file_utils import callZooFunc

logger = logging.getLogger(__name__)


def open_text(path):
"""
Expand Down Expand Up @@ -251,3 +254,45 @@ def put_local_dir_to_remote(local_dir, remote_dir):
remote_dir = remote_dir[len("file://"):]
from distutils.dir_util import copy_tree
copy_tree(local_dir, remote_dir)


def put_local_dir_tree_to_remote(local_dir, remote_dir):
if remote_dir.startswith("hdfs"): # hdfs://url:port/file_path
test_cmd = 'hdfs dfs -ls {}'.format(remote_dir)
process = subprocess.Popen(test_cmd, shell=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
out, err = process.communicate()
if process.returncode != 0:
if 'No such file or directory' in err.decode('utf-8'):
mkdir_cmd = 'hdfs dfs -mkdir {}'.format(remote_dir)
mkdir_process = subprocess.Popen(mkdir_cmd, shell=True)
mkdir_process.wait()
else:
# ls remote dir error
logger.warning(err.decode('utf-8'))
return
cmd = 'hdfs dfs -put -f {}/* {}/'.format(local_dir, remote_dir)
process = subprocess.Popen(cmd, shell=True)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible to just use pyarrow or hadoop command instead of interleaving them together?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

using pyarrow to copy hdfs tree is a little complex. will change to use command all the ways.

process.wait()
elif remote_dir.startswith("s3"): # s3://bucket/file_path
access_key_id = os.environ["AWS_ACCESS_KEY_ID"]
secret_access_key = os.environ["AWS_SECRET_ACCESS_KEY"]
import boto3
s3_client = boto3.Session(
aws_access_key_id=access_key_id,
aws_secret_access_key=secret_access_key).client('s3', verify=False)
path_parts = remote_dir.split("://")[1].split('/')
bucket = path_parts.pop(0)
prefix = "/".join(path_parts)
local_files = [os.path.join(dirpath, f)
for (dirpath, dirnames, filenames) in os.walk(local_dir)
for f in filenames]
for file in local_files:
with open(file, "rb") as f:
s3_client.upload_fileobj(f, Bucket=bucket, Key=prefix+'/'+file[len(local_dir)+1:])
else:
if remote_dir.startswith("file://"):
remote_dir = remote_dir[len("file://"):]
from distutils.dir_util import copy_tree
copy_tree(local_dir, remote_dir)
37 changes: 27 additions & 10 deletions python/orca/src/bigdl/orca/learn/tf2/spark_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@
from bigdl.orca.data.utils import ray_partition_get_data_label
from bigdl.orca.data.file import put_local_dir_to_remote
from bigdl.orca.learn.utils import save_pkl, duplicate_stdout_stderr_to_file,\
get_specific_object_from_callbacks, get_replaced_path, get_rank
get_specific_object_from_callbacks, get_replaced_path, get_rank, \
process_tensorboard_in_callbacks
from bigdl.orca.learn.log_monitor import LogMonitor

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -274,7 +275,6 @@ def distributed_train_func(self, data_creator, config, epochs=1, verbose=1,
config=config, epochs=epochs,
steps_per_epoch=steps_per_epoch,
validation_steps=validation_steps)
checkpoint = None
if callbacks:
checkpoint = get_specific_object_from_callbacks(tf.keras.callbacks.ModelCheckpoint,
callbacks)
Expand All @@ -283,6 +283,8 @@ def distributed_train_func(self, data_creator, config, epochs=1, verbose=1,
replaced_checkpoint_path = get_replaced_path(checkpoint.filepath)
checkpoint.filepath = replaced_checkpoint_path

replaced_log_dir = process_tensorboard_in_callbacks(callbacks, "fit", self.rank)

history = model.fit(train_dataset,
epochs=epochs,
verbose=verbose,
Expand All @@ -294,14 +296,22 @@ def distributed_train_func(self, data_creator, config, epochs=1, verbose=1,
validation_steps=validation_steps,
validation_freq=validation_freq)

if checkpoint:
try:
if self.rank == 0:
put_local_dir_to_remote(os.path.dirname(replaced_checkpoint_path),
original_checkpoint_dir)
finally:
shutil.rmtree(os.path.dirname(replaced_checkpoint_path))

if callbacks:
if checkpoint:
checkpoint_copied = False
try:
if self.rank == 0:
put_local_dir_to_remote(os.path.dirname(replaced_checkpoint_path),
original_checkpoint_dir)
checkpoint_copied = True
except Exception:
logger.warning("Error when copy local checkpoint {} to {}, "
"please get the local checkpoint manually"
.format(replaced_checkpoint_path, original_checkpoint_dir))
if checkpoint_copied:
shutil.rmtree(os.path.dirname(replaced_checkpoint_path))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If put_local_dir_to_remote failed, maybe we should not delete replaced_checkpoint_path otherwise user's training result will be lost.

How about printing a warning stating that there is an error and the checkpoint is located at xxx and then users will have a chance to get them manually.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

if replaced_log_dir and os.path.exists(replaced_log_dir):
shutil.rmtree(replaced_log_dir)
return (model, history)

def step(self, data_creator, epochs=1, batch_size=32, verbose=1,
Expand Down Expand Up @@ -374,6 +384,8 @@ def validate(self, data_creator, batch_size=32, verbose=1, sample_weight=None,
dataset = dataset_handler.handle_dataset_validation(data_creator,
config=config,
steps=steps)
if callbacks:
replaced_log_dir = process_tensorboard_in_callbacks(callbacks, "evaluate", self.rank)

params = dict(
verbose=verbose,
Expand All @@ -397,6 +409,11 @@ def validate(self, data_creator, batch_size=32, verbose=1, sample_weight=None,
else:
stats = {"results": results}

# clean temporary dir for tensorboard
if callbacks:
if replaced_log_dir and os.path.exists(replaced_log_dir):
shutil.rmtree(replaced_log_dir)

if self.rank == 0:
if self.need_to_log_to_driver:
LogMonitor.stop_log_monitor(self.log_path, self.logger_thread, self.thread_stop)
Expand Down
70 changes: 70 additions & 0 deletions python/orca/src/bigdl/orca/learn/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
from bigdl.dllib.utils.file_utils import get_file_list
from bigdl.orca.data import SparkXShards
from bigdl.orca.data.utils import get_size
from bigdl.orca.data.file import put_local_dir_tree_to_remote, exists, makedirs
from bigdl.dllib.utils.utils import convert_row_to_numpy
import numpy as np
import pickle
Expand Down Expand Up @@ -440,3 +441,72 @@ def get_replaced_path(original_filepath):
base_name = os.path.basename(original_filepath)
temp_dir = tempfile.mkdtemp()
return os.path.join(temp_dir, base_name)


def process_tensorboard_in_callbacks(callbacks, mode="train", rank=None):
import tensorflow as tf

class EpochCopyCallback(tf.keras.callbacks.Callback):
def __init__(self, local_dir, remote_dir, rank=None):
super(EpochCopyCallback, self).__init__()
self.local_dir = local_dir
self.remote_dir = remote_dir
self.rank = rank

def on_epoch_end(self, epoch, logs=None):
if self.rank is not None:
if self.rank == 0:
put_local_dir_tree_to_remote(self.local_dir, self.remote_dir)

class TrainBatchCopyCallback(tf.keras.callbacks.Callback):
def __init__(self, local_dir, remote_dir, freq, rank=None):
super(TrainBatchCopyCallback, self).__init__()
self.local_dir = local_dir
self.remote_dir = remote_dir
self.freq = freq
self.rank = rank

def on_train_batch_end(self, batch, logs=None):
if self.rank is not None:
if self.rank == 0:
if batch % self.freq == 0:
put_local_dir_tree_to_remote(self.local_dir, self.remote_dir)

class BatchCopyCallback(tf.keras.callbacks.Callback):
def __init__(self, local_dir, remote_dir, freq, rank=None):
super(BatchCopyCallback, self).__init__()
self.local_dir = local_dir
self.remote_dir = remote_dir
self.freq = freq
self.rank = rank

def on_test_batch_end(self, batch, logs=None):
if self.rank is not None:
if self.rank == 0:
if batch % self.freq == 0:
put_local_dir_tree_to_remote(self.local_dir, self.remote_dir)

tensorboard = get_specific_object_from_callbacks(tf.keras.callbacks.TensorBoard,
callbacks)
if tensorboard:
original_log_dir = tensorboard.log_dir
replaced_log_dir = get_replaced_path(original_log_dir)
tensorboard.log_dir = replaced_log_dir

if tensorboard.update_freq == 'epoch':
# create copy callback for epoch
copy_callback = EpochCopyCallback(replaced_log_dir, original_log_dir, rank)
else:
# to avoid frequent copy, set update freq > 10
update_freq = tensorboard.update_freq if tensorboard.update_freq > 10 \
else 10
if mode == "fit":
# create copy callback for batch
copy_callback = TrainBatchCopyCallback(replaced_log_dir, original_log_dir,
update_freq, rank)
else:
copy_callback = BatchCopyCallback(replaced_log_dir, original_log_dir,
update_freq, rank)
callbacks.append(copy_callback)
return replaced_log_dir
return None
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,67 @@ def test_dataframe_different_train_val(self):
finally:
shutil.rmtree(temp_dir)

def test_tensorboard(self):
sc = OrcaContext.get_spark_context()
rdd = sc.range(0, 100)
spark = OrcaContext.get_spark_session()

from pyspark.ml.linalg import DenseVector
df = rdd.map(lambda x: (DenseVector(np.random.randn(1, ).astype(np.float)),
int(np.random.randint(0, 2, size=())))).toDF(["feature", "label"])

config = {
"lr": 0.2
}

try:
temp_dir = tempfile.mkdtemp()

trainer = Estimator.from_keras(
model_creator=model_creator,
verbose=True,
config=config,
workers_per_node=2,
backend="spark",
model_dir=temp_dir)

callbacks = [
tf.keras.callbacks.TensorBoard(log_dir=os.path.join(temp_dir, "train_log"),
update_freq='epoch')
]
res = trainer.fit(df, epochs=3, batch_size=4, steps_per_epoch=25,
callbacks=callbacks,
feature_cols=["feature"],
label_cols=["label"],
validation_data=df,
validation_steps=1)
assert len(os.listdir(os.path.join(temp_dir, "train_log"))) > 0

callbacks = [
tf.keras.callbacks.TensorBoard(log_dir=os.path.join(temp_dir, "train_log_2"),
update_freq='batch')
]
res = trainer.fit(df, epochs=3, batch_size=4, steps_per_epoch=25,
callbacks=callbacks,
feature_cols=["feature"],
label_cols=["label"],
validation_data=df,
validation_steps=11)
assert len(os.listdir(os.path.join(temp_dir, "train_log_2"))) > 0

callbacks = [
tf.keras.callbacks.TensorBoard(log_dir=os.path.join(temp_dir, "val_log"),
update_freq='batch')
]
res = trainer.evaluate(df, batch_size=4, num_steps=25,
callbacks=callbacks,
feature_cols=["feature"],
label_cols=["label"])
assert len(os.listdir(os.path.join(temp_dir, "val_log"))) > 0

finally:
shutil.rmtree(temp_dir)


if __name__ == "__main__":
pytest.main([__file__])