Skip to content

Commit

Permalink
Improved error state handling (#37)
Browse files Browse the repository at this point in the history
* Skip unique check when doing an eval

* Detect if the worker has exited

* Add subshell in _async_job for atomic print

This partially reverts d5e007b and will allow the output to be written
in a single print statement, allowing the output to be captured in one
go on the other end (usually).

* Signal when the worker exits

* Fix eval command

* Restore removed comment

* Use is-at-least and disable HUP trap on newer Zsh versions

* Improve zpty worker termination and killjobs

* Clean up requests that contain garbage carriage returns

* Silence zpty errors when there is a callback handler

* Bump dev version

* Update readme

* Fix the check for older Zsh versions in worker

* travis: Add Zsh 5.7, 5.7.1 and 5.8 as test targets

* test: Fix exit status of test runner

* travis: Disable 5.0.2 tests due to missing signals on Travis
  • Loading branch information
mafredri authored Apr 13, 2020
1 parent 95c2b15 commit 32548d3
Show file tree
Hide file tree
Showing 5 changed files with 115 additions and 50 deletions.
6 changes: 5 additions & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@ env:
matrix:
# Use _ZSH_VERSION since if ZSH_VERSION is present, travis cacher thinks it
# is running in zsh and tries to use zsh specific functions.
- _ZSH_VERSION=5.8
- _ZSH_VERSION=5.7.1
- _ZSH_VERSION=5.7
- _ZSH_VERSION=5.6.2
- _ZSH_VERSION=5.5.1
- _ZSH_VERSION=5.4.2
Expand All @@ -19,7 +22,8 @@ env:
- _ZSH_VERSION=5.2
- _ZSH_VERSION=5.1.1
- _ZSH_VERSION=5.0.8
- _ZSH_VERSION=5.0.2
# Zsh 5.0.2 has issues on Travis, missing signals.
# - _ZSH_VERSION=5.0.2

cache:
directories:
Expand Down
7 changes: 7 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,13 @@ The `callback_function` is called with the following parameters:
* `$6` has next result in buffer (0 = buffer empty, 1 = yes)
* This means another async job has completed and is pending in the buffer, it's very likely that your callback function will be called a second time (or more) in this execution. It's generally a good idea to e.g. delay prompt updates (`zle reset-prompt`) until the buffer is empty to prevent strange states in ZLE.

Possible error return codes for the job name `[async]`:

* `1` Corrupt worker output.
* `2` ZLE watcher detected an error on the worker fd.
* `3` Response from async_job when worker is missing.
* `130` Async worker crashed, this should not happen but it can mean the file descriptor has become corrupt. This must be followed by a `async_stop_worker [name]` and then the worker and tasks should be restarted. It is unknown why this happens.

#### `async_register_callback <worker_name> <callback_function>`

Register a callback for completed jobs. As soon as a job is finished, `async_process_results` will be called with the specified callback function. This requires that a worker is initialized with the -n (notify) option.
Expand Down
132 changes: 98 additions & 34 deletions async.zsh
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,12 @@
#
# zsh-async
#
# version: 1.7.2
# version: 1.8.0-dev2
# author: Mathias Fredriksson
# url: https://github.com/mafredri/zsh-async
#

typeset -g ASYNC_VERSION=1.7.2
typeset -g ASYNC_VERSION=1.8.0-dev2
# Produce debug output from zsh-async when set to 1.
typeset -g ASYNC_DEBUG=${ASYNC_DEBUG:-0}

Expand Down Expand Up @@ -37,19 +37,27 @@ _async_job() {
# block, after the command block has completed, the stdin for `cat` is
# closed, causing stderr to be appended with a $'\0' at the end to mark the
# end of output from this job.
local jobname=${ASYNC_JOB_NAME:-$1}
local stdout stderr ret tok
{
stdout=$(eval "$@")
ret=$?
duration=$(( EPOCHREALTIME - duration )) # Calculate duration.
local jobname=${ASYNC_JOB_NAME:-$1} out
out="$(
local stdout stderr ret tok
{
stdout=$(eval "$@")
ret=$?
duration=$(( EPOCHREALTIME - duration )) # Calculate duration.
print -r -n - $'\0'${(q)jobname} $ret ${(q)stdout} $duration
} 2> >(stderr=$(cat) && print -r -n - " "${(q)stderr}$'\0')
)"
if [[ $out != $'\0'*$'\0' ]]; then
# Corrupted output (aborted job?), skipping.
return
fi

# Grab mutex lock, stalls until token is available.
read -r -k 1 -p tok || exit 1
# Grab mutex lock, stalls until token is available.
read -r -k 1 -p tok || return 1

# Return output (<job_name> <return_code> <stdout> <duration> <stderr>).
print -r -n - $'\0'${(q)jobname} $ret ${(q)stdout} $duration
} 2> >(stderr=$(cat) && print -r -n - " "${(q)stderr}$'\0')
# Return output (<job_name> <return_code> <stdout> <duration> <stderr>).
print -r -n - "$out"

# Unlock mutex by inserting a token.
print -n -p $tok
Expand All @@ -73,10 +81,13 @@ _async_worker() {
# When a zpty is deleted (using -d) all the zpty instances created before
# the one being deleted receive a SIGHUP, unless we catch it, the async
# worker would simply exit (stop working) even though visible in the list
# of zpty's (zpty -L).
TRAPHUP() {
return 0 # Return 0, indicating signal was handled.
}
# of zpty's (zpty -L). This has been fixed around the time of Zsh 5.4
# (not released).
if ! is-at-least 5.4.1; then
TRAPHUP() {
return 0 # Return 0, indicating signal was handled.
}
fi

local -A storage
local unique=0
Expand Down Expand Up @@ -130,6 +141,23 @@ _async_worker() {
esac
done

# Terminate all running jobs, note that this function does not
# reinstall the child trap.
terminate_jobs() {
trap - CHLD # Ignore child exits during kill.
coproc : # Quit coproc.
coproc_pid=0 # Reset pid.

if is-at-least 5.4.1; then
trap '' HUP # Catch the HUP sent to this process.
kill -HUP -$$ # Send to entire process group.
trap - HUP # Disable HUP trap.
else
# We already handle HUP for Zsh < 5.4.1.
kill -HUP -$$ # Send to entire process group.
fi
}

killjobs() {
local tok
local -a pids
Expand All @@ -143,24 +171,34 @@ _async_worker() {
# process is in the middle of writing to stdin during kill.
(( coproc_pid )) && read -r -k 1 -p tok

kill -HUP -$$ # Send to entire process group.
coproc : # Quit coproc.
coproc_pid=0 # Reset pid.
terminate_jobs
trap child_exit CHLD # Reinstall child trap.
}

local request do_eval=0
local -a cmd
while :; do
# Wait for jobs sent by async_job.
read -r -d $'\0' request || {
# Since we handle SIGHUP above (and thus do not know when `zpty -d`)
# occurs, a failure to read probably indicates that stdin has
# closed. This is why we propagate the signal to all children and
# exit manually.
kill -HUP -$$ # Send SIGHUP to all jobs.
exit 0
# Unknown error occurred while reading from stdin, the zpty
# worker is likely in a broken state, so we shut down.
terminate_jobs

# Stdin is broken and in case this was an unintended
# crash, we try to report it as a last hurrah.
print -r -n $'\0'"'[async]'" $(( 127 + 3 )) "''" 0 "'$0:$LINENO: zpty fd died, exiting'"$'\0'

# We use `return` to abort here because using `exit` may
# result in an infinite loop that never exits and, as a
# result, high CPU utilization.
return $(( 127 + 1 ))
}

# We need to clean the input here because sometimes when a zpty
# has died and been respawned, messages will be prefixed with a
# carraige return (\r, or \C-M).
request=${request#$'\C-M'}

# Check for non-job commands sent to worker
case $request in
_unset_trap) notify_parent=0; continue;;
Expand All @@ -175,9 +213,11 @@ _async_worker() {
# Name of the job (first argument).
local job=$cmd[1]

# If worker should perform unique jobs
if (( unique )); then
# Check if a previous job is still running, if yes, let it finnish
# Check if a worker should perform unique jobs, unless
# this is an eval since they run synchronously.
if (( !do_eval )) && (( unique )); then
# Check if a previous job is still running, if yes,
# skip this job and let the previous one finish.
for pid in ${${(v)jobstates##*:*:}%\=*}; do
if [[ ${storage[$job]} == $pid ]]; then
continue 2
Expand Down Expand Up @@ -317,7 +357,7 @@ _async_zle_watcher() {
async_stop_worker $worker

if [[ -n $callback ]]; then
$callback '[async]' 2 "" 0 "$worker:zle -F $1 returned error $2" 0
$callback '[async]' 2 "" 0 "$0:$LINENO: error: fd for $worker failed: zle -F $1 returned error $2" 0
fi
return
fi;
Expand All @@ -327,6 +367,28 @@ _async_zle_watcher() {
fi
}

_async_send_job() {
setopt localoptions noshwordsplit noksharrays noposixidentifiers noposixstrings

local caller=$1
local worker=$2
shift 2

zpty -t $worker &>/dev/null || {
typeset -gA ASYNC_PTYS ASYNC_CALLBACKS
local callback=$ASYNC_CALLBACKS[$worker]

if [[ -n $callback ]]; then
$callback '[async]' 3 "" 0 "$0:$LINENO: error: no such worker: $worker" 0
else
print -u2 "$caller: no such async worker: $worker"
fi
return 1
}

zpty -w $worker "$@"$'\0'
}

#
# Start a new asynchronous job on specified worker, assumes the worker is running.
#
Expand All @@ -344,8 +406,7 @@ async_job() {
cmd=(${(q)cmd}) # Quote special characters in multi argument commands.
fi

# Quote the cmd in case RC_EXPAND_PARAM is set.
zpty -w $worker "$cmd"$'\0'
_async_send_job $0 $worker "$cmd"
}

#
Expand All @@ -369,7 +430,7 @@ async_worker_eval() {
fi

# Quote the cmd in case RC_EXPAND_PARAM is set.
zpty -w $worker "_async_eval $cmd"$'\0'
_async_send_job $0 $worker "_async_eval $cmd"
}

# This function traps notification signals and calls all registered callbacks
Expand Down Expand Up @@ -494,7 +555,7 @@ async_start_worker() {
# Re-enable it if it was enabled, for debugging.
(( has_xtrace )) && setopt xtrace

if [[ $ZSH_VERSION < 5.0.8 ]]; then
if ! is-at-least 5.0.8; then
# For ZSH versions older than 5.0.8 we delay a bit to give
# time for the worker to start before issuing commands,
# otherwise it will not be ready to receive them.
Expand Down Expand Up @@ -556,6 +617,9 @@ async_init() {
zmodload zsh/zpty
zmodload zsh/datetime

# Load is-at-least for reliable version check.
autoload -Uz is-at-least

# Check if zsh/zpty returns a file descriptor or not,
# shell must also be interactive with zle enabled.
typeset -g ASYNC_ZPTY_RETURNS_FD=0
Expand Down
14 changes: 0 additions & 14 deletions async_test.zsh
Original file line number Diff line number Diff line change
Expand Up @@ -139,20 +139,6 @@ test_async_process_results_stress() {
integer iter=40 timeout=5
for i in {1..$iter}; do
async_job test "print -n $i"

# TODO: Figure out how we can remove sleep & process here.

# If we do not sleep here, we end up losing some of the commands sent to
# async_job (~90 get sent). This could possibly be due to the zpty
# buffer being full (see below).
sleep 0.00001
# Without processing resuls we occasionally run into 'print -n 39'
# failing due to the command name and exit status missing. Sample output
# from processing for 39 (stdout, time, stderr):
# $'39 0.0056798458 '
# This is again, probably due to the zpty buffer being full, we only
# need to ensure that not too many commands are run before we process.
(( iter % 6 == 0 )) && async_process_results test cb
done

float start=$EPOCHSECONDS
Expand Down
6 changes: 5 additions & 1 deletion test.zsh
Original file line number Diff line number Diff line change
Expand Up @@ -240,9 +240,10 @@ run_test_module() {
}

cleanup() {
trap '' HUP
kill -HUP -$$ 2>/dev/null
trap - HUP
kill -HUP $$ 2>/dev/null
kill -HUP -$$ 2>/dev/null
}

trap cleanup EXIT INT HUP QUIT TERM USR1
Expand All @@ -260,4 +261,7 @@ for tf in ${~TEST_GLOB}/*_test.(zsh|sh); do
(( $? )) && failed=1
done

trap - EXIT
trap '' HUP
kill -HUP -$$ 2>/dev/null
exit $failed

0 comments on commit 32548d3

Please sign in to comment.