From 98d35c049511a2dc0ff6e2dcc9ffb41c9347cf68 Mon Sep 17 00:00:00 2001 From: Joe Walnes Date: Thu, 2 Oct 2014 13:51:36 -0500 Subject: [PATCH] Rolled back to stable --- config.go | 1 - libwebsocketd/endpoint.go | 33 +++ libwebsocketd/endpoint_test.go | 73 +++++++ libwebsocketd/handler.go | 93 ++------- libwebsocketd/handler_test.go | 16 +- libwebsocketd/http.go | 15 +- libwebsocketd/http_test.go | 4 +- libwebsocketd/launcher.go | 45 +++++ libwebsocketd/logscope.go | 15 +- libwebsocketd/logscope_test.go | 21 -- libwebsocketd/process.go | 216 -------------------- libwebsocketd/process_endpoint.go | 106 ++++++++++ libwebsocketd/process_test.go | 302 ---------------------------- libwebsocketd/websocket_endpoint.go | 60 ++++++ main.go | 3 +- 15 files changed, 352 insertions(+), 651 deletions(-) create mode 100644 libwebsocketd/endpoint.go create mode 100644 libwebsocketd/endpoint_test.go create mode 100644 libwebsocketd/launcher.go delete mode 100644 libwebsocketd/logscope_test.go delete mode 100644 libwebsocketd/process.go create mode 100644 libwebsocketd/process_endpoint.go delete mode 100644 libwebsocketd/process_test.go create mode 100644 libwebsocketd/websocket_endpoint.go diff --git a/config.go b/config.go index 88476569..f135330a 100644 --- a/config.go +++ b/config.go @@ -112,7 +112,6 @@ func parseCommandLine() *Config { mainConfig.Addr = []string{fmt.Sprintf(":%d", port)} } mainConfig.MaxForks = *maxForksFlag - mainConfig.BasePath = *basePathFlag mainConfig.LogLevel = libwebsocketd.LevelFromString(*logLevelFlag) if mainConfig.LogLevel == libwebsocketd.LogUnknown { diff --git a/libwebsocketd/endpoint.go b/libwebsocketd/endpoint.go new file mode 100644 index 00000000..4e418805 --- /dev/null +++ b/libwebsocketd/endpoint.go @@ -0,0 +1,33 @@ +// Copyright 2013 Joe Walnes and the websocketd team. +// All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package libwebsocketd + +type Endpoint interface { + StartReading() + Terminate() + Output() chan string + Send(string) bool +} + +func PipeEndpoints(e1, e2 Endpoint) { + e1.StartReading() + e2.StartReading() + + defer e1.Terminate() + defer e2.Terminate() + for { + select { + case msgOne, ok := <-e1.Output(): + if !ok || !e2.Send(msgOne) { + return + } + case msgTwo, ok := <-e2.Output(): + if !ok || !e1.Send(msgTwo) { + return + } + } + } +} diff --git a/libwebsocketd/endpoint_test.go b/libwebsocketd/endpoint_test.go new file mode 100644 index 00000000..142f2953 --- /dev/null +++ b/libwebsocketd/endpoint_test.go @@ -0,0 +1,73 @@ +package libwebsocketd + +import ( + "strconv" + "testing" + "time" +) + +var eol_tests = []string{ + "", "\n", "\r\n", "ok\n", "ok\n", + "quite long string for our test\n", + "quite long string for our test\r\n", +} + +var eol_answers = []string{ + "", "", "", "ok", "ok", + "quite long string for our test", "quite long string for our test", +} + +func TestTrimEOL(t *testing.T) { + for n := 0; n < len(eol_tests); n++ { + answ := trimEOL(eol_tests[n]) + if answ != eol_answers[n] { + t.Errorf("Answer '%s' did not match predicted '%s'", answ, eol_answers[n]) + } + } +} + +func BenchmarkTrimEOL(b *testing.B) { + for n := 0; n < b.N; n++ { + trimEOL(eol_tests[n%len(eol_tests)]) + } +} + +type TestEndpoint struct { + limit int + prefix string + c chan string + result []string +} + +func (e *TestEndpoint) StartReading() { + go func() { + for i := 0; i < e.limit; i++ { + e.c <- e.prefix + strconv.Itoa(i) + } + time.Sleep(time.Millisecond) // should be enough for smaller channel to catch up with long one + close(e.c) + }() +} + +func (e *TestEndpoint) Terminate() { +} + +func (e *TestEndpoint) Output() chan string { + return e.c +} + +func (e *TestEndpoint) Send(msg string) bool { + e.result = append(e.result, msg) + return true +} + +func TestEndpointPipe(t *testing.T) { + one := &TestEndpoint{2, "one:", make(chan string), make([]string, 0)} + two := &TestEndpoint{4, "two:", make(chan string), make([]string, 0)} + PipeEndpoints(one, two) + if len(one.result) != 4 || len(two.result) != 2 { + t.Errorf("Invalid lengths, should be 4 and 2: %v %v", one.result, two.result) + } else if one.result[0] != "two:0" || two.result[0] != "one:0" { + t.Errorf("Invalid first results, should be two:0 and one:0: %#v %#v", one.result[0], two.result[0]) + } +} diff --git a/libwebsocketd/handler.go b/libwebsocketd/handler.go index 0b1fd96d..79afe223 100644 --- a/libwebsocketd/handler.go +++ b/libwebsocketd/handler.go @@ -13,7 +13,7 @@ import ( "time" ) -var ErrScriptNotFound = errors.New("Script not found") +var ScriptNotFoundError = errors.New("script not found") // WebsocketdHandler is a single request information and processing structure, it handles WS requests out of all that daemon can handle (static, cgi, devconsole) type WebsocketdHandler struct { @@ -27,7 +27,7 @@ type WebsocketdHandler struct { command string } -// NewWebsocketdHandler constructs the handler struct. It also prepares *Info elements and generates process environment. +// NewWebsocketdHandler constructs the struct and parses all required things in it... func NewWebsocketdHandler(s *WebsocketdServer, req *http.Request, log *LogScope) (wsh *WebsocketdHandler, err error) { wsh = &WebsocketdHandler{server: s, Id: generateId()} log.Associate("id", wsh.Id) @@ -49,7 +49,6 @@ func NewWebsocketdHandler(s *WebsocketdServer, req *http.Request, log *LogScope) if s.Config.UsingScriptDir { wsh.command = wsh.URLInfo.FilePath } - log.Associate("command", wsh.command) wsh.Env = createEnv(wsh, req, log) @@ -60,88 +59,28 @@ func NewWebsocketdHandler(s *WebsocketdServer, req *http.Request, log *LogScope) // wshandler returns function that executes code with given log context func (wsh *WebsocketdHandler) wshandler(log *LogScope) websocket.Handler { return websocket.Handler(func(ws *websocket.Conn) { - wsh.accept(&WebsocketWrapper{ws}, log) + wsh.accept(ws, log) }) } -type wsConn interface { - Close() error - Receive(*string) error - Send(string) error -} - -type WebsocketWrapper struct { - *websocket.Conn -} - -func (ww *WebsocketWrapper) Receive(ptr *string) error { - return websocket.Message.Receive(ww.Conn, ptr) -} -func (ww *WebsocketWrapper) Send(s string) error { - return websocket.Message.Send(ww.Conn, s) -} - -// accept connects process and websocket. -func (wsh *WebsocketdHandler) accept(ws wsConn, log *LogScope) { +func (wsh *WebsocketdHandler) accept(ws *websocket.Conn, log *LogScope) { defer ws.Close() - log.Access("handler", "CONNECT") - defer log.Access("handler", "DISCONNECT") + log.Access("session", "CONNECT") + defer log.Access("session", "DISCONNECT") - p, output, err := wsh.server.launchServerProcess(wsh.command, wsh.Env, log) + launched, err := launchCmd(wsh.command, wsh.server.Config.CommandArgs, wsh.Env) if err != nil { - log.Error("handler", "Could not launch process %s %s (%s)", wsh.command, strings.Join(wsh.server.Config.CommandArgs, " "), err) + log.Error("process", "Could not launch process %s %s (%s)", wsh.command, strings.Join(wsh.server.Config.CommandArgs, " "), err) return } - /// we need to unsubscribe as soon as we done. - defer p.Terminate() - - // send websocket data to process - input := make(chan string) - go func() { - for { - var msg string - err := ws.Receive(&msg) - if err != nil { - close(input) - return - } - input <- msg - } - }() - - for { - select { - case msg, ok := <-input: - if ok { - err = p.PassInput(msg) - if err != nil { - log.Info("handler", "Dropping input message, process input returned %s", err) - } - } else { - log.Info("handler", "Websocket client closed connection....") - return - } - case str, ok := <-output: - if !ok { - // we might still be able to pass input from websocket to process - log.Trace("handler", "Process stopped producing results") - return - } - err = ws.Send(str) - if err != nil { - log.Trace("handler", "Process data cannot be passed to websocket due to %s", err) - return - } - case <-time.After(time.Millisecond * 100): - // check every 100ms if process has been finished - if p.cmd.ProcessState != nil { - log.Trace("handler", "Process ended") - return - } - } - } + log.Associate("pid", strconv.Itoa(launched.cmd.Process.Pid)) + + process := NewProcessEndpoint(launched, log) + wsEndpoint := NewWebSocketEndpoint(ws, log) + + PipeEndpoints(process, wsEndpoint) } // RemoteInfo holds information about remote http client @@ -195,12 +134,12 @@ func GetURLInfo(path string, config *Config) (*URLInfo, error) { // not a valid path if err != nil { - return nil, ErrScriptNotFound + return nil, ScriptNotFoundError } // at the end of url but is a dir if isLastPart && statInfo.IsDir() { - return nil, ErrScriptNotFound + return nil, ScriptNotFoundError } // we've hit a dir, carry on looking diff --git a/libwebsocketd/handler_test.go b/libwebsocketd/handler_test.go index 3cc9df15..557bfa4a 100644 --- a/libwebsocketd/handler_test.go +++ b/libwebsocketd/handler_test.go @@ -68,7 +68,7 @@ func TestParsePathWithScriptDir(t *testing.T) { if err == nil { t.Error("non-existing file should fail") } - if err != ErrScriptNotFound { + if err != ScriptNotFoundError { t.Error("should fail with script not found") } @@ -77,7 +77,7 @@ func TestParsePathWithScriptDir(t *testing.T) { if err == nil { t.Error("non-existing dir should fail") } - if err != ErrScriptNotFound { + if err != ScriptNotFoundError { t.Error("should fail with script not found") } } @@ -100,15 +100,3 @@ func TestParsePathExplicitScript(t *testing.T) { t.Error("filePath") } } - -func TestHandlerBasics(t *testing.T) { - wh := WebsocketdHandler{ - server: nil, - Id: "", - RemoteInfo: &RemoteInfo{"", "", ""}, - URLInfo: &URLInfo{"", "", ""}, - Env: []string{}, - command: "/bin/echo", - } - logger_helper(t.Log) -} diff --git a/libwebsocketd/http.go b/libwebsocketd/http.go index b5b061c8..259c392a 100644 --- a/libwebsocketd/http.go +++ b/libwebsocketd/http.go @@ -14,14 +14,13 @@ import ( "net/http/cgi" "net/url" "os" - "os/exec" "path" "path/filepath" "regexp" "strings" ) -var ErrForkNotAllowed = errors.New("Too many forks active") +var ForkNotAllowedError = errors.New("too many forks active") // WebsocketdServer presents http.Handler interface for requests libwebsocketd is handling. type WebsocketdServer struct { @@ -65,7 +64,7 @@ func (h *WebsocketdServer) ServeHTTP(w http.ResponseWriter, req *http.Request) { handler, err := NewWebsocketdHandler(h, req, log) if err != nil { - if err == ErrScriptNotFound { + if err == ScriptNotFoundError { log.Access("session", "NOT FOUND: %s", err) http.Error(w, "404 Not Found", 404) } else { @@ -160,7 +159,7 @@ func (h *WebsocketdServer) noteForkCreated() error { case h.forks <- 1: return nil default: - return ErrForkNotAllowed + return ForkNotAllowedError } } else { return nil @@ -182,14 +181,6 @@ func (h *WebsocketdServer) noteForkCompled() { return } -func (h *WebsocketdServer) launchServerProcess(command string, env []string, log *LogScope) (*ExternalProcess, <-chan string, error) { - cmd := exec.Command(command, h.Config.CommandArgs...) - cmd.Env = env - - log.Debug("process", "Starting %s", command) - return LaunchProcess(cmd, log) -} - func checkOrigin(wsconf *websocket.Config, req *http.Request, config *Config, log *LogScope) (err error) { // check for origin to be correct in future // handshaker triggers answering with 403 if error was returned diff --git a/libwebsocketd/http_test.go b/libwebsocketd/http_test.go index fb6e7b31..ae60b1a2 100644 --- a/libwebsocketd/http_test.go +++ b/libwebsocketd/http_test.go @@ -91,7 +91,9 @@ Sec-WebSocket-Version: 13 t.Fatal("request", err) } - log := logger_helper(t.Log) + log := new(LogScope) + log.LogFunc = func(*LogScope, LogLevel, string, string, string, ...interface{}) {} + wsconf := &websocket.Config{Version: websocket.ProtocolVersionHybi13} config := new(Config) diff --git a/libwebsocketd/launcher.go b/libwebsocketd/launcher.go new file mode 100644 index 00000000..ba685d0f --- /dev/null +++ b/libwebsocketd/launcher.go @@ -0,0 +1,45 @@ +// Copyright 2013 Joe Walnes and the websocketd team. +// All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package libwebsocketd + +import ( + "io" + "os/exec" +) + +type LaunchedProcess struct { + cmd *exec.Cmd + stdin io.WriteCloser + stdout io.ReadCloser + stderr io.ReadCloser +} + +func launchCmd(commandName string, commandArgs []string, env []string) (*LaunchedProcess, error) { + cmd := exec.Command(commandName, commandArgs...) + cmd.Env = env + + stdout, err := cmd.StdoutPipe() + if err != nil { + return nil, err + } + + stderr, err := cmd.StderrPipe() + if err != nil { + return nil, err + } + + stdin, err := cmd.StdinPipe() + if err != nil { + return nil, err + } + + err = cmd.Start() + if err != nil { + return nil, err + } + + return &LaunchedProcess{cmd, stdin, stdout, stderr}, err +} diff --git a/libwebsocketd/logscope.go b/libwebsocketd/logscope.go index a95de965..9ebf1536 100644 --- a/libwebsocketd/logscope.go +++ b/libwebsocketd/logscope.go @@ -7,6 +7,7 @@ package libwebsocketd import ( "sync" + "time" ) type LogLevel int @@ -29,17 +30,17 @@ type LogScope struct { Parent *LogScope // Parent scope MinLevel LogLevel // Minimum log level to write out. Mutex *sync.Mutex // Should be shared across all LogScopes that write to the same destination. - Associated []assocPair // Additional data associated with scope + Associated []AssocPair // Additional data associated with scope LogFunc LogFunc } -type assocPair struct { +type AssocPair struct { Key string Value string } func (l *LogScope) Associate(key string, value string) { - l.Associated = append(l.Associated, assocPair{key, value}) + l.Associated = append(l.Associated, AssocPair{key, value}) } func (l *LogScope) Debug(category string, msg string, args ...interface{}) { @@ -71,7 +72,7 @@ func (parent *LogScope) NewLevel(logFunc LogFunc) *LogScope { Parent: parent, MinLevel: parent.MinLevel, Mutex: parent.Mutex, - Associated: make([]assocPair, 0), + Associated: make([]AssocPair, 0), LogFunc: logFunc} } @@ -80,10 +81,14 @@ func RootLogScope(minLevel LogLevel, logFunc LogFunc) *LogScope { Parent: nil, MinLevel: minLevel, Mutex: &sync.Mutex{}, - Associated: make([]assocPair, 0), + Associated: make([]AssocPair, 0), LogFunc: logFunc} } +func Timestamp() string { + return time.Now().Format(time.RFC1123Z) +} + func LevelFromString(s string) LogLevel { switch s { case "debug": diff --git a/libwebsocketd/logscope_test.go b/libwebsocketd/logscope_test.go deleted file mode 100644 index a45f4b50..00000000 --- a/libwebsocketd/logscope_test.go +++ /dev/null @@ -1,21 +0,0 @@ -package libwebsocketd - -import ( - "fmt" - "os" -) - -// no real tests here so far, just a helper func for others - -func logger_helper(logfunc func(args ...interface{})) *LogScope { - log := new(LogScope) - log.LogFunc = func(_ *LogScope, _ LogLevel, level string, cat string, f string, attr ...interface{}) { - if v := os.Getenv("LOGALL"); v != "" { - fmt.Printf("LOG-%s [%s] %s\n", level, cat, fmt.Sprintf(f, attr...)) - } else { - logfunc(fmt.Sprintf("LOG-%s [%s]", level, cat), fmt.Sprintf(f, attr...)) - } - } - - return log -} diff --git a/libwebsocketd/process.go b/libwebsocketd/process.go deleted file mode 100644 index eea33d5f..00000000 --- a/libwebsocketd/process.go +++ /dev/null @@ -1,216 +0,0 @@ -// Copyright 2013 Joe Walnes and the websocketd team. -// All rights reserved. -// Use of this source code is governed by a BSD-style -// license that can be found in the LICENSE file. - -package libwebsocketd - -import ( - "bufio" - "errors" - "io" - "os" - "os/exec" - "strconv" - "sync" - "sync/atomic" - "time" -) - -var ErrNoConsumers = errors.New("All consumers are gone") -var ErrProcessFinished = errors.New("Process already finished") -var ErrUnknownConsumer = errors.New("No consumer to unsubscribe") - -var RcvrTimeout = time.Second * 5 -var StdoutBufSize int64 = 1024 * 1024 -var StdoutBufLines int64 = 10000 - -// ExternalProcess holds info about running process and sends info to subscribers using channels -type ExternalProcess struct { - cmd *exec.Cmd - in io.WriteCloser - mux *sync.Mutex - terminating chan int - log *LogScope -} - -func (p *ExternalProcess) wait() { - p.cmd.Wait() - p.log.Debug("process", "Process completed, status: %s", p.cmd.ProcessState.String()) -} - -// LaunchProcess initializes ExternalProcess struct fields. Command pipes for standard input/output are established and first consumer channel is returned. -func LaunchProcess(cmd *exec.Cmd, log *LogScope) (*ExternalProcess, <-chan string, error) { - stdout, err := cmd.StdoutPipe() - if err != nil { - log.Debug("process", "Unable to create p") - return nil, nil, err - } - - stderr, err := cmd.StderrPipe() - if err != nil { - return nil, nil, err - } - - stdin, err := cmd.StdinPipe() - if err != nil { - return nil, nil, err - } - - if err = cmd.Start(); err != nil { - return nil, nil, err - } - - p := &ExternalProcess{ - cmd: cmd, - in: stdin, - mux: new(sync.Mutex), - terminating: make(chan int), - log: log, - } - log.Associate("pid", strconv.Itoa(p.Pid())) - p.log.Trace("process", "Command started, first consumer channel created") - - consumer := make(chan string) - - // Run output listeners - go p.process_stdout(stdout, consumer) - go p.process_stderr(stderr) - - return p, consumer, nil -} - -// Terminate tries to stop process forcefully using interrupt and kill signals with a second of waiting time between them. If the kill is unsuccessful, it might be repeated -// again and again while system accepts those attempts. -func (p *ExternalProcess) Terminate() { - // prevent double entrance to this subroutine... - p.mux.Lock() - defer p.mux.Unlock() - - if p.cmd.ProcessState == nil { - go func() { p.wait(); close(p.terminating) }() - - select { - case <-p.terminating: - return - case <-time.After(time.Millisecond * 10): - p.log.Debug("process", "Sending SIGINT to %d", p.Pid()) - err := p.cmd.Process.Signal(os.Interrupt) - if err != nil { - p.log.Error("process", "could not send SIGINT %s", err) - } - } - - for { - select { - case <-p.terminating: - return - case <-time.After(time.Second): - p.log.Error("process", "process did not react to SIGINT, sending SIGKILL") - err := p.cmd.Process.Signal(os.Kill) - if err != nil { - p.log.Error("process", "could not send SIGKILL %s (process got lost?)", err) - return - } - } - } - } -} - -// Pid is a helper function to return Pid from OS Process -func (e *ExternalProcess) Pid() int { - return e.cmd.Process.Pid -} - -// PassInput delivers particular string to the process, involves locking input channel -func (p *ExternalProcess) PassInput(s string) error { - if p.cmd.ProcessState != nil { - return ErrProcessFinished - } - p.mux.Lock() - defer p.mux.Unlock() - _, err := io.WriteString(p.in, s+"\n") - if err != nil { - p.log.Info("process", "Unable to write string to a process: %s", err) - return err - } - p.log.Debug("process", "Passed input string %#v", s) - return nil -} - -// process_stdout is a background function that reads from process and muxes output to all subscribed channels -func (p *ExternalProcess) process_stdout(r io.ReadCloser, c chan string) { - bsize, backlog := int64(0), make(chan string, StdoutBufLines) - - wg := sync.WaitGroup{} - wg.Add(1) - - go func() { - LOOP: - for s := range backlog { - select { - case c <- s: - l := len(s) - p.log.Trace("process", "Sent %d bytes to websocket handler", l) - atomic.AddInt64(&bsize, int64(-l)) - case <-p.terminating: - p.log.Trace("process", "Websocket handler connection was terminated...") - break LOOP - case <-time.After(RcvrTimeout): - p.log.Trace("process", "Websocket handler timed out with %d messages in queue (%d bytes), terminating...", len(backlog), bsize) - r.Close() - break LOOP - } - } - close(c) - wg.Done() - }() - - buf := bufio.NewReader(r) - for { - str, err := buf.ReadString('\n') - if str != "" { - str = trimEOL(str) - backlog <- str - if sz := atomic.AddInt64(&bsize, int64(len(str))); sz > StdoutBufSize { - p.log.Trace("process", "Websocket handler did not process %d messages (%d bytes), terminating...", len(backlog), bsize) - break - } - } - if err != nil { - p.log.Debug("process", "STDOUT stream ended: %s", err) - break - } - } - close(backlog) - wg.Wait() - p.Terminate() -} - -// process_stderr is a function to log process output to STDERR -func (p *ExternalProcess) process_stderr(r io.ReadCloser) { - buf := bufio.NewReader(r) - for { - str, err := buf.ReadString('\n') - str = trimEOL(str) - if str != "" { - p.log.Error("stderr", "%s", str) - } - if err != nil { - p.log.Debug("process", "STDERR stream ended: %s", err) - break - } - } -} - -// trimEOL cuts unixy style \n and windowsy style \r\n suffix from the string -func trimEOL(s string) string { - lns := len(s) - if lns > 0 && s[lns-1] == '\n' { - lns-- - if lns > 0 && s[lns-1] == '\r' { - lns-- - } - } - return s[0:lns] -} diff --git a/libwebsocketd/process_endpoint.go b/libwebsocketd/process_endpoint.go new file mode 100644 index 00000000..31437e34 --- /dev/null +++ b/libwebsocketd/process_endpoint.go @@ -0,0 +1,106 @@ +// Copyright 2013 Joe Walnes and the websocketd team. +// All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package libwebsocketd + +import ( + "bufio" + "io" + "syscall" +) + +type ProcessEndpoint struct { + process *LaunchedProcess + bufferedIn *bufio.Writer + output chan string + log *LogScope +} + +func NewProcessEndpoint(process *LaunchedProcess, log *LogScope) *ProcessEndpoint { + return &ProcessEndpoint{ + process: process, + bufferedIn: bufio.NewWriter(process.stdin), + output: make(chan string), + log: log} +} + +func (pe *ProcessEndpoint) Terminate() { + pe.process.stdin.Close() + + err := pe.process.cmd.Process.Signal(syscall.SIGINT) + if err != nil { + pe.log.Debug("process", "Failed to Interrupt process %v: %s, attempting to kill", pe.process.cmd.Process.Pid, err) + err = pe.process.cmd.Process.Kill() + if err != nil { + pe.log.Debug("process", "Failed to Kill process %v: %s", pe.process.cmd.Process.Pid, err) + } + } + + pe.process.cmd.Wait() + if err != nil { + pe.log.Debug("process", "Failed to reap process %v: %s", pe.process.cmd.Process.Pid, err) + } +} + +func (pe *ProcessEndpoint) Output() chan string { + return pe.output +} + +func (pe *ProcessEndpoint) Send(msg string) bool { + pe.bufferedIn.WriteString(msg) + pe.bufferedIn.WriteString("\n") + pe.bufferedIn.Flush() + return true +} + +func (pe *ProcessEndpoint) StartReading() { + go pe.log_stderr() + go pe.process_stdout() +} + +func (pe *ProcessEndpoint) process_stdout() { + bufin := bufio.NewReader(pe.process.stdout) + for { + str, err := bufin.ReadString('\n') + if err != nil { + if err != io.EOF { + pe.log.Error("process", "Unexpected error while reading STDOUT from process: %s", err) + } else { + pe.log.Debug("process", "Process STDOUT closed") + } + break + } + pe.output <- trimEOL(str) + } + close(pe.output) +} + +func (pe *ProcessEndpoint) log_stderr() { + bufstderr := bufio.NewReader(pe.process.stderr) + for { + str, err := bufstderr.ReadString('\n') + if err != nil { + if err != io.EOF { + pe.log.Error("process", "Unexpected error while reading STDERR from process: %s", err) + } else { + pe.log.Debug("process", "Process STDERR closed") + } + break + } + pe.log.Error("stderr", "%s", trimEOL(str)) + } +} + +// trimEOL cuts unixy style \n and windowsy style \r\n suffix from the string +func trimEOL(s string) string { + lns := len(s) + if lns > 0 && s[lns-1] == '\n' { + lns-- + if lns > 0 && s[lns-1] == '\r' { + lns-- + } + } + return s[0:lns] +} diff --git a/libwebsocketd/process_test.go b/libwebsocketd/process_test.go deleted file mode 100644 index 138db6fb..00000000 --- a/libwebsocketd/process_test.go +++ /dev/null @@ -1,302 +0,0 @@ -package libwebsocketd - -import ( - "bufio" - "bytes" - "errors" - "fmt" - "io" - "io/ioutil" - "net" - "os" - "os/exec" - "runtime" - "strconv" - "sync" - "testing" - "time" -) - -func launchHelper(t *testing.T, args ...string) (*ExternalProcess, <-chan string) { - cmd := helperCommand(args...) - - ep, ch, err := LaunchProcess(cmd, logger_helper(t.Log)) - if err != nil { - t.Fatal(err.Error()) - return nil, nil - } - return ep, ch -} - -func TestEarlyTerminate(t *testing.T) { - ep, _ := launchHelper(t, "cat") - ep.Terminate() -} - -func chanEq(c <-chan string, data ...string) error { - for _, m := range data { - s, ok := <-c - if !ok || s != m { - return errors.New(s) - } - } - return nil -} - -func TestSimpleEcho(t *testing.T) { - ep, c := launchHelper(t, "echo", "foo bar", "baz") - - if s := chanEq(c, "foo bar baz"); s != nil { - t.Errorf("Invalid echo result %#v", s) - } - - s, ok := <-c - if ok || s != "" { - t.Error("Echo returned more than one line") - } - - time.Sleep(10 * time.Millisecond) - ep.inmux.Lock() - defer ep.inmux.Unlock() - if ep.cmd.ProcessState == nil { - t.Error("Echo did not stop after sending the line") - } -} - -func TestSimpleCat(t *testing.T) { - ep, c := launchHelper(t, "cat") - - var wg sync.WaitGroup - wg.Add(1) - - go func() { - defer wg.Done() - var check string - for i := 0; i < 3; i++ { - s, ok := <-c - if ok { - check += s + "\n" - } - } - if check != "foo bar\nfoo baz\nfoo bam\n" { - t.Errorf("Invalid cat result %#v", check) - } - }() - - ep.PassInput("foo bar\nfoo baz\nfoo bam") - - wg.Wait() - - ep.Terminate() // this forces termination... Other way to finish is calling ep.Unsuscribe(c) - - time.Sleep(10 * time.Millisecond) - if ep.cmd.ProcessState == nil { - t.Error("Cat did not stop after termination") - } -} - -// --- -// -// following is copied from standard lib see http://golang.org/src/pkg/os/exec/exec_test.go -// (c) 2009 The Go Authors. All rights reserved. For more information see http://golang.org/LICENSE -// -func helperCommand(s ...string) *exec.Cmd { - cs := []string{"-test.run=TestHelperProcess", "--"} - cs = append(cs, s...) - cmd := exec.Command(os.Args[0], cs...) - cmd.Env = []string{"GO_WANT_HELPER_PROCESS=1"} - return cmd -} - -const stdinCloseTestString = "Some test string." - -// TestHelperProcess isn't a real test. It's used as a helper process -// for TestParameterRun. -func TestHelperProcess(*testing.T) { - if os.Getenv("GO_WANT_HELPER_PROCESS") != "1" { - return - } - defer os.Exit(0) - - // Determine which command to use to display open files. - ofcmd := "lsof" - switch runtime.GOOS { - case "dragonfly", "freebsd", "netbsd", "openbsd": - ofcmd = "fstat" - } - - args := os.Args - for len(args) > 0 { - if args[0] == "--" { - args = args[1:] - break - } - args = args[1:] - } - if len(args) == 0 { - fmt.Fprintf(os.Stderr, "No command\n") - os.Exit(2) - } - - cmd, args := args[0], args[1:] - switch cmd { - case "echo": - iargs := []interface{}{} - for _, s := range args { - iargs = append(iargs, s) - } - fmt.Println(iargs...) - case "cat": - if len(args) == 0 { - io.Copy(os.Stdout, os.Stdin) - return - } - exit := 0 - for _, fn := range args { - f, err := os.Open(fn) - if err != nil { - fmt.Fprintf(os.Stderr, "Error: %v\n", err) - exit = 2 - } else { - defer f.Close() - io.Copy(os.Stdout, f) - } - } - os.Exit(exit) - case "pipetest": - bufr := bufio.NewReader(os.Stdin) - for { - line, _, err := bufr.ReadLine() - if err == io.EOF { - break - } else if err != nil { - os.Exit(1) - } - if bytes.HasPrefix(line, []byte("O:")) { - os.Stdout.Write(line) - os.Stdout.Write([]byte{'\n'}) - } else if bytes.HasPrefix(line, []byte("E:")) { - os.Stderr.Write(line) - os.Stderr.Write([]byte{'\n'}) - } else { - os.Exit(1) - } - } - case "stdinClose": - b, err := ioutil.ReadAll(os.Stdin) - if err != nil { - fmt.Fprintf(os.Stderr, "Error: %v\n", err) - os.Exit(1) - } - if s := string(b); s != stdinCloseTestString { - fmt.Fprintf(os.Stderr, "Error: Read %q, want %q", s, stdinCloseTestString) - os.Exit(1) - } - os.Exit(0) - case "read3": // read fd 3 - fd3 := os.NewFile(3, "fd3") - bs, err := ioutil.ReadAll(fd3) - if err != nil { - fmt.Printf("ReadAll from fd 3: %v", err) - os.Exit(1) - } - switch runtime.GOOS { - case "dragonfly": - // TODO(jsing): Determine why DragonFly is leaking - // file descriptors... - case "darwin": - // TODO(bradfitz): broken? Sometimes. - // http://golang.org/issue/2603 - // Skip this additional part of the test for now. - case "netbsd": - // TODO(jsing): This currently fails on NetBSD due to - // the cloned file descriptors that result from opening - // /dev/urandom. - // http://golang.org/issue/3955 - default: - // Now verify that there are no other open fds. - var files []*os.File - for wantfd := basefds() + 1; wantfd <= 100; wantfd++ { - f, err := os.Open(os.Args[0]) - if err != nil { - fmt.Printf("error opening file with expected fd %d: %v", wantfd, err) - os.Exit(1) - } - if got := f.Fd(); got != wantfd { - fmt.Printf("leaked parent file. fd = %d; want %d\n", got, wantfd) - out, _ := exec.Command(ofcmd, "-p", fmt.Sprint(os.Getpid())).CombinedOutput() - fmt.Print(string(out)) - os.Exit(1) - } - files = append(files, f) - } - for _, f := range files { - f.Close() - } - } - // Referring to fd3 here ensures that it is not - // garbage collected, and therefore closed, while - // executing the wantfd loop above. It doesn't matter - // what we do with fd3 as long as we refer to it; - // closing it is the easy choice. - fd3.Close() - os.Stdout.Write(bs) - case "exit": - n, _ := strconv.Atoi(args[0]) - os.Exit(n) - case "describefiles": - f := os.NewFile(3, fmt.Sprintf("fd3")) - ln, err := net.FileListener(f) - if err == nil { - fmt.Printf("fd3: listener %s\n", ln.Addr()) - ln.Close() - } - os.Exit(0) - case "extraFilesAndPipes": - n, _ := strconv.Atoi(args[0]) - pipes := make([]*os.File, n) - for i := 0; i < n; i++ { - pipes[i] = os.NewFile(uintptr(3+i), strconv.Itoa(i)) - } - response := "" - for i, r := range pipes { - ch := make(chan string, 1) - go func(c chan string) { - buf := make([]byte, 10) - n, err := r.Read(buf) - if err != nil { - fmt.Fprintf(os.Stderr, "Child: read error: %v on pipe %d\n", err, i) - os.Exit(1) - } - c <- string(buf[:n]) - close(c) - }(ch) - select { - case m := <-ch: - response = response + m - case <-time.After(5 * time.Second): - fmt.Fprintf(os.Stderr, "Child: Timeout reading from pipe: %d\n", i) - os.Exit(1) - } - } - fmt.Fprintf(os.Stderr, "child: %s", response) - os.Exit(0) - default: - fmt.Fprintf(os.Stderr, "Unknown command %q\n", cmd) - os.Exit(2) - } -} - -// basefds returns the number of expected file descriptors -// to be present in a process at start. -func basefds() uintptr { - n := os.Stderr.Fd() + 1 - - // Go runtime for 32-bit Plan 9 requires that /dev/bintime - // be kept open. - // See ../../runtime/time_plan9_386.c:/^runtime·nanotime - if runtime.GOOS == "plan9" && runtime.GOARCH == "386" { - n++ - } - return n -} diff --git a/libwebsocketd/websocket_endpoint.go b/libwebsocketd/websocket_endpoint.go new file mode 100644 index 00000000..0b133409 --- /dev/null +++ b/libwebsocketd/websocket_endpoint.go @@ -0,0 +1,60 @@ +// Copyright 2013 Joe Walnes and the websocketd team. +// All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package libwebsocketd + +import ( + "io" + + "code.google.com/p/go.net/websocket" +) + +type WebSocketEndpoint struct { + ws *websocket.Conn + output chan string + log *LogScope +} + +func NewWebSocketEndpoint(ws *websocket.Conn, log *LogScope) *WebSocketEndpoint { + return &WebSocketEndpoint{ + ws: ws, + output: make(chan string), + log: log} +} + +func (we *WebSocketEndpoint) Terminate() { +} + +func (we *WebSocketEndpoint) Output() chan string { + return we.output +} + +func (we *WebSocketEndpoint) Send(msg string) bool { + err := websocket.Message.Send(we.ws, msg) + if err != nil { + we.log.Trace("websocket", "Cannot send: %s", err) + return false + } + return true +} + +func (we *WebSocketEndpoint) StartReading() { + go we.read_client() +} + +func (we *WebSocketEndpoint) read_client() { + for { + var msg string + err := websocket.Message.Receive(we.ws, &msg) + if err != nil { + if err != io.EOF { + we.log.Debug("websocket", "Cannot receive: %s", err) + } + break + } + we.output <- msg + } + close(we.output) +} diff --git a/main.go b/main.go index 29ab3c8e..7d51ab70 100644 --- a/main.go +++ b/main.go @@ -10,7 +10,6 @@ import ( "net/http" "os" "strings" - "time" "github.com/joewalnes/websocketd/libwebsocketd" ) @@ -30,7 +29,7 @@ func log(l *libwebsocketd.LogScope, level libwebsocketd.LogLevel, levelName stri } l.Mutex.Lock() - fmt.Printf("%s | %-6s | %-10s | %s | %s\n", time.Now().Format(time.RFC1123Z), levelName, category, assocDump, fullMsg) + fmt.Printf("%s | %-6s | %-10s | %s | %s\n", libwebsocketd.Timestamp(), levelName, category, assocDump, fullMsg) l.Mutex.Unlock() }