diff --git a/.gitignore b/.gitignore index d1738070c3f..e8db0d8f76b 100644 --- a/.gitignore +++ b/.gitignore @@ -21,7 +21,7 @@ config/ # Compiled source # ################### -*.com +!*.com/ *.class *.dll *.exe diff --git a/assembly/assembly-main/pom.xml b/assembly/assembly-main/pom.xml index c446491c365..edaafef072a 100644 --- a/assembly/assembly-main/pom.xml +++ b/assembly/assembly-main/pom.xml @@ -37,6 +37,18 @@ assembly-wsmaster-war war + + org.eclipse.che + exec-agent + tar.gz + linux_amd64 + + + org.eclipse.che + exec-agent + tar.gz + linux_arm7 + org.eclipse.che.core che-core-ide-stacks @@ -62,18 +74,6 @@ che-tomcat8-slf4j-logback zip - - org.eclipse.che.lib - che-websocket-terminal - tar.gz - linux_amd64 - - - org.eclipse.che.lib - che-websocket-terminal - tar.gz - linux_arm7 - org.eclipse.che.plugin che-plugin-sdk-tools diff --git a/assembly/assembly-main/src/assembly/assembly.xml b/assembly/assembly-main/src/assembly/assembly.xml index 4676e15a942..422cd3ec63a 100644 --- a/assembly/assembly-main/src/assembly/assembly.xml +++ b/assembly/assembly-main/src/assembly/assembly.xml @@ -70,7 +70,7 @@ lib/linux_amd64/terminal websocket-terminal-linux_amd64.tar.gz - org.eclipse.che.lib:che-websocket-terminal:tar.gz:linux_amd64 + org.eclipse.che:exec-agent:tar.gz:linux_amd64 @@ -79,7 +79,7 @@ lib/linux_arm7/terminal websocket-terminal-linux_arm7.tar.gz - org.eclipse.che.lib:che-websocket-terminal:tar.gz:linux_arm7 + org.eclipse.che:exec-agent:tar.gz:linux_arm7 diff --git a/assembly/assembly-wsmaster-war/src/main/java/org/eclipse/che/api/deploy/WsMasterModule.java b/assembly/assembly-wsmaster-war/src/main/java/org/eclipse/che/api/deploy/WsMasterModule.java index c2a8c4a09cb..3e694abc56f 100644 --- a/assembly/assembly-wsmaster-war/src/main/java/org/eclipse/che/api/deploy/WsMasterModule.java +++ b/assembly/assembly-wsmaster-war/src/main/java/org/eclipse/che/api/deploy/WsMasterModule.java @@ -104,6 +104,8 @@ protected void configure() { bindConstant().annotatedWith(Names.named("machine.ws_agent.run_command")) .to("export JPDA_ADDRESS=\"4403\" && ~/che/ws-agent/bin/catalina.sh jpda run"); + bindConstant().annotatedWith(Names.named("machine.terminal_agent.run_command")) + .to("$HOME/che/terminal/che-websocket-terminal -addr :4411 -cmd ${SHELL_INTERPRETER} -static $HOME/che/terminal/"); bind(org.eclipse.che.api.workspace.server.WorkspaceValidator.class) .to(org.eclipse.che.api.workspace.server.DefaultWorkspaceValidator.class); diff --git a/exec-agent/.gitignore b/exec-agent/.gitignore new file mode 100644 index 00000000000..7d245c33e26 --- /dev/null +++ b/exec-agent/.gitignore @@ -0,0 +1,2 @@ +logs/ +exec-agent diff --git a/exec-agent/README.md b/exec-agent/README.md new file mode 100644 index 00000000000..bc56c341fee --- /dev/null +++ b/exec-agent/README.md @@ -0,0 +1,59 @@ +Summary +--- +Golang based server for executing commands and streaming process output logs, +also websocket-terminal. + + +Requirements +-- +- golang 1.6+ + + +Docs +--- +- jsonrpc2.0 based [Webscoket API](docs/ws_api.md) +- jsonrpc2.0 based [Events](docs/events.md) +- [REST API](docs/rest_api.md) + +Development +--- + +##### Link the sources to standard go workspace + +```bash +export CHE_PATH=~/code/che +mkdir $GOPATH/src/github.com/eclipse/che -p +ln -s $CHE_PATH/exec-agent/src $GOPATH/src/github.com/eclipse/che/exec-agent +``` + +##### Install godep +```bash +go get github.com/tools/godep +``` + +##### Get all dependencies + +```bash +cd $GOPATH/src/github.com/eclipse/che/exec-agent +$GOPATH/bin/godep restore +``` + +That's it, `$GOPATH/src/github.com/eclipse/che/exec-agent` project is ready. + +##### Building linked project + +```bash +cd $GOPATH/src/github.com/eclipse/che/exec-agent && go build +``` + +##### Running linked project tests + +```bash +cd $GOPATH/src/github.com/eclipse/che/exec-agent && go test ./... +``` + +##### Formatting linked project sources + +```bash +cd $GOPATH/src/github.com/eclipse/che/exec-agent && go fmt ./... +``` diff --git a/exec-agent/docs/events.md b/exec-agent/docs/events.md new file mode 100644 index 00000000000..186e7dfaf07 --- /dev/null +++ b/exec-agent/docs/events.md @@ -0,0 +1,98 @@ +Events +=== +Messages sent via websocket connections to clients + +Channel Events +--- + +#### Connected + +The first event in the channel, published when client successfully connected to the exec-agent. + +```json +{ + "jsonrpc": "2.0", + "method": "connected", + "params": { + "time": "2016-09-24T16:40:05.098478609+03:00", + "channel": "channel-1", + "text": "Hello!" + } +} +``` + +Process Events +--- + +#### Process started + +Published when process is successfully started. +This is the first event from all the events produced by process, +it appears only once for one process + +```json +{ + "jsonrpc": "2.0", + "method": "process_started", + "params": { + "time": "2016-09-24T16:40:55.930743249+03:00", + "pid": 1, + "nativePid": 22164, + "name": "print", + "commandLine": "printf \"\n1\n2\n3\"" + } +} +``` + +#### STDOUT event + +Published when process writes to stdout. +One stdout event describes one output line + +```json +{ + "jsonrpc": "2.0", + "method": "process_stdout", + "params": { + "time": "2016-09-24T16:40:55.933255297+03:00", + "pid": 1, + "text": "Starting server..." + } +} +``` + +#### STDERR event + +Published when process writes to stderr. +One stderr event describes one output line + +```json +{ + "jsonrpc": "2.0", + "method": "process_stderr", + "params": { + "time": "2016-09-24T16:40:55.933255297+03:00", + "pid": 1, + "text": "sh: ifconfig: command not found" + } +} +``` + +#### Process died + +Published when process is done, or killed. This is the last event from the process, +it appears only once for one process + +```json +{ + "jsonrpc": "2.0", + "method": "process_died", + "params": { + "time": "2016-09-24T16:40:55.93354086+03:00", + "pid": 1, + "nativePid": 22164, + "name": "print", + "commandLine": "printf \"\n1\n2\n3\"" + } +} +``` diff --git a/exec-agent/docs/notes.md b/exec-agent/docs/notes.md new file mode 100644 index 00000000000..906e8ee2bc3 --- /dev/null +++ b/exec-agent/docs/notes.md @@ -0,0 +1,10 @@ +##### Websocket messages order + +The order is respected +``` +Message fragments MUST be delivered to the recipient in the order sent by the sender. +``` +Helpful Sources +* https://tools.ietf.org/html/rfc6455 (search the sentence above) +* http://stackoverflow.com/questions/11804721/can-websocket-messages-arrive-out-of-order +* http://stackoverflow.com/questions/14287224/processing-websockets-messages-in-order-of-receiving diff --git a/exec-agent/docs/rest_api.md b/exec-agent/docs/rest_api.md new file mode 100644 index 00000000000..b4566fc516e --- /dev/null +++ b/exec-agent/docs/rest_api.md @@ -0,0 +1,233 @@ +REST API +=== + +Process API +--- + +### Start a new process + +#### Request + +_POST /process_ + +- `channel`(optional) - the id of the channel which should be subscribed to the process events +- `types`(optional) - comma separated types works only in couple with specified `channel`, defines +the events which will be sent by the process to the `channel`. Several values may be specified, +e.g. `channel=channel-1&types=stderr,stdout`. By default channel will be subscribed to +all the existing types(listed below). Possible type values: + - `stderr` - output from the process stderr + - `stdout` - output from the process stdout + - `process_status` - the process status events(_started, died_) + + +```json +{ + "name" : "build", + "commandLine" : "mvn clean install", + "type" : "maven" +} +``` + +#### Response + +```json +{ + "pid": 1, + "name": "build", + "commandLine": "mvn clean install", + "type" : "maven", + "alive": true, + "nativePid": 9186 +} +``` +- `200` if successfully started +- `400` if incoming data is not valid e.g. name is empty +- `404` if specified `channel` doesn't exist +- `500` if any other error occurs + + +### Get a process + +#### Request + +_GET /process/{pid}_ + +- `pid` - the id of the process to get + +#### Response + +```json +{ + "pid": 1, + "name": "build", + "commandLine": "mvn clean install", + "type" : "maven", + "alive": false, + "nativePid": 9186, +} +``` + +- `200` if response contains requested process +- `400` if `pid` is not valid, unsigned int required +- `404` if there is no such process +- `500` if any other error occurs + +### Kill a process + +#### Request + +_DELETE /process/{pid}_ + +- `pid` - the id of the process to kill + +#### Response + +```json +{ + "pid": 1, + "name": "build", + "commandLine": "mvn clean install", + "type" : "maven", + "alive": true, + "nativePid": 9186, +} +``` +- `200` if successfully killed +- `400` if `pid` is not valid, unsigned int required +- `404` if there is no such process +- `500` if any other error occurs + + +### Get process logs + +#### Request + +_GET /process/{pid}/logs_ + +- `pid` - the id of the process to get logs +- `from`(optional) - time to get logs from e.g. _2016-07-12T01:48:04.097980475+03:00_ the format is _RFC3339Nano_ +don't forget to encode this query parameter +- `till`(optional) - time to get logs till e.g. _2016-07-12T01:49:04.097980475+03:00_ the format is _RFC3339Nano_ +don't forget to encode this query parameter +- `format`(optional) - the format of the response, default is `json`, possible values are: `text`, `json` +- `limit`(optional) - the limit of logs in result, the default value is _50_, logs are limited from the +latest to the earliest +- `skip` (optional) - the logs to skip, default value is `0` + +#### Response + +The result logs of the process with the command line `printf "Hello\nWorld\n"` + +Text: +```text +[STDOUT] 2016-07-04 08:37:56.315082296 +0300 EEST Hello +[STDOUT] 2016-07-04 08:37:56.315128242 +0300 EEST World +``` + +Json: +```json +[ + { + "Kind" : "STDOUT", + "Time" : "2016-07-16T19:51:32.313368463+03:00", + "Text" : "Hello" + }, + { + "Kind" : "STDOUT", + "Time" : "2016-07-16T19:51:32.313603625+03:00", + "Text" : "World" + } +] +``` + +- `200` if logs are successfully fetched +- `400` if `from` or `till` format is invalid +- `404` if there is no such process +- `500` if any other error occurs + +### Get processes + +#### Request + +_GET /process_ + +- `all`(optional) - if `true` then all the processes including _dead_ ones will be returned(respecting paging ofc), +otherwise only _alive_ processes will be returnedg + +#### Response + +The result of the request _GET /process?all=true_ +```json +[ + { + "pid": 1, + "name": "build", + "commandLine": "mvn clean install", + "type" : "maven", + "alive": true, + "nativePid": 9186, + }, + { + "pid": 2, + "name": "build", + "commandLine": "printf \"Hello World\"", + "alive": false, + "nativePid": 9588 + } +] +``` +- `200` if processes are successfully retrieved +- `500` if any error occurs + +### Subscribe to the process events + +#### Request + +_POST /process/{pid}/events/{channel}_ + +- `pid` - the id of the process to subscribe to +- `channel` - the id of the webscoket channel which is subscriber +- `types`(optional) - the types of the events separated by comma e.g. `?types=stderr,stdout` +- `after`(optional) - process logs which appeared after given time will +be republished to the channel. This method may be useful in the reconnect process + +#### Response + +- `200` if successfully subscribed +- `400` if any of the parameters is not valid +- `404` if there is no such process or channel +- `500` if any other error occurs + +### Unsubscribe from the process events + +#### Request + +_DELETE /process/{pid}/events/{channel}_ + +- `pid` - the id of the process to unsubscribe from +- `channel` - the id of the webscoket channel which currenly subscribed +to the process events + +#### Response + +- `200` if successfully unsubsribed +- `400` if any of the parameters is not valid +- `404` if there is no such process or channel +- `500` if any other error occurs + +### Update the process events subscriber + +#### Request + +_PUT /process/{pid}/events/{channel}_ + +- `pid` - the id of the process +- `channel` - the id of the websocket channel which is subscriber +- `types` - the types of the events separated with comma e.g. `?types=stderr,stdout` + +#### Response + +- `200` if successfully updated +- `400` if any of the parameters is not valid +- `404` if there is no such process or channel +- `500` if any other error occurs diff --git a/exec-agent/docs/ws_api.md b/exec-agent/docs/ws_api.md new file mode 100644 index 00000000000..81e1be8dfca --- /dev/null +++ b/exec-agent/docs/ws_api.md @@ -0,0 +1,564 @@ +Websocket API +--- +[JSON RPC 2.0](http://www.jsonrpc.org/specification) protocol is used for client-server +communication, but: +- `params` is always json object(never array) +- server to client notifications are treated as [Events](events.md) + +the apis described below include some of the following fields: +```json +{ + "jsonrpc" : "2.0", + "method": "...", + "id": "...", + "params": { }, + "error" : { }, + "result" : { } +} +``` + these fields are part of the protocol so they are not documented. + +## Process API + + +### Start process + +##### Request + +- __name__ - the name of the command +- __commandLine__ - command line to execute +- __type__(optional) - command type +- __eventTypes__(optional) - comma separated types of events which will be + received by this channel. By default all the process events will be received. +Possible values are: `stderr`, `stdout`, `process_status` + +```json +{ + "method": "process.start", + "id": "id1234567", + "params": { + "name": "print", + "commandLine": "printf \"1\n2\n3\"", + "type": "test" + } +} +``` + +##### Response + +```json +{ + "jsonrpc": "2.0", + "id": "id1234567", + "result": { + "pid": 1, + "name": "print", + "commandLine": "printf \"1\n2\n3\"", + "type": "test", + "alive": true, + "nativePid": 19920 + } +} +``` + +#### Errors + +- when either `name` or `commandLine` is missing, e.g: + +```json +{ + "jsonrpc": "2.0", + "id": "id1234567", + "error": { + "code": -32602, + "message": "Command line required" + } +} +``` + +### Kill process + +##### Request + +- __pid__ - the id of the process to kill + +```json +{ + "method": "process.kill", + "id": "id1234567", + "params": { + "pid": 2 + } +} +``` + +##### Response + +```json +{ + "jsonrpc": "2.0", + "id": "id1234567", + "result": { + "pid": 2, + "text": "Successfully killed" + } +} +``` + +##### Errors + +- when there is no such process + +```json +{ + "jsonrpc": "2.0", + "id": "id1234567", + "error": { + "code": -32000, + "message": "Process with id '2' does not exist" + } +} +``` + +- when process with given id is not alive + +```json +{ + "jsonrpc": "2.0", + "id": "id1234567", + "error": { + "code": -32001, + "message": "Process with id '2' is not alive" + } +} +``` + +### Subscribe to process events + +##### Request + +- __pid__ - the id of the process to subscribe to +- __eventTypes__(optional) - comma separated types of events which will be +received by this channel. By default all the process events will be received, +not supported even types are ignored. Possible values are: `stdout`, `stderr`, `process_status`. +- __after__(optional) - process logs which appeared after given time will +be republished to the channel. This parameter may be useful when reconnecting to the exec-agent + +```json +{ + "method": "process.subscribe", + "id": "0x12345", + "params": { + "pid": 2, + "eventTypes": "stdout,stderr", + "after" : "2016-07-26T09:36:44.920890113+03:00" + } +} +``` + + +##### Response + +```json +{ + "jsonrpc": "2.0", + "id": "0x12345", + "result": { + "pid": 2, + "eventTypes": "stdout,stderr", + "text": "Successfully subscribed" + } +} +``` + +##### Errors + +- when there is no a single valid event type provided + +```json +{ + "jsonrpc": "2.0", + "id": "0x12345", + "error": { + "code": -32602, + "message": "Required at least 1 valid event type" + } +} +``` + +- when `after` is not in valid format + +```json +{ + "jsonrpc": "2.0", + "id": "0x12345", + "error": { + "code": -32602, + "message": "Bad format of 'after', parsing time \"2016-07-26\" as \"2006-01-02T15:04:05.999999999Z07:00\": cannot parse \"\" as \"T\"" + } +} +``` + +- when this channel is already subscribed on process events + +```json +{ + "jsonrpc": "2.0", + "id": "0x12345", + "error": { + "code": -32603, + "message": "Already subscribed" + } +} +``` + +- when there is no such process + +```json +{ + "jsonrpc": "2.0", + "id": "0x12345", + "error": { + "code": -32000, + "message": "Process with id '2' does not exist" + } +} +``` + +- when process with given id is not alive + +```json +{ + "jsonrpc": "2.0", + "id": "0x12345", + "error": { + "code": -32001, + "message": "Process with id '2' is not alive" + } +} +``` + + + +### Unsubscribe from process events + +##### Request + +- __pid__ - the id of the process to unsubscribe from + +```json +{ + "method": "process.unsubscribe", + "id": "0x12345", + "params": { + "pid": 2 + } +} +``` + +##### Response + +```json +{ + "jsonrpc": "2.0", + "id": "0x12345", + "result": { + "pid": 2, + "text": "Successfully unsubscribed" + } +} +``` + +##### Errors + +- when there is no such process + +```json +{ + "jsonrpc": "2.0", + "id": "0x12345", + "error": { + "code": -32000, + "message": "Process with id '2' does not exist" + } +} +``` + +- when process with given id is not alive + +```json +{ + "jsonrpc": "2.0", + "id": "0x12345", + "error": { + "code": -32001, + "message": "Process with id '2' is not alive" + } +} +``` + + + +### Update process subscriber + +##### Request + +- __pid__ - the id of the process which subscriber should be updated +- __eventTypes__ - comma separated types of events which will be +received by this channel. Not supported even types are ignored. +Possible values are: `stdout`, `stderr`, `process_status`. + +```json +{ + "method": "process.updateSubscriber", + "id": "0x12345", + "params": { + "pid": 2, + "eventTypes": "process_status" + } +} +``` + +##### Response + +```json +{ + "jsonrpc": "2.0", + "id": "0x12345", + "result": { + "pid": 2, + "eventTypes": "process_status", + "text": "Subscriber successfully updated" + } +} +``` + +##### Errors + +- when there is no such process + +```json +{ + "jsonrpc": "2.0", + "id": "0x12345", + "error": { + "code": -32000, + "message": "Process with id '2' does not exist" + } +} +``` + +- when process with given id is not alive + +```json +{ + "jsonrpc": "2.0", + "id": "0x12345", + "error": { + "code": -32001, + "message": "Process with id '2' is not alive" + } +} +``` + +- when this channel is not subscribed on process events + +```json +{ + "jsonrpc": "2.0", + "id": "0x12345", + "error": { + "code": -32603, + "message": "No subscriber with id 'channel-1'" + } +} +``` + + +### Get process logs + +##### Request + +- __pid__ - the id of the process to get logs +- __from__(optional) - time to get logs from e.g. _2016-07-12T01:48:04.097980475+03:00_ the format is _RFC3339Nano_ +- __till__(optional) - time to get logs till e.g. _2016-07-12T01:49:04.097980475+03:00_ the format is _RFC3339Nano_ +- __limit__(optional) - the limit of logs in result, the default value is _50_, logs are limited from the +latest to the earliest +- __skip__ (optional) - the logs to skip, default value is `0` + +```json +{ + "method": "process.getLogs", + "id": "0x12345", + "params": { + "pid": 3, + "limit": 5, + "skip": 5 + } +} +``` + +##### Response + +For the command `printf "1\n2\n3\n4\n5\n6\n7\n8\n9\n10`, the result will look like + +```json +{ + "jsonrpc": "2.0", + "id": "0x12345", + "result": [ + { + "kind": "STDOUT", + "time": "2016-09-24T17:18:30.757623274+03:00", + "text": "1" + }, + { + "kind": "STDOUT", + "time": "2016-09-24T17:18:30.757701555+03:00", + "text": "2" + }, + { + "kind": "STDOUT", + "time": "2016-09-24T17:18:30.757721423+03:00", + "text": "3" + }, + { + "kind": "STDOUT", + "time": "2016-09-24T17:18:30.757841518+03:00", + "text": "4" + }, + { + "kind": "STDOUT", + "time": "2016-09-24T17:18:30.757851622+03:00", + "text": "5" + } + ] +} +``` + +##### Errors + + +- when there is no such process + +```json +{ + "jsonrpc": "2.0", + "id": "0x12345", + "error": { + "code": -32000, + "message": "Process with id '2' does not exist" + } +} +``` + +- when one of the time parameters is invalid, e.g: + +```json +{ + "jsonrpc": "2.0", + "id": "0x12345", + "error": { + "code": -32602, + "message": "Bad format of 'till', parsing time \"date\" as \"2006-01-02T15:04:05.999999999Z07:00\": cannot parse \"date\" as \"2006\"" + } +} +``` + + + +### Get process + +##### Request + +- __pid__ - the id of the process to get + +```json +{ + "method": "process.getProcess", + "id": "0x12345", + "params": { + "pid": 3 + } +} +``` + +##### Response + +When everything is okay + +```json +{ + "jsonrpc": "2.0", + "id": "0x12345", + "result": { + "pid": 1, + "name": "print", + "commandLine": "printf \n1\n2\n3\"", + "type": "test", + "alive": false, + "nativePid": 13158 + } +} +``` + +##### Errors + + +- when there is no such process + +```json +{ + "jsonrpc": "2.0", + "id": "0x12345", + "error": { + "code": -32000, + "message": "Process with id '2' does not exist" + } +} +``` + + +### Get processes + +##### Request + +- __all__(optional) - if `true` then all the processes including _dead_ ones will be returned, +otherwise only _alive_ processes will be returned + +```json +{ + "method": "process.getProcesses", + "id": "id1234567", + "params": { + "all": true + } +} +``` + +##### Response + +```json +{ + "jsonrpc": "2.0", + "id": "id1234567", + "result": [ + { + "pid": 1, + "name": "print", + "commandLine": "printf \"1\n2\n3\"", + "type": "test", + "alive": false, + "nativePid": 13553 + }, + { + "pid": 2, + "name": "print2", + "commandLine": "printf \"\n3\n2\n1\"", + "type": "test2", + "alive": false, + "nativePid": 13561 + } + ] +} +``` diff --git a/exec-agent/pom.xml b/exec-agent/pom.xml new file mode 100644 index 00000000000..e2e945e5a40 --- /dev/null +++ b/exec-agent/pom.xml @@ -0,0 +1,215 @@ + + + + 4.0.0 + + che-parent + org.eclipse.che + 5.0.0-M9-SNAPSHOT + + exec-agent + Exec Agent + + go-workspace + + + + + com.mycila + license-maven-plugin + + + src/term/server.go + src/docs/** + src/vendor/** + src/Godeps/** + src/static/term.js + src/.idea/** + src/exec-agent + src/exec-agent.iml + + + + + org.apache.maven.plugins + maven-antrun-plugin + + + copy-sources + compile + + run + + + + + + + + + + + + + + + + com.soebes.maven.plugins + iterator-maven-plugin + 0.4 + + + compile-exec-agent + compile + + iterator + + + + + linux_arm5 + + linux + arm + 5 + + + + linux_arm6 + + linux + arm + 6 + + + + linux_arm7 + + linux + arm + 7 + + + + linux_amd64 + + linux + amd64 + + + + linux_i386 + + linux + 386 + + + + + + + org.codehaus.mojo + exec-maven-plugin + + exec + + go + ${project.build.directory}/${go.workspace.name}/src/github.com/eclipse/che/exec-agent + + build + -a + -installsuffix + cgo + -o + ${project.build.directory}/${item}/che-websocket-terminal + + + ${project.build.directory}/${go.workspace.name} + ${terminal.target.os} + ${terminal.target.architecture} + ${terminal.target.arm.version} + + + + + + + + assembly + package + + iterator + + + + linux_arm5 + linux_arm6 + linux_arm7 + linux_amd64 + linux_i386 + + + + + maven-assembly-plugin + + single + + + ${basedir}/src/assembly/assembly.xml + + + + + + + + + + + + + integration + + false + + + + + org.apache.maven.plugins + maven-antrun-plugin + + + run-tests + test + + run + + + + + + + + + + + + + + + + + + diff --git a/exec-agent/src/Godeps/Godeps.json b/exec-agent/src/Godeps/Godeps.json new file mode 100644 index 00000000000..17b175c95ef --- /dev/null +++ b/exec-agent/src/Godeps/Godeps.json @@ -0,0 +1,22 @@ +{ + "ImportPath": "github.com/eclipse/che/exec-agent", + "GoVersion": "go1.6", + "GodepVersion": "v74", + "Deps": [ + { + "ImportPath": "github.com/eclipse/che-lib/pty", + "Comment": "4.6.0-2-g69a9cee", + "Rev": "69a9cee57914086c88c8400f5f66cd25422f1fa7" + }, + { + "ImportPath": "github.com/eclipse/che-lib/websocket", + "Comment": "4.6.0-2-g69a9cee", + "Rev": "69a9cee57914086c88c8400f5f66cd25422f1fa7" + }, + { + "ImportPath": "github.com/julienschmidt/httprouter", + "Comment": "v1.1-38-g4563b0b", + "Rev": "4563b0ba73e4db6c6423b60a26f3cadd2e9a1ec9" + } + ] +} diff --git a/exec-agent/src/Godeps/Readme b/exec-agent/src/Godeps/Readme new file mode 100644 index 00000000000..4cdaa53d56d --- /dev/null +++ b/exec-agent/src/Godeps/Readme @@ -0,0 +1,5 @@ +This directory tree is generated automatically by godep. + +Please do not edit. + +See https://github.com/tools/godep for more information. diff --git a/exec-agent/src/assembly/assembly.xml b/exec-agent/src/assembly/assembly.xml new file mode 100644 index 00000000000..956a8dac74a --- /dev/null +++ b/exec-agent/src/assembly/assembly.xml @@ -0,0 +1,36 @@ + + + ${item} + true + terminal + + zip + tar.gz + + + + ${project.build.directory}/${item} + + che-websocket-terminal + + + + + ${project.build.directory}/${go.workspace.name}/src/github.com/eclipse/che/exec-agent/static + + + + diff --git a/exec-agent/src/auth/auth.go b/exec-agent/src/auth/auth.go new file mode 100644 index 00000000000..bf83bc9df3f --- /dev/null +++ b/exec-agent/src/auth/auth.go @@ -0,0 +1,128 @@ +// +// Copyright (c) 2012-2016 Codenvy, S.A. +// All rights reserved. This program and the accompanying materials +// are made available under the terms of the Eclipse Public License v1.0 +// which accompanies this distribution, and is available at +// http://www.eclipse.org/legal/epl-v10.html +// +// Contributors: +// Codenvy, S.A. - initial API and implementation +// + +package auth + +import ( + "errors" + "fmt" + "net/http" + "sync" + "time" + + "github.com/eclipse/che/exec-agent/rest" +) + +const ( + DefaultTokensExpirationTimeoutInMinutes = 10 +) + +// Authenticates all the http calls on workspace master +// checking if provided by request token is valid, if authentication is successful +// then calls ServerHTTP on delegate, otherwise if UnauthorizedHandler is configured +// then uses it to handle the request if not then returns 401 with appropriate error message +type Handler struct { + Delegate http.Handler + ApiEndpoint string + Cache *TokenCache + UnauthorizedHandler func(w http.ResponseWriter, req *http.Request) +} + +func (handler Handler) ServeHTTP(w http.ResponseWriter, req *http.Request) { + token := req.URL.Query().Get("token") + if handler.Cache.Contains(token) { + handler.Delegate.ServeHTTP(w, req) + } else if err := authenticateOnMaster(handler.ApiEndpoint, token); err == nil { + handler.Cache.Put(token) + handler.Delegate.ServeHTTP(w, req) + } else if handler.UnauthorizedHandler != nil { + handler.UnauthorizedHandler(w, req) + } else { + http.Error(w, err.Error(), http.StatusUnauthorized) + } +} + +// Authentication tokens cache. +type TokenCache struct { + sync.RWMutex + tokens map[string]time.Time + ticker *time.Ticker + expireTimeout time.Duration +} + +func NewCache(expireDuration time.Duration, period time.Duration) *TokenCache { + cache := &TokenCache{ + tokens: make(map[string]time.Time), + expireTimeout: expireDuration, + } + if period > 0 { + go cache.expirePeriodically(period) + } + return cache +} + +// Puts token into the cache. +func (cache *TokenCache) Put(token string) { + cache.Lock() + defer cache.Unlock() + cache.tokens[token] = time.Now().Add(cache.expireTimeout) +} + +// Removes the token from the cache. +func (cache *TokenCache) Expire(token string) { + cache.Lock() + defer cache.Unlock() + delete(cache.tokens, token) +} + +// Returns true if token is present in the cache and false otherwise. +func (cache *TokenCache) Contains(token string) bool { + cache.RLock() + defer cache.RUnlock() + _, ok := cache.tokens[token] + return ok +} + +func (cache *TokenCache) expirePeriodically(period time.Duration) { + cache.ticker = time.NewTicker(period) + for range cache.ticker.C { + cache.expireAllBefore(time.Now()) + } +} + +func (cache *TokenCache) expireAllBefore(expirationPoint time.Time) { + cache.Lock() + defer cache.Unlock() + for token, expTime := range cache.tokens { + if expTime.Before(expirationPoint) { + delete(cache.tokens, token) + } + } +} + +func authenticateOnMaster(apiEndpoint string, tokenParam string) error { + if tokenParam == "" { + return rest.Unauthorized(errors.New("Authentication failed: missing 'token' query parameter")) + } + req, err := http.NewRequest("GET", apiEndpoint+"/machine/token/user/"+tokenParam, nil) + if err != nil { + return rest.Unauthorized(err) + } + req.Header.Add("Authorization", tokenParam) + resp, err := http.DefaultClient.Do(req) + if err != nil { + return rest.Unauthorized(err) + } + if resp.StatusCode != 200 { + return rest.Unauthorized(errors.New(fmt.Sprintf("Authentication failed, token: %s is invalid", tokenParam))) + } + return nil +} diff --git a/exec-agent/src/auth/cache_test.go b/exec-agent/src/auth/cache_test.go new file mode 100644 index 00000000000..c24a1918c28 --- /dev/null +++ b/exec-agent/src/auth/cache_test.go @@ -0,0 +1,56 @@ +// +// Copyright (c) 2012-2016 Codenvy, S.A. +// All rights reserved. This program and the accompanying materials +// are made available under the terms of the Eclipse Public License v1.0 +// which accompanies this distribution, and is available at +// http://www.eclipse.org/legal/epl-v10.html +// +// Contributors: +// Codenvy, S.A. - initial API and implementation +// + +package auth + +import ( + "testing" + "time" +) + +func TestTokenCache(t *testing.T) { + cache := &TokenCache{ + tokens: make(map[string]time.Time), + expireTimeout: 0, + } + + token := "my-token" + + cache.Put(token) + if !cache.Contains(token) { + t.Fatalf("Cache must contain token %s", token) + } + + cache.Expire(token) + if cache.Contains(token) { + t.Fatalf("Cache must not contain token %s", token) + } +} + +func TestExpiresTokensCreatedBeforeGivenPointOfTime(t *testing.T) { + cache := &TokenCache{ + tokens: make(map[string]time.Time), + expireTimeout: 0, + } + + cache.Put("token1") + afterToken1Put := time.Now() + cache.Put("token2") + + cache.expireAllBefore(afterToken1Put) + + if cache.Contains("token1") { + t.Fatal("Cache must not contain token1") + } + if !cache.Contains("token2") { + t.Fatal("Cache must contain token2") + } +} diff --git a/exec-agent/src/main.go b/exec-agent/src/main.go new file mode 100644 index 00000000000..144bdfb15e2 --- /dev/null +++ b/exec-agent/src/main.go @@ -0,0 +1,300 @@ +// +// Copyright (c) 2012-2016 Codenvy, S.A. +// All rights reserved. This program and the accompanying materials +// are made available under the terms of the Eclipse Public License v1.0 +// which accompanies this distribution, and is available at +// http://www.eclipse.org/legal/epl-v10.html +// +// Contributors: +// Codenvy, S.A. - initial API and implementation +// + +package main + +import ( + "flag" + "fmt" + "log" + "net/http" + "net/url" + "os" + "time" + + "regexp" + + "github.com/eclipse/che/exec-agent/auth" + "github.com/eclipse/che/exec-agent/process" + "github.com/eclipse/che/exec-agent/rest" + "github.com/eclipse/che/exec-agent/rpc" + "github.com/eclipse/che/exec-agent/term" + "github.com/julienschmidt/httprouter" +) + +var ( + AppHttpRoutes = []rest.RoutesGroup{ + process.HttpRoutes, + rpc.HttpRoutes, + term.HttpRoutes, + } + + AppOpRoutes = []rpc.RoutesGroup{ + process.RpcRoutes, + } + + serverAddress string + staticDir string + basePath string + apiEndpoint string + + authEnabled bool + tokensExpirationTimeoutInMinutes uint + + processCleanupThresholdInMinutes int + processCleanupPeriodInMinutes int +) + +func init() { + // server configuration + flag.StringVar( + &serverAddress, + "addr", + ":9000", + "IP:PORT or :PORT the address to start the server on", + ) + flag.StringVar( + &staticDir, + "static", + "./static/", + "path to the directory where static content is located", + ) + flag.StringVar( + &basePath, + "path", + "", + `the base path for all the rpc & rest routes, so route paths are treated not + as 'server_address + route_path' but 'server_address + path + route_path'. + For example for the server address 'localhost:9000', route path '/connect' and + configured path '/api/' exec-agent server will serve the following route: + 'localhost:9000/api/connect'. + Regexp syntax is supported`, + ) + + // terminal configuration + flag.StringVar( + &term.Cmd, + "cmd", + "/bin/bash", + "command to execute on slave side of the pty", + ) + + // workspace master server configuration + flag.StringVar( + &apiEndpoint, + "api-endpoint", + os.Getenv("CHE_API"), + `api-endpoint used by exec-agent modules(such as activity checker or authentication) + to request workspace master. By default the value from 'CHE_API' environment variable is used`, + ) + + // auth configuration + flag.BoolVar( + &authEnabled, + "enable-auth", + false, + "whether authenicate requests on workspace master before allowing them to proceed", + ) + flag.UintVar( + &tokensExpirationTimeoutInMinutes, + "tokens-expiration-timeout", + auth.DefaultTokensExpirationTimeoutInMinutes, + "how much time machine tokens stay in cache(if auth is enabled)", + ) + + // terminal configuration + flag.BoolVar( + &term.ActivityTrackingEnabled, + "enable-activity-tracking", + false, + "whether workspace master will be notified about terminal activity", + ) + + // process executor configuration + flag.IntVar( + &processCleanupPeriodInMinutes, + "process-cleanup-period", + -1, + "how often processs cleanup job will be executed(in minutes)", + ) + flag.IntVar(&processCleanupThresholdInMinutes, + "process-cleanup-threshold", + -1, + `how much time will dead and unused process stay(in minutes), + if -1 passed then processes won't be cleaned at all. Please note that the time + of real cleanup is between configured threshold and threshold + process-cleanup-period.`, + ) + curDir, _ := os.Getwd() + curDir += string(os.PathSeparator) + "logs" + flag.StringVar( + &process.LogsDir, + "logs-dir", + curDir, + "base directory for process logs", + ) +} + +func main() { + flag.Parse() + + log.SetOutput(os.Stdout) + + // print configuration + fmt.Println("Exec-agent configuration") + fmt.Println(" Server") + fmt.Printf(" - Address: %s\n", serverAddress) + fmt.Printf(" - Static content: %s\n", staticDir) + fmt.Printf(" - Base path: '%s'\n", basePath) + fmt.Println(" Terminal") + fmt.Printf(" - Slave command: '%s'\n", term.Cmd) + fmt.Printf(" - Activity tracking enabled: %t\n", term.ActivityTrackingEnabled) + if authEnabled { + fmt.Println(" Authentication") + fmt.Printf(" - Enabled: %t\n", authEnabled) + fmt.Printf(" - Tokens expiration timeout: %dm\n", tokensExpirationTimeoutInMinutes) + } + fmt.Println(" Process executor") + fmt.Printf(" - Logs dir: %s\n", process.LogsDir) + if processCleanupPeriodInMinutes > 0 { + fmt.Printf(" - Cleanup job period: %dm\n", processCleanupPeriodInMinutes) + fmt.Printf(" - Not used & dead processes stay for: %dm\n", processCleanupThresholdInMinutes) + } + if authEnabled || term.ActivityTrackingEnabled { + fmt.Println(" Workspace master server") + fmt.Printf(" - API endpoint: %s\n", apiEndpoint) + } + fmt.Println() + + term.ApiEndpoint = apiEndpoint + + // process configuration + if err := os.RemoveAll(process.LogsDir); err != nil { + log.Fatal(err) + } + + if processCleanupPeriodInMinutes > 0 { + if processCleanupThresholdInMinutes < 0 { + log.Fatal("Expected process cleanup threshold to be non negative value") + } + cleaner := process.NewCleaner(processCleanupPeriodInMinutes, processCleanupThresholdInMinutes) + cleaner.CleanPeriodically() + } + + // terminal configuration + if term.ActivityTrackingEnabled { + go term.Activity.StartTracking() + } + + // register routes and http handlers + router := httprouter.New() + router.NotFound = http.FileServer(http.Dir(staticDir)) + + fmt.Print("ā‡© Registered HttpRoutes:\n\n") + for _, routesGroup := range AppHttpRoutes { + fmt.Printf("%s:\n", routesGroup.Name) + for _, route := range routesGroup.Items { + router.Handle( + route.Method, + route.Path, + toHandle(route.HandleFunc), + ) + fmt.Printf("āœ“ %s\n", &route) + } + fmt.Println() + } + + fmt.Print("\nā‡© Registered RpcRoutes:\n\n") + for _, routesGroup := range AppOpRoutes { + fmt.Printf("%s:\n", routesGroup.Name) + for _, route := range routesGroup.Items { + fmt.Printf("āœ“ %s\n", route.Method) + rpc.RegisterRoute(route) + } + } + + var handler http.Handler = router + + // required authentication for all the requests, if it is configured + if authEnabled { + cache := auth.NewCache(time.Minute*time.Duration(tokensExpirationTimeoutInMinutes), time.Minute*5) + + handler = auth.Handler{ + Delegate: handler, + ApiEndpoint: apiEndpoint, + Cache: cache, + UnauthorizedHandler: func(w http.ResponseWriter, req *http.Request) { + dropChannelsWithExpiredToken(req.URL.Query().Get("token")) + http.Error(w, "Unauthorized", http.StatusUnauthorized) + }, + } + } + + // cut base path on requests, if it is configured + if basePath != "" { + if rx, err := regexp.Compile(basePath); err == nil { + handler = basePathChopper{rx, handler} + } else { + log.Fatal(err) + } + } + + http.Handle("/", handler) + + server := &http.Server{ + Handler: handler, + Addr: serverAddress, + WriteTimeout: 10 * time.Second, + ReadTimeout: 10 * time.Second, + } + log.Fatal(server.ListenAndServe()) +} + +func dropChannelsWithExpiredToken(token string) { + for _, c := range rpc.GetChannels() { + u, err := url.ParseRequestURI(c.RequestURI) + if err != nil { + log.Printf("Couldn't parse the RequestURI '%s' of channel '%s'", c.RequestURI, c.Id) + } else if u.Query().Get("token") == token { + log.Printf("Token for channel '%s' is expired, trying to drop the channel", c.Id) + rpc.DropChannel(c.Id) + } + } +} + +type routerParamsAdapter struct { + params httprouter.Params +} + +func (pa routerParamsAdapter) Get(param string) string { + return pa.params.ByName(param) +} + +func toHandle(f rest.HttpRouteHandlerFunc) httprouter.Handle { + return func(w http.ResponseWriter, r *http.Request, p httprouter.Params) { + if err := f(w, r, routerParamsAdapter{params: p}); err != nil { + rest.WriteError(w, err) + } + } +} + +type basePathChopper struct { + pattern *regexp.Regexp + delegate http.Handler +} + +func (c basePathChopper) ServeHTTP(w http.ResponseWriter, r *http.Request) { + // if request path starts with given base path + if idx := c.pattern.FindStringSubmatchIndex(r.URL.Path); len(idx) != 0 && idx[0] == 0 { + r.URL.Path = r.URL.Path[idx[1]:] + r.RequestURI = r.RequestURI[idx[1]:] + } + c.delegate.ServeHTTP(w, r) +} diff --git a/exec-agent/src/process/common.go b/exec-agent/src/process/common.go new file mode 100644 index 00000000000..7ef333e27ef --- /dev/null +++ b/exec-agent/src/process/common.go @@ -0,0 +1,79 @@ +// +// Copyright (c) 2012-2016 Codenvy, S.A. +// All rights reserved. This program and the accompanying materials +// are made available under the terms of the Eclipse Public License v1.0 +// which accompanies this distribution, and is available at +// http://www.eclipse.org/legal/epl-v10.html +// +// Contributors: +// Codenvy, S.A. - initial API and implementation +// + +package process + +import ( + "errors" + "strconv" + "strings" + "time" +) + +const ( + DefaultLogsPerPageLimit = 50 +) + +func maskFromTypes(types string) uint64 { + var mask uint64 + for _, t := range strings.Split(types, ",") { + switch strings.ToLower(strings.TrimSpace(t)) { + case "stderr": + mask |= StderrBit + case "stdout": + mask |= StdoutBit + case "process_status": + mask |= ProcessStatusBit + } + } + return mask +} + +func parseTypes(types string) uint64 { + var mask uint64 = DefaultMask + if types != "" { + mask = maskFromTypes(types) + } + return mask +} + +// Checks whether pid is valid and converts it to the uint64 +func parsePid(strPid string) (uint64, error) { + intPid, err := strconv.Atoi(strPid) + if err != nil { + return 0, errors.New("Pid value must be unsigned integer") + } + if intPid <= 0 { + return 0, errors.New("Pid value must be unsigned integer") + } + return uint64(intPid), nil +} + +// Checks whether command is valid +func checkCommand(command *Command) error { + if command.Name == "" { + return errors.New("Command name required") + } + if command.CommandLine == "" { + return errors.New("Command line required") + } + return nil +} + +// If time string is empty, then default time is returned +// If time string is invalid, then appropriate error is returned +// If time string is valid then parsed time is returned +func parseTime(timeStr string, defTime time.Time) (time.Time, error) { + if timeStr == "" { + return defTime, nil + } + return time.Parse(DateTimeFormat, timeStr) +} diff --git a/exec-agent/src/process/events.go b/exec-agent/src/process/events.go new file mode 100644 index 00000000000..8bf1e1cb981 --- /dev/null +++ b/exec-agent/src/process/events.go @@ -0,0 +1,72 @@ +// +// Copyright (c) 2012-2016 Codenvy, S.A. +// All rights reserved. This program and the accompanying materials +// are made available under the terms of the Eclipse Public License v1.0 +// which accompanies this distribution, and is available at +// http://www.eclipse.org/legal/epl-v10.html +// +// Contributors: +// Codenvy, S.A. - initial API and implementation +// + +package process + +import ( + "github.com/eclipse/che/exec-agent/rpc" + "time" +) + +const ( + StartedEventType = "process_started" + DiedEventType = "process_died" + StdoutEventType = "process_stdout" + StderrEventType = "process_stderr" +) + +type ProcessStatusEventBody struct { + rpc.Timed + Pid uint64 `json:"pid"` + NativePid int `json:"nativePid"` + Name string `json:"name"` + CommandLine string `json:"commandLine"` +} + +type ProcessOutputEventBody struct { + rpc.Timed + Pid uint64 `json:"pid"` + Text string `json:"text"` +} + +func newStderrEvent(pid uint64, text string, when time.Time) *rpc.Event { + return rpc.NewEvent(StderrEventType, &ProcessOutputEventBody{ + Timed: rpc.Timed{Time: when}, + Pid: pid, + Text: text, + }) +} + +func newStdoutEvent(pid uint64, text string, when time.Time) *rpc.Event { + return rpc.NewEvent(StdoutEventType, &ProcessOutputEventBody{ + Timed: rpc.Timed{Time: when}, + Pid: pid, + Text: text, + }) +} + +func newStatusEvent(mp MachineProcess, status string) *rpc.Event { + return rpc.NewEvent(status, &ProcessStatusEventBody{ + Timed: rpc.Timed{Time: time.Now()}, + Pid: mp.Pid, + NativePid: mp.NativePid, + Name: mp.Name, + CommandLine: mp.CommandLine, + }) +} + +func newStartedEvent(mp MachineProcess) *rpc.Event { + return newStatusEvent(mp, StartedEventType) +} + +func newDiedEvent(mp MachineProcess) *rpc.Event { + return newStatusEvent(mp, DiedEventType) +} diff --git a/exec-agent/src/process/file_logger.go b/exec-agent/src/process/file_logger.go new file mode 100644 index 00000000000..fc2be5f779a --- /dev/null +++ b/exec-agent/src/process/file_logger.go @@ -0,0 +1,84 @@ +// +// Copyright (c) 2012-2016 Codenvy, S.A. +// All rights reserved. This program and the accompanying materials +// are made available under the terms of the Eclipse Public License v1.0 +// which accompanies this distribution, and is available at +// http://www.eclipse.org/legal/epl-v10.html +// +// Contributors: +// Codenvy, S.A. - initial API and implementation +// + +package process + +import ( + "bytes" + "encoding/json" + "log" + "os" + "sync" + "time" +) + +const ( + flushThreshold = 8192 +) + +type FileLogger struct { + sync.RWMutex + filename string + buffer *bytes.Buffer + encoder *json.Encoder +} + +func NewLogger(filename string) (*FileLogger, error) { + fl := &FileLogger{filename: filename} + fl.buffer = &bytes.Buffer{} + fl.encoder = json.NewEncoder(fl.buffer) + + // Trying to create logs file + file, err := os.Create(filename) + if err != nil { + return nil, err + } + defer file.Close() + + return fl, nil +} + +func (fl *FileLogger) Flush() { + fl.Lock() + fl.doFlush() + fl.Unlock() +} + +func (fl *FileLogger) OnStdout(line string, time time.Time) { + fl.writeLine(&LogMessage{StdoutKind, time, line}) +} + +func (fl *FileLogger) OnStderr(line string, time time.Time) { + fl.writeLine(&LogMessage{StderrKind, time, line}) +} + +func (fl *FileLogger) Close() { + fl.Flush() +} + +func (fl *FileLogger) writeLine(message *LogMessage) { + fl.Lock() + fl.encoder.Encode(message) + if flushThreshold < fl.buffer.Len() { + fl.doFlush() + } + fl.Unlock() +} + +func (fl *FileLogger) doFlush() { + f, err := os.OpenFile(fl.filename, os.O_WRONLY|os.O_APPEND|os.O_CREATE, 0666) + if err != nil { + log.Printf("Couldn't open file '%s' for flushing the buffer. %s \n", fl.filename, err.Error()) + } else { + defer f.Close() + fl.buffer.WriteTo(f) + } +} diff --git a/exec-agent/src/process/file_logger_test.go b/exec-agent/src/process/file_logger_test.go new file mode 100644 index 00000000000..0970a60d16e --- /dev/null +++ b/exec-agent/src/process/file_logger_test.go @@ -0,0 +1,125 @@ +// +// Copyright (c) 2012-2016 Codenvy, S.A. +// All rights reserved. This program and the accompanying materials +// are made available under the terms of the Eclipse Public License v1.0 +// which accompanies this distribution, and is available at +// http://www.eclipse.org/legal/epl-v10.html +// +// Contributors: +// Codenvy, S.A. - initial API and implementation +// + +package process_test + +import ( + "encoding/json" + "github.com/eclipse/che/exec-agent/process" + "io/ioutil" + "math/rand" + "os" + "testing" + "time" +) + +var alphabet = []byte("abcdefgh123456789") + +func TestFileLoggerCreatesFileWhenFileDoesNotExist(t *testing.T) { + filename := os.TempDir() + string(os.PathSeparator) + randomName(10) + defer os.Remove(filename) + + if _, err := os.Stat(filename); err == nil { + t.Fatalf("File '%s' already exists", filename) + } + + if _, err := process.NewLogger(filename); err != nil { + t.Fatal(err) + } + + if _, err := os.Stat(filename); os.IsNotExist(err) { + t.Fatalf("Expected file '%s' was created, but it wasn't", filename) + } +} + +func TestFileLoggerTruncatesFileIfFileExistsOnCreate(t *testing.T) { + filename := os.TempDir() + string(os.PathSeparator) + randomName(10) + defer os.Remove(filename) + + if _, err := os.Create(filename); err != nil { + t.Fatal(err) + } + if err := ioutil.WriteFile(filename, []byte("file-content"), 0666); err != nil { + t.Fatal(err) + } + + if _, err := process.NewLogger(filename); err != nil { + t.Fatal(err) + } + + content, err := ioutil.ReadFile(filename) + if err != nil { + t.Fatal(err) + } + if len(content) != 0 { + t.Errorf("Expected file '%s' content is empty", filename) + } +} + +func TestLogsAreFlushedOnClose(t *testing.T) { + filename := os.TempDir() + string(os.PathSeparator) + randomName(10) + defer os.Remove(filename) + + fl, err := process.NewLogger(filename) + if err != nil { + t.Fatal(err) + } + + // Write something to the log + now := time.Now() + fl.OnStdout("stdout", now) + fl.OnStderr("stderr", now) + fl.Close() + + // Read file content + f, err := os.Open(filename) + if err != nil { + t.Fatal(err) + } + + // Read log messages + stdout := process.LogMessage{} + stderr := process.LogMessage{} + decoder := json.NewDecoder(f) + if err := decoder.Decode(&stdout); err != nil { + t.Fatal(err) + } + if err := decoder.Decode(&stderr); err != nil { + t.Fatal(err) + } + + // Check logs are okay + expectedStdout := process.LogMessage{ + Kind: process.StdoutKind, + Time: now, + Text: "stdout", + } + if stdout != expectedStdout { + t.Fatalf("Expected %v but found %v", expectedStdout, stdout) + } + expectedStderr := process.LogMessage{ + Kind: process.StderrKind, + Time: now, + Text: "stderr", + } + if stdout != expectedStdout { + t.Fatalf("Expected %v but found %v", expectedStderr, stderr) + } +} + +func randomName(length int) string { + rand.Seed(time.Now().UnixNano()) + bytes := make([]byte, length) + for i := 0; i < length; i++ { + bytes[i] = alphabet[rand.Intn(len(alphabet))] + } + return string(bytes) +} diff --git a/exec-agent/src/process/logs_distributor.go b/exec-agent/src/process/logs_distributor.go new file mode 100644 index 00000000000..9dd1c2e7909 --- /dev/null +++ b/exec-agent/src/process/logs_distributor.go @@ -0,0 +1,63 @@ +// +// Copyright (c) 2012-2016 Codenvy, S.A. +// All rights reserved. This program and the accompanying materials +// are made available under the terms of the Eclipse Public License v1.0 +// which accompanies this distribution, and is available at +// http://www.eclipse.org/legal/epl-v10.html +// +// Contributors: +// Codenvy, S.A. - initial API and implementation +// + +package process + +import ( + "errors" + "fmt" + "os" +) + +const ( + DefaultMaxDirsCount = 16 +) + +// Distributes the logs between different directories +type LogsDistributor interface { + + // The implementor must guarantee that returned file name + // is always the same for the same pid. + // Returns an error if it is impossible to create hierarchy of + // logs file parent folders, otherwise returns file path + DirForPid(baseDir string, pid uint64) (string, error) +} + +type DefaultLogsDistributor struct { + MaxDirsCount uint +} + +func NewLogsDistributor() LogsDistributor { + return &DefaultLogsDistributor{ + MaxDirsCount: DefaultMaxDirsCount, + } +} + +func (ld *DefaultLogsDistributor) DirForPid(baseDir string, pid uint64) (string, error) { + // directories from 1 to maxDirsCount inclusive + subDirName := (pid % uint64(ld.MaxDirsCount)) + + // {baseLogsDir}/{subDirName} + pidLogsDir := fmt.Sprintf("%s%c%d", baseDir, os.PathSeparator, subDirName) + + // Create subdirectory + if info, err := os.Stat(pidLogsDir); os.IsNotExist(err) { + if err := os.MkdirAll(pidLogsDir, os.ModePerm); err != nil { + return "", err + } + } else if err != nil { + return "", err + } else if !info.IsDir() { + m := fmt.Sprintf("Couldn't create a directory '%s', the name is taken by file", pidLogsDir) + return "", errors.New(m) + } + return pidLogsDir, nil +} diff --git a/exec-agent/src/process/logs_distributor_test.go b/exec-agent/src/process/logs_distributor_test.go new file mode 100644 index 00000000000..ccfb39d0ca7 --- /dev/null +++ b/exec-agent/src/process/logs_distributor_test.go @@ -0,0 +1,72 @@ +// +// Copyright (c) 2012-2016 Codenvy, S.A. +// All rights reserved. This program and the accompanying materials +// are made available under the terms of the Eclipse Public License v1.0 +// which accompanies this distribution, and is available at +// http://www.eclipse.org/legal/epl-v10.html +// +// Contributors: +// Codenvy, S.A. - initial API and implementation +// + +package process_test + +import ( + "fmt" + "github.com/eclipse/che/exec-agent/process" + "io/ioutil" + "os" + "testing" +) + +func TestLogsDistributorCreatesSubdirectories(t *testing.T) { + baseDir := os.TempDir() + string(os.PathSeparator) + randomName(10) + defer os.RemoveAll(baseDir) + + distributor := process.DefaultLogsDistributor{ + MaxDirsCount: 4, + } + + dir, err := distributor.DirForPid(baseDir, 1) + if err != nil { + t.Fatal(err) + } + + if _, err := os.Stat(dir); os.IsNotExist(err) { + t.Fatal("Expected that logs file subdirectory was created") + } else if err != nil { + t.Fatal(err) + } +} + +func TestLogsDistribution(t *testing.T) { + baseDir := os.TempDir() + string(os.PathSeparator) + randomName(10) + defer os.RemoveAll(baseDir) + + distributor := process.DefaultLogsDistributor{ + MaxDirsCount: 4, + } + + // Those files should be evenly distributed in 4 directories + for pid := 1; pid <= 16; pid++ { + dir, err := distributor.DirForPid(baseDir, uint64(pid)) + if err != nil { + t.Fatal(err) + } + filename := fmt.Sprintf("%s%cpid-%d", dir, os.PathSeparator, pid) + if _, err := os.Create(filename); err != nil { + t.Fatal(err) + } + } + + for i := 0; i < 4; i++ { + dir := fmt.Sprintf("%s%c%d", baseDir, os.PathSeparator, i) + fi, err := ioutil.ReadDir(dir) + if err != nil { + t.Fatal(err) + } + if len(fi) != 4 { + t.Fatalf("Expected directory '%s' to contain 4 files", dir) + } + } +} diff --git a/exec-agent/src/process/logs_reader.go b/exec-agent/src/process/logs_reader.go new file mode 100644 index 00000000000..73065461649 --- /dev/null +++ b/exec-agent/src/process/logs_reader.go @@ -0,0 +1,87 @@ +// +// Copyright (c) 2012-2016 Codenvy, S.A. +// All rights reserved. This program and the accompanying materials +// are made available under the terms of the Eclipse Public License v1.0 +// which accompanies this distribution, and is available at +// http://www.eclipse.org/legal/epl-v10.html +// +// Contributors: +// Codenvy, S.A. - initial API and implementation +// + +package process + +import ( + "bufio" + "encoding/json" + "io" + "os" + "time" +) + +type LogsReader struct { + filename string + readFrom *time.Time + readTill *time.Time +} + +func NewLogsReader(filename string) *LogsReader { + return &LogsReader{filename: filename} +} + +// Skip all the logs before the given time. +// If the log message appeared at the given time, it won't be skipped. +func (lr *LogsReader) From(time time.Time) *LogsReader { + lr.readFrom = &time + return lr +} + +// Read logs which appeared before and right at a given time +func (lr *LogsReader) Till(time time.Time) *LogsReader { + lr.readTill = &time + return lr +} + +// Reads logs between [from, till] inclusive. +// Returns an error if logs file is missing, or +// decoding of file content failed. +// If no logs matched time frame, an empty slice will be returned. +func (lr *LogsReader) ReadLogs() ([]*LogMessage, error) { + // Open logs file for reading logs + logsFile, err := os.Open(lr.filename) + if err != nil { + return nil, err + } + defer logsFile.Close() + + from := time.Time{} + if lr.readFrom != nil { + from = *lr.readFrom + } + till := time.Now() + if lr.readTill != nil { + till = *lr.readTill + } + + // Read logs + logs := []*LogMessage{} + decoder := json.NewDecoder(bufio.NewReader(logsFile)) + for { + message := &LogMessage{} + err = decoder.Decode(message) + if err != nil { + if err == io.EOF { + break + } + return nil, err + } + if message.Time.Before(from) { + continue + } + if message.Time.After(till) { + break + } + logs = append(logs, message) + } + return logs, nil +} diff --git a/exec-agent/src/process/logs_reader_test.go b/exec-agent/src/process/logs_reader_test.go new file mode 100644 index 00000000000..1ee7cb4ca94 --- /dev/null +++ b/exec-agent/src/process/logs_reader_test.go @@ -0,0 +1,60 @@ +// +// Copyright (c) 2012-2016 Codenvy, S.A. +// All rights reserved. This program and the accompanying materials +// are made available under the terms of the Eclipse Public License v1.0 +// which accompanies this distribution, and is available at +// http://www.eclipse.org/legal/epl-v10.html +// +// Contributors: +// Codenvy, S.A. - initial API and implementation +// + +package process_test + +import ( + "github.com/eclipse/che/exec-agent/process" + "os" + "testing" + "time" +) + +func TestReadLogs(t *testing.T) { + filename := os.TempDir() + string(os.PathSeparator) + randomName(10) + defer os.Remove(filename) + + fl, err := process.NewLogger(filename) + if err != nil { + t.Fatal(err) + } + + // Write something to the log + now := time.Now() + fl.OnStdout("line1", now.Add(time.Second)) + fl.OnStdout("line2", now.Add(time.Second*2)) + fl.OnStdout("line3", now.Add(time.Second*3)) + fl.OnStderr("line4", now.Add(time.Second*4)) + fl.OnStderr("line5", now.Add(time.Second*5)) + fl.Close() + + // Read logs [2, 4] + logs, err := + process.NewLogsReader(filename). + From(now.Add(time.Second * 2)). + Till(now.Add(time.Second * 4)). + ReadLogs() + if err != nil { + t.Fatal(err) + } + + // Check everything is okay + expected := []process.LogMessage{ + {Kind: process.StdoutKind, Time: now.Add(time.Second * 2), Text: "line2"}, + {Kind: process.StdoutKind, Time: now.Add(time.Second * 3), Text: "line3"}, + {Kind: process.StderrKind, Time: now.Add(time.Second * 4), Text: "line4"}, + } + for i := 0; i < len(logs); i++ { + if *logs[i] != expected[i] { + t.Fatalf("Expected: '%v' Found '%v'", expected[i], *logs[i]) + } + } +} diff --git a/exec-agent/src/process/process.go b/exec-agent/src/process/process.go new file mode 100644 index 00000000000..25075916bae --- /dev/null +++ b/exec-agent/src/process/process.go @@ -0,0 +1,483 @@ +// +// Copyright (c) 2012-2016 Codenvy, S.A. +// All rights reserved. This program and the accompanying materials +// are made available under the terms of the Eclipse Public License v1.0 +// which accompanies this distribution, and is available at +// http://www.eclipse.org/legal/epl-v10.html +// +// Contributors: +// Codenvy, S.A. - initial API and implementation +// + +package process + +import ( + "errors" + "fmt" + "os" + "os/exec" + "sync" + "sync/atomic" + "syscall" + "time" + + "github.com/eclipse/che/exec-agent/rpc" +) + +const ( + StdoutBit = 1 << iota + StderrBit = 1 << iota + ProcessStatusBit = 1 << iota + DefaultMask = StderrBit | StdoutBit | ProcessStatusBit + + DateTimeFormat = time.RFC3339Nano + + StdoutKind = "STDOUT" + StderrKind = "STDERR" +) + +var ( + prevPid uint64 = 0 + processes = &processesMap{items: make(map[uint64]*MachineProcess)} + logsDist = NewLogsDistributor() + LogsDir string +) + +type Command struct { + Name string `json:"name"` + CommandLine string `json:"commandLine"` + Type string `json:"type"` +} + +// Defines machine process model +type MachineProcess struct { + + // The virtual id of the process, it is guaranteed that pid + // is always unique, while NativePid may occur twice or more(when including dead processes) + Pid uint64 `json:"pid"` + + // The name of the process, it is equal to the Command.Name which this process created from. + // It doesn't have to be unique, at least machine agent doesn't need such constraint, + // as pid is used for identifying process + Name string `json:"name"` + + // The command line executed by this process. + // It is equal to the Command.CommandLine which this process created from + CommandLine string `json:"commandLine"` + + // The type of the command line, this field is rather useful meta + // information than something used for functioning. It is equal + // to the Command.Type which this process created from + Type string `json:"type"` + + // Whether this process is alive or dead + Alive bool `json:"alive"` + + // The native(OS) pid, it is unique per alive processes, + // but those which are not alive, may have the same NativePid + NativePid int `json:"nativePid"` + + // Process log filename + logfileName string + + // Command executed by this process. + // If process is not alive then the command value is set to nil + command *exec.Cmd + + // Stdout/stderr pumper. + // If process is not alive then the pumper value is set to nil + pumper *LogsPumper + + // Process subscribers, all the outgoing events are go through those subscribers. + // If process is not alive then the subscribers value is set to nil + subs []*Subscriber + + // Process file logger + fileLogger *FileLogger + + // Process mutex should be used to sync process data + // or block on process related operations such as events publications + mutex sync.RWMutex + + // When the process was last time used by client + lastUsed time.Time + lastUsedLock sync.RWMutex + + // Called once before any of process events is published + // and after process is started + beforeEventsHook func(process MachineProcess) +} + +type Subscriber struct { + Id string + Mask uint64 + Channel chan *rpc.Event +} + +type LogMessage struct { + Kind string `json:"kind"` + Time time.Time `json:"time"` + Text string `json:"text"` +} + +type NoProcessError struct { + error + Pid uint64 +} + +type NotAliveError struct { + error + Pid uint64 +} + +// Lockable map for storing processes +type processesMap struct { + sync.RWMutex + items map[uint64]*MachineProcess +} + +func Start(process MachineProcess) (MachineProcess, error) { + // wrap command to be able to kill child processes see https://github.com/golang/go/issues/8854 + cmd := exec.Command("setsid", "sh", "-c", process.CommandLine) + + // getting stdout pipe + stdout, err := cmd.StdoutPipe() + if err != nil { + return process, err + } + + // getting stderr pipe + stderr, err := cmd.StderrPipe() + if err != nil { + return process, err + } + + // starting a new process + err = cmd.Start() + if err != nil { + return process, err + } + + // increment current pid & assign it to the value + pid := atomic.AddUint64(&prevPid, 1) + + // Figure out the place for logs file + dir, err := logsDist.DirForPid(LogsDir, pid) + if err != nil { + return process, err + } + filename := fmt.Sprintf("%s%cpid-%d", dir, os.PathSeparator, pid) + + fileLogger, err := NewLogger(filename) + if err != nil { + return process, err + } + + // save process + process.Pid = pid + process.Alive = true + process.NativePid = cmd.Process.Pid + process.command = cmd + process.pumper = NewPumper(stdout, stderr) + process.logfileName = filename + process.fileLogger = fileLogger + process.updateLastUsedTime() + + processes.Lock() + processes.items[pid] = &process + processes.Unlock() + + // register logs consumers + process.pumper.AddConsumer(fileLogger) + process.pumper.AddConsumer(&process) + + if process.beforeEventsHook != nil { + process.beforeEventsHook(process) + } + + // before pumping is started publish process_started event + startPublished := make(chan bool) + go func() { + process.notifySubs(newStartedEvent(process), ProcessStatusBit) + startPublished <- true + }() + + // start pumping after start event is published 'pumper.Pump' is blocking + go func() { + <-startPublished + process.pumper.Pump() + }() + + return process, nil +} + +// Gets process by pid. +// If process doesn't exist then error of type NoProcessError is returned. +func Get(pid uint64) (MachineProcess, error) { + p, ok := directGet(pid) + if ok { + return *p, nil + } + return MachineProcess{}, noProcess(pid) +} + +func GetProcesses(all bool) []MachineProcess { + processes.RLock() + defer processes.RUnlock() + + pArr := make([]MachineProcess, 0, len(processes.items)) + for _, p := range processes.items { + if all { + pArr = append(pArr, *p) + } else { + p.mutex.RLock() + if p.Alive { + pArr = append(pArr, *p) + } + p.mutex.RUnlock() + } + } + return pArr +} + +// Kills process by given pid. +// Returns an error when any error occurs during process kill. +// If process doesn't exist error of type NoProcessError is returned. +func Kill(pid uint64) error { + p, ok := directGet(pid) + if !ok { + return noProcess(pid) + } + if !p.Alive { + return notAlive(pid) + } + // workaround for killing child processes see https://github.com/golang/go/issues/8854 + return syscall.Kill(-p.NativePid, syscall.SIGKILL) +} + +// Reads process logs between [from, till] inclusive. +// Returns an error if any error occurs during logs reading. +// If process doesn't exist error of type NoProcessError is returned. +func ReadLogs(pid uint64, from time.Time, till time.Time) ([]*LogMessage, error) { + p, ok := directGet(pid) + if !ok { + return nil, noProcess(pid) + } + fl := p.fileLogger + if p.Alive { + fl.Flush() + } + return NewLogsReader(p.logfileName).From(from).Till(till).ReadLogs() +} + +// Reads all process logs. +// Returns an error if any error occurs during logs reading. +// If process doesn't exist error of type NoProcessError is returned. +func ReadAllLogs(pid uint64) ([]*LogMessage, error) { + return ReadLogs(pid, time.Time{}, time.Now()) +} + +// Unsubscribe subscriber with given id from process events. +// If process doesn't exist then error of type NoProcessError is returned. +func RemoveSubscriber(pid uint64, id string) error { + p, ok := directGet(pid) + if !ok { + return noProcess(pid) + } + if !p.Alive { + return notAlive(pid) + } + p.mutex.Lock() + defer p.mutex.Unlock() + for idx, sub := range p.subs { + if sub.Id == id { + p.subs = append(p.subs[0:idx], p.subs[idx+1:]...) + break + } + } + return nil +} + +// Subscribe to the process output. +// An error of type NoProcessError is returned when process +// with given pid doesn't exist, a regular error is returned +// if the process is dead or subscriber with such id already subscribed +// to the process output. +func AddSubscriber(pid uint64, subscriber Subscriber) error { + p, ok := directGet(pid) + if !ok { + return noProcess(pid) + } + p.mutex.Lock() + defer p.mutex.Unlock() + if !p.Alive && p.NativePid != 0 { + return errors.New("Can't subscribe to the events of dead process") + } + for _, sub := range p.subs { + if sub.Id == subscriber.Id { + return errors.New("Already subscribed") + } + } + p.subs = append(p.subs, &subscriber) + return nil +} + +// Adds a new process subscriber by reading all the logs between +// given 'after' and now and publishing them to the channel. +// Returns an error of type NoProcessError if process with given id doesn't exist, +// returns a regular error if process is alive an subscriber with such id +// already subscribed. +func RestoreSubscriber(pid uint64, subscriber Subscriber, after time.Time) error { + p, ok := directGet(pid) + if !ok { + return noProcess(pid) + } + p.mutex.Lock() + defer p.mutex.Unlock() + + // Read logs between after and now + logs, err := ReadLogs(pid, after, time.Now()) + if err != nil { + return err + } + + // If process is dead there is no need to subscribe to it + // as it is impossible to get it alive again, but it is still + // may be useful for client to get missed logs, that's why this + // function doesn't throw any errors in the case of dead process + if p.Alive { + for _, sub := range p.subs { + if sub.Id == subscriber.Id { + return errors.New("Already subscribed") + } + } + p.subs = append(p.subs, &subscriber) + } + + // Publish all the logs between (after, now] + for i := 0; i < len(logs); i++ { + message := logs[i] + if message.Time.After(after) { + if message.Kind == StdoutKind { + subscriber.Channel <- newStdoutEvent(p.Pid, message.Text, message.Time) + } else { + subscriber.Channel <- newStderrEvent(p.Pid, message.Text, message.Time) + } + } + } + + // Publish died event after logs are published and process is dead + if !p.Alive { + subscriber.Channel <- newDiedEvent(*p) + } + + return nil +} + +// Updates subscriber with given id. +// An error of type NoProcessError is returned when process +// with given pid doesn't exist, a regular error is returned +// if the process is dead. +func UpdateSubscriber(pid uint64, id string, newMask uint64) error { + p, ok := directGet(pid) + if !ok { + return noProcess(pid) + } + if !p.Alive { + return notAlive(pid) + } + p.mutex.Lock() + defer p.mutex.Unlock() + for _, sub := range p.subs { + if sub.Id == id { + sub.Mask = newMask + return nil + } + } + return errors.New(fmt.Sprintf("No subscriber with id '%s'", id)) +} + +func (process *MachineProcess) OnStdout(line string, time time.Time) { + process.notifySubs(newStdoutEvent(process.Pid, line, time), StdoutBit) +} + +func (process *MachineProcess) OnStderr(line string, time time.Time) { + process.notifySubs(newStderrEvent(process.Pid, line, time), StderrBit) +} + +func (mp *MachineProcess) Close() { + // Cleanup command resources + mp.command.Wait() + // Cleanup machine process resources before dead event is sent + mp.mutex.Lock() + mp.Alive = false + mp.command = nil + mp.pumper = nil + mp.mutex.Unlock() + + mp.notifySubs(newDiedEvent(*mp), ProcessStatusBit) + + mp.mutex.Lock() + mp.subs = nil + mp.mutex.Unlock() + + mp.updateLastUsedTime() +} + +func (p *MachineProcess) notifySubs(event *rpc.Event, typeBit uint64) { + p.mutex.RLock() + subs := p.subs + for _, subscriber := range subs { + // Check whether subscriber needs such kind of event and then try to notify it + if subscriber.Mask&typeBit == typeBit && !tryWrite(subscriber.Channel, event) { + // Impossible to write to the channel, remove the channel from the subscribers list. + // It may happen when writing to the closed channel + defer RemoveSubscriber(p.Pid, subscriber.Id) + } + } + p.mutex.RUnlock() +} + +func (mp *MachineProcess) updateLastUsedTime() { + mp.lastUsedLock.Lock() + mp.lastUsed = time.Now() + mp.lastUsedLock.Unlock() +} + +// Writes to a channel and returns true if write is successful, +// otherwise if write to the channel failed e.g. channel is closed then returns false +func tryWrite(eventsChan chan *rpc.Event, event *rpc.Event) (ok bool) { + defer func() { + if r := recover(); r != nil { + ok = false + } + }() + eventsChan <- event + return true +} + +func directGet(pid uint64) (*MachineProcess, bool) { + processes.RLock() + defer processes.RUnlock() + item, ok := processes.items[pid] + if ok { + item.updateLastUsedTime() + } + return item, ok +} + +// Returns an error indicating that process with given pid doesn't exist +func noProcess(pid uint64) *NoProcessError { + return &NoProcessError{ + error: errors.New(fmt.Sprintf("Process with id '%d' does not exist", pid)), + Pid: pid, + } +} + +// Returns an error indicating that process with given pid is not alive +func notAlive(pid uint64) *NotAliveError { + return &NotAliveError{ + error: errors.New(fmt.Sprintf("Process with id '%d' is not alive", pid)), + Pid: pid, + } +} diff --git a/exec-agent/src/process/process_builder.go b/exec-agent/src/process/process_builder.go new file mode 100644 index 00000000000..3d18110df41 --- /dev/null +++ b/exec-agent/src/process/process_builder.go @@ -0,0 +1,72 @@ +// +// Copyright (c) 2012-2016 Codenvy, S.A. +// All rights reserved. This program and the accompanying materials +// are made available under the terms of the Eclipse Public License v1.0 +// which accompanies this distribution, and is available at +// http://www.eclipse.org/legal/epl-v10.html +// +// Contributors: +// Codenvy, S.A. - initial API and implementation +// + +package process + +type ProcessBuilder struct { + command Command + beforeEventsHook func(p MachineProcess) + firstSubscriber *Subscriber +} + +func NewBuilder() *ProcessBuilder { + return &ProcessBuilder{} +} + +func (pb *ProcessBuilder) Cmd(command Command) *ProcessBuilder { + pb.command = command + return pb +} + +func (pb *ProcessBuilder) CmdLine(cmdLine string) *ProcessBuilder { + pb.command.CommandLine = cmdLine + return pb +} + +func (pb *ProcessBuilder) CmdType(cmdType string) *ProcessBuilder { + pb.command.Type = cmdType + return pb +} + +func (pb *ProcessBuilder) CmdName(cmdName string) *ProcessBuilder { + pb.command.Name = cmdName + return pb +} + +// Sets the hook which will be called once before +// process subscribers notified with any of the process events, +// and after process is started. +func (pb *ProcessBuilder) BeforeEventsHook(hook func(p MachineProcess)) *ProcessBuilder { + pb.beforeEventsHook = hook + return pb +} + +func (pb *ProcessBuilder) FirstSubscriber(subscriber Subscriber) *ProcessBuilder { + pb.firstSubscriber = &subscriber + return pb +} + +func (pb *ProcessBuilder) Build() MachineProcess { + p := MachineProcess{ + Name: pb.command.Name, + CommandLine: pb.command.CommandLine, + Type: pb.command.Type, + beforeEventsHook: pb.beforeEventsHook, + } + if pb.firstSubscriber != nil { + p.subs = []*Subscriber{pb.firstSubscriber} + } + return p +} + +func (pb *ProcessBuilder) Start() (MachineProcess, error) { + return Start(pb.Build()) +} diff --git a/exec-agent/src/process/process_cleaner.go b/exec-agent/src/process/process_cleaner.go new file mode 100644 index 00000000000..7068618fbc2 --- /dev/null +++ b/exec-agent/src/process/process_cleaner.go @@ -0,0 +1,56 @@ +// +// Copyright (c) 2012-2016 Codenvy, S.A. +// All rights reserved. This program and the accompanying materials +// are made available under the terms of the Eclipse Public License v1.0 +// which accompanies this distribution, and is available at +// http://www.eclipse.org/legal/epl-v10.html +// +// Contributors: +// Codenvy, S.A. - initial API and implementation +// + +package process + +import ( + "log" + "os" + "time" +) + +type Cleaner struct { + CleanupPeriod time.Duration + CleanupThreshold time.Duration +} + +func NewCleaner(period int, threshold int) *Cleaner { + return &Cleaner{ + time.Duration(period) * time.Minute, + time.Duration(threshold) * time.Minute, + } +} + +func (c *Cleaner) CleanPeriodically() { + ticker := time.NewTicker(c.CleanupPeriod) + defer ticker.Stop() + for range ticker.C { + c.CleanOnce() + } +} + +func (pc *Cleaner) CleanOnce() { + deadPoint := time.Now().Add(-pc.CleanupThreshold) + processes.Lock() + for _, mp := range processes.items { + mp.lastUsedLock.RLock() + if !mp.Alive && mp.lastUsed.Before(deadPoint) { + delete(processes.items, mp.Pid) + if err := os.Remove(mp.logfileName); err != nil { + if !os.IsNotExist(err) { + log.Printf("Couldn't remove process logs file, '%s'", mp.logfileName) + } + } + } + mp.lastUsedLock.RUnlock() + } + processes.Unlock() +} diff --git a/exec-agent/src/process/process_cleaner_test.go b/exec-agent/src/process/process_cleaner_test.go new file mode 100644 index 00000000000..42f8fc1d12a --- /dev/null +++ b/exec-agent/src/process/process_cleaner_test.go @@ -0,0 +1,60 @@ +// +// Copyright (c) 2012-2016 Codenvy, S.A. +// All rights reserved. This program and the accompanying materials +// are made available under the terms of the Eclipse Public License v1.0 +// which accompanies this distribution, and is available at +// http://www.eclipse.org/legal/epl-v10.html +// +// Contributors: +// Codenvy, S.A. - initial API and implementation +// + +package process_test + +import ( + "testing" + + "github.com/eclipse/che/exec-agent/process" + "time" +) + +func TestCleanWithZeroThreshold(t *testing.T) { + p := startAndWaitTestProcess(testCmd, t) + defer cleanupLogsDir() + + process.NewCleaner(0, 0).CleanOnce() + + _, err := process.Get(p.Pid) + if err == nil { + t.Fatal("Must not exist") + } + if _, ok := err.(*process.NoProcessError); !ok { + t.Fatal(err) + } +} + +func TestCleansOnlyUnusedProcesses(t *testing.T) { + p1 := startAndWaitTestProcess(testCmd, t) + p2 := startAndWaitTestProcess(testCmd, t) + + time.Sleep(500 * time.Millisecond) + + // use one of the processes, so it is used now + process.Get(p1.Pid) + + // cleanup immediately + (&process.Cleaner{CleanupPeriod: 0, CleanupThreshold: 500 * time.Millisecond}).CleanOnce() + + _, err1 := process.Get(p1.Pid) + _, err2 := process.Get(p2.Pid) + + // process 1 must be cleaned + if err1 != nil { + t.Fatalf("Expected process 2 to exist, but got an error: %s", err1.Error()) + } + + // process 2 must exist + if _, ok := err2.(*process.NoProcessError); !ok { + t.Fatal("Expected process 2 to be cleaned") + } +} diff --git a/exec-agent/src/process/process_test.go b/exec-agent/src/process/process_test.go new file mode 100644 index 00000000000..a535789922a --- /dev/null +++ b/exec-agent/src/process/process_test.go @@ -0,0 +1,255 @@ +// +// Copyright (c) 2012-2016 Codenvy, S.A. +// All rights reserved. This program and the accompanying materials +// are made available under the terms of the Eclipse Public License v1.0 +// which accompanies this distribution, and is available at +// http://www.eclipse.org/legal/epl-v10.html +// +// Contributors: +// Codenvy, S.A. - initial API and implementation +// + +package process_test + +import ( + "github.com/eclipse/che/exec-agent/process" + "github.com/eclipse/che/exec-agent/rpc" + "os" + "strings" + "testing" + "time" +) + +const ( + testCmd = "printf \"1\n2\n3\n4\n5\n6\n7\n8\n9\n10\"" +) + +func TestOneLineOutput(t *testing.T) { + defer cleanupLogsDir() + // create and start a process + p := startAndWaitTestProcess("echo test", t) + + logs, _ := process.ReadAllLogs(p.Pid) + + if len(logs) != 1 { + t.Fatalf("Expected logs size to be 1, but got %d", len(logs)) + } + + if logs[0].Text != "test" { + t.Fatalf("Expected to get 'test' output but got %s", logs[0].Text) + } +} + +func TestEmptyLinesOutput(t *testing.T) { + defer cleanupLogsDir() + p := startAndWaitTestProcess("printf \"\n\n\n\n\n\"", t) + + logs, _ := process.ReadAllLogs(p.Pid) + + if len(logs) != 5 { + t.Fatal("Expected logs to be 4 sized") + } + + for _, value := range logs { + if value.Text != "" { + t.Fatal("Expected all the logs to be empty files") + } + } +} + +func TestAddSubscriber(t *testing.T) { + process.LogsDir = TmpFile() + defer cleanupLogsDir() + + outputLines := []string{"1", "2", "3", "4", "5", "6", "7", "8", "9", "10"} + + // create and start a process + pb := process.NewBuilder(). + CmdName("test"). + CmdType("test"). + CmdLine("printf \"" + strings.Join(outputLines, "\n") + "\"") + + // add a new subscriber + eventsChan := make(chan *rpc.Event) + pb.FirstSubscriber(process.Subscriber{ + Id: "test", + Mask: process.DefaultMask, + Channel: eventsChan, + }) + + // start a new process + if _, err := pb.Start(); err != nil { + t.Fatal(err) + } + + // read all the process output events + done := make(chan bool) + var received []string + go func() { + event := <-eventsChan + for event.EventType != process.DiedEventType { + if event.EventType == process.StdoutEventType { + out := event.Body.(*process.ProcessOutputEventBody) + received = append(received, out.Text) + } + event = <-eventsChan + } + done <- true + }() + + // wait until process is done + <-done + + if len(outputLines) != len(received) { + t.Fatalf("Expected the same size but got %d != %d", len(outputLines), len(received)) + } + + for idx, value := range outputLines { + if value != received[idx] { + t.Fatalf("Expected %s but got %s", value, received[idx]) + } + } +} + +func TestRestoreSubscriberForDeadProcess(t *testing.T) { + process.LogsDir = TmpFile() + defer cleanupLogsDir() + beforeStart := time.Now() + p := startAndWaitTestProcess("echo test", t) + + // Read all the data from channel + channel := make(chan *rpc.Event) + done := make(chan bool) + var received []*rpc.Event + go func() { + statusReceived := false + timeoutReached := false + for !statusReceived && !timeoutReached { + select { + case v := <-channel: + received = append(received, v) + if v.EventType == process.DiedEventType { + statusReceived = true + } + case <-time.After(time.Second): + timeoutReached = true + } + } + done <- true + }() + + process.RestoreSubscriber(p.Pid, process.Subscriber{ + "test", + process.DefaultMask, + channel, + }, beforeStart) + + <-done + + if len(received) != 2 { + t.Fatalf("Expected to recieve 2 events but got %d", len(received)) + } + e1Type := received[0].EventType + e1Text := received[0].Body.(*process.ProcessOutputEventBody).Text + if received[0].EventType != process.StdoutEventType || e1Text != "test" { + t.Fatalf("Expected to receieve output event with text 'test', but got '%s' event with text %s", + e1Type, + e1Text) + } + if received[1].EventType != process.DiedEventType { + t.Fatal("Expected to get 'process_died' event") + } +} + +func TestMachineProcessIsNotAliveAfterItIsDead(t *testing.T) { + p := startAndWaitTestProcess(testCmd, t) + defer cleanupLogsDir() + if p.Alive { + t.Fatal("Process should not be alive") + } +} + +func TestItIsNotPossibleToAddSubscriberToDeadProcess(t *testing.T) { + p := startAndWaitTestProcess(testCmd, t) + defer cleanupLogsDir() + if err := process.AddSubscriber(p.Pid, process.Subscriber{}); err == nil { + t.Fatal("Should not be able to add subscriber") + } +} + +func TestReadProcessLogs(t *testing.T) { + p := startAndWaitTestProcess(testCmd, t) + defer cleanupLogsDir() + logs, err := process.ReadLogs(p.Pid, time.Time{}, time.Now()) + if err != nil { + t.Fatal(err) + } + expected := []string{"1", "2", "3", "4", "5", "6", "7", "8", "9", "10"} + + for idx := range expected { + if process.StdoutKind != logs[idx].Kind { + t.Fatalf("Expected log message kind to be '%s', while got '%s'", process.StdoutKind, logs[idx].Kind) + } + if expected[idx] != logs[idx].Text { + t.Fatalf("Expected log message to be '%s', but got '%s'", expected[idx], logs[idx].Text) + } + } +} + +func startAndWaitTestProcess(cmd string, t *testing.T) process.MachineProcess { + process.LogsDir = TmpFile() + events := make(chan *rpc.Event) + done := make(chan bool) + + // Create and start process + pb := process.NewBuilder(). + CmdName("test"). + CmdType("test"). + CmdLine(cmd). + FirstSubscriber(process.Subscriber{ + Id: "test", + Mask: process.DefaultMask, + Channel: events, + }) + + go func() { + statusReceived := false + timeoutReached := false + for !statusReceived && !timeoutReached { + select { + case event := <-events: + if event.EventType == process.DiedEventType { + statusReceived = true + } + case <-time.After(time.Second): + timeoutReached = true + } + } + done <- true + }() + + p, err := pb.Start() + if err != nil { + t.Fatal(err) + } + + // Wait until process is finished or timeout is reached + if ok := <-done; !ok { + t.Fatalf("Expected to receive %s process event", process.DiedEventType) + } + + // Check process state after it is finished + result, err := process.Get(p.Pid) + if err != nil { + t.Fatal(err) + } + return result +} + +func TmpFile() string { + return os.TempDir() + string(os.PathSeparator) + randomName(10) +} + +func cleanupLogsDir() { + os.RemoveAll(process.LogsDir) +} diff --git a/exec-agent/src/process/pumper.go b/exec-agent/src/process/pumper.go new file mode 100644 index 00000000000..86c01656ecd --- /dev/null +++ b/exec-agent/src/process/pumper.go @@ -0,0 +1,108 @@ +// +// Copyright (c) 2012-2016 Codenvy, S.A. +// All rights reserved. This program and the accompanying materials +// are made available under the terms of the Eclipse Public License v1.0 +// which accompanies this distribution, and is available at +// http://www.eclipse.org/legal/epl-v10.html +// +// Contributors: +// Codenvy, S.A. - initial API and implementation +// + +package process + +import ( + "bufio" + "io" + "log" + "sync" + "time" +) + +type acceptLine func(line string) + +// LogsPumper client consumes a message read by pumper +type LogsConsumer interface { + + // called on each line pumped from process stdout + OnStdout(line string, time time.Time) + + // called on each line pumped from process stderr + OnStderr(line string, time time.Time) + + // called when pumping is finished either by normal return or by error + Close() +} + +// Pumps lines from the stdout and stderr +type LogsPumper struct { + stdout io.Reader + stderr io.Reader + clients []LogsConsumer + waitGroup sync.WaitGroup +} + +func NewPumper(stdout io.Reader, stderr io.Reader) *LogsPumper { + return &LogsPumper{ + stdout: stdout, + stderr: stderr, + } +} + +func (pumper *LogsPumper) AddConsumer(consumer LogsConsumer) { + pumper.clients = append(pumper.clients, consumer) +} + +// Start 'pumping' logs from the stdout and stderr +// The method execution is synchronous and waits for +// both stderr and stdout to complete closing all the clients after +func (pumper *LogsPumper) Pump() { + pumper.waitGroup.Add(2) + + // reading from stdout & stderr + go pump(pumper.stdout, pumper.notifyStdout, &pumper.waitGroup) + go pump(pumper.stderr, pumper.notifyStderr, &pumper.waitGroup) + + // cleanup after pumping is complete + pumper.waitGroup.Wait() + pumper.notifyClose() +} + +func pump(r io.Reader, lineConsumer acceptLine, wg *sync.WaitGroup) { + defer wg.Done() + br := bufio.NewReader(r) + for { + line, err := br.ReadBytes('\n') + + if err != nil { + if err != io.EOF { + log.Println("Error pumping: " + err.Error()) + } else if len(line) != 0 { + lineConsumer(string(line)) + } + return + } + + lineConsumer(string(line[:len(line)-1])) + } +} + +func (pumper *LogsPumper) notifyStdout(line string) { + t := time.Now() + for _, client := range pumper.clients { + client.OnStdout(line, t) + } +} + +func (pumper *LogsPumper) notifyStderr(line string) { + t := time.Now() + for _, client := range pumper.clients { + client.OnStderr(line, t) + } +} + +func (pumper *LogsPumper) notifyClose() { + for _, client := range pumper.clients { + client.Close() + } +} diff --git a/exec-agent/src/process/rest_service.go b/exec-agent/src/process/rest_service.go new file mode 100644 index 00000000000..c2c4b497a06 --- /dev/null +++ b/exec-agent/src/process/rest_service.go @@ -0,0 +1,194 @@ +// +// Copyright (c) 2012-2016 Codenvy, S.A. +// All rights reserved. This program and the accompanying materials +// are made available under the terms of the Eclipse Public License v1.0 +// which accompanies this distribution, and is available at +// http://www.eclipse.org/legal/epl-v10.html +// +// Contributors: +// Codenvy, S.A. - initial API and implementation +// + +package process + +import ( + "errors" + "fmt" + "github.com/eclipse/che/exec-agent/rest" + "github.com/eclipse/che/exec-agent/rest/restutil" + "github.com/eclipse/che/exec-agent/rpc" + "io" + "math" + "net/http" + "strconv" + "strings" + "time" +) + +var HttpRoutes = rest.RoutesGroup{ + "Process Routes", + []rest.Route{ + { + "POST", + "Start Process", + "/process", + startProcessHF, + }, + { + "GET", + "Get Process", + "/process/:pid", + getProcessHF, + }, + { + "DELETE", + "Kill Process", + "/process/:pid", + killProcessHF, + }, + { + "GET", + "Get Process Logs", + "/process/:pid/logs", + getProcessLogsHF, + }, + { + "GET", + "Get Processes", + "/process", + getProcessesHF, + }, + }, +} + +func startProcessHF(w http.ResponseWriter, r *http.Request, p rest.Params) error { + command := Command{} + if err := restutil.ReadJson(r, &command); err != nil { + return err + } + if err := checkCommand(&command); err != nil { + return rest.BadRequest(err) + } + + // If channel is provided then check whether it is ready to be + // first process subscriber and use it if it is + var subscriber *Subscriber + channelId := r.URL.Query().Get("channel") + if channelId != "" { + channel, ok := rpc.GetChannel(channelId) + if !ok { + m := fmt.Sprintf("Channel with id '%s' doesn't exist. Process won't be started", channelId) + return rest.NotFound(errors.New(m)) + } + subscriber = &Subscriber{ + Id: channelId, + Mask: parseTypes(r.URL.Query().Get("types")), + Channel: channel.Events, + } + } + + pb := NewBuilder().Cmd(command) + + if subscriber != nil { + pb.FirstSubscriber(*subscriber) + } + + process, err := pb.Start() + if err != nil { + return err + } + return restutil.WriteJson(w, process) +} + +func getProcessHF(w http.ResponseWriter, r *http.Request, p rest.Params) error { + pid, err := parsePid(p.Get("pid")) + if err != nil { + return rest.BadRequest(err) + } + + process, err := Get(pid) + if err != nil { + return asHttpError(err) + } + return restutil.WriteJson(w, process) +} + +func killProcessHF(w http.ResponseWriter, r *http.Request, p rest.Params) error { + pid, err := parsePid(p.Get("pid")) + if err != nil { + return rest.BadRequest(err) + } + if err := Kill(pid); err != nil { + return asHttpError(err) + } + return nil +} + +func getProcessLogsHF(w http.ResponseWriter, r *http.Request, p rest.Params) error { + pid, err := parsePid(p.Get("pid")) + if err != nil { + return rest.BadRequest(err) + } + + // Parse 'from', if 'from' is not specified then read all the logs from the start + // if 'from' format is different from the DATE_TIME_FORMAT then return 400 + from, err := parseTime(r.URL.Query().Get("from"), time.Time{}) + if err != nil { + return rest.BadRequest(errors.New("Bad format of 'from', " + err.Error())) + } + + // Parse 'till', if 'till' is not specified then 'now' is used for it + // if 'till' format is different from the DATE_TIME_FORMAT then return 400 + till, err := parseTime(r.URL.Query().Get("till"), time.Now()) + if err != nil { + return rest.BadRequest(errors.New("Bad format of 'till', " + err.Error())) + } + + logs, err := ReadLogs(pid, from, till) + if err != nil { + return asHttpError(err) + } + + // limit logs from the latest to the earliest + // limit - how many the latest logs will be present + // skip - how many log lines should be skipped from the end + limit := restutil.IntQueryParam(r, "limit", DefaultLogsPerPageLimit) + skip := restutil.IntQueryParam(r, "skip", 0) + if limit < 1 { + return rest.BadRequest(errors.New("Required 'limit' to be > 0")) + } + if skip < 0 { + return rest.BadRequest(errors.New("Required 'skip' to be >= 0")) + } + len := len(logs) + fromIdx := int(math.Max(float64(len-limit-skip), 0)) + toIdx := len - int(math.Min(float64(skip), float64(len))) + + // Respond with an appropriate logs format, default json + format := r.URL.Query().Get("format") + switch strings.ToLower(format) { + case "text": + for _, item := range logs[fromIdx:toIdx] { + line := fmt.Sprintf("[%s] %s \t %s\n", item.Kind, item.Time.Format(DateTimeFormat), item.Text) + io.WriteString(w, line) + } + default: + return restutil.WriteJson(w, logs[fromIdx:toIdx]) + } + return nil +} + +func getProcessesHF(w http.ResponseWriter, r *http.Request, _ rest.Params) error { + all, err := strconv.ParseBool(r.URL.Query().Get("all")) + if err != nil { + all = false + } + return restutil.WriteJson(w, GetProcesses(all)) +} + +func asHttpError(err error) error { + if npErr, ok := err.(*NoProcessError); ok { + return rest.NotFound(npErr.error) + } + return err +} diff --git a/exec-agent/src/process/ws_service.go b/exec-agent/src/process/ws_service.go new file mode 100644 index 00000000000..a75ea8bd1d8 --- /dev/null +++ b/exec-agent/src/process/ws_service.go @@ -0,0 +1,348 @@ +// +// Copyright (c) 2012-2016 Codenvy, S.A. +// All rights reserved. This program and the accompanying materials +// are made available under the terms of the Eclipse Public License v1.0 +// which accompanies this distribution, and is available at +// http://www.eclipse.org/legal/epl-v10.html +// +// Contributors: +// Codenvy, S.A. - initial API and implementation +// + +package process + +import ( + "encoding/json" + "errors" + "github.com/eclipse/che/exec-agent/rpc" + "math" + "time" +) + +const ( + StartMethod = "process.start" + KillMethod = "process.kill" + SubscribeMethod = "process.subscribe" + UnsubscribeMethod = "process.unsubscribe" + UpdateSubscriberMethod = "process.updateSubscriber" + GetLogsMethod = "process.getLogs" + GetProcessMethod = "process.getProcess" + GetProcessesMethod = "process.getProcesses" + + NoSuchProcessErrorCode = -32000 + ProcessNotAliveErrorCode = -32001 +) + +var RpcRoutes = rpc.RoutesGroup{ + "Process Routes", + []rpc.Route{ + { + StartMethod, + func(body []byte) (interface{}, error) { + b := StartParams{} + err := json.Unmarshal(body, &b) + return b, err + }, + startProcessReqHF, + }, + { + KillMethod, + func(body []byte) (interface{}, error) { + b := KillParams{} + err := json.Unmarshal(body, &b) + return b, err + }, + killProcessReqHF, + }, + { + SubscribeMethod, + func(body []byte) (interface{}, error) { + b := SubscribeParams{} + err := json.Unmarshal(body, &b) + return b, err + }, + subscribeReqHF, + }, + { + UnsubscribeMethod, + func(body []byte) (interface{}, error) { + b := UnsubscribeParams{} + err := json.Unmarshal(body, &b) + return b, err + }, + unsubscribeReqHF, + }, + { + UpdateSubscriberMethod, + func(body []byte) (interface{}, error) { + b := UpdateSubscriberParams{} + err := json.Unmarshal(body, &b) + return b, err + }, + updateSubscriberReqHF, + }, + { + GetLogsMethod, + func(body []byte) (interface{}, error) { + b := GetLogsParams{} + err := json.Unmarshal(body, &b) + return b, err + }, + getProcessLogsReqHF, + }, + { + GetProcessMethod, + func(body []byte) (interface{}, error) { + b := GetProcessParams{} + err := json.Unmarshal(body, &b) + return b, err + }, + getProcessReqHF, + }, + { + GetProcessesMethod, + func(body []byte) (interface{}, error) { + b := GetProcessesParams{} + err := json.Unmarshal(body, &b) + return b, err + }, + getProcessesReqHF, + }, + }, +} + +type ProcessResult struct { + Pid uint64 `json:"pid"` + Text string `json:"text"` +} + +//-- process start +type StartParams struct { + Name string `json:"name"` + CommandLine string `json:"commandLine"` + Type string `json:"type"` + EventTypes string `json:"eventTypes"` +} + +func startProcessReqHF(params interface{}, t *rpc.Transmitter) error { + startParams := params.(StartParams) + command := Command{ + Name: startParams.Name, + CommandLine: startParams.CommandLine, + Type: startParams.Type, + } + if err := checkCommand(&command); err != nil { + return rpc.NewArgsError(err) + } + + _, err := NewBuilder(). + Cmd(command). + FirstSubscriber(Subscriber{ + Id: t.Channel.Id, + Mask: parseTypes(startParams.EventTypes), + Channel: t.Channel.Events, + }). + BeforeEventsHook(func(process MachineProcess) { + t.Send(process) + }). + Start() + return err +} + +//-- process kill +type KillParams struct { + Pid uint64 `json:"pid"` + NativePid uint64 `json:"nativePid"` +} + +func killProcessReqHF(params interface{}, t *rpc.Transmitter) error { + killParams := params.(KillParams) + if err := Kill(killParams.Pid); err != nil { + return asRpcError(err) + } + t.Send(&ProcessResult{ + Pid: killParams.Pid, + Text: "Successfully killed", + }) + return nil +} + +//-- process subscribe +type SubscribeResult struct { + Pid uint64 `json:"pid"` + EventTypes string `json:"eventTypes"` + Text string `json:"text"` +} + +type SubscribeParams struct { + Pid uint64 `json:"pid"` + EventTypes string `json:"eventTypes"` + After string `json:"after"` +} + +func subscribeReqHF(params interface{}, t *rpc.Transmitter) error { + subscribeParams := params.(SubscribeParams) + + mask := maskFromTypes(subscribeParams.EventTypes) + if mask == 0 { + return rpc.NewArgsError(errors.New("Required at least 1 valid event type")) + } + + subscriber := Subscriber{ + Id: t.Channel.Id, + Mask: mask, + Channel: t.Channel.Events, + } + // Check whether subscriber should see previous logs or not + if subscribeParams.After == "" { + if err := AddSubscriber(subscribeParams.Pid, subscriber); err != nil { + return asRpcError(err) + } + } else { + after, err := time.Parse(DateTimeFormat, subscribeParams.After) + if err != nil { + return rpc.NewArgsError(errors.New("Bad format of 'after', " + err.Error())) + } + if err := RestoreSubscriber(subscribeParams.Pid, subscriber, after); err != nil { + return err + } + } + t.Send(&SubscribeResult{ + Pid: subscribeParams.Pid, + EventTypes: subscribeParams.EventTypes, + Text: "Successfully subscribed", + }) + return nil +} + +//-- process unsubscribe +type UnsubscribeParams struct { + Pid uint64 `json:"pid"` +} + +func unsubscribeReqHF(params interface{}, t *rpc.Transmitter) error { + unsubscribeParams := params.(UnsubscribeParams) + if err := RemoveSubscriber(unsubscribeParams.Pid, t.Channel.Id); err != nil { + return asRpcError(err) + } + t.Send(&ProcessResult{ + Pid: unsubscribeParams.Pid, + Text: "Successfully unsubscribed", + }) + return nil +} + +//-- process update subscriber +type UpdateSubscriberParams struct { + Pid uint64 `json:"pid"` + EventTypes string `json:"eventTypes"` +} + +func updateSubscriberReqHF(params interface{}, t *rpc.Transmitter) error { + updateParams := params.(UpdateSubscriberParams) + if updateParams.EventTypes == "" { + return rpc.NewArgsError(errors.New("'eventTypes' required for subscriber update")) + } + if err := UpdateSubscriber(updateParams.Pid, t.Channel.Id, maskFromTypes(updateParams.EventTypes)); err != nil { + return asRpcError(err) + } + t.Send(&SubscribeResult{ + Pid: updateParams.Pid, + EventTypes: updateParams.EventTypes, + Text: "Subscriber successfully updated", + }) + return nil +} + +//-- process get logs +type GetLogsParams struct { + Pid uint64 `json:"pid"` + From string `json:"from"` + Till string `json:"till"` + Limit int `json:"limit"` + Skip int `json:"skip"` +} + +func getProcessLogsReqHF(params interface{}, t *rpc.Transmitter) error { + getLogsParams := params.(GetLogsParams) + + if getLogsParams.Skip < 0 { + getLogsParams.Skip = 0 + } + if getLogsParams.Limit < 0 { + getLogsParams.Limit = 0 + } + + from, err := parseTime(getLogsParams.From, time.Time{}) + if err != nil { + return rpc.NewArgsError(errors.New("Bad format of 'from', " + err.Error())) + } + + till, err := parseTime(getLogsParams.Till, time.Now()) + if err != nil { + return rpc.NewArgsError(errors.New("Bad format of 'till', " + err.Error())) + } + + logs, err := ReadLogs(getLogsParams.Pid, from, till) + if err != nil { + return asRpcError(err) + } + + limit := DefaultLogsPerPageLimit + if getLogsParams.Limit != 0 { + if limit < 1 { + return rpc.NewArgsError(errors.New("Required 'limit' to be > 0")) + } + limit = getLogsParams.Limit + } + + skip := 0 + if getLogsParams.Skip != 0 { + if skip < 0 { + return rpc.NewArgsError(errors.New("Required 'skip' to be >= 0")) + } + skip = getLogsParams.Skip + } + + logsLen := len(logs) + fromIdx := int(math.Max(float64(logsLen-limit-skip), 0)) + toIdx := logsLen - int(math.Min(float64(skip), float64(logsLen))) + + t.Send(logs[fromIdx:toIdx]) + return nil +} + +//-- get process +type GetProcessParams struct { + Pid uint64 `json:"pid"` +} + +func getProcessReqHF(body interface{}, t *rpc.Transmitter) error { + params := body.(GetProcessParams) + p, err := Get(params.Pid) + if err != nil { + return asRpcError(err) + } + t.Send(p) + return nil +} + +//-- get processes +type GetProcessesParams struct { + All bool `json:"all"` +} + +func getProcessesReqHF(body interface{}, t *rpc.Transmitter) error { + params := body.(GetProcessesParams) + t.Send(GetProcesses(params.All)) + return nil +} + +func asRpcError(err error) error { + if npErr, ok := err.(*NoProcessError); ok { + return rpc.NewError(npErr.error, NoSuchProcessErrorCode) + } else if naErr, ok := err.(*NotAliveError); ok { + return rpc.NewError(naErr.error, ProcessNotAliveErrorCode) + } + return err +} diff --git a/exec-agent/src/rest/errors.go b/exec-agent/src/rest/errors.go new file mode 100644 index 00000000000..096771b392b --- /dev/null +++ b/exec-agent/src/rest/errors.go @@ -0,0 +1,41 @@ +// +// Copyright (c) 2012-2016 Codenvy, S.A. +// All rights reserved. This program and the accompanying materials +// are made available under the terms of the Eclipse Public License v1.0 +// which accompanies this distribution, and is available at +// http://www.eclipse.org/legal/epl-v10.html +// +// Contributors: +// Codenvy, S.A. - initial API and implementation +// + +package rest + +import ( + "net/http" +) + +type ApiError struct { + error + Code int +} + +func BadRequest(err error) error { + return ApiError{err, http.StatusBadRequest} +} + +func NotFound(err error) error { + return ApiError{err, http.StatusNotFound} +} + +func Conflict(err error) error { + return ApiError{err, http.StatusConflict} +} + +func Forbidden(err error) error { + return ApiError{err, http.StatusForbidden} +} + +func Unauthorized(err error) error { + return ApiError{err, http.StatusUnauthorized} +} diff --git a/exec-agent/src/rest/restutil/util.go b/exec-agent/src/rest/restutil/util.go new file mode 100644 index 00000000000..10e667d234c --- /dev/null +++ b/exec-agent/src/rest/restutil/util.go @@ -0,0 +1,45 @@ +// +// Copyright (c) 2012-2016 Codenvy, S.A. +// All rights reserved. This program and the accompanying materials +// are made available under the terms of the Eclipse Public License v1.0 +// which accompanies this distribution, and is available at +// http://www.eclipse.org/legal/epl-v10.html +// +// Contributors: +// Codenvy, S.A. - initial API and implementation +// + +package restutil + +import ( + "encoding/json" + "net/http" + "strconv" +) + +// Writes body as json to the response writer +func WriteJson(w http.ResponseWriter, body interface{}) error { + w.Header().Set("Content-Type", "application/json") + return json.NewEncoder(w).Encode(body) +} + +// Reads json body from the request +func ReadJson(r *http.Request, v interface{}) error { + return json.NewDecoder(r.Body).Decode(v) +} + +func IntQueryParam(r *http.Request, name string, defaultValue int) int { + qp := r.URL.Query().Get(name) + if qp == "" { + return defaultValue + } else { + v, err := strconv.Atoi(qp) + if err != nil { + return defaultValue + } + if v < 0 { + return defaultValue + } + return v + } +} diff --git a/exec-agent/src/rest/route.go b/exec-agent/src/rest/route.go new file mode 100644 index 00000000000..2531acd0e1e --- /dev/null +++ b/exec-agent/src/rest/route.go @@ -0,0 +1,80 @@ +// +// Copyright (c) 2012-2016 Codenvy, S.A. +// All rights reserved. This program and the accompanying materials +// are made available under the terms of the Eclipse Public License v1.0 +// which accompanies this distribution, and is available at +// http://www.eclipse.org/legal/epl-v10.html +// +// Contributors: +// Codenvy, S.A. - initial API and implementation +// + +package rest + +import ( + "fmt" + "net/http" + "strings" +) + +const ( + maxNameLen = 40 + maxMethodLen = len("DELETE") +) + +// Handler for http routes +// vars variable contain only path parameters if any specified for given route +type HttpRouteHandlerFunc func(w http.ResponseWriter, r *http.Request, params Params) error + +// An interface for getting mapped path parameters by their names +type Params interface { + + // Gets path parameter by it's name e.g. + // for url template `/process/:id` and actual value `/process/123` + // this method will return string '123' + Get(name string) string +} + +// Describes route for http requests +type Route struct { + + // Http method e.g. 'GET' + Method string + + // The name of the http route, used in logs + // this name is unique for all the application http routes + // example: 'StartProcess' + Name string + + // The path of the http route which this route is mapped to + // example: '/process' + Path string + + // The function used for handling http request + HandleFunc HttpRouteHandlerFunc +} + +// Named group of http routes, those groups +// should be defined by separate apis, and then combined together +type RoutesGroup struct { + + // The name of this group e.g.: 'ProcessRoutes' + Name string + + // The http routes of this group + Items []Route +} + +func (r *Route) String() string { + name := r.Name + " " + strings.Repeat(".", maxNameLen-len(r.Name)) + method := r.Method + strings.Repeat(" ", maxMethodLen-len(r.Method)) + return fmt.Sprintf("%s %s %s", name, method, r.Path) +} + +func WriteError(w http.ResponseWriter, err error) { + if apiErr, ok := err.(ApiError); ok { + http.Error(w, apiErr.Error(), apiErr.Code) + } else { + http.Error(w, err.Error(), http.StatusInternalServerError) + } +} diff --git a/exec-agent/src/rpc/channels.go b/exec-agent/src/rpc/channels.go new file mode 100644 index 00000000000..9e73a2c05ec --- /dev/null +++ b/exec-agent/src/rpc/channels.go @@ -0,0 +1,334 @@ +// +// Copyright (c) 2012-2016 Codenvy, S.A. +// All rights reserved. This program and the accompanying materials +// are made available under the terms of the Eclipse Public License v1.0 +// which accompanies this distribution, and is available at +// http://www.eclipse.org/legal/epl-v10.html +// +// Contributors: +// Codenvy, S.A. - initial API and implementation +// + +package rpc + +import ( + "encoding/json" + "errors" + "fmt" + "log" + "net/http" + "strconv" + "strings" + "sync" + "sync/atomic" + "time" + + "github.com/eclipse/che-lib/websocket" + "github.com/eclipse/che/exec-agent/rest" +) + +const ( + ConnectedEventType = "connected" +) + +var ( + upgrader = websocket.Upgrader{ + ReadBufferSize: 1024, + WriteBufferSize: 1024, + CheckOrigin: func(r *http.Request) bool { + return true + }, + } + + prevChanId uint64 = 0 + + channels = channelsMap{items: make(map[string]Channel)} + + HttpRoutes = rest.RoutesGroup{ + "Channel Routes", + []rest.Route{ + { + "GET", + "Connect to Exec-Agent(webscoket)", + "/connect", + registerChannel, + }, + }, + } + + messageHandler = jsonrpc2_0MessageHandler{} +) + +// Published when websocket connection is established +// and channel is ready for interaction +type ChannelConnected struct { + Timed + ChannelId string `json:"channel"` + Text string `json:"text"` +} + +// Describes channel which is websocket connection +// with additional properties required by the application +type Channel struct { + // Unique channel identifier + Id string `json:"id"` + + // When the connection was established + Connected time.Time `json:"connected"` + + // the uri of the request that established this connection + RequestURI string `json:"-"` + + // Go channel for sending events to the websocket. + // All the events are encoded to the json messages and + // send to websocket connection defined by this channel. + Events chan *Event + + // Everything passed to this channel will be encoded + // to json and send to the client. + output chan interface{} + + // If any value is send to this channel then + // physical connection associated with it along with + // output channel will be immediately closed. + drop chan bool + + // Websocket connection + conn *websocket.Conn +} + +// A struct for reading raw websocket messages +type WsMessage struct { + err error + bytes []byte +} + +// Handles raw messages received from websocket channel +type MessageHandler interface { + // handles a message in implementation specific way + handle(message *WsMessage, channel Channel) +} + +// Defines lockable map for managing channels +type channelsMap struct { + sync.RWMutex + items map[string]Channel +} + +// Gets channel by the channel id, if there is no such channel +// then returned 'ok' is false. +func GetChannel(chanId string) (Channel, bool) { + channels.RLock() + defer channels.RUnlock() + item, ok := channels.items[chanId] + return item, ok +} + +// Returns all the currently registered channels. +func GetChannels() []Channel { + channels.RLock() + defer channels.RUnlock() + all := make([]Channel, len(channels.items)) + idx := 0 + for _, v := range channels.items { + all[idx] = v + idx++ + } + return all +} + +// Drops the channel with the given id. +func DropChannel(id string) { + if c, ok := GetChannel(id); ok { + c.drop <- true + } +} + +// Saves the channel with the given identifier and returns true. +// If the channel with the given identifier already exists then false is returned +// and the channel is not saved. +func saveChannel(channel Channel) bool { + channels.Lock() + defer channels.Unlock() + _, ok := channels.items[channel.Id] + if ok { + return false + } + channels.items[channel.Id] = channel + return true +} + +// Removes channel +func removeChannel(channel Channel) { + channels.Lock() + defer channels.Unlock() + delete(channels.items, channel.Id) +} + +func registerChannel(w http.ResponseWriter, r *http.Request, _ rest.Params) error { + conn, err := upgrader.Upgrade(w, r, nil) + if err != nil { + log.Println("Couldn't establish websocket connection " + err.Error()) + return nil + } + + channel := Channel{ + Id: "channel-" + strconv.Itoa(int(atomic.AddUint64(&prevChanId, 1))), + Connected: time.Now(), + RequestURI: r.RequestURI, + Events: make(chan *Event), + output: make(chan interface{}), + drop: make(chan bool), + conn: conn, + } + saveChannel(channel) + + log.Printf("A new channel with id '%s' successfully opened", channel.Id) + + go transferAsJson(conn, channel.output) + go redirectEventsToOutput(channel) + go handleMessages(readMessages(conn), channel) + + // Say hello to the client + channel.Events <- NewEvent(ConnectedEventType, &ChannelConnected{ + Timed: Timed{Time: channel.Connected}, + ChannelId: channel.Id, + Text: "Hello!", + }) + return nil +} + +// Handles all the messages from the given channel +// until an error occurs or a drop signal is sent. +// Clears all the associated resources. +func handleMessages(messageChan chan *WsMessage, channel Channel) { + for { + select { + case message := <-messageChan: + if message.err == nil { + messageHandler.handle(message, channel) + } else { + closeErr, ok := message.err.(*websocket.CloseError) + if !ok || !isNormallyClosed(closeErr.Code) { + log.Println("Error reading message, " + message.err.Error()) + } + closeChannel(channel) + return + } + case <-channel.drop: + closeChannel(channel) + return + } + } +} + +// Closes all associated go channels(events, output, drop) +// and physical websocket connection. +func closeChannel(channel Channel) { + close(channel.Events) + close(channel.output) + close(channel.drop) + if err := channel.conn.Close(); err != nil { + log.Println("Error closing connection, " + err.Error()) + } + removeChannel(channel) + log.Printf("Channel with id '%s' successfully closed", channel.Id) +} + +// Reads the message from the websocket connection until error is received, +// returns the channel which should be used for reading such messages. +func readMessages(conn *websocket.Conn) chan *WsMessage { + messagesChan := make(chan *WsMessage) + go func() { + for { + _, bytes, err := conn.ReadMessage() + messagesChan <- &WsMessage{err: err, bytes: bytes} + if err != nil { + close(messagesChan) + break + } + } + }() + return messagesChan +} + +func redirectEventsToOutput(channel Channel) { + for event := range channel.Events { + channel.output <- event + } +} + +// transfers data from channel to physical connection, +// tries to transform data to json. +func transferAsJson(conn *websocket.Conn, c chan interface{}) { + for message := range c { + err := conn.WriteJSON(message) + if err != nil { + log.Printf("Couldn't write message to the channel. Message: %T, %v", message, message) + } + } +} + +// handles messages as jsonrpc as described by package doc +type jsonrpc2_0MessageHandler struct{} + +func (h *jsonrpc2_0MessageHandler) handle(message *WsMessage, channel Channel) { + req := &Request{} + + // try to unmarshal the request + if err := json.Unmarshal(message.bytes, req); err != nil { + // Respond parse error according to specification + channel.output <- &Response{ + Version: "2.0", + Error: &Error{ + Code: ParseErrorCode, + Message: "Invalid json object", + }, + } + log.Printf("Error decoding request '%s', Error: %s \n", string(message.bytes), err.Error()) + return + } + + // ensure provided version is supported + if req.Version != "" && strings.Trim(req.Version, " ") != "2.0" { + channel.output <- &Response{ + Version: "2.0", + Error: &Error{ + Code: InvalidRequestErrorCode, + Message: "'2.0' is the only supported version, use it or omit version at all", + }, + } + return + } + + transmitter := &Transmitter{Channel: channel, id: req.Id} + + opRoute, ok := routes.get(req.Method) + if !ok { + m := fmt.Sprintf("No route for the operation '%s'", req.Method) + transmitter.SendError(NewError(errors.New(m), MethodNotFoundErrorCode)) + return + } + + decodedBody, err := opRoute.DecoderFunc(req.RawParams) + if err != nil { + m := fmt.Sprintf("Error decoding body for the operation '%s'. Error: '%s'", req.Method, err.Error()) + transmitter.SendError(NewError(errors.New(m), InvalidRequestErrorCode)) + return + } + + if err := opRoute.HandlerFunc(decodedBody, transmitter); err != nil { + opError, ok := err.(Error) + if ok { + transmitter.SendError(opError) + } else { + transmitter.SendError(NewError(err, InternalErrorCode)) + } + } +} + +func isNormallyClosed(code int) bool { + return code == websocket.CloseGoingAway || + code == websocket.CloseNormalClosure || + code == websocket.CloseNoStatusReceived +} diff --git a/exec-agent/src/rpc/model.go b/exec-agent/src/rpc/model.go new file mode 100644 index 00000000000..f9e461f7bef --- /dev/null +++ b/exec-agent/src/rpc/model.go @@ -0,0 +1,173 @@ +// +// Copyright (c) 2012-2016 Codenvy, S.A. +// All rights reserved. This program and the accompanying materials +// are made available under the terms of the Eclipse Public License v1.0 +// which accompanies this distribution, and is available at +// http://www.eclipse.org/legal/epl-v10.html +// +// Contributors: +// Codenvy, S.A. - initial API and implementation +// + +// Provides a lightweight implementation of jsonrpc2.0 protocol. +// The original jsonrpc2.0 specification - http://www.jsonrpc.org/specification +// +// The implementations does not fully implement the protocol +// and introduces a few modifications to its terminology in +// term of exec-agent transport needs. +// +// From the specification: +// The Client is defined as the origin of Request objects and the handler of Response objects. +// The Server is defined as the origin of Response objects and the handler of Request objects. +// +// Exec-agent serves as both, server and client as it receives +// Responses and sends Notifications in the same time. +// +// Request. +// It's a message from the physical websocket connection client to the exec-agent server. +// Request(in it's origin form) is considered to be unidirectional. +// WS Client =---> WS Server. +// +// Response. +// It's a message from the the exec-agent server to a websocket client, +// indicates the result of the operation execution requested by certain request. +// Response doesn't exist without request. The response is considered to be unidirectional. +// WS Client <---= WS Server +// +// Event. +// Is a message from the exec-agent server to a websocket client, the analogue +// from the specification is Notification, which is defined as a request +// which doesn't need any response, that's also true for events. +// Events may happen periodically and don't need to be indicated by request. +// WS Client <---X WS Server +package rpc + +import ( + "encoding/json" + "time" +) + +const ( + + // Invalid JSON was received by the server. + ParseErrorCode = -32700 + + // Request object is not valid, fails + // when route decoder can't decode params. + InvalidRequestErrorCode = -32600 + + // There is no route for such method. + MethodNotFoundErrorCode = -32601 + + // When handler parameters are considered as not valid + // this error type should be returned directly from the HandlerFunc + InvalidParamsErrorCode = -32602 + + // When error returned from the Route HandlerFunc is different from Error type + InternalErrorCode = -32603 + + // -32000 to -32099 Reserved for implementation-defined server-errors. +) + +// Describes named operation which is called +// on the websocket client's side and performed +// on the servers's side, if appropriate Route exists. +type Request struct { + + // Version of this request e.g. '2.0'. + Version string `json:"jsonrpc"` + + // The method name which should be proceeded by this call + // usually dot separated resource and action e.g. 'process.start'. + Method string `json:"method"` + + // The unique identifier of this operation request. + // If a client needs to identify the result of the operation execution, + // the id should be passed by the client, then it is guaranteed + // that the client will receive the result frame with the same id. + // The uniqueness of the identifier must be controlled by the client, + // if client doesn't specify the identifier in the operation call, + // the response won't contain the identifier as well. + // + // It is preferable to specify identifier for those calls which may + // either validate data, or produce such information which can't be + // identified by itself. + Id interface{} `json:"id"` + + // Request data, parameters which are needed for operation execution. + RawParams json.RawMessage `json:"params"` +} + +// A message from the server to the client, +// which represents the result of the certain operation execution. +// The result is sent to the client only once per operation. +type Response struct { + + // Version of this response e.g. '2.0'. + Version string `json:"jsonrpc"` + + // The operation call identifier, will be set only + // if the operation contains it. See 'rpc.Request.Id' + Id interface{} `json:"id"` + + // The actual result data, the operation execution result. + Result interface{} `json:"result,omitempty"` + + // Body and Error are mutual exclusive. + // Present only if the operation execution fails due to an error. + Error *Error `json:"error,omitempty"` +} + +// A message from the server to the client, +// which may notify client about any activity that the client is interested in. +// The difference from the 'rpc.Response' is that the event may happen periodically, +// before or even after some operation calls, while the 'rpc.Response' is more like +// result of the operation call execution, which is sent to the client immediately +// after the operation execution is done. +type Event struct { + + // Version of this notification e.g. '2.0' + Version string `json:"jsonrpc"` + + // A type of this operation event, must be always set. + // The type must be generally unique. + EventType string `json:"method"` + + // Event related data. + Body interface{} `json:"params"` +} + +// May be returned by any of route HandlerFunc. +type Error struct { + error `json:"-"` + + // An error code + Code int `json:"code"` + + // A short description of the occurred error. + Message string `json:"message"` +} + +type Timed struct { + Time time.Time `json:"time"` +} + +func NewEvent(eType string, body interface{}) *Event { + return &Event{ + Version: "2.0", + EventType: eType, + Body: body, + } +} + +func NewArgsError(err error) Error { + return NewError(err, InvalidParamsErrorCode) +} + +func NewError(err error, code int) Error { + return Error{ + error: err, + Code: code, + Message: err.Error(), + } +} diff --git a/exec-agent/src/rpc/route.go b/exec-agent/src/rpc/route.go new file mode 100644 index 00000000000..f1653cba6f4 --- /dev/null +++ b/exec-agent/src/rpc/route.go @@ -0,0 +1,88 @@ +// +// Copyright (c) 2012-2016 Codenvy, S.A. +// All rights reserved. This program and the accompanying materials +// are made available under the terms of the Eclipse Public License v1.0 +// which accompanies this distribution, and is available at +// http://www.eclipse.org/legal/epl-v10.html +// +// Contributors: +// Codenvy, S.A. - initial API and implementation +// + +package rpc + +import ( + "log" + "sync" +) + +var ( + routes = &routesMap{items: make(map[string]Route)} +) + +// Describes route for rpc requests +type Route struct { + + // The operation name like defined by Request.Method + Method string + + // The decoder used for decoding raw request parameters + // into the certain object. If decoding is okay, then + // decoded value will be passed to the HandlerFunc + // of this request route, so it is up to the route + // - to define type safe couple of DecoderFunc & HandlerFunc. + DecoderFunc func(body []byte) (interface{}, error) + + // Defines handler for decoded request parameters. + // If handler function can't perform the operation then + // handler function should either return an error, or + // send it directly within transmitter#SendError func. + // Params is a value returned from the DecoderFunc. + // If an error is returned from this function and the type + // of the error is different from rpc.Error, it will be + // published as internal rpc error(-32603). + HandlerFunc func(params interface{}, t *Transmitter) error +} + +// Named group of rpc routes +type RoutesGroup struct { + // The name of this group e.g.: 'ProcessRpcRoutes' + Name string + + // Rpc routes of this group + Items []Route +} + +// Defines lockable map for storing operation routes +type routesMap struct { + sync.RWMutex + items map[string]Route +} + +// Gets route by the operation name +func (routes *routesMap) get(method string) (Route, bool) { + routes.RLock() + defer routes.RUnlock() + item, ok := routes.items[method] + return item, ok +} + +// Returns true if route is added and false if route for such method +// already present(won't override it). +func (or *routesMap) add(r Route) bool { + routes.Lock() + defer routes.Unlock() + _, ok := routes.items[r.Method] + if ok { + return false + } + routes.items[r.Method] = r + return true +} + +// Adds a new route, panics if such route already exists. +func RegisterRoute(route Route) { + if !routes.add(route) { + log.Fatalf("Couldn't register a new route, route for the operation '%s' already exists", route.Method) + } +} diff --git a/exec-agent/src/rpc/transmitter.go b/exec-agent/src/rpc/transmitter.go new file mode 100644 index 00000000000..72a2349352b --- /dev/null +++ b/exec-agent/src/rpc/transmitter.go @@ -0,0 +1,41 @@ +// +// Copyright (c) 2012-2016 Codenvy, S.A. +// All rights reserved. This program and the accompanying materials +// are made available under the terms of the Eclipse Public License v1.0 +// which accompanies this distribution, and is available at +// http://www.eclipse.org/legal/epl-v10.html +// +// Contributors: +// Codenvy, S.A. - initial API and implementation +// + +package rpc + +// Transmitter is used for sending +// results of the operation executions to the channel. +type Transmitter struct { + + // The id of the request behind this transmitter. + id interface{} + + // The channel to which the message will be send. + Channel Channel +} + +// Wraps the given message with 'rpc.Result' and sends it to the client. +func (t *Transmitter) Send(message interface{}) { + t.Channel.output <- &Response{ + Version: "2.0", + Id: t.id, + Result: message, + } +} + +// Wraps the given error with 'rpc.Result' and sends it to the client. +func (t *Transmitter) SendError(err Error) { + t.Channel.output <- &Response{ + Version: "2.0", + Id: t.id, + Error: &err, + } +} diff --git a/exec-agent/src/static/term.js b/exec-agent/src/static/term.js new file mode 100644 index 00000000000..9d80e4bfc30 --- /dev/null +++ b/exec-agent/src/static/term.js @@ -0,0 +1,5793 @@ +/** + * term.js - an xterm emulator + * Copyright (c) 2012-2013, Christopher Jeffrey (MIT License) + * https://github.com/chjj/term.js + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + * THE SOFTWARE. + * + * Originally forked from (with the author's permission): + * Fabrice Bellard's javascript vt100 for jslinux: + * http://bellard.org/jslinux/ + * Copyright (c) 2011 Fabrice Bellard + * The original design remains. The terminal itself + * has been extended to include xterm CSI codes, among + * other features. + */ + +;(function() { + +/** + * Terminal Emulation References: + * http://vt100.net/ + * http://invisible-island.net/xterm/ctlseqs/ctlseqs.txt + * http://invisible-island.net/xterm/ctlseqs/ctlseqs.html + * http://invisible-island.net/vttest/ + * http://www.inwap.com/pdp10/ansicode.txt + * http://linux.die.net/man/4/console_codes + * http://linux.die.net/man/7/urxvt + */ + +'use strict'; + +/** + * Shared + */ + +var window = this + , document = this.document; + +/** + * EventEmitter + */ + +function EventEmitter() { + this._events = this._events || {}; +} + +EventEmitter.prototype.addListener = function(type, listener) { + this._events[type] = this._events[type] || []; + this._events[type].push(listener); +}; + +EventEmitter.prototype.on = EventEmitter.prototype.addListener; + +EventEmitter.prototype.removeListener = function(type, listener) { + if (!this._events[type]) return; + + var obj = this._events[type] + , i = obj.length; + + while (i--) { + if (obj[i] === listener || obj[i].listener === listener) { + obj.splice(i, 1); + return; + } + } +}; + +EventEmitter.prototype.off = EventEmitter.prototype.removeListener; + +EventEmitter.prototype.removeAllListeners = function(type) { + if (this._events[type]) delete this._events[type]; +}; + +EventEmitter.prototype.once = function(type, listener) { + function on() { + var args = Array.prototype.slice.call(arguments); + this.removeListener(type, on); + return listener.apply(this, args); + } + on.listener = listener; + return this.on(type, on); +}; + +EventEmitter.prototype.emit = function(type) { + if (!this._events[type]) return; + + var args = Array.prototype.slice.call(arguments, 1) + , obj = this._events[type] + , l = obj.length + , i = 0; + + for (; i < l; i++) { + obj[i].apply(this, args); + } +}; + +EventEmitter.prototype.listeners = function(type) { + return this._events[type] = this._events[type] || []; +}; + +/** + * States + */ + +var normal = 0 + , escaped = 1 + , csi = 2 + , osc = 3 + , charset = 4 + , dcs = 5 + , ignore = 6; + +/** + * Terminal + */ + +function Terminal(options) { + var self = this; + + if (!(this instanceof Terminal)) { + return new Terminal(arguments[0], arguments[1], arguments[2]); + } + + EventEmitter.call(this); + + if (typeof options === 'number') { + options = { + cols: arguments[0], + rows: arguments[1], + handler: arguments[2] + }; + } + + options = options || {}; + + each(keys(Terminal.defaults), function(key) { + if (options[key] == null) { + options[key] = Terminal.options[key]; + // Legacy: + if (Terminal[key] !== Terminal.defaults[key]) { + options[key] = Terminal[key]; + } + } + self[key] = options[key]; + }); + + if (options.colors.length === 8) { + options.colors = options.colors.concat(Terminal._colors.slice(8)); + } else if (options.colors.length === 16) { + options.colors = options.colors.concat(Terminal._colors.slice(16)); + } else if (options.colors.length === 10) { + options.colors = options.colors.slice(0, -2).concat( + Terminal._colors.slice(8, -2), options.colors.slice(-2)); + } else if (options.colors.length === 18) { + options.colors = options.colors.slice(0, -2).concat( + Terminal._colors.slice(16, -2), options.colors.slice(-2)); + } + this.colors = options.colors; + + this.options = options; + + // this.context = options.context || window; + // this.document = options.document || document; + this.parent = options.body || options.parent + || (document ? document.getElementsByTagName('body')[0] : null); + + this.cols = options.cols || options.geometry[0]; + this.rows = options.rows || options.geometry[1]; + + if (options.handler) { + this.on('data', options.handler); + } + + this.ybase = 0; + this.ydisp = 0; + this.x = 0; + this.y = 0; + this.cursorState = 0; + this.cursorHidden = false; + this.convertEol; + this.state = 0; + this.queue = ''; + this.scrollTop = 0; + this.scrollBottom = this.rows - 1; + + // modes + this.applicationKeypad = false; + this.applicationCursor = false; + this.originMode = false; + this.insertMode = false; + this.wraparoundMode = false; + this.normal = null; + + // select modes + this.selectMode = false; + this.visualMode = false; + this.searchMode = false; + this.searchDown; + this.entry = ''; + this.entryPrefix = 'Search: '; + this._real; + this._selected; + this._textarea; + + // charset + this.charset = null; + this.gcharset = null; + this.glevel = 0; + this.charsets = [null]; + + // mouse properties + this.decLocator; + this.x10Mouse; + this.vt200Mouse; + this.vt300Mouse; + this.normalMouse; + this.mouseEvents; + this.sendFocus; + this.utfMouse; + this.sgrMouse; + this.urxvtMouse; + + // misc + this.element; + this.children; + this.refreshStart; + this.refreshEnd; + this.savedX; + this.savedY; + this.savedCols; + + // stream + this.readable = true; + this.writable = true; + + this.defAttr = (0 << 18) | (257 << 9) | (256 << 0); + this.curAttr = this.defAttr; + + this.params = []; + this.currentParam = 0; + this.prefix = ''; + this.postfix = ''; + + this.lines = []; + var i = this.rows; + while (i--) { + this.lines.push(this.blankLine()); + } + + this.tabs; + this.setupStops(); +} + +inherits(Terminal, EventEmitter); + +// back_color_erase feature for xterm. +Terminal.prototype.eraseAttr = function() { + // if (this.is('screen')) return this.defAttr; + return (this.defAttr & ~0x1ff) | (this.curAttr & 0x1ff); +}; + +/** + * Colors + */ + +// Colors 0-15 +Terminal.tangoColors = [ + // dark: + '#2e3436', + '#cc0000', + '#4e9a06', + '#c4a000', + '#3465a4', + '#75507b', + '#06989a', + '#d3d7cf', + // bright: + '#555753', + '#ef2929', + '#8ae234', + '#fce94f', + '#729fcf', + '#ad7fa8', + '#34e2e2', + '#eeeeec' +]; + +Terminal.xtermColors = [ + // dark: + '#000000', // black + '#cd0000', // red3 + '#00cd00', // green3 + '#cdcd00', // yellow3 + '#0000ee', // blue2 + '#cd00cd', // magenta3 + '#00cdcd', // cyan3 + '#e5e5e5', // gray90 + // bright: + '#7f7f7f', // gray50 + '#ff0000', // red + '#00ff00', // green + '#ffff00', // yellow + '#5c5cff', // rgb:5c/5c/ff + '#ff00ff', // magenta + '#00ffff', // cyan + '#ffffff' // white +]; + +// Colors 0-15 + 16-255 +// Much thanks to TooTallNate for writing this. +Terminal.colors = (function() { + var colors = Terminal.tangoColors.slice() + , r = [0x00, 0x5f, 0x87, 0xaf, 0xd7, 0xff] + , i; + + // 16-231 + i = 0; + for (; i < 216; i++) { + out(r[(i / 36) % 6 | 0], r[(i / 6) % 6 | 0], r[i % 6]); + } + + // 232-255 (grey) + i = 0; + for (; i < 24; i++) { + r = 8 + i * 10; + out(r, r, r); + } + + function out(r, g, b) { + colors.push('#' + hex(r) + hex(g) + hex(b)); + } + + function hex(c) { + c = c.toString(16); + return c.length < 2 ? '0' + c : c; + } + + return colors; +})(); + +// Default BG/FG +Terminal.colors[256] = '#000000'; +Terminal.colors[257] = '#f0f0f0'; + +Terminal._colors = Terminal.colors.slice(); + +Terminal.vcolors = (function() { + var out = [] + , colors = Terminal.colors + , i = 0 + , color; + + for (; i < 256; i++) { + color = parseInt(colors[i].substring(1), 16); + out.push([ + (color >> 16) & 0xff, + (color >> 8) & 0xff, + color & 0xff + ]); + } + + return out; +})(); + +/** + * Options + */ + +Terminal.defaults = { + colors: Terminal.colors, + convertEol: false, + termName: 'xterm', + geometry: [80, 24], + cursorBlink: true, + visualBell: false, + popOnBell: false, + scrollback: 1000, + screenKeys: false, + debug: false, + useStyle: false + // programFeatures: false, + // focusKeys: false, +}; + +Terminal.options = {}; + +each(keys(Terminal.defaults), function(key) { + Terminal[key] = Terminal.defaults[key]; + Terminal.options[key] = Terminal.defaults[key]; +}); + +/** + * Focused Terminal + */ + +Terminal.focus = null; + +Terminal.prototype.focus = function() { + if (Terminal.focus === this) return; + + if (Terminal.focus) { + Terminal.focus.blur(); + } + + if (this.sendFocus) this.send('\x1b[I'); + this.showCursor(); + + try { + this.element.focus(); + } catch (e) { + ; + } + + this.emit('focus'); + + Terminal.focus = this; +}; + +Terminal.prototype.blur = function() { + if (Terminal.focus !== this) return; + + this.cursorState = 0; + this.refresh(this.y, this.y); + if (this.sendFocus) this.send('\x1b[O'); + + // try { + // this.element.blur(); + // } catch (e) { + // ; + // } + + // this.emit('blur'); + + Terminal.focus = null; +}; + +/** + * Initialize global behavior + */ + +Terminal.prototype.initGlobal = function() { + var document = this.document; + + Terminal._boundDocs = Terminal._boundDocs || []; + if (~indexOf(Terminal._boundDocs, document)) { + return; + } + Terminal._boundDocs.push(document); + + Terminal.bindPaste(document); + + Terminal.bindKeys(document); + + Terminal.bindCopy(document); + + if (this.isMobile) { + this.fixMobile(document); + } + + if (this.useStyle) { + Terminal.insertStyle(document, this.colors[256], this.colors[257]); + } +}; + +/** + * Bind to paste event + */ + +Terminal.bindPaste = function(document) { + // This seems to work well for ctrl-V and middle-click, + // even without the contentEditable workaround. + var window = document.defaultView; + on(window, 'paste', function(ev) { + var term = Terminal.focus; + if (!term) return; + if (ev.clipboardData) { + term.send(ev.clipboardData.getData('text/plain')); + } else if (term.context.clipboardData) { + term.send(term.context.clipboardData.getData('Text')); + } + // Not necessary. Do it anyway for good measure. + term.element.contentEditable = 'inherit'; + return cancel(ev); + }); +}; + +/** + * Global Events for key handling + */ + +Terminal.bindKeys = function(document) { + // We should only need to check `target === body` below, + // but we can check everything for good measure. + on(document, 'keydown', function(ev) { + if (!Terminal.focus) return; + var target = ev.target || ev.srcElement; + if (!target) return; + if (target === Terminal.focus.element + || target === Terminal.focus.context + || target === Terminal.focus.document + || target === Terminal.focus.body + || target === Terminal._textarea + || target === Terminal.focus.parent) { + return Terminal.focus.keyDown(ev); + } + }, true); + + on(document, 'keypress', function(ev) { + if (!Terminal.focus) return; + var target = ev.target || ev.srcElement; + if (!target) return; + if (target === Terminal.focus.element + || target === Terminal.focus.context + || target === Terminal.focus.document + || target === Terminal.focus.body + || target === Terminal._textarea + || target === Terminal.focus.parent) { + return Terminal.focus.keyPress(ev); + } + }, true); + + // If we click somewhere other than a + // terminal, unfocus the terminal. + on(document, 'mousedown', function(ev) { + if (!Terminal.focus) return; + + var el = ev.target || ev.srcElement; + if (!el) return; + + do { + if (el === Terminal.focus.element) return; + } while (el = el.parentNode); + + Terminal.focus.blur(); + }); +}; + +/** + * Copy Selection w/ Ctrl-C (Select Mode) + */ + +Terminal.bindCopy = function(document) { + var window = document.defaultView; + + // if (!('onbeforecopy' in document)) { + // // Copies to *only* the clipboard. + // on(window, 'copy', function fn(ev) { + // var term = Terminal.focus; + // if (!term) return; + // if (!term._selected) return; + // var text = term.grabText( + // term._selected.x1, term._selected.x2, + // term._selected.y1, term._selected.y2); + // term.emit('copy', text); + // ev.clipboardData.setData('text/plain', text); + // }); + // return; + // } + + // Copies to primary selection *and* clipboard. + // NOTE: This may work better on capture phase, + // or using the `beforecopy` event. + on(window, 'copy', function(ev) { + var term = Terminal.focus; + if (!term) return; + if (!term._selected) return; + var textarea = term.getCopyTextarea(); + var text = term.grabText( + term._selected.x1, term._selected.x2, + term._selected.y1, term._selected.y2); + term.emit('copy', text); + textarea.focus(); + textarea.textContent = text; + textarea.value = text; + textarea.setSelectionRange(0, text.length); + setTimeout(function() { + term.element.focus(); + term.focus(); + }, 1); + }); +}; + +/** + * Fix Mobile + */ + +Terminal.prototype.fixMobile = function(document) { + var self = this; + + var textarea = document.createElement('textarea'); + textarea.style.position = 'absolute'; + textarea.style.left = '-32000px'; + textarea.style.top = '-32000px'; + textarea.style.width = '0px'; + textarea.style.height = '0px'; + textarea.style.opacity = '0'; + textarea.style.backgroundColor = 'transparent'; + textarea.style.borderStyle = 'none'; + textarea.style.outlineStyle = 'none'; + textarea.autocapitalize = 'none'; + textarea.autocorrect = 'off'; + + document.getElementsByTagName('body')[0].appendChild(textarea); + + Terminal._textarea = textarea; + + setTimeout(function() { + textarea.focus(); + }, 1000); + + if (this.isAndroid) { + on(textarea, 'change', function() { + var value = textarea.textContent || textarea.value; + textarea.value = ''; + textarea.textContent = ''; + self.send(value + '\r'); + }); + } +}; + +/** + * Insert a default style + */ + +Terminal.insertStyle = function(document, bg, fg) { + var style = document.getElementById('term-style'); + if (style) return; + + var head = document.getElementsByTagName('head')[0]; + if (!head) return; + + var style = document.createElement('style'); + style.id = 'term-style'; + + // textContent doesn't work well with IE for