Skip to content

Commit

Permalink
Poll for subprocess output while the process is running. (kubeflow#70)
Browse files Browse the repository at this point in the history
* Poll for subprocess output while the process is running.

* Don't wait for the process to finish to log the output.
* This is more convenient for streaming the output of long running commands.
* We no longer use Airflow so redirecting to a file and then reading that
  file should no longer be necessary.

* Fix some bugs.

* Address code review comments. Rewrite run_and_output to use run.
  • Loading branch information
jlewi authored and k8s-ci-robot committed Mar 22, 2018
1 parent cad1d3b commit 3d58796
Showing 1 changed file with 32 additions and 38 deletions.
70 changes: 32 additions & 38 deletions py/kubeflow/testing/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,13 @@
MASTER_REPO_OWNER = "tensorflow"
MASTER_REPO_NAME = "k8s"

# TODO(jlewi): Should we stream the output by polling the subprocess?
# look at run_and_stream in build_and_push.
def run(command, cwd=None, env=None, dryrun=False):
def run(command, cwd=None, env=None, polling_interval=datetime.timedelta(seconds=1)):
"""Run a subprocess.
Any subprocess output is emitted through the logging modules.
Returns:
output: A string containing the output.
"""
logging.info("Running: %s \ncwd=%s", " ".join(command), cwd)

Expand All @@ -47,42 +48,35 @@ def run(command, cwd=None, env=None, dryrun=False):
logging.info("Running: Environment:\n%s", "\n".join(lines))

log_file = None
try:
if dryrun:
command_str = ("Dryrun: Command:\n{0}\nCWD:\n{1}\n"
"Environment:\n{2}").format(" ".join(command), cwd, env)
logging.info(command_str)

# We write stderr/stdout to a file and then read it and process it.
# We do this because if just inherit the handles from the parent the
# subprocess output doesn't show up in Airflow. This might be because
# we had multiple levels of processes invoking python processes.
with tempfile.NamedTemporaryFile(prefix="tmpRunLogs", delete=False,
mode="w") as hf:
log_file = hf.name
subprocess.check_call(command, cwd=cwd, env=env,
stdout=hf,
stderr=hf)
finally:
with open(log_file, "r") as hf:
output = hf.read()
logging.info("Subprocess output:\n%s", output)

def run_and_output(command, cwd=None, env=None):
logging.info("Running: %s \ncwd=%s", " ".join(command), cwd)

if not env:
env = os.environ
# The output won't be available until the command completes.
# So prefer using run if we don't need to return the output.
try:
output = subprocess.check_output(command, cwd=cwd, env=env,
stderr=subprocess.STDOUT).decode("utf-8")
logging.info("Subprocess output:\n%s", output)
except subprocess.CalledProcessError as e:
logging.info("Subprocess output:\n%s", e.output)
raise
return output
process = subprocess.Popen(
command, cwd=cwd, env=env, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)

logging.info("Subprocess output:\n")
output = []
while process.poll() is None:
process.stdout.flush()
for line in iter(process.stdout.readline, ''):
output.append(line.strip())
logging.info(line.strip())

time.sleep(polling_interval.total_seconds())

process.stdout.flush()
for line in iter(process.stdout.readline, ''):
output.append(line.strip())
logging.info(line.strip())

if process.returncode != 0:
raise subprocess.CalledProcessError(process.returncode,
"cmd: {0} exited with code {1}".format(
" ".join(command), process.returncode), "\n".join(output))

return "\n".join(output)

# TODO(jlewi): We should update callers to use run and just delete this function.
def run_and_output(*args, **argv):
return run(*args, **argv)


def clone_repo(dest, repo_owner=MASTER_REPO_OWNER, repo_name=MASTER_REPO_NAME,
Expand Down

0 comments on commit 3d58796

Please sign in to comment.