Skip to content

Commit

Permalink
Merge pull request joewalnes#90 from asergeyev/master
Browse files Browse the repository at this point in the history
Follow up on joewalnes#88, introducing ways to handle big messages and slow network.
  • Loading branch information
joewalnes committed Oct 2, 2014
2 parents 139717d + 6524f12 commit 2376a79
Show file tree
Hide file tree
Showing 4 changed files with 103 additions and 257 deletions.
27 changes: 22 additions & 5 deletions libwebsocketd/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,12 +60,29 @@ func NewWebsocketdHandler(s *WebsocketdServer, req *http.Request, log *LogScope)
// wshandler returns function that executes code with given log context
func (wsh *WebsocketdHandler) wshandler(log *LogScope) websocket.Handler {
return websocket.Handler(func(ws *websocket.Conn) {
wsh.accept(ws, log)
wsh.accept(&WebsocketWrapper{ws}, log)
})
}

type wsConn interface {
Close() error
Receive(*string) error
Send(string) error
}

type WebsocketWrapper struct {
*websocket.Conn
}

func (ww *WebsocketWrapper) Receive(ptr *string) error {
return websocket.Message.Receive(ww.Conn, ptr)
}
func (ww *WebsocketWrapper) Send(s string) error {
return websocket.Message.Send(ww.Conn, s)
}

// accept connects process and websocket.
func (wsh *WebsocketdHandler) accept(ws *websocket.Conn, log *LogScope) {
func (wsh *WebsocketdHandler) accept(ws wsConn, log *LogScope) {
defer ws.Close()

log.Access("handler", "CONNECT")
Expand All @@ -78,14 +95,14 @@ func (wsh *WebsocketdHandler) accept(ws *websocket.Conn, log *LogScope) {
}

/// we need to unsubscribe as soon as we done.
defer p.Unsubscribe(output)
defer p.Terminate()

// send websocket data to process
input := make(chan string)
go func() {
for {
var msg string
err := websocket.Message.Receive(ws, &msg)
err := ws.Receive(&msg)
if err != nil {
close(input)
return
Expand All @@ -112,7 +129,7 @@ func (wsh *WebsocketdHandler) accept(ws *websocket.Conn, log *LogScope) {
log.Trace("handler", "Process stopped producing results")
return
}
err = websocket.Message.Send(ws, str)
err = ws.Send(str)
if err != nil {
log.Trace("handler", "Process data cannot be passed to websocket due to %s", err)
return
Expand Down
12 changes: 12 additions & 0 deletions libwebsocketd/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,3 +100,15 @@ func TestParsePathExplicitScript(t *testing.T) {
t.Error("filePath")
}
}

func TestHandlerBasics(t *testing.T) {
wh := WebsocketdHandler{
server: nil,
Id: "",
RemoteInfo: &RemoteInfo{"", "", ""},
URLInfo: &URLInfo{"", "", ""},
Env: []string{},
command: "/bin/echo",
}
logger_helper(t.Log)
}
218 changes: 67 additions & 151 deletions libwebsocketd/process.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,43 +21,26 @@ var ErrNoConsumers = errors.New("All consumers are gone")
var ErrProcessFinished = errors.New("Process already finished")
var ErrUnknownConsumer = errors.New("No consumer to unsubscribe")

// RcvrTimeout is a very short duration to determine if subscriber is unable to process data quickly enough.
// Zero is not practical because it would cause packet receiving to block while OS passes data via Pipe to process.
var RcvrTimeout = time.Millisecond
var RcvrTimeout = time.Second * 5
var StdoutBufSize int64 = 1024 * 1024
var StdoutBufLines int64 = 10000

// ExternalProcess holds info about running process and sends info to subscribers using channels
type ExternalProcess struct {
cmd *exec.Cmd

consumers []chan string
cmux *sync.Mutex

in io.WriteCloser
inmux *sync.Mutex

terminating int32

log *LogScope
cmd *exec.Cmd
in io.WriteCloser
mux *sync.Mutex
terminating chan int
log *LogScope
}

func (p *ExternalProcess) wait() {
atomic.StoreInt32(&p.terminating, 1)
p.cmd.Wait()

if l := len(p.consumers); l > 0 {
p.log.Trace("process", "Closing %d consumer channels", l)
for _, x := range p.consumers {
close(x)
}
}
p.consumers = nil

p.log.Debug("process", "Process completed, status: %s", p.cmd.ProcessState.String())
}

// LaunchProcess initializes ExternalProcess struct fields. Command pipes for standard input/output are established and first consumer channel is returned.
func LaunchProcess(cmd *exec.Cmd, log *LogScope) (*ExternalProcess, <-chan string, error) {
// TODO: Investigate alternative approaches. exec.Cmd uses real OS pipes which spends new filehandler each.
stdout, err := cmd.StdoutPipe()
if err != nil {
log.Debug("process", "Unable to create p")
Expand All @@ -78,43 +61,49 @@ func LaunchProcess(cmd *exec.Cmd, log *LogScope) (*ExternalProcess, <-chan strin
return nil, nil, err
}

firstconsumer := make(chan string)
p := &ExternalProcess{
cmd: cmd,
consumers: []chan string{firstconsumer},
cmux: new(sync.Mutex),
in: stdin,
inmux: new(sync.Mutex),
terminating: 0,
mux: new(sync.Mutex),
terminating: make(chan int),
log: log,
}
log.Associate("pid", strconv.Itoa(p.Pid()))
p.log.Trace("process", "Command started, first consumer channel created")

consumer := make(chan string)

// Run output listeners
go p.process_stdout(stdout)
go p.process_stdout(stdout, consumer)
go p.process_stderr(stderr)

return p, firstconsumer, nil
return p, consumer, nil
}

// Terminate tries to stop process forcefully using interrupt and kill signals with a second of waiting time between them. If the kill is unsuccessful, it might be repeated
// again and again while system accepts those attempts.
func (p *ExternalProcess) Terminate() {
if p.cmd.ProcessState == nil {
p.log.Debug("process", "Sending SIGINT to %d", p.Pid())

// wait for process completion in background and report to channel
term := make(chan int)
go func() { p.wait(); close(term) }()
// prevent double entrance to this subroutine...
p.mux.Lock()
defer p.mux.Unlock()

err := p.cmd.Process.Signal(os.Interrupt)
if err != nil {
p.log.Error("process", "could not send SIGINT %s", err)
if p.cmd.ProcessState == nil {
go func() { p.wait(); close(p.terminating) }()

select {
case <-p.terminating:
return
case <-time.After(time.Millisecond * 10):
p.log.Debug("process", "Sending SIGINT to %d", p.Pid())
err := p.cmd.Process.Signal(os.Interrupt)
if err != nil {
p.log.Error("process", "could not send SIGINT %s", err)
}
}

for {
select {
case <-term:
case <-p.terminating:
return
case <-time.After(time.Second):
p.log.Error("process", "process did not react to SIGINT, sending SIGKILL")
Expand All @@ -125,7 +114,6 @@ func (p *ExternalProcess) Terminate() {
}
}
}

}
}

Expand All @@ -134,109 +122,13 @@ func (e *ExternalProcess) Pid() int {
return e.cmd.Process.Pid
}

// Subscribe allows someone to open channel that contains process's stdout messages.
func (p *ExternalProcess) Subscribe() (<-chan string, error) {
p.cmux.Lock()
defer p.cmux.Unlock()
if p.consumers == nil {
return nil, ErrProcessFinished
}
p.log.Trace("process", "New consumer added")
c := make(chan string)
p.consumers = append(p.consumers, c)
return c, nil
}

// Unubscribe signals back from the consumer and helps to finish process if output is quiet and all subscribers disconnected
func (p *ExternalProcess) Unsubscribe(x <-chan string) (err error) {
p.cmux.Lock()
defer p.cmux.Unlock()

if p.consumers != nil {
// we did not terminate consumers yet
ln := len(p.consumers)
if ln == 1 {
// simple choice!
if p.consumers[0] == x {
p.log.Debug("process", "No receivers listen, last one unsubscribed")
p.Terminate()
} else {
err = ErrUnknownConsumer
}
} else {
for i, m := range p.consumers {
if m == x {
p.log.Trace("process", "Process subscriber unsubscribed leaving %d to listen", ln-1)
close(m)
copy(p.consumers[i:], p.consumers[i+1:])
p.consumers = p.consumers[:ln-1]
break
}
}
// error if nothing changed
if len(p.consumers) == ln {
err = ErrUnknownConsumer
}
}
} else {
err = ErrNoConsumers
}
return err
}

// demux_content delivers particular string to all consumers
func (p *ExternalProcess) demux_content(s string) error {
p.cmux.Lock()
defer p.cmux.Unlock()

ln := len(p.consumers)
alive := make([]bool, ln)

// Idea here is to run parallel send to consumers with same timeout.
// All of those sends will put their result into same pre-allocated slice.
// This could be changed to a channel later to avoid blocking.
wg := sync.WaitGroup{}
for i := range p.consumers {
wg.Add(1)
go func(i int) {
select {
case p.consumers[i] <- s:
p.log.Trace("process", "Sent process output %#v to %v", s, i)
alive[i] = true
case <-time.After(RcvrTimeout):
// consumer cannot receive data, removing it (note, sometimes it's ok to have small delay)
p.log.Debug("process", "Dropped message '%s' to consumer %d, closing it", s, i)
}
wg.Done()
}(i)
}
wg.Wait()

d := 0 // counter for deletions
for j := 0; j < ln; j++ {
if !alive[j] {
i := j - d
close(p.consumers[i])
copy(p.consumers[i:], p.consumers[i+1:])
d++
p.consumers = p.consumers[:ln-d]
}
}

if d == ln { // all consumers gone
return ErrNoConsumers
} else {
return nil
}
}

// PassInput delivers particular string to the process, involves locking input channel
func (p *ExternalProcess) PassInput(s string) error {
if p.cmd.ProcessState != nil {
return ErrProcessFinished
}
p.inmux.Lock()
defer p.inmux.Unlock()
p.mux.Lock()
defer p.mux.Unlock()
_, err := io.WriteString(p.in, s+"\n")
if err != nil {
p.log.Info("process", "Unable to write string to a process: %s", err)
Expand All @@ -247,15 +139,41 @@ func (p *ExternalProcess) PassInput(s string) error {
}

// process_stdout is a background function that reads from process and muxes output to all subscribed channels
func (p *ExternalProcess) process_stdout(r io.ReadCloser) {
func (p *ExternalProcess) process_stdout(r io.ReadCloser, c chan string) {
bsize, backlog := int64(0), make(chan string, StdoutBufLines)

wg := sync.WaitGroup{}
wg.Add(1)

go func() {
LOOP:
for s := range backlog {
select {
case c <- s:
l := len(s)
p.log.Trace("process", "Sent %d bytes to websocket handler", l)
atomic.AddInt64(&bsize, int64(-l))
case <-p.terminating:
p.log.Trace("process", "Websocket handler connection was terminated...")
break LOOP
case <-time.After(RcvrTimeout):
p.log.Trace("process", "Websocket handler timed out with %d messages in queue (%d bytes), terminating...", len(backlog), bsize)
r.Close()
break LOOP
}
}
close(c)
wg.Done()
}()

buf := bufio.NewReader(r)
for {
str, err := buf.ReadString('\n')

str = trimEOL(str)
if str != "" {
snderr := p.demux_content(str)
if snderr != nil {
str = trimEOL(str)
backlog <- str
if sz := atomic.AddInt64(&bsize, int64(len(str))); sz > StdoutBufSize {
p.log.Trace("process", "Websocket handler did not process %d messages (%d bytes), terminating...", len(backlog), bsize)
break
}
}
Expand All @@ -264,10 +182,9 @@ func (p *ExternalProcess) process_stdout(r io.ReadCloser) {
break
}
}
r.Close()
if p.terminating == 0 {
p.wait()
}
close(backlog)
wg.Wait()
p.Terminate()
}

// process_stderr is a function to log process output to STDERR
Expand All @@ -284,7 +201,6 @@ func (p *ExternalProcess) process_stderr(r io.ReadCloser) {
break
}
}
r.Close()
}

// trimEOL cuts unixy style \n and windowsy style \r\n suffix from the string
Expand Down
Loading

0 comments on commit 2376a79

Please sign in to comment.