Skip to content

Commit

Permalink
Removing idea of reusable process pool.
Browse files Browse the repository at this point in the history
That was implemented only for introducing mux/demux. This is now not a
good place to do that.
  • Loading branch information
Alex Sergeyev committed Sep 26, 2014
1 parent 93129e4 commit efc5ab0
Show file tree
Hide file tree
Showing 3 changed files with 16 additions and 72 deletions.
2 changes: 1 addition & 1 deletion libwebsocketd/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (
"time"
)

var ScriptNotFoundError = errors.New("script not found")
var ScriptNotFoundError = errors.New("Script not found")

// WebsocketdHandler is a single request information and processing structure, it handles WS requests out of all that daemon can handle (static, cgi, devconsole)
type WebsocketdHandler struct {
Expand Down
21 changes: 12 additions & 9 deletions libwebsocketd/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,28 +14,27 @@ import (
"net/http/cgi"
"net/url"
"os"
"os/exec"
"path"
"path/filepath"
"regexp"
"strings"
)

var ForkNotAllowedError = errors.New("too many forks active")
var ForkNotAllowedError = errors.New("Too many forks active")

// WebsocketdServer presents http.Handler interface for requests libwebsocketd is handling.
type WebsocketdServer struct {
Config *Config
Log *LogScope
forks chan byte
procPool *processPool
Config *Config
Log *LogScope
forks chan byte
}

// NewWebsocketdServer creates WebsocketdServer struct with pre-determined config, logscope and maxforks limit
func NewWebsocketdServer(config *Config, log *LogScope, maxforks int) *WebsocketdServer {
mux := &WebsocketdServer{
Config: config,
Log: log,
procPool: NewProcessPool(0),
Config: config,
Log: log,
}
if maxforks > 0 {
mux.forks = make(chan byte, maxforks)
Expand Down Expand Up @@ -184,7 +183,11 @@ func (h *WebsocketdServer) noteForkCompled() {
}

func (h *WebsocketdServer) launchServerProcess(command string, env []string, log *LogScope) (*ExternalProcess, <-chan string, error) {
return h.procPool.LaunchProcess(command, h.Config.CommandArgs, env, log)
cmd := exec.Command(command, h.Config.CommandArgs...)
cmd.Env = env

log.Debug("process", "Starting %s", command)
return LaunchProcess(cmd, log)
}

func checkOrigin(wsconf *websocket.Config, req *http.Request, config *Config, log *LogScope) (err error) {
Expand Down
65 changes: 3 additions & 62 deletions libwebsocketd/process.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ var ErrNoConsumers = errors.New("All consumers are gone")
var ErrProcessFinished = errors.New("Process already finished")
var ErrUnknownConsumer = errors.New("No consumer to unsubscribe")

// RcvrTimeout is a very short duration to determine if subscriber is unable to process data quickly enough
// RcvrTimeout is a very short duration to determine if subscriber is unable to process data quickly enough.
// Zero is not practical because it would cause packet receiving to block while OS passes data via Pipe to process.
var RcvrTimeout = time.Millisecond

// ExternalProcess holds info about running process and sends info to subscribers using channels
Expand All @@ -39,7 +40,7 @@ type ExternalProcess struct {

// LaunchProcess initializes ExternalProcess struct fields
func LaunchProcess(cmd *exec.Cmd, log *LogScope) (*ExternalProcess, <-chan string, error) {
// TODO: Investigate alternative approaches. exec.Cmd uses real OS pipes which spen a filehandler each.
// TODO: Investigate alternative approaches. exec.Cmd uses real OS pipes which spends new filehandler each.
stdout, err := cmd.StdoutPipe()
if err != nil {
log.Debug("process", "Unable to create p")
Expand Down Expand Up @@ -300,63 +301,3 @@ func trimEOL(s string) string {
}
return s[0:lns]
}

type processPool struct {
d time.Duration
stopReuse chan interface{}
pool map[string]*ExternalProcess
poolmx *sync.Mutex
}

func NewProcessPool(sec int) *processPool {
d := time.Duration(sec) * time.Second
pp := &processPool{d, make(chan interface{}), make(map[string]*ExternalProcess), &sync.Mutex{}}
go func() {
for delcmd := range pp.stopReuse {
x := delcmd.(struct {
name string
pid int
})
pp.poolmx.Lock()
if pp.pool[x.name].Pid() == x.pid {
// otherwise the program was already ended and re-started
delete(pp.pool, x.name)
}
pp.poolmx.Unlock()
}
}()
return pp
}

func (pp *processPool) LaunchProcess(name string, args []string, env []string, log *LogScope) (*ExternalProcess, <-chan string, error) {
pp.poolmx.Lock()
defer pp.poolmx.Unlock()
if m, ok := pp.pool[name]; ok {
o, e := m.Subscribe()
// only if Subscribe worked the process is still out there...
if e == nil {
log.Debug("ddb", "Reusing for %s", name)
return m, o, nil
}
}
cmd := exec.Command(name, args...)
cmd.Env = env

log.Debug("ddb", "Starting %s %d", name, pp.d)
p, o, e := LaunchProcess(cmd, log)
if e == nil {
pp.pool[name] = p

if pp.d > 0 {
go func(pid int) {
time.Sleep(pp.d)
log.Debug("ddb", "no more reuse for %v", name)
pp.stopReuse <- struct {
name string
pid int
}{name, pid}
}(p.Pid())
}
}
return p, o, e
}

0 comments on commit efc5ab0

Please sign in to comment.