Skip to content

Commit

Permalink
Composed process termination differently.
Browse files Browse the repository at this point in the history
Now there is way to terminate process wrapper when data already passed
to backlog buffer.

Removed extra "close" calls and not-needed demuxing
"subscribe/unsubscribe" feature.
  • Loading branch information
Alex Sergeyev committed Oct 2, 2014
1 parent c61236b commit cb91431
Show file tree
Hide file tree
Showing 3 changed files with 48 additions and 60 deletions.
2 changes: 1 addition & 1 deletion libwebsocketd/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ func (wsh *WebsocketdHandler) accept(ws *websocket.Conn, log *LogScope) {
}

/// we need to unsubscribe as soon as we done.
defer p.Unsubscribe()
defer p.Terminate()

// send websocket data to process
input := make(chan string)
Expand Down
104 changes: 45 additions & 59 deletions libwebsocketd/process.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,43 +21,26 @@ var ErrNoConsumers = errors.New("All consumers are gone")
var ErrProcessFinished = errors.New("Process already finished")
var ErrUnknownConsumer = errors.New("No consumer to unsubscribe")

// RcvrTimeout is a very short duration to determine if subscriber is unable to process data quickly enough.
// Zero is not practical because it would cause packet receiving to block while OS passes data via Pipe to process.
var RcvrTimeout = time.Second * 10

// StdoutBufSize is a size to limit max amount of data read from process and stored inside of Websocketd process
var StdoutBufSize = 10 * 1024 * 1024
var RcvrTimeout = time.Second * 5
var StdoutBufSize int64 = 1024 * 1024
var StdoutBufLines int64 = 10000

// ExternalProcess holds info about running process and sends info to subscribers using channels
type ExternalProcess struct {
cmd *exec.Cmd

in io.WriteCloser
inmux *sync.Mutex

terminating int32

log *LogScope
cmd *exec.Cmd
in io.WriteCloser
mux *sync.Mutex
terminating chan int
log *LogScope
}

func (p *ExternalProcess) wait() {
atomic.StoreInt32(&p.terminating, 1)
p.cmd.Wait()

// if l := len(p.consumers); l > 0 {
// p.log.Trace("process", "Closing %d consumer channels", l)
// for _, x := range p.consumers {
// close(x)
// }
// }
// p.consumers = nil

p.log.Debug("process", "Process completed, status: %s", p.cmd.ProcessState.String())
}

// LaunchProcess initializes ExternalProcess struct fields. Command pipes for standard input/output are established and first consumer channel is returned.
func LaunchProcess(cmd *exec.Cmd, log *LogScope) (*ExternalProcess, <-chan string, error) {
// TODO: Investigate alternative approaches. exec.Cmd uses real OS pipes which spends new filehandler each.
stdout, err := cmd.StdoutPipe()
if err != nil {
log.Debug("process", "Unable to create p")
Expand All @@ -81,8 +64,8 @@ func LaunchProcess(cmd *exec.Cmd, log *LogScope) (*ExternalProcess, <-chan strin
p := &ExternalProcess{
cmd: cmd,
in: stdin,
inmux: new(sync.Mutex),
terminating: 0,
mux: new(sync.Mutex),
terminating: make(chan int),
log: log,
}
log.Associate("pid", strconv.Itoa(p.Pid()))
Expand All @@ -100,20 +83,27 @@ func LaunchProcess(cmd *exec.Cmd, log *LogScope) (*ExternalProcess, <-chan strin
// Terminate tries to stop process forcefully using interrupt and kill signals with a second of waiting time between them. If the kill is unsuccessful, it might be repeated
// again and again while system accepts those attempts.
func (p *ExternalProcess) Terminate() {
if p.cmd.ProcessState == nil {
p.log.Debug("process", "Sending SIGINT to %d", p.Pid())

// wait for process completion in background and report to channel
term := make(chan int)
go func() { p.wait(); close(term) }()
// prevent double entrance to this subroutine...
p.mux.Lock()
defer p.mux.Unlock()

err := p.cmd.Process.Signal(os.Interrupt)
if err != nil {
p.log.Error("process", "could not send SIGINT %s", err)
if p.cmd.ProcessState == nil {
go func() { p.wait(); close(p.terminating) }()

select {
case <-p.terminating:
return
case <-time.After(time.Millisecond * 10):
p.log.Debug("process", "Sending SIGINT to %d", p.Pid())
err := p.cmd.Process.Signal(os.Interrupt)
if err != nil {
p.log.Error("process", "could not send SIGINT %s", err)
}
}

for {
select {
case <-term:
case <-p.terminating:
return
case <-time.After(time.Second):
p.log.Error("process", "process did not react to SIGINT, sending SIGKILL")
Expand All @@ -124,7 +114,6 @@ func (p *ExternalProcess) Terminate() {
}
}
}

}
}

Expand All @@ -133,21 +122,13 @@ func (e *ExternalProcess) Pid() int {
return e.cmd.Process.Pid
}

// Unubscribe signals back from the consumer and helps to finish process if output is quiet and all subscribers disconnected
func (p *ExternalProcess) Unsubscribe() (err error) {
p.log.Debug("process", "Receiver finished listening to process")
p.Terminate()

return err
}

// PassInput delivers particular string to the process, involves locking input channel
func (p *ExternalProcess) PassInput(s string) error {
if p.cmd.ProcessState != nil {
return ErrProcessFinished
}
p.inmux.Lock()
defer p.inmux.Unlock()
p.mux.Lock()
defer p.mux.Unlock()
_, err := io.WriteString(p.in, s+"\n")
if err != nil {
p.log.Info("process", "Unable to write string to a process: %s", err)
Expand All @@ -159,19 +140,26 @@ func (p *ExternalProcess) PassInput(s string) error {

// process_stdout is a background function that reads from process and muxes output to all subscribed channels
func (p *ExternalProcess) process_stdout(r io.ReadCloser, c chan string) {
bsize, backlog := int64(0), make(chan string, StdoutBufSize/100)
bsize, backlog := int64(0), make(chan string, StdoutBufLines)

wg := sync.WaitGroup{}
wg.Add(1)

go func() {
LOOP:
for s := range backlog {
select {
case c <- s:
atomic.AddInt64(&bsize, int64(-len(s)))
l := len(s)
p.log.Trace("process", "Sent %d bytes to websocket handler", l)
atomic.AddInt64(&bsize, int64(-l))
case <-p.terminating:
p.log.Trace("process", "Websocket handler connection was terminated...")
break LOOP
case <-time.After(RcvrTimeout):
p.Terminate()
break
p.log.Trace("process", "Websocket handler timed out with %d messages in queue (%d bytes), terminating...", len(backlog), bsize)
r.Close()
break LOOP
}
}
close(c)
Expand All @@ -184,20 +172,19 @@ func (p *ExternalProcess) process_stdout(r io.ReadCloser, c chan string) {
if str != "" {
str = trimEOL(str)
backlog <- str
atomic.AddInt64(&bsize, int64(len(str)))
if sz := atomic.AddInt64(&bsize, int64(len(str))); sz > StdoutBufSize {
p.log.Trace("process", "Websocket handler did not process %d messages (%d bytes), terminating...", len(backlog), bsize)
break
}
}
if err != nil {
p.log.Debug("process", "STDOUT stream ended: %s", err)
break
}
}
close(backlog)
r.Close()
wg.Wait()

if p.terminating == 0 {
p.wait()
}
p.Terminate()
}

// process_stderr is a function to log process output to STDERR
Expand All @@ -214,7 +201,6 @@ func (p *ExternalProcess) process_stderr(r io.ReadCloser) {
break
}
}
r.Close()
}

// trimEOL cuts unixy style \n and windowsy style \r\n suffix from the string
Expand Down
2 changes: 2 additions & 0 deletions libwebsocketd/process_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ func TestSimpleEcho(t *testing.T) {
}

time.Sleep(10 * time.Millisecond)
ep.inmux.Lock()
defer ep.inmux.Unlock()
if ep.cmd.ProcessState == nil {
t.Error("Echo did not stop after sending the line")
}
Expand Down

0 comments on commit cb91431

Please sign in to comment.