Skip to content

Commit

Permalink
New attack on joewalnes#88. Added buffering.
Browse files Browse the repository at this point in the history
  • Loading branch information
Alex Sergeyev committed Oct 2, 2014
1 parent 95558d4 commit c61236b
Show file tree
Hide file tree
Showing 3 changed files with 60 additions and 199 deletions.
2 changes: 1 addition & 1 deletion libwebsocketd/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ func (wsh *WebsocketdHandler) accept(ws *websocket.Conn, log *LogScope) {
}

/// we need to unsubscribe as soon as we done.
defer p.Unsubscribe(output)
defer p.Unsubscribe()

// send websocket data to process
input := make(chan string)
Expand Down
160 changes: 45 additions & 115 deletions libwebsocketd/process.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,15 @@ var ErrUnknownConsumer = errors.New("No consumer to unsubscribe")

// RcvrTimeout is a very short duration to determine if subscriber is unable to process data quickly enough.
// Zero is not practical because it would cause packet receiving to block while OS passes data via Pipe to process.
var RcvrTimeout = time.Millisecond
var RcvrTimeout = time.Second * 10

// StdoutBufSize is a size to limit max amount of data read from process and stored inside of Websocketd process
var StdoutBufSize = 10 * 1024 * 1024

// ExternalProcess holds info about running process and sends info to subscribers using channels
type ExternalProcess struct {
cmd *exec.Cmd

consumers []chan string
cmux *sync.Mutex

in io.WriteCloser
inmux *sync.Mutex

Expand All @@ -44,13 +44,13 @@ func (p *ExternalProcess) wait() {
atomic.StoreInt32(&p.terminating, 1)
p.cmd.Wait()

if l := len(p.consumers); l > 0 {
p.log.Trace("process", "Closing %d consumer channels", l)
for _, x := range p.consumers {
close(x)
}
}
p.consumers = nil
// if l := len(p.consumers); l > 0 {
// p.log.Trace("process", "Closing %d consumer channels", l)
// for _, x := range p.consumers {
// close(x)
// }
// }
// p.consumers = nil

p.log.Debug("process", "Process completed, status: %s", p.cmd.ProcessState.String())
}
Expand Down Expand Up @@ -78,11 +78,8 @@ func LaunchProcess(cmd *exec.Cmd, log *LogScope) (*ExternalProcess, <-chan strin
return nil, nil, err
}

firstconsumer := make(chan string)
p := &ExternalProcess{
cmd: cmd,
consumers: []chan string{firstconsumer},
cmux: new(sync.Mutex),
in: stdin,
inmux: new(sync.Mutex),
terminating: 0,
Expand All @@ -91,11 +88,13 @@ func LaunchProcess(cmd *exec.Cmd, log *LogScope) (*ExternalProcess, <-chan strin
log.Associate("pid", strconv.Itoa(p.Pid()))
p.log.Trace("process", "Command started, first consumer channel created")

consumer := make(chan string)

// Run output listeners
go p.process_stdout(stdout)
go p.process_stdout(stdout, consumer)
go p.process_stderr(stderr)

return p, firstconsumer, nil
return p, consumer, nil
}

// Terminate tries to stop process forcefully using interrupt and kill signals with a second of waiting time between them. If the kill is unsuccessful, it might be repeated
Expand Down Expand Up @@ -134,100 +133,12 @@ func (e *ExternalProcess) Pid() int {
return e.cmd.Process.Pid
}

// Subscribe allows someone to open channel that contains process's stdout messages.
func (p *ExternalProcess) Subscribe() (<-chan string, error) {
p.cmux.Lock()
defer p.cmux.Unlock()
if p.consumers == nil {
return nil, ErrProcessFinished
}
p.log.Trace("process", "New consumer added")
c := make(chan string)
p.consumers = append(p.consumers, c)
return c, nil
}

// Unubscribe signals back from the consumer and helps to finish process if output is quiet and all subscribers disconnected
func (p *ExternalProcess) Unsubscribe(x <-chan string) (err error) {
p.cmux.Lock()
defer p.cmux.Unlock()

if p.consumers != nil {
// we did not terminate consumers yet
ln := len(p.consumers)
if ln == 1 {
// simple choice!
if p.consumers[0] == x {
p.log.Debug("process", "No receivers listen, last one unsubscribed")
p.Terminate()
} else {
err = ErrUnknownConsumer
}
} else {
for i, m := range p.consumers {
if m == x {
p.log.Trace("process", "Process subscriber unsubscribed leaving %d to listen", ln-1)
close(m)
copy(p.consumers[i:], p.consumers[i+1:])
p.consumers = p.consumers[:ln-1]
break
}
}
// error if nothing changed
if len(p.consumers) == ln {
err = ErrUnknownConsumer
}
}
} else {
err = ErrNoConsumers
}
return err
}

// demux_content delivers particular string to all consumers
func (p *ExternalProcess) demux_content(s string) error {
p.cmux.Lock()
defer p.cmux.Unlock()
func (p *ExternalProcess) Unsubscribe() (err error) {
p.log.Debug("process", "Receiver finished listening to process")
p.Terminate()

ln := len(p.consumers)
alive := make([]bool, ln)

// Idea here is to run parallel send to consumers with same timeout.
// All of those sends will put their result into same pre-allocated slice.
// This could be changed to a channel later to avoid blocking.
wg := sync.WaitGroup{}
for i := range p.consumers {
wg.Add(1)
go func(i int) {
select {
case p.consumers[i] <- s:
p.log.Trace("process", "Sent process output %#v to %v", s, i)
alive[i] = true
case <-time.After(RcvrTimeout):
// consumer cannot receive data, removing it (note, sometimes it's ok to have small delay)
p.log.Debug("process", "Dropped message '%s' to consumer %d, closing it", s, i)
}
wg.Done()
}(i)
}
wg.Wait()

d := 0 // counter for deletions
for j := 0; j < ln; j++ {
if !alive[j] {
i := j - d
close(p.consumers[i])
copy(p.consumers[i:], p.consumers[i+1:])
d++
p.consumers = p.consumers[:ln-d]
}
}

if d == ln { // all consumers gone
return ErrNoConsumers
} else {
return nil
}
return err
}

// PassInput delivers particular string to the process, involves locking input channel
Expand All @@ -247,24 +158,43 @@ func (p *ExternalProcess) PassInput(s string) error {
}

// process_stdout is a background function that reads from process and muxes output to all subscribed channels
func (p *ExternalProcess) process_stdout(r io.ReadCloser) {
func (p *ExternalProcess) process_stdout(r io.ReadCloser, c chan string) {
bsize, backlog := int64(0), make(chan string, StdoutBufSize/100)

wg := sync.WaitGroup{}
wg.Add(1)

go func() {
for s := range backlog {
select {
case c <- s:
atomic.AddInt64(&bsize, int64(-len(s)))
case <-time.After(RcvrTimeout):
p.Terminate()
break
}
}
close(c)
wg.Done()
}()

buf := bufio.NewReader(r)
for {
str, err := buf.ReadString('\n')

str = trimEOL(str)
if str != "" {
snderr := p.demux_content(str)
if snderr != nil {
break
}
str = trimEOL(str)
backlog <- str
atomic.AddInt64(&bsize, int64(len(str)))
}
if err != nil {
p.log.Debug("process", "STDOUT stream ended: %s", err)
break
}
}
close(backlog)
r.Close()
wg.Wait()

if p.terminating == 0 {
p.wait()
}
Expand Down
97 changes: 14 additions & 83 deletions libwebsocketd/process_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,18 +28,6 @@ func launchHelper(t *testing.T, args ...string) (*ExternalProcess, <-chan string
return ep, ch
}

func launchHelpers(t *testing.T, args ...string) (*ExternalProcess, <-chan string, <-chan string) {
ep, c1 := launchHelper(t, args...)
if ep != nil {
c2, err := ep.Subscribe()
if err == nil {
return ep, c1, c2
}
t.Fatal("Cannot join second subscriber")
}
return nil, nil, nil
}

func TestEarlyTerminate(t *testing.T) {
ep, _ := launchHelper(t, "cat")
ep.Terminate()
Expand Down Expand Up @@ -73,56 +61,6 @@ func TestSimpleEcho(t *testing.T) {
}
}

func TestTwoReceiversEcho(t *testing.T) {
ep, c1, c2 := launchHelpers(t, "echo", "foo bar", "baz")

var wg sync.WaitGroup
wg.Add(2)

comp := func(c <-chan string) {
if s := chanEq(c, "foo bar baz"); s != nil {
t.Errorf("Invalid echo result in one of the two receivers %s", s)
}
wg.Done()
}
go comp(c1)
go comp(c2)
wg.Wait()

time.Sleep(10 * time.Millisecond)
if ep.cmd.ProcessState == nil {
t.Error("Echo did not stop after sending the line")
}
}

func TestTwoReceiversOneStoppedEcho(t *testing.T) {
ep, c1, c2 := launchHelpers(t, "echo", "foo bar", "baz")

ep.Unsubscribe(c1)

if s := chanEq(c2, "foo bar baz"); s != nil {
t.Errorf("Invalid echo result in one of the two receivers %s", s)
}

time.Sleep(10 * time.Millisecond)
if ep.cmd.ProcessState == nil {
t.Error("Echo did not stop after sending the line")
}

}

func TestTwoReceiversBothStoppedEcho(t *testing.T) {
ep, c1, c2 := launchHelpers(t, "echo", "foo bar", "baz")

ep.Unsubscribe(c1)
ep.Unsubscribe(c2)

time.Sleep(10 * time.Millisecond)
if ep.cmd.ProcessState == nil {
t.Error("Echo did not stop after both receivers unsubscribed")
}
}

func TestSimpleCat(t *testing.T) {
ep, c := launchHelper(t, "cat")

Expand Down Expand Up @@ -155,34 +93,27 @@ func TestSimpleCat(t *testing.T) {
}
}

func TestConcurrentCat(t *testing.T) {
ep, c1 := launchHelper(t, "cat")
func TestSlowCat(t *testing.T) {
ep, c := launchHelper(t, "cat")

var wg sync.WaitGroup
wg.Add(2)
go func() {
if s := chanEq(c1, "foo bar", "foo baz", "foo bam"); s != nil {
t.Errorf("Invalid cat result in one of the two receivers %s", s)
}
wg.Done()
}()

ep.PassInput("foo bar")
time.Sleep(10 * time.Millisecond)
wg.Add(1)

c2, err := ep.Subscribe()
if err != nil {
t.Fatal("Cannot join second subscriber to cat")
}
go func() {
if s := chanEq(c2, "foo baz", "foo bam"); s != nil {
t.Errorf("Invalid cat result in one of the two receivers %s", s)
defer wg.Done()
var check string
for i := 0; i < 3; i++ {
s, ok := <-c
if ok {
check += s + "\n"
}
}
if check != "foo bar\nfoo baz\nfoo bam\n" {
t.Errorf("Invalid cat result %#v", check)
}
wg.Done()
}()

go ep.PassInput("foo baz")
go ep.PassInput("foo bam")
ep.PassInput("foo bar\nfoo baz\nfoo bam")

wg.Wait()

Expand Down

0 comments on commit c61236b

Please sign in to comment.