From 5815666ae853e0598bf3fe6dac9641a0642bd2d7 Mon Sep 17 00:00:00 2001 From: Greg Soltis Date: Mon, 27 Jun 2022 12:06:29 -0700 Subject: [PATCH] Watch task outputs to avoid cache restorations (#1269) Start a daemon, notify it of output globs when a task completes, watch those outputs, check if we can skip cache restores on subsequent runs. Uses `grpc` and `protobuf` for communication. Before shipping: - [x] Wire up `--no-daemon` flag to `run` - [x] Add command for `daemon status` - [x] Add command for `daemon stop` - [x] Add command for `daemon start` - [x] Add command for `daemon restart` - [x] Proper daemon logfile management - [x] Ensure that the client tries to start the server after maybe killing a previous incarnation - [x] Investigate golang internal lockfile - [ ] Verify ordering guarantees on file-watching - [ ] Sort out proper syntax for `protos` in `Makefile` - [x] Shut down if repo root is deleted - [x] Proper signal handling - [ ] Handle watching too many files - [ ] Consider using [panicwrap](https://github.com/mitchellh/panicwrap) --- .github/workflows/ci-go.yml | 22 ++ .github/workflows/golangci-lint.yml | 16 + .vscode/extensions.json | 5 +- .vscode/launch.json | 24 +- .vscode/tasks.json | 11 +- cli/.gitignore | 1 + cli/Makefile | 21 +- cli/cmd/turbo/main.go | 24 +- cli/cmd/turbo/signals.go | 20 - cli/compile-protos | 0 cli/go.mod | 9 + cli/go.sum | 22 ++ cli/internal/daemon/connector/connector.go | 372 ++++++++++++++++++ .../daemon/connector/connector_test.go | 235 +++++++++++ cli/internal/daemon/connector/fork.go | 15 + cli/internal/daemon/connector/fork_windows.go | 15 + cli/internal/daemon/daemon.go | 334 ++++++++++++++++ cli/internal/daemon/daemon_test.go | 261 ++++++++++++ cli/internal/daemon/lifecycle.go | 134 +++++++ cli/internal/daemon/status.go | 92 +++++ cli/internal/daemonclient/daemonclient.go | 67 ++++ cli/internal/filewatcher/filewatcher.go | 184 +++++++++ cli/internal/filewatcher/filewatcher_test.go | 165 ++++++++ cli/internal/fs/path.go | 19 + cli/internal/globwatcher/globwatcher.go | 151 +++++++ cli/internal/globwatcher/globwatcher_test.go | 143 +++++++ cli/internal/run/run.go | 108 +++-- cli/internal/run/run_test.go | 2 +- cli/internal/runcache/output_watcher.go | 28 ++ cli/internal/runcache/runcache.go | 57 ++- cli/internal/server/server.go | 174 ++++++++ cli/internal/server/server_test.go | 72 ++++ cli/internal/signals/signals.go | 60 +++ cli/internal/turbodprotocol/turbod.proto | 52 +++ cli/scripts/monorepo.ts | 6 +- 35 files changed, 2824 insertions(+), 97 deletions(-) create mode 100644 cli/.gitignore delete mode 100644 cli/cmd/turbo/signals.go delete mode 100644 cli/compile-protos create mode 100644 cli/internal/daemon/connector/connector.go create mode 100644 cli/internal/daemon/connector/connector_test.go create mode 100644 cli/internal/daemon/connector/fork.go create mode 100644 cli/internal/daemon/connector/fork_windows.go create mode 100644 cli/internal/daemon/daemon.go create mode 100644 cli/internal/daemon/daemon_test.go create mode 100644 cli/internal/daemon/lifecycle.go create mode 100644 cli/internal/daemon/status.go create mode 100644 cli/internal/daemonclient/daemonclient.go create mode 100644 cli/internal/filewatcher/filewatcher.go create mode 100644 cli/internal/filewatcher/filewatcher_test.go create mode 100644 cli/internal/globwatcher/globwatcher.go create mode 100644 cli/internal/globwatcher/globwatcher_test.go create mode 100644 cli/internal/runcache/output_watcher.go create mode 100644 cli/internal/server/server.go create mode 100644 cli/internal/server/server_test.go create mode 100644 cli/internal/signals/signals.go create mode 100644 cli/internal/turbodprotocol/turbod.proto diff --git a/.github/workflows/ci-go.yml b/.github/workflows/ci-go.yml index 075623d876641..d2d71988cbb1f 100644 --- a/.github/workflows/ci-go.yml +++ b/.github/workflows/ci-go.yml @@ -33,6 +33,17 @@ jobs: cache: true cache-dependency-path: cli/go.sum + - name: Set Up Protoc + uses: arduino/setup-protoc@v1 + with: + version: "3.x" + repo-token: ${{ secrets.GITHUB_TOKEN }} + + - name: Set Up Go and GRPC protobuf + run: | + go install google.golang.org/protobuf/cmd/protoc-gen-go@v1.28.0 + go install google.golang.org/grpc/cmd/protoc-gen-go-grpc@v1.2.0 + - uses: pnpm/action-setup@v2.2.2 with: version: 7.2.1 @@ -93,6 +104,17 @@ jobs: cache: true cache-dependency-path: cli/go.sum + - name: Set Up Protoc + uses: arduino/setup-protoc@v1 + with: + version: "3.x" + repo-token: ${{ secrets.GITHUB_TOKEN }} + + - name: Set Up Go and GRPC protobuf + run: | + go install google.golang.org/protobuf/cmd/protoc-gen-go@v1.28.0 + go install google.golang.org/grpc/cmd/protoc-gen-go-grpc@v1.2.0 + - uses: pnpm/action-setup@v2.2.2 with: version: 7.2.1 diff --git a/.github/workflows/golangci-lint.yml b/.github/workflows/golangci-lint.yml index eb71beac980a5..a4ca041e22a2a 100644 --- a/.github/workflows/golangci-lint.yml +++ b/.github/workflows/golangci-lint.yml @@ -26,11 +26,27 @@ jobs: - uses: actions/checkout@v3 with: fetch-depth: 2 + - uses: actions/setup-go@v3 with: go-version: 1.17 cache: true cache-dependency-path: cli/go.sum + + - name: Set Up Protoc + uses: arduino/setup-protoc@v1 + with: + version: "3.x" + repo-token: ${{ secrets.GITHUB_TOKEN }} + + - name: Set Up Go and GRPC protobuf + run: | + go install google.golang.org/protobuf/cmd/protoc-gen-go@v1.28.0 + go install google.golang.org/grpc/cmd/protoc-gen-go-grpc@v1.2.0 + + - name: Setup Protos + run: cd cli && make compile-protos + - name: golangci-lint uses: golangci/golangci-lint-action@v3 with: diff --git a/.vscode/extensions.json b/.vscode/extensions.json index d5f5527865735..39a0e18fd5035 100644 --- a/.vscode/extensions.json +++ b/.vscode/extensions.json @@ -1,5 +1,5 @@ { - "recommendations": [ + "recommendations": [ "bradlc.vscode-tailwindcss", "christian-kohler.npm-intellisense", "dbaeumer.vscode-eslint", @@ -14,6 +14,7 @@ "ms-vscode-remote.remote-containers", "silvenon.mdx", "windmilleng.vscode-go-autotest", - "yzhang.markdown-all-in-one" + "yzhang.markdown-all-in-one", + "zxh404.vscode-proto3" ] } diff --git a/.vscode/launch.json b/.vscode/launch.json index 420f68e5ddb8f..f1ccb03811690 100644 --- a/.vscode/launch.json +++ b/.vscode/launch.json @@ -10,12 +10,28 @@ "cwd": "${workspaceFolder}/cli", "program": "${workspaceFolder}/cli/scripts/e2e/e2e.ts", "request": "launch", - "skipFiles": [ - "/**" - ], + "skipFiles": ["/**"], "type": "node", "preLaunchTask": "prepare e2e" }, + { + "name": "Server", + "type": "go", + "request": "launch", + "mode": "debug", + "program": "${workspaceRoot}/cli/cmd/turbo", + "cwd": "${workspaceRoot}/examples/basic", + "args": ["daemon"] + }, + { + "name": "Client", + "type": "go", + "request": "launch", + "mode": "debug", + "program": "${workspaceRoot}/cli/cmd/turbo", + "cwd": "${workspaceRoot}/examples/basic", + "args": ["run", "build", "-vvv"] + }, { "name": "turbo --version", "type": "go", @@ -69,6 +85,6 @@ "program": "${workspaceRoot}/cli/cmd/turbo", "cwd": "${workspaceRoot}", "args": ["run", "build", "--force"] - }, + } ] } diff --git a/.vscode/tasks.json b/.vscode/tasks.json index e22c56c21712d..5c2f624bba061 100644 --- a/.vscode/tasks.json +++ b/.vscode/tasks.json @@ -1,6 +1,15 @@ { "version": "2.0.0", "tasks": [ + { + "runOptions": { "runOn": "folderOpen" }, + "label": "Compile protobufs", + "type": "shell", + "options": { + "cwd": "${workspaceRoot}/cli" + }, + "command": "make compile-protos" + }, { "type": "shell", "label": "prepare e2e", @@ -9,7 +18,7 @@ { "type": "shell", "command": "cd ${cwd}/cli && make install", - "label": "make install", + "label": "make install" }, { "type": "shell", diff --git a/cli/.gitignore b/cli/.gitignore new file mode 100644 index 0000000000000..98f78830656c2 --- /dev/null +++ b/cli/.gitignore @@ -0,0 +1 @@ +internal/turbodprotocol/*.go diff --git a/cli/Makefile b/cli/Makefile index 5a1be363a80fc..f8d6269e4a62b 100644 --- a/cli/Makefile +++ b/cli/Makefile @@ -10,10 +10,21 @@ GO_FLAGS += -trimpath GO_FILES = $(shell find . -name "*.go") SRC_FILES = $(shell find . -name "*.go" | grep -v "_test.go") +GENERATED_FILES = internal/turbodprotocol/turbod.pb.go internal/turbodprotocol/turbod_grpc.pb.go -turbo: $(SRC_FILES) go.mod go.sum +turbo: $(GENERATED_FILES) $(SRC_FILES) go.mod CGO_ENABLED=0 go build $(GO_FLAGS) ./cmd/turbo +protoc: internal/turbodprotocol/turbod.proto + protoc --go_out=. --go_opt=paths=source_relative \ + --go-grpc_out=. --go-grpc_opt=paths=source_relative \ + internal/turbodprotocol/turbod.proto + +$(GENERATED_FILES): internal/turbodprotocol/turbod.proto + make protoc + +compile-protos: $(GENERATED_FILES) + ewatch: scripts/... nodemon --exec "make e2e" -e .ts,.go @@ -29,10 +40,12 @@ TURBO_RACE ?= -race clean-go: go clean -testcache ./... -test-go: $(GO_FILES) go.mod go.sum +test-go:$(GENERATED_FILES) $(GO_FILES) go.mod go.sum go test $(TURBO_RACE) ./... -lint-go: $(GO_FILES) go.mod go.sum +# protos need to be compiled before linting, since linting needs to pick up +# some types from the generated code +lint-go: $(GENERATED_FILES) $(GO_FILES) go.mod go.sum golangci-lint run --new-from-rev=main fmt-go: $(GO_FILES) go.mod go.sum @@ -49,7 +62,7 @@ cmd/turbo/version.go: version.txt node -e 'console.log(`package main\n\nconst turboVersion = "$(TURBO_VERSION)"`)' > cmd/turbo/version.go.txt mv cmd/turbo/version.go.txt cmd/turbo/version.go -platform-all: cmd/turbo/version.go +platform-all: cmd/turbo/version.go $(GENERATED_FILES) make -j4 \ platform-windows-64 \ platform-windows-32 \ diff --git a/cli/cmd/turbo/main.go b/cli/cmd/turbo/main.go index 79255df0c36bb..cf1d56938d002 100644 --- a/cli/cmd/turbo/main.go +++ b/cli/cmd/turbo/main.go @@ -10,16 +10,16 @@ import ( "github.com/vercel/turborepo/cli/internal/cmd/auth" "github.com/vercel/turborepo/cli/internal/cmd/info" "github.com/vercel/turborepo/cli/internal/config" + "github.com/vercel/turborepo/cli/internal/daemon" "github.com/vercel/turborepo/cli/internal/login" - "github.com/vercel/turborepo/cli/internal/process" prune "github.com/vercel/turborepo/cli/internal/prune" "github.com/vercel/turborepo/cli/internal/run" + "github.com/vercel/turborepo/cli/internal/signals" "github.com/vercel/turborepo/cli/internal/ui" uiPkg "github.com/vercel/turborepo/cli/internal/ui" "github.com/vercel/turborepo/cli/internal/util" "github.com/fatih/color" - hclog "github.com/hashicorp/go-hclog" "github.com/mitchellh/cli" ) @@ -66,18 +66,11 @@ func main() { os.Exit(1) } - var logger hclog.Logger - if cf != nil { - logger = cf.Logger - } else { - logger = hclog.Default() - } - processes := process.NewManager(logger.Named("processes")) - signalCh := watchSignals(func() { processes.Close() }) + signalWatcher := signals.NewWatcher() c.HiddenCommands = []string{"graph"} c.Commands = map[string]cli.CommandFactory{ "run": func() (cli.Command, error) { - return &run.RunCommand{Config: cf, Ui: ui, Processes: processes}, + return &run.RunCommand{Config: cf, UI: ui, SignalWatcher: signalWatcher}, nil }, "prune": func() (cli.Command, error) { @@ -98,6 +91,9 @@ func main() { "bin": func() (cli.Command, error) { return &info.BinCommand{Config: cf, UI: ui}, nil }, + "daemon": func() (cli.Command, error) { + return &daemon.Command{Config: cf, UI: ui, SignalWatcher: signalWatcher}, nil + }, } // Capture the defer statements below so the "done" message comes last @@ -178,8 +174,10 @@ func main() { // or to receive a signal, in which case the signal handler above does the cleanup select { case <-doneCh: - processes.Close() - case <-signalCh: + // We finished whatever task we were running + signalWatcher.Close() + case <-signalWatcher.Done(): + // We caught a signal, which already called the close handlers } os.Exit(exitCode) } diff --git a/cli/cmd/turbo/signals.go b/cli/cmd/turbo/signals.go deleted file mode 100644 index 3329c8a7f6ede..0000000000000 --- a/cli/cmd/turbo/signals.go +++ /dev/null @@ -1,20 +0,0 @@ -package main - -import ( - "os" - "os/signal" - "syscall" -) - -func watchSignals(onClose func()) <-chan struct{} { - // TODO: platform specific signals to watch for? - doneCh := make(chan struct{}) - signalCh := make(chan os.Signal, 1) - signal.Notify(signalCh, os.Interrupt, syscall.SIGTERM, syscall.SIGQUIT) - go func() { - <-signalCh - onClose() - close(doneCh) - }() - return doneCh -} diff --git a/cli/compile-protos b/cli/compile-protos deleted file mode 100644 index e69de29bb2d1d..0000000000000 diff --git a/cli/go.mod b/cli/go.mod index 445c4e56626e6..629c87faaa238 100644 --- a/cli/go.mod +++ b/cli/go.mod @@ -9,9 +9,11 @@ require ( github.com/briandowns/spinner v1.18.1 github.com/deckarep/golang-set v1.8.0 github.com/fatih/color v1.13.0 + github.com/fsnotify/fsnotify v1.5.4 github.com/gobwas/glob v0.2.3 github.com/google/chrometracing v0.0.0-20210413150014-55fded0163e7 github.com/google/uuid v1.3.0 + github.com/grpc-ecosystem/go-grpc-middleware v1.3.0 github.com/hashicorp/go-gatedio v0.5.0 github.com/hashicorp/go-hclog v1.2.1 github.com/hashicorp/go-retryablehttp v0.6.8 @@ -21,6 +23,7 @@ require ( github.com/mitchellh/cli v1.1.2 github.com/mitchellh/go-homedir v1.1.0 github.com/mitchellh/mapstructure v1.4.3 + github.com/nightlyone/lockfile v1.0.0 github.com/pkg/errors v0.9.1 github.com/pyr-sh/dag v1.0.0 github.com/sabhiram/go-gitignore v0.0.0-20201211210132-54b8a0bf510f @@ -29,6 +32,8 @@ require ( github.com/stretchr/testify v1.7.2 github.com/yosuke-furukawa/json5 v0.1.1 golang.org/x/sync v0.0.0-20210220032951-036812b2e83c + google.golang.org/grpc v1.46.0 + google.golang.org/protobuf v1.28.0 gopkg.in/yaml.v3 v3.0.1 gotest.tools/v3 v3.2.0 ) @@ -38,7 +43,9 @@ require ( github.com/Masterminds/sprig v2.22.0+incompatible // indirect github.com/armon/go-radix v1.0.0 // indirect github.com/bgentry/speakeasy v0.1.0 // indirect + github.com/cenkalti/backoff/v4 v4.1.3 // indirect github.com/davecgh/go-spew v1.1.1 // indirect + github.com/golang/protobuf v1.5.2 // indirect github.com/google/go-cmp v0.5.6 // indirect github.com/hashicorp/errwrap v1.1.0 // indirect github.com/hashicorp/go-cleanhttp v0.5.2 // indirect @@ -56,8 +63,10 @@ require ( github.com/pmezard/go-difflib v1.0.0 // indirect github.com/posener/complete v1.2.3 // indirect golang.org/x/crypto v0.0.0-20211108221036-ceb1ce70b4fa // indirect + golang.org/x/net v0.0.0-20210813160813-60bc85c4be6d // indirect golang.org/x/sys v0.0.0-20220503163025-988cb79eb6c6 // indirect golang.org/x/term v0.0.0-20210503060354-a79de5458b56 // indirect golang.org/x/text v0.3.7 // indirect + google.golang.org/genproto v0.0.0-20211208223120-3a66f561d7aa // indirect gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f // indirect ) diff --git a/cli/go.sum b/cli/go.sum index 30d71d2cc2f5f..31dfa3e3a81ba 100644 --- a/cli/go.sum +++ b/cli/go.sum @@ -80,6 +80,8 @@ github.com/bgentry/speakeasy v0.1.0 h1:ByYyxL9InA1OWqxJqqp2A5pYHUrCiAL6K3J+LKSsQ github.com/bgentry/speakeasy v0.1.0/go.mod h1:+zsyZBPWlz7T6j88CTgSN5bM796AkVf0kBD4zp0CCIs= github.com/briandowns/spinner v1.18.1 h1:yhQmQtM1zsqFsouh09Bk/jCjd50pC3EOGsh28gLVvwY= github.com/briandowns/spinner v1.18.1/go.mod h1:mQak9GHqbspjC/5iUx3qMlIho8xBS/ppAL/hX5SmPJU= +github.com/cenkalti/backoff/v4 v4.1.3 h1:cFAlzYUlVYDysBEH2T5hyJZMh3+5+WCBvSnK6Q8UtC4= +github.com/cenkalti/backoff/v4 v4.1.3/go.mod h1:scbssz8iZGpm3xbr14ovlUdkxfGXNInqkPWOWmG2CLw= github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= github.com/census-instrumentation/opencensus-proto v0.3.0/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghfAqPWnc= @@ -121,6 +123,7 @@ github.com/envoyproxy/go-control-plane v0.9.9-0.20210217033140-668b12f5399d/go.m github.com/envoyproxy/go-control-plane v0.9.9-0.20210512163311-63b5d3c536b0/go.mod h1:hliV/p42l8fGbc6Y9bQ70uLwIvmJyVE5k4iMKlh8wCQ= github.com/envoyproxy/go-control-plane v0.9.10-0.20210907150352-cf90f659a021/go.mod h1:AFq3mo9L8Lqqiid3OhADV3RfLJnjiw63cSpi+fDTRC0= github.com/envoyproxy/go-control-plane v0.10.1/go.mod h1:AY7fTTXNdv/aJ2O5jwpxAPOWUZ7hQAEvzN5Pf27BkQQ= +github.com/envoyproxy/go-control-plane v0.10.2-0.20220325020618-49ff273808a1/go.mod h1:KJwIaB5Mv44NWtYuAOFCVOjcI94vtpEz2JU/D2v6IjE= github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c= github.com/envoyproxy/protoc-gen-validate v0.6.2/go.mod h1:2t7qjJNvHPx8IjnBOzl9E9/baC+qXE/TeeyBRzgJDws= github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5KwzbycvMj4= @@ -128,6 +131,8 @@ github.com/fatih/color v1.9.0/go.mod h1:eQcE1qtQxscV5RaZvpXrrb8Drkc3/DdQ+uUYCNjL github.com/fatih/color v1.13.0 h1:8LOYc1KYPPmyKMuN8QV2DNRWNbLo6LZ0iLs8+mlH53w= github.com/fatih/color v1.13.0/go.mod h1:kLAiJbzzSOZDVNGyDpeOxJ47H46qBXwg5ILebYFFOfk= github.com/fsnotify/fsnotify v1.5.1/go.mod h1:T3375wBYaZdLLcVNkcVbzGHY7f1l/uK5T5Ai1i3InKU= +github.com/fsnotify/fsnotify v1.5.4 h1:jRbGcIw6P2Meqdwuo0H1p6JVLbL5DHKAKlYndzMwVZI= +github.com/fsnotify/fsnotify v1.5.4/go.mod h1:OVB6XrOHzAwXMpEM7uPOzcehqUV2UqJxmVXmkdnm1bU= github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04= github.com/go-gl/glfw v0.0.0-20190409004039-e6da0acd62b1/go.mod h1:vR7hzQXu2zJy9AVAgeJqvqgH9Q5CA+iKCZ2gyEVpxRU= github.com/go-gl/glfw/v3.3/glfw v0.0.0-20191125211704-12ad95a8df72/go.mod h1:tQ2UAYgL5IevRw8kRxooKSPJfGvJ9fJQFa0TUsXzTg8= @@ -172,6 +177,7 @@ github.com/golang/protobuf v1.4.2/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw github.com/golang/protobuf v1.4.3/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI= github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk= github.com/golang/protobuf v1.5.1/go.mod h1:DopwsBzvsk0Fs44TXzsVbJyPhcCPeIwnvohx4u74HPM= +github.com/golang/protobuf v1.5.2 h1:ROPKBNFfQgOUMifHyP+KYbvpjbdoFNs+aK7DXlji0Tw= github.com/golang/protobuf v1.5.2/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY= github.com/golang/snappy v0.0.3/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ= @@ -218,6 +224,8 @@ github.com/googleapis/gax-go/v2 v2.0.4/go.mod h1:0Wqv26UfaUD9n4G6kQubkQ+KchISgw+ github.com/googleapis/gax-go/v2 v2.0.5/go.mod h1:DWXyrwAJ9X0FpwwEdw+IPEYBICEFu5mhpdKc/us6bOk= github.com/googleapis/gax-go/v2 v2.1.0/go.mod h1:Q3nei7sK6ybPYH7twZdmQpAd1MKb7pfu6SK+H1/DsU0= github.com/googleapis/gax-go/v2 v2.1.1/go.mod h1:hddJymUZASv3XPyGkUpKj8pPO47Rmb0eJc8R6ouapiM= +github.com/grpc-ecosystem/go-grpc-middleware v1.3.0 h1:+9834+KizmvFV7pXQGSXQTsaWhq2GjuNUt0aUU0YBYw= +github.com/grpc-ecosystem/go-grpc-middleware v1.3.0/go.mod h1:z0ButlSOZa5vEBq9m2m2hlwIgKw+rp3sdCBRoJY+30Y= github.com/grpc-ecosystem/grpc-gateway v1.16.0/go.mod h1:BDjrQk3hbvj6Nolgz8mAMFbcEtjT1g+wF4CSlocrBnw= github.com/hashicorp/consul/api v1.11.0/go.mod h1:XjsvQN+RJGWI2TWy1/kqaE16HrR2J/FWgkYjdZQsX9M= github.com/hashicorp/consul/sdk v0.8.0/go.mod h1:GBvyrGALthsZObzUGsfgHZQDXjg4lOjagTIwIR1vPms= @@ -341,6 +349,9 @@ github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjY github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U= github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e h1:fD57ERR4JtEqsWbfPhv4DMiApHyliiK5xCTNVSPiaAs= github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno= +github.com/nightlyone/lockfile v1.0.0 h1:RHep2cFKK4PonZJDdEl4GmkabuhbsRMgk/k3uAmxBiA= +github.com/nightlyone/lockfile v1.0.0/go.mod h1:rywoIealpdNse2r832aiD9jRk8ErCatROs6LzC841CI= +github.com/opentracing/opentracing-go v1.1.0/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o= github.com/pascaldekloe/goe v0.0.0-20180627143212-57f6aae5913c/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc= github.com/pascaldekloe/goe v0.1.0/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc= github.com/pelletier/go-toml v1.9.4/go.mod h1:u1nR/EPcESfeI/szUZKdtJ0xRNbUoANCkoOuaOx1Y+c= @@ -419,8 +430,11 @@ go.opencensus.io v0.22.4/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw= go.opencensus.io v0.22.5/go.mod h1:5pWMHQbX5EPX2/62yrJeAkowc+lfs/XD7Uxpq3pI6kk= go.opencensus.io v0.23.0/go.mod h1:XItmlyltB5F7CS4xOC1DcqMoFqwtC6OG2xF7mCv7P7E= go.opentelemetry.io/proto/otlp v0.7.0/go.mod h1:PqfVotwruBrMGOCsRd/89rSnXhoiJIqeYNgFYFoEGnI= +go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE= go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= +go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0= go.uber.org/multierr v1.6.0/go.mod h1:cdWPpRnG4AhwMwsgIHip0KRBQjJy5kYEpYjJxpXp9iU= +go.uber.org/zap v1.10.0/go.mod h1:vwi/ZaCAaUcBkycHslxD9B2zi4UTXhF60s6SWpuDF0Q= go.uber.org/zap v1.17.0/go.mod h1:MXVU+bhUf/A7Xi2HNOnopQOrmycQ5Ih87HtOu4q5SSo= golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= golang.org/x/crypto v0.0.0-20181029021203-45a5f77698d3/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= @@ -511,6 +525,7 @@ golang.org/x/net v0.0.0-20210316092652-d523dce5a7f4/go.mod h1:RBQZq4jEuRlivfhVLd golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM= golang.org/x/net v0.0.0-20210410081132-afb366fc7cd1/go.mod h1:9tjilg8BloeKEkVJvy7fQ90B1CfIiPueXVOjqfkSzI8= golang.org/x/net v0.0.0-20210503060351-7fd8e65b6420/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= +golang.org/x/net v0.0.0-20210813160813-60bc85c4be6d h1:LO7XpTYMwTqxjLcGWPijK3vRXg1aWdlNOVOHRq45d7c= golang.org/x/net v0.0.0-20210813160813-60bc85c4be6d/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= @@ -606,6 +621,7 @@ golang.org/x/sys v0.0.0-20210927094055-39ccf1dd6fa6/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20211007075335-d3039528d8ac/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20211124211545-fe61309f8881/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20211205182925-97ca703d548d/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220412211240-33da011f77ad/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220422013727-9388b58f7150/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220503163025-988cb79eb6c6 h1:nonptSpoQ4vQjyraW20DXPAglgQfVnM9ZC6MmNLMR60= golang.org/x/sys v0.0.0-20220503163025-988cb79eb6c6/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= @@ -744,6 +760,7 @@ google.golang.org/genproto v0.0.0-20200228133532-8c2c7df3a383/go.mod h1:55QSHmfG google.golang.org/genproto v0.0.0-20200305110556-506484158171/go.mod h1:55QSHmfGQM9UVYDPBsyGGes0y52j32PQ3BqQfXhyH3c= google.golang.org/genproto v0.0.0-20200312145019-da6875a35672/go.mod h1:55QSHmfGQM9UVYDPBsyGGes0y52j32PQ3BqQfXhyH3c= google.golang.org/genproto v0.0.0-20200331122359-1ee6d9798940/go.mod h1:55QSHmfGQM9UVYDPBsyGGes0y52j32PQ3BqQfXhyH3c= +google.golang.org/genproto v0.0.0-20200423170343-7949de9c1215/go.mod h1:55QSHmfGQM9UVYDPBsyGGes0y52j32PQ3BqQfXhyH3c= google.golang.org/genproto v0.0.0-20200430143042-b979b6f78d84/go.mod h1:55QSHmfGQM9UVYDPBsyGGes0y52j32PQ3BqQfXhyH3c= google.golang.org/genproto v0.0.0-20200511104702-f5ebc3bea380/go.mod h1:55QSHmfGQM9UVYDPBsyGGes0y52j32PQ3BqQfXhyH3c= google.golang.org/genproto v0.0.0-20200513103714-09dca8ec2884/go.mod h1:55QSHmfGQM9UVYDPBsyGGes0y52j32PQ3BqQfXhyH3c= @@ -785,6 +802,7 @@ google.golang.org/genproto v0.0.0-20211118181313-81c1377c94b1/go.mod h1:5CzLGKJ6 google.golang.org/genproto v0.0.0-20211129164237-f09f9a12af12/go.mod h1:5CzLGKJ67TSI2B9POpiiyGha0AjJvZIUgRMt1dSmuhc= google.golang.org/genproto v0.0.0-20211203200212-54befc351ae9/go.mod h1:5CzLGKJ67TSI2B9POpiiyGha0AjJvZIUgRMt1dSmuhc= google.golang.org/genproto v0.0.0-20211206160659-862468c7d6e0/go.mod h1:5CzLGKJ67TSI2B9POpiiyGha0AjJvZIUgRMt1dSmuhc= +google.golang.org/genproto v0.0.0-20211208223120-3a66f561d7aa h1:I0YcKz0I7OAhddo7ya8kMnvprhcWM045PmkBdMO9zN0= google.golang.org/genproto v0.0.0-20211208223120-3a66f561d7aa/go.mod h1:5CzLGKJ67TSI2B9POpiiyGha0AjJvZIUgRMt1dSmuhc= google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c= google.golang.org/grpc v1.20.1/go.mod h1:10oTOabMzJvdu6/UiuZezV6QK5dSlG84ov/aaiqXj38= @@ -813,6 +831,8 @@ google.golang.org/grpc v1.39.1/go.mod h1:PImNr+rS9TWYb2O4/emRugxiyHZ5JyHW5F+RPnD google.golang.org/grpc v1.40.0/go.mod h1:ogyxbiOoUXAkP+4+xa6PZSE9DZgIHtSpzjDTB9KAK34= google.golang.org/grpc v1.40.1/go.mod h1:ogyxbiOoUXAkP+4+xa6PZSE9DZgIHtSpzjDTB9KAK34= google.golang.org/grpc v1.42.0/go.mod h1:k+4IHHFw41K8+bbowsex27ge2rCb65oeWqe4jJ590SU= +google.golang.org/grpc v1.46.0 h1:oCjezcn6g6A75TGoKYBPgKmVBLexhYLM6MebdrPApP8= +google.golang.org/grpc v1.46.0/go.mod h1:vN9eftEi1UMyUsIF80+uQXhHjbXYbm0uXoFCACuMGWk= google.golang.org/grpc/cmd/protoc-gen-go-grpc v1.1.0/go.mod h1:6Kw0yEErY5E/yWrBtf03jp27GLLJujG4z/JK95pnjjw= google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8= google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0= @@ -827,6 +847,8 @@ google.golang.org/protobuf v1.25.0/go.mod h1:9JNX74DMeImyA3h4bdi1ymwjUzf21/xIlba google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= google.golang.org/protobuf v1.27.1/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= +google.golang.org/protobuf v1.28.0 h1:w43yiav+6bVFTBQFZX0r7ipe9JQ1QsbMgHwbBziscLw= +google.golang.org/protobuf v1.28.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I= gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= diff --git a/cli/internal/daemon/connector/connector.go b/cli/internal/daemon/connector/connector.go new file mode 100644 index 0000000000000..c1c77016e636b --- /dev/null +++ b/cli/internal/daemon/connector/connector.go @@ -0,0 +1,372 @@ +package connector + +import ( + "context" + "fmt" + "os" + "os/exec" + "time" + + "github.com/cenkalti/backoff/v4" + "github.com/hashicorp/go-hclog" + "github.com/nightlyone/lockfile" + "github.com/pkg/errors" + "github.com/vercel/turborepo/cli/internal/fs" + "github.com/vercel/turborepo/cli/internal/turbodprotocol" + "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/credentials/insecure" + "google.golang.org/grpc/status" +) + +var ( + // ErrFailedToStart is returned when the daemon process cannot be started + ErrFailedToStart = errors.New("daemon could not be started") + errVersionMismatch = errors.New("daemon version does not match client version") + errConnectionFailure = errors.New("could not connect to daemon") + // ErrTooManyAttempts is returned when the client fails to connect too many times + ErrTooManyAttempts = errors.New("reached maximum number of attempts contacting daemon") + // ErrDaemonNotRunning is returned when the client cannot contact the daemon and has + // been instructed not to attempt to start a new daemon + ErrDaemonNotRunning = errors.New("the daemon is not running") +) + +// Opts is the set of configurable options for the client connection, +// including some options to be passed through to the daemon process if +// it needs to be started. +type Opts struct { + ServerTimeout time.Duration + DontStart bool // if true, don't attempt to start the daemon +} + +// Client represents a connection to the daemon process +type Client struct { + turbodprotocol.TurbodClient + *grpc.ClientConn + SockPath fs.AbsolutePath + PidPath fs.AbsolutePath + LogPath fs.AbsolutePath +} + +// Connector instances are used to create a connection to turbo's daemon process +// The daemon will be started , or killed and restarted, if necessary +type Connector struct { + Logger hclog.Logger + Bin string + Opts Opts + SockPath fs.AbsolutePath + PidPath fs.AbsolutePath + LogPath fs.AbsolutePath + TurboVersion string +} + +// ConnectionError is returned in the error case from connect. It wraps the underlying +// cause and adds a message with the relevant files for the user to check. +type ConnectionError struct { + SockPath fs.AbsolutePath + PidPath fs.AbsolutePath + LogPath fs.AbsolutePath + cause error +} + +func (ce *ConnectionError) Error() string { + return fmt.Sprintf(`connection to turbo daemon process failed. Please ensure the following: + - the process identified by the pid in the file at %v is not running, and remove %v + - check the logs at %v + - the unix domain socket at %v has been removed + You can also run without the daemon process by passing --no-daemon`, ce.PidPath, ce.PidPath, ce.LogPath, ce.SockPath) +} + +// Unwrap allows a connection error to work with standard library "errors" and compatible packages +func (ce *ConnectionError) Unwrap() error { + return ce.cause +} + +func (c *Connector) wrapConnectionError(err error) error { + return &ConnectionError{ + SockPath: c.SockPath, + PidPath: c.PidPath, + LogPath: c.LogPath, + cause: err, + } +} + +// lockFile returns a pointer to where a lockfile should be. +// lockfile.New does not perform IO and the only error it produces +// is in the case a non-absolute path was provided. We're guaranteeing an +// AbsolutePath, so an error here is an indication of a bug and +// we should crash. +func (c *Connector) lockFile() lockfile.Lockfile { + lockFile, err := lockfile.New(c.PidPath.ToString()) + if err != nil { + panic(err) + } + return lockFile +} + +func (c *Connector) addr() string { + return fmt.Sprintf("unix://%v", c.SockPath.ToString()) +} + +// We defer to the daemon's pid file as the locking mechanism. +// If it doesn't exist, we will attempt to start the daemon. +// If the daemon has a different version, ask it to shut down. +// If the pid file exists but we can't connect, try to kill +// the daemon. +// If we can't cause the daemon to remove the pid file, report +// an error to the user that includes the file location so that +// they can resolve it. +const ( + _maxAttempts = 3 + _shutdownTimeout = 1 * time.Second + _socketPollTimeout = 1 * time.Second +) + +// killLiveServer tells a running server to shut down. This method is also responsible +// for closing this client connection. +func (c *Connector) killLiveServer(ctx context.Context, client *Client, serverPid int) error { + defer func() { _ = client.Close() }() + + _, err := client.Shutdown(ctx, &turbodprotocol.ShutdownRequest{}) + if err != nil { + c.Logger.Error(fmt.Sprintf("failed to shutdown running daemon. attempting to force it closed: %v", err)) + return c.killDeadServer(serverPid) + } + // Wait for the server to gracefully exit + err = backoff.Retry(func() error { + lockFile := c.lockFile() + owner, err := lockFile.GetOwner() + if os.IsNotExist(err) { + // If there is no pid more file, we can conclude that the daemon successfully + // exited and cleaned up after itself. + return nil + } else if err != nil { + // some other error occurred getting the lockfile owner + return backoff.Permanent(err) + } else if owner.Pid == serverPid { + // // We're still waiting for the server to shut down + return errNeedsRetry + } + // if there's no error and the lockfile has a new pid, someone else must've started a new daemon. + // Consider the old one killed and move on. + return nil + }, backoffWithTimeout(_shutdownTimeout)) + if errors.Is(err, errNeedsRetry) { + c.Logger.Error(fmt.Sprintf("daemon did not exit after %v, attempting to force it closed", _shutdownTimeout.String())) + return c.killDeadServer(serverPid) + } else if err != nil { + return err + } + return nil +} + +func (c *Connector) killDeadServer(pid int) error { + // currently the only error that this constructor returns is + // in the case that you don't provide an absolute path. + // Given that we require an absolute path as input, this should + // hopefully never happen. + lockFile := c.lockFile() + process, err := lockFile.GetOwner() + if err == nil { + // Check that this is the same process that we failed to connect to. + // Otherwise, connectInternal will loop around again and start with whatever + // new process has the pid file. + if process.Pid == pid { + // we have a process that we need to kill + // TODO(gsoltis): graceful kill? the process is already not responding to requests, + // but it could be in the middle of a graceful shutdown. Probably should let it clean + // itself up, and report an error and defer to a force-kill by the user + if err := process.Kill(); err != nil { + return err + } + } + return nil + } else if errors.Is(err, os.ErrNotExist) { + // There's no pid file. Someone else killed it. Returning no error will cause the + // connectInternal to loop around and try the connection again. + return nil + } + return err +} + +// Connect attempts to create a connection to a turbo daemon. +// Retries and daemon restarts are built in. If this fails, +// it is unlikely to succeed after an automated retry. +func (c *Connector) Connect(ctx context.Context) (*Client, error) { + client, err := c.connectInternal(ctx) + if err != nil { + return nil, c.wrapConnectionError(err) + } + return client, nil +} + +func (c *Connector) connectInternal(ctx context.Context) (*Client, error) { + // for each attempt, we: + // 1. try to find or start a daemon process, getting its pid + // 2. wait for the unix domain socket file to appear + // 3. connect to the unix domain socket. Note that this connection is not validated + // 4. send a hello message. This validates the connection as a side effect of + // negotiating versions, which currently requires exact match. + // In the event of a live, but incompatible server, we attempt to shut it down and start + // a new one. In the event of an unresponsive server, we attempt to kill the process + // identified by the pid file, with the hope that it will clean up after itself. + // Failures include details about where to find logs, the pid file, and the socket file. + for i := 0; i < _maxAttempts; i++ { + serverPid, err := c.getOrStartDaemon() + if err != nil { + // If we fail to even start the daemon process, return immediately, we're unlikely + // to succeed without user intervention + return nil, err + } + if err := c.waitForSocket(); errors.Is(err, ErrFailedToStart) { + // If we didn't see the socket file, try again. It's possible that + // the daemon encountered an transitory error + continue + } else if err != nil { + return nil, err + } + client, err := c.getClientConn() + if err != nil { + return nil, err + } + if err := c.sendHello(ctx, client); err == nil { + // We connected and negotiated a version, we're all set + return client, nil + } else if errors.Is(err, errVersionMismatch) { + // We now know we aren't going to return this client, + // but killLiveServer still needs it to send the Shutdown request. + // killLiveServer will close the client when it is done with it. + if err := c.killLiveServer(ctx, client, serverPid); err != nil { + return nil, err + } + } else if errors.Is(err, errConnectionFailure) { + // close the client, see if we can kill the stale daemon + _ = client.Close() + if err := c.killDeadServer(serverPid); err != nil { + return nil, err + } + // if we successfully killed the dead server, loop around and try again + } else if err != nil { + // Some other error occurred, close the client and + // report the error to the user + if closeErr := client.Close(); closeErr != nil { + // In the event that we fail to close the client, bundle that error along also. + // Keep the original error in the error chain, as it's more likely to be useful + // or needed for matching on later. + err = errors.Wrapf(err, "also failed to close client connection: %v", closeErr) + } + return nil, err + } + } + return nil, ErrTooManyAttempts +} + +// getOrStartDaemon returns the PID of the daemon process on success. It may start +// the daemon if it doesn't find one running. +func (c *Connector) getOrStartDaemon() (int, error) { + lockFile := c.lockFile() + if daemonProcess, err := lockFile.GetOwner(); errors.Is(err, lockfile.ErrDeadOwner) { + // If we've found a pid file but no corresponding process, there's nothing we can do. + // We defer to the user to clean up the pid file. + return 0, errors.Wrapf(err, "pid file appears stale. If no daemon is running, please remove it: %v", c.PidPath) + } else if os.IsNotExist(err) { + if c.Opts.DontStart { + return 0, ErrDaemonNotRunning + } + // The pid file doesn't exist. Start a daemon + pid, err := c.startDaemon() + if err != nil { + return 0, err + } + return pid, nil + } else { + return daemonProcess.Pid, nil + } +} + +func (c *Connector) getClientConn() (*Client, error) { + creds := insecure.NewCredentials() + conn, err := grpc.Dial(c.addr(), grpc.WithTransportCredentials(creds)) + if err != nil { + return nil, err + } + tc := turbodprotocol.NewTurbodClient(conn) + return &Client{ + TurbodClient: tc, + ClientConn: conn, + SockPath: c.SockPath, + PidPath: c.PidPath, + LogPath: c.LogPath, + }, nil +} + +func (c *Connector) sendHello(ctx context.Context, client turbodprotocol.TurbodClient) error { + _, err := client.Hello(ctx, &turbodprotocol.HelloRequest{ + Version: c.TurboVersion, + // TODO: add session id + }) + status := status.Convert(err) + switch status.Code() { + case codes.OK: + return nil + case codes.FailedPrecondition: + return errVersionMismatch + case codes.Unavailable: + return errConnectionFailure + default: + return err + } +} + +var errNeedsRetry = errors.New("retry the operation") + +// backoffWithTimeout returns an exponential backoff, starting at 2ms and doubling until +// the specific timeout has elapsed. Note that backoff instances are stateful, so we need +// a new one each time we do a Retry. +func backoffWithTimeout(timeout time.Duration) *backoff.ExponentialBackOff { + return &backoff.ExponentialBackOff{ + Multiplier: 2, + InitialInterval: 2 * time.Millisecond, + MaxElapsedTime: timeout, + Clock: backoff.SystemClock, + Stop: backoff.Stop, + } +} + +// waitForSocket waits for the unix domain socket to appear +func (c *Connector) waitForSocket() error { + // Note that we don't care if this is our daemon + // or not. We started a process, but someone else could beat + // use to listening. That's fine, we'll check the version + // later. + err := backoff.Retry(func() error { + if !c.SockPath.FileExists() { + return errNeedsRetry + } + return nil + }, backoffWithTimeout(_socketPollTimeout)) + if errors.Is(err, errNeedsRetry) { + return ErrFailedToStart + } else if err != nil { + return err + } + return nil +} + +// startDaemon starts the daemon and returns the pid for the new process +func (c *Connector) startDaemon() (int, error) { + args := []string{"daemon"} + if c.Opts.ServerTimeout != 0 { + args = append(args, fmt.Sprintf("--idle-time=%v", c.Opts.ServerTimeout.String())) + } + c.Logger.Debug(fmt.Sprintf("starting turbod binary %v", c.Bin)) + cmd := exec.Command(c.Bin, args...) + // For the daemon to have its own process group id so that any attempts + // to kill it and its process tree don't kill this client. + cmd.SysProcAttr = getSysProcAttrs() + err := cmd.Start() + if err != nil { + return 0, err + } + return cmd.Process.Pid, nil +} diff --git a/cli/internal/daemon/connector/connector_test.go b/cli/internal/daemon/connector/connector_test.go new file mode 100644 index 0000000000000..778316ef1536a --- /dev/null +++ b/cli/internal/daemon/connector/connector_test.go @@ -0,0 +1,235 @@ +package connector + +import ( + "context" + "errors" + "net" + "os/exec" + "runtime" + "strconv" + "testing" + + "github.com/hashicorp/go-hclog" + "github.com/nightlyone/lockfile" + "github.com/vercel/turborepo/cli/internal/fs" + "github.com/vercel/turborepo/cli/internal/turbodprotocol" + "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/credentials/insecure" + "google.golang.org/grpc/status" + "google.golang.org/grpc/test/bufconn" + "gotest.tools/v3/assert" +) + +// testBin returns a platform-appropriate executable to run node. +// Node works here as an arbitrary process to start, since it's +// required for turbo development. It will obviously not implement +// our grpc service, use a mockServer instance where that's needed. +func testBin() string { + if runtime.GOOS == "windows" { + return "node.exe" + } + return "node" +} + +func getUnixSocket(dir fs.AbsolutePath) fs.AbsolutePath { + return dir.Join("turbod-test.sock") +} + +func getPidFile(dir fs.AbsolutePath) fs.AbsolutePath { + return dir.Join("turbod-test.pid") +} + +func TestConnectFailsWithoutGrpcServer(t *testing.T) { + // We aren't starting a server that is going to write + // to our socket file, so we should see a series of connection + // failures, followed by ErrTooManyAttempts + logger := hclog.Default() + dir := t.TempDir() + dirPath := fs.AbsolutePathFromUpstream(dir) + + sockPath := getUnixSocket(dirPath) + pidPath := getPidFile(dirPath) + ctx := context.Background() + bin := testBin() + c := &Connector{ + Logger: logger, + Bin: bin, + Opts: Opts{}, + SockPath: sockPath, + PidPath: pidPath, + } + // Note that we expect ~3s here, for 3 attempts with a timeout of 1s + _, err := c.connectInternal(ctx) + assert.ErrorIs(t, err, ErrTooManyAttempts) +} + +func TestKillDeadServerNoPid(t *testing.T) { + logger := hclog.Default() + dir := t.TempDir() + dirPath := fs.AbsolutePathFromUpstream(dir) + + sockPath := getUnixSocket(dirPath) + pidPath := getPidFile(dirPath) + c := &Connector{ + Logger: logger, + Bin: "nonexistent", + Opts: Opts{}, + SockPath: sockPath, + PidPath: pidPath, + } + + err := c.killDeadServer(99999) + assert.NilError(t, err, "killDeadServer") +} + +func TestKillDeadServerNoProcess(t *testing.T) { + logger := hclog.Default() + dir := t.TempDir() + dirPath := fs.AbsolutePathFromUpstream(dir) + + sockPath := getUnixSocket(dirPath) + pidPath := getPidFile(dirPath) + // Simulate the socket already existing, with no live daemon + err := sockPath.WriteFile([]byte("junk"), 0644) + assert.NilError(t, err, "WriteFile") + err = pidPath.WriteFile([]byte("99999"), 0644) + assert.NilError(t, err, "WriteFile") + c := &Connector{ + Logger: logger, + Bin: "nonexistent", + Opts: Opts{}, + SockPath: sockPath, + PidPath: pidPath, + } + + err = c.killDeadServer(99999) + assert.ErrorIs(t, err, lockfile.ErrDeadOwner) + stillExists := pidPath.FileExists() + if !stillExists { + t.Error("pidPath should still exist, expected the user to clean it up") + } +} + +func TestKillDeadServerWithProcess(t *testing.T) { + logger := hclog.Default() + dir := t.TempDir() + dirPath := fs.AbsolutePathFromUpstream(dir) + + sockPath := getUnixSocket(dirPath) + pidPath := getPidFile(dirPath) + // Simulate the socket already existing, with no live daemon + err := sockPath.WriteFile([]byte("junk"), 0644) + assert.NilError(t, err, "WriteFile") + bin := testBin() + cmd := exec.Command(bin) + err = cmd.Start() + assert.NilError(t, err, "cmd.Start") + pid := cmd.Process.Pid + if pid == 0 { + t.Fatalf("failed to start process %v", bin) + } + + err = pidPath.WriteFile([]byte(strconv.Itoa(pid)), 0644) + assert.NilError(t, err, "WriteFile") + c := &Connector{ + Logger: logger, + Bin: "nonexistent", + Opts: Opts{}, + SockPath: sockPath, + PidPath: pidPath, + } + + err = c.killDeadServer(pid) + assert.NilError(t, err, "killDeadServer") + stillExists := pidPath.FileExists() + if !stillExists { + t.Error("pidPath no longer exists, expected client to not clean it up") + } + err = cmd.Wait() + exitErr := &exec.ExitError{} + if !errors.As(err, &exitErr) { + t.Errorf("expected an exit error from %v, got %v", bin, err) + } +} + +type mockServer struct { + turbodprotocol.UnimplementedTurbodServer + helloErr error + shutdownResp *turbodprotocol.ShutdownResponse + pidFile fs.AbsolutePath +} + +// Simulates server exiting by cleaning up the pid file +func (s *mockServer) Shutdown(ctx context.Context, req *turbodprotocol.ShutdownRequest) (*turbodprotocol.ShutdownResponse, error) { + if err := s.pidFile.Remove(); err != nil { + return nil, err + } + return s.shutdownResp, nil +} + +func (s *mockServer) Hello(ctx context.Context, req *turbodprotocol.HelloRequest) (*turbodprotocol.HelloResponse, error) { + if req.Version == "" { + return nil, errors.New("missing version") + } + return nil, s.helloErr +} + +func TestKillLiveServer(t *testing.T) { + logger := hclog.Default() + dir := t.TempDir() + dirPath := fs.AbsolutePathFromUpstream(dir) + + sockPath := getUnixSocket(dirPath) + pidPath := getPidFile(dirPath) + err := pidPath.WriteFile([]byte("99999"), 0644) + assert.NilError(t, err, "WriteFile") + + ctx := context.Background() + c := &Connector{ + Logger: logger, + Bin: "nonexistent", + Opts: Opts{}, + SockPath: sockPath, + PidPath: pidPath, + TurboVersion: "some-version", + } + + st := status.New(codes.FailedPrecondition, "version mismatch") + mock := &mockServer{ + shutdownResp: &turbodprotocol.ShutdownResponse{}, + helloErr: st.Err(), + pidFile: pidPath, + } + lis := bufconn.Listen(1024 * 1024) + grpcServer := grpc.NewServer() + turbodprotocol.RegisterTurbodServer(grpcServer, mock) + go func(t *testing.T) { + if err := grpcServer.Serve(lis); err != nil { + t.Logf("server closed: %v", err) + } + }(t) + + conn, err := grpc.DialContext(ctx, "bufnet", grpc.WithContextDialer(func(ctx context.Context, s string) (net.Conn, error) { + return lis.Dial() + }), grpc.WithTransportCredentials(insecure.NewCredentials())) + assert.NilError(t, err, "DialContext") + turboClient := turbodprotocol.NewTurbodClient(conn) + client := &Client{ + TurbodClient: turboClient, + ClientConn: conn, + } + err = c.sendHello(ctx, client) + if !errors.Is(err, errVersionMismatch) { + t.Errorf("sendHello error got %v, want %v", err, errVersionMismatch) + } + err = c.killLiveServer(ctx, client, 99999) + assert.NilError(t, err, "killLiveServer") + // Expect the pid file and socket files to have been cleaned up + if pidPath.FileExists() { + t.Errorf("expected pid file to have been deleted: %v", pidPath) + } + if sockPath.FileExists() { + t.Errorf("expected socket file to have been deleted: %v", sockPath) + } +} diff --git a/cli/internal/daemon/connector/fork.go b/cli/internal/daemon/connector/fork.go new file mode 100644 index 0000000000000..8a6d01da558ce --- /dev/null +++ b/cli/internal/daemon/connector/fork.go @@ -0,0 +1,15 @@ +//go:build !windows +// +build !windows + +package connector + +import "syscall" + +// getSysProcAttrs returns the platform-specific attributes we want to +// use while forking the daemon process. Currently this is limited to +// forcing a new process group +func getSysProcAttrs() *syscall.SysProcAttr { + return &syscall.SysProcAttr{ + Setpgid: true, + } +} diff --git a/cli/internal/daemon/connector/fork_windows.go b/cli/internal/daemon/connector/fork_windows.go new file mode 100644 index 0000000000000..b9d6e77908397 --- /dev/null +++ b/cli/internal/daemon/connector/fork_windows.go @@ -0,0 +1,15 @@ +//go:build windows +// +build windows + +package connector + +import "syscall" + +// getSysProcAttrs returns the platform-specific attributes we want to +// use while forking the daemon process. Currently this is limited to +// forcing a new process group +func getSysProcAttrs() *syscall.SysProcAttr { + return &syscall.SysProcAttr{ + CreationFlags: syscall.CREATE_NEW_PROCESS_GROUP, + } +} diff --git a/cli/internal/daemon/daemon.go b/cli/internal/daemon/daemon.go new file mode 100644 index 0000000000000..9faeb9d9aed34 --- /dev/null +++ b/cli/internal/daemon/daemon.go @@ -0,0 +1,334 @@ +package daemon + +import ( + "context" + "crypto/sha256" + "encoding/hex" + "fmt" + "io" + "net" + "os" + "time" + + grpc_recovery "github.com/grpc-ecosystem/go-grpc-middleware/recovery" + "github.com/hashicorp/go-hclog" + "github.com/mitchellh/cli" + "github.com/nightlyone/lockfile" + "github.com/pkg/errors" + "github.com/spf13/cobra" + "github.com/vercel/turborepo/cli/internal/config" + "github.com/vercel/turborepo/cli/internal/daemon/connector" + "github.com/vercel/turborepo/cli/internal/fs" + "github.com/vercel/turborepo/cli/internal/server" + "github.com/vercel/turborepo/cli/internal/signals" + "github.com/vercel/turborepo/cli/internal/util" + "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" +) + +// Command is the wrapper around the daemon command until we port fully to cobra +type Command struct { + Config *config.Config + UI cli.Ui + SignalWatcher *signals.Watcher +} + +// Run runs the daemon command +func (c *Command) Run(args []string) int { + cmd := getCmd(c.Config, c.UI, c.SignalWatcher) + cmd.SetArgs(args) + err := cmd.Execute() + if err != nil { + return 1 + } + return 0 +} + +// Help returns information about the `daemon` command +func (c *Command) Help() string { + cmd := getCmd(c.Config, c.UI, c.SignalWatcher) + return util.HelpForCobraCmd(cmd) +} + +// Synopsis of daemon command +func (c *Command) Synopsis() string { + cmd := getCmd(c.Config, c.UI, c.SignalWatcher) + return cmd.Short +} + +type daemon struct { + logger hclog.Logger + repoRoot fs.AbsolutePath + timeout time.Duration + reqCh chan struct{} + timedOutCh chan struct{} +} + +func getRepoHash(repoRoot fs.AbsolutePath) string { + pathHash := sha256.Sum256([]byte(repoRoot.ToString())) + // We grab a substring of the hash because there is a 108-character limit on the length + // of a filepath for unix domain socket. + return hex.EncodeToString(pathHash[:])[:16] +} + +func getDaemonFileRoot(repoRoot fs.AbsolutePath) fs.AbsolutePath { + tempDir := fs.TempDir("turbod") + hexHash := getRepoHash(repoRoot) + return tempDir.Join(hexHash) +} + +func getLogFilePath(repoRoot fs.AbsolutePath) (fs.AbsolutePath, error) { + hexHash := getRepoHash(repoRoot) + base := repoRoot.Base() + logFilename := fmt.Sprintf("%v-%v.log", hexHash, base) + + logsDir := fs.GetTurboDataDir().Join("logs") + return logsDir.Join(logFilename), nil +} + +func getUnixSocket(repoRoot fs.AbsolutePath) fs.AbsolutePath { + root := getDaemonFileRoot(repoRoot) + return root.Join("turbod.sock") +} + +func getPidFile(repoRoot fs.AbsolutePath) fs.AbsolutePath { + root := getDaemonFileRoot(repoRoot) + return root.Join("turbod.pid") +} + +// logError logs an error and outputs it to the UI. +func (d *daemon) logError(err error) { + d.logger.Error("error", err) +} + +// we're only appending, and we're creating the file if it doesn't exist. +// we do not need to read the log file. +var _logFileFlags = os.O_WRONLY | os.O_APPEND | os.O_CREATE + +func getCmd(config *config.Config, output cli.Ui, signalWatcher *signals.Watcher) *cobra.Command { + var idleTimeout time.Duration + cmd := &cobra.Command{ + Use: "turbo daemon", + Short: "Runs turbod", + SilenceUsage: true, + SilenceErrors: true, + RunE: func(cmd *cobra.Command, args []string) error { + logFilePath, err := getLogFilePath(config.Cwd) + if err != nil { + return err + } + if err := logFilePath.EnsureDir(); err != nil { + return err + } + logFile, err := logFilePath.OpenFile(_logFileFlags, 0644) + if err != nil { + return err + } + defer func() { _ = logFile.Close() }() + logger := hclog.New(&hclog.LoggerOptions{ + Output: io.MultiWriter(logFile, os.Stdout), + Level: hclog.Debug, + Color: hclog.ColorOff, + Name: "turbod", + }) + ctx := cmd.Context() + d := &daemon{ + logger: logger, + repoRoot: config.Cwd, + timeout: idleTimeout, + reqCh: make(chan struct{}), + timedOutCh: make(chan struct{}), + } + turboServer, err := server.New(d.logger.Named("rpc server"), config.Cwd, config.TurboVersion, logFilePath) + if err != nil { + d.logError(err) + return err + } + defer func() { _ = turboServer.Close() }() + err = d.runTurboServer(ctx, turboServer, signalWatcher) + if err != nil { + d.logError(err) + return err + } + return nil + }, + } + cmd.Flags().DurationVar(&idleTimeout, "idle-time", 4*time.Hour, "Set the idle timeout for turbod") + addDaemonSubcommands(cmd, config, output) + return cmd +} + +func addDaemonSubcommands(cmd *cobra.Command, config *config.Config, output cli.Ui) { + addStatusCmd(cmd, config, output) + addStartCmd(cmd, config, output) + addStopCmd(cmd, config, output) + addRestartCmd(cmd, config, output) +} + +var errInactivityTimeout = errors.New("turbod shut down from inactivity") + +// tryAcquirePidfileLock attempts to ensure that only one daemon is running from the given pid file path +// at a time. If this process fails to write its PID to the lockfile, it must exit. +func tryAcquirePidfileLock(pidPath fs.AbsolutePath) (lockfile.Lockfile, error) { + if err := pidPath.EnsureDir(); err != nil { + return "", err + } + lockFile, err := lockfile.New(pidPath.ToString()) + if err != nil { + // lockfile.New should only return an error if it wasn't given an absolute path. + // We are attempting to use the type system to enforce that we are passing an + // absolute path. An error here likely means a bug, and we should crash. + panic(err) + } + if err := lockFile.TryLock(); err != nil { + return "", err + } + return lockFile, nil +} + +type rpcServer interface { + Register(grpcServer server.GRPCServer) +} + +func (d *daemon) runTurboServer(parentContext context.Context, rpcServer rpcServer, signalWatcher *signals.Watcher) error { + ctx, cancel := context.WithCancel(parentContext) + defer cancel() + pidPath := getPidFile(d.repoRoot) + lock, err := tryAcquirePidfileLock(pidPath) + if err != nil { + return errors.Wrapf(err, "failed to lock the pid file at %v. Is another turbo daemon running?", lock) + } + // When we're done serving, clean up the pid file. + // Also, if *this* goroutine panics, make sure we unlock the pid file. + defer func() { + if err := lock.Unlock(); err != nil { + d.logger.Error(errors.Wrapf(err, "failed unlocking pid file at %v", lock).Error()) + } + }() + // This handler runs in request goroutines. If a request causes a panic, + // this handler will get called after a call to recover(), meaning we are + // no longer panicking. We return a server error and cancel our context, + // which triggers a shutdown of the server. + panicHandler := func(thePanic interface{}) error { + cancel() + d.logger.Error(fmt.Sprintf("Caught panic %v", thePanic)) + return status.Error(codes.Internal, "server panicked") + } + + // If we have the lock, assume that we are the owners of the socket file, + // whether it already exists or not. That means we are free to remove it. + sockPath := getUnixSocket(d.repoRoot) + if err := sockPath.Remove(); err != nil && !errors.Is(err, os.ErrNotExist) { + return err + } + d.logger.Debug(fmt.Sprintf("Using socket path %v (%v)\n", sockPath, len(sockPath))) + lis, err := net.Listen("unix", sockPath.ToString()) + if err != nil { + return err + } + // We don't need to explicitly close 'lis', the grpc server will handle that + s := grpc.NewServer( + grpc.ChainUnaryInterceptor( + d.onRequest, + grpc_recovery.UnaryServerInterceptor(grpc_recovery.WithRecoveryHandler(panicHandler)), + ), + ) + go d.timeoutLoop(ctx) + + rpcServer.Register(s) + errCh := make(chan error) + go func(errCh chan<- error) { + if err := s.Serve(lis); err != nil { + errCh <- err + } + close(errCh) + }(errCh) + + // Note that we aren't deferring s.GracefulStop here because we also need + // to drain the error channel, which isn't guaranteed to happen until + // the server has stopped. That in turn may depend on GracefulStop being + // called. + // Future work could restructure this to make that simpler. + var exitErr error + select { + case err, ok := <-errCh: + // The server exited + if ok { + exitErr = err + } + case <-d.timedOutCh: + // This is the inactivity timeout case + exitErr = errInactivityTimeout + s.GracefulStop() + case <-ctx.Done(): + // If a request handler panics, it will cancel this context + s.GracefulStop() + case <-signalWatcher.Done(): + // This is fired if caught a signal + s.GracefulStop() + } + // Wait for the server to exit, if it hasn't already. + // When it does, this channel will close. We don't + // care about the error in this scenario because we've + // either requested a close via cancelling the context, + // an inactivity timeout, or caught a signal. + for range errCh { + } + return exitErr +} + +func (d *daemon) onRequest(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) { + d.reqCh <- struct{}{} + return handler(ctx, req) +} + +func (d *daemon) timeoutLoop(ctx context.Context) { + timeoutCh := time.After(d.timeout) +outer: + for { + select { + case <-d.reqCh: + timeoutCh = time.After(d.timeout) + case <-timeoutCh: + close(d.timedOutCh) + break outer + case <-ctx.Done(): + break outer + } + } +} + +// ClientOpts re-exports connector.Ops to encapsulate the connector package +type ClientOpts = connector.Opts + +// Client re-exports connector.Client to encapsulate the connector package +type Client = connector.Client + +// GetClient returns a client that can be used to interact with the daemon +func GetClient(ctx context.Context, repoRoot fs.AbsolutePath, logger hclog.Logger, turboVersion string, opts ClientOpts) (*Client, error) { + sockPath := getUnixSocket(repoRoot) + pidPath := getPidFile(repoRoot) + logPath, err := getLogFilePath(repoRoot) + if err != nil { + return nil, err + } + bin, err := os.Executable() + if err != nil { + return nil, err + } + c := &connector.Connector{ + Logger: logger.Named("TurbodClient"), + Bin: bin, + Opts: opts, + SockPath: sockPath, + PidPath: pidPath, + LogPath: logPath, + TurboVersion: turboVersion, + } + client, err := c.Connect(ctx) + if err != nil { + return nil, err + } + return client, nil +} diff --git a/cli/internal/daemon/daemon_test.go b/cli/internal/daemon/daemon_test.go new file mode 100644 index 0000000000000..cea97881cbcc3 --- /dev/null +++ b/cli/internal/daemon/daemon_test.go @@ -0,0 +1,261 @@ +package daemon + +import ( + "context" + "errors" + "os/exec" + "runtime" + "strconv" + "sync" + "testing" + "time" + + "github.com/hashicorp/go-hclog" + "github.com/nightlyone/lockfile" + "github.com/vercel/turborepo/cli/internal/fs" + "github.com/vercel/turborepo/cli/internal/server" + "github.com/vercel/turborepo/cli/internal/signals" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" + "google.golang.org/grpc/test/grpc_testing" + "gotest.tools/v3/assert" +) + +// testBin returns a platform-appropriate node binary. +// We need some process to be running and findable by the +// lockfile library, and we don't particularly care what it is. +// Since node is required for turbo development, it makes a decent +// candidate. +func testBin() string { + if runtime.GOOS == "windows" { + return "node.exe" + } + return "node" +} + +func TestPidFileLock(t *testing.T) { + repoRoot := fs.AbsolutePathFromUpstream(t.TempDir()) + + pidPath := getPidFile(repoRoot) + // the lockfile library handles removing pids from dead owners + _, err := tryAcquirePidfileLock(pidPath) + assert.NilError(t, err, "acquirePidLock") + + // Start up a node process and fake a pid file for it. + // Ensure that we can't start the daemon while the node process is live + bin := testBin() + node := exec.Command(bin) + err = node.Start() + assert.NilError(t, err, "Start") + stopNode := func() error { + if err := node.Process.Kill(); err != nil { + return err + } + // We expect an error from node, we just sent a kill signal + _ = node.Wait() + return nil + } + // In case we fail the test, still try to kill the node process + t.Cleanup(func() { _ = stopNode() }) + nodePid := node.Process.Pid + err = pidPath.WriteFile([]byte(strconv.Itoa(nodePid)), 0644) + assert.NilError(t, err, "WriteFile") + + _, err = tryAcquirePidfileLock(pidPath) + assert.ErrorIs(t, err, lockfile.ErrBusy) + + // Stop the node process, but leave the pid file there + // This simulates a crash + err = stopNode() + assert.NilError(t, err, "stopNode") + // the lockfile library handles removing pids from dead owners + _, err = tryAcquirePidfileLock(pidPath) + assert.NilError(t, err, "acquirePidLock") +} + +type testRPCServer struct { + grpc_testing.UnimplementedTestServiceServer + registered chan struct{} +} + +func (ts *testRPCServer) EmptyCall(ctx context.Context, req *grpc_testing.Empty) (*grpc_testing.Empty, error) { + panic("intended to panic") +} + +func (ts *testRPCServer) Register(grpcServer server.GRPCServer) { + grpc_testing.RegisterTestServiceServer(grpcServer, ts) + ts.registered <- struct{}{} +} + +func newTestRPCServer() *testRPCServer { + return &testRPCServer{ + registered: make(chan struct{}, 1), + } +} + +func waitForFile(t *testing.T, filename fs.AbsolutePath, timeout time.Duration) { + t.Helper() + deadline := time.After(timeout) +outer: + for !filename.FileExists() { + select { + case <-deadline: + break outer + case <-time.After(10 * time.Millisecond): + } + } + if !filename.FileExists() { + t.Errorf("timed out waiting for %v to exist after %v", filename, timeout) + } +} + +func TestDaemonLifecycle(t *testing.T) { + logger := hclog.Default() + logger.SetLevel(hclog.Debug) + repoRoot := fs.AbsolutePathFromUpstream(t.TempDir()) + + ts := newTestRPCServer() + watcher := signals.NewWatcher() + ctx, cancel := context.WithCancel(context.Background()) + + d := &daemon{ + logger: logger, + repoRoot: repoRoot, + timeout: 10 * time.Second, + reqCh: make(chan struct{}), + timedOutCh: make(chan struct{}), + } + + var serverErr error + wg := &sync.WaitGroup{} + wg.Add(1) + go func() { + serverErr = d.runTurboServer(ctx, ts, watcher) + wg.Done() + }() + + sockPath := getUnixSocket(repoRoot) + waitForFile(t, sockPath, 30*time.Second) + pidPath := getPidFile(repoRoot) + waitForFile(t, pidPath, 1*time.Second) + cancel() + wg.Wait() + assert.NilError(t, serverErr, "runTurboServer") + if sockPath.FileExists() { + t.Errorf("%v still exists, should have been cleaned up", sockPath) + } + if pidPath.FileExists() { + t.Errorf("%v still exists, should have been cleaned up", sockPath) + } +} + +func TestTimeout(t *testing.T) { + logger := hclog.Default() + logger.SetLevel(hclog.Debug) + repoRoot := fs.AbsolutePathFromUpstream(t.TempDir()) + + ts := newTestRPCServer() + watcher := signals.NewWatcher() + ctx := context.Background() + + d := &daemon{ + logger: logger, + repoRoot: repoRoot, + timeout: 5 * time.Millisecond, + reqCh: make(chan struct{}), + timedOutCh: make(chan struct{}), + } + err := d.runTurboServer(ctx, ts, watcher) + if !errors.Is(err, errInactivityTimeout) { + t.Errorf("server error got %v, want %v", err, errInactivityTimeout) + } +} + +func TestCaughtSignal(t *testing.T) { + logger := hclog.Default() + logger.SetLevel(hclog.Debug) + repoRoot := fs.AbsolutePathFromUpstream(t.TempDir()) + + ts := newTestRPCServer() + watcher := signals.NewWatcher() + ctx := context.Background() + + d := &daemon{ + logger: logger, + repoRoot: repoRoot, + timeout: 5 * time.Second, + reqCh: make(chan struct{}), + timedOutCh: make(chan struct{}), + } + errCh := make(chan error) + go func() { + err := d.runTurboServer(ctx, ts, watcher) + errCh <- err + }() + <-ts.registered + // grpc doesn't provide a signal to know when the server is serving. + // So while this call to Close can race with the call to grpc.Server.Serve, if we've + // registered with the turboserver, we've registered all of our + // signal handlers as well. We just may or may not be serving when Close() + // is called. It shouldn't matter for the purposes of this test: + // Either we are serving, and Serve will return with nil when GracefulStop is + // called, or we aren't serving yet, and the subsequent call to Serve will + // immediately return with grpc.ErrServerStopped. So, both nil and grpc.ErrServerStopped + // are acceptable outcomes for runTurboServer. Any other error, or a timeout, is a + // failure. + watcher.Close() + + err := <-errCh + pidPath := getPidFile(repoRoot) + if pidPath.FileExists() { + t.Errorf("expected to clean up %v, but it still exists", pidPath) + } + // We'll either get nil or ErrServerStopped, depending on whether + // or not we close the signal watcher before grpc.Server.Serve was + // called. + if err != nil && !errors.Is(err, grpc.ErrServerStopped) { + t.Errorf("runTurboServer got err %v, want nil or ErrServerStopped", err) + } +} + +func TestCleanupOnPanic(t *testing.T) { + logger := hclog.Default() + logger.SetLevel(hclog.Debug) + repoRoot := fs.AbsolutePathFromUpstream(t.TempDir()) + + ts := newTestRPCServer() + watcher := signals.NewWatcher() + ctx := context.Background() + + d := &daemon{ + logger: logger, + repoRoot: repoRoot, + timeout: 5 * time.Second, + reqCh: make(chan struct{}), + timedOutCh: make(chan struct{}), + } + errCh := make(chan error) + go func() { + err := d.runTurboServer(ctx, ts, watcher) + errCh <- err + }() + <-ts.registered + + creds := insecure.NewCredentials() + sockFile := getUnixSocket(repoRoot) + conn, err := grpc.Dial("unix://"+sockFile.ToString(), grpc.WithTransportCredentials(creds)) + assert.NilError(t, err, "Dial") + + client := grpc_testing.NewTestServiceClient(conn) + _, err = client.EmptyCall(ctx, &grpc_testing.Empty{}) + if err == nil { + t.Error("nil error") + } + // wait for the server to finish + <-errCh + + pidPath := getPidFile(repoRoot) + if pidPath.FileExists() { + t.Errorf("expected to clean up %v, but it still exists", pidPath) + } +} diff --git a/cli/internal/daemon/lifecycle.go b/cli/internal/daemon/lifecycle.go new file mode 100644 index 0000000000000..9c4a9a48fa470 --- /dev/null +++ b/cli/internal/daemon/lifecycle.go @@ -0,0 +1,134 @@ +package daemon + +import ( + "context" + + "github.com/hashicorp/go-hclog" + "github.com/mitchellh/cli" + "github.com/pkg/errors" + "github.com/spf13/cobra" + "github.com/vercel/turborepo/cli/internal/config" + "github.com/vercel/turborepo/cli/internal/daemon/connector" + "github.com/vercel/turborepo/cli/internal/fs" + "github.com/vercel/turborepo/cli/internal/turbodprotocol" +) + +func addStartCmd(root *cobra.Command, config *config.Config, output cli.Ui) { + cmd := &cobra.Command{ + Use: "start", + Short: "Ensures that the turbo daemon is running", + SilenceUsage: true, + SilenceErrors: true, + RunE: func(cmd *cobra.Command, args []string) error { + l := &lifecycle{ + repoRoot: config.Cwd, + logger: config.Logger, + output: output, + turboVersion: config.TurboVersion, + } + if err := l.ensureStarted(); err != nil { + l.logError(err) + return err + } + return nil + }, + } + root.AddCommand(cmd) +} + +func addStopCmd(root *cobra.Command, config *config.Config, output cli.Ui) { + cmd := &cobra.Command{ + Use: "stop", + Short: "Stop the turbo daemon", + SilenceUsage: true, + SilenceErrors: true, + RunE: func(cmd *cobra.Command, args []string) error { + l := &lifecycle{ + repoRoot: config.Cwd, + logger: config.Logger, + output: output, + turboVersion: config.TurboVersion, + } + if err := l.ensureStopped(); err != nil { + l.logError(err) + return err + } + return nil + }, + } + root.AddCommand(cmd) +} + +func addRestartCmd(root *cobra.Command, config *config.Config, output cli.Ui) { + cmd := &cobra.Command{ + Use: "restart", + Short: "Restart the turbo daemon", + SilenceUsage: true, + SilenceErrors: true, + RunE: func(cmd *cobra.Command, args []string) error { + l := &lifecycle{ + repoRoot: config.Cwd, + logger: config.Logger, + output: output, + turboVersion: config.TurboVersion, + } + if err := l.ensureStopped(); err != nil { + l.logError(err) + return err + } + if err := l.ensureStarted(); err != nil { + l.logError(err) + return err + } + return nil + }, + } + root.AddCommand(cmd) +} + +type lifecycle struct { + repoRoot fs.AbsolutePath + logger hclog.Logger + output cli.Ui + turboVersion string +} + +// logError logs an error and outputs it to the UI. +func (l *lifecycle) logError(err error) { + l.logger.Error("error", err) + l.output.Error(err.Error()) +} + +func (l *lifecycle) ensureStarted() error { + ctx := context.Background() + client, err := GetClient(ctx, l.repoRoot, l.logger, l.turboVersion, ClientOpts{}) + if err != nil { + return err + } + // We don't really care if we fail to close the client, we're about to exit + _ = client.Close() + l.output.Output("turbo daemon is running") + return nil +} + +func (l *lifecycle) ensureStopped() error { + ctx := context.Background() + client, err := GetClient(ctx, l.repoRoot, l.logger, l.turboVersion, ClientOpts{ + // If the daemon is not running, don't start it, since we're trying to stop it + DontStart: true, + }) + if err != nil { + if errors.Is(err, connector.ErrDaemonNotRunning) { + l.output.Output("turbo daemon is not running") + return nil + } + return err + } + defer func() { _ = client.Close() }() + _, err = client.Shutdown(ctx, &turbodprotocol.ShutdownRequest{}) + if err != nil { + return err + } + l.output.Output("Successfully requested that turbo daemon shut down") + return nil +} diff --git a/cli/internal/daemon/status.go b/cli/internal/daemon/status.go new file mode 100644 index 0000000000000..8e8b40a5be7e5 --- /dev/null +++ b/cli/internal/daemon/status.go @@ -0,0 +1,92 @@ +package daemon + +import ( + "context" + "encoding/json" + "fmt" + "time" + + "github.com/mitchellh/cli" + "github.com/pkg/errors" + "github.com/spf13/cobra" + "github.com/vercel/turborepo/cli/internal/config" + "github.com/vercel/turborepo/cli/internal/daemon/connector" + "github.com/vercel/turborepo/cli/internal/daemonclient" +) + +func addStatusCmd(root *cobra.Command, config *config.Config, output cli.Ui) { + var outputJSON bool + cmd := &cobra.Command{ + Use: "status", + Short: "Reports the status of the turbo daemon", + SilenceUsage: true, + SilenceErrors: true, + RunE: func(cmd *cobra.Command, args []string) error { + l := &lifecycle{ + repoRoot: config.Cwd, + logger: config.Logger, + output: output, + turboVersion: config.TurboVersion, + } + if err := l.status(outputJSON); err != nil { + l.logError(err) + return err + } + return nil + }, + } + cmd.Flags().BoolVar(&outputJSON, "json", false, "Pass --json to report status in JSON format") + root.AddCommand(cmd) +} + +func (l *lifecycle) status(outputJSON bool) error { + ctx := context.Background() + client, err := GetClient(ctx, l.repoRoot, l.logger, l.turboVersion, ClientOpts{ + // If the daemon is not running, the status is that it's not running. + // We don't want to start it just to check the status. + DontStart: true, + }) + if err != nil { + return l.reportStatusError(err, outputJSON) + } + turboClient := daemonclient.New(client) + status, err := turboClient.Status(ctx) + if err != nil { + return l.reportStatusError(err, outputJSON) + } + if outputJSON { + rendered, err := json.MarshalIndent(status, "", " ") + if err != nil { + return err + } + l.output.Output(string(rendered)) + } else { + uptime := time.Duration(int64(status.UptimeMs * 1000 * 1000)) + l.output.Output(fmt.Sprintf("Daemon log file: %v", status.LogFile)) + l.output.Output(fmt.Sprintf("Daemon uptime: %v", uptime.String())) + l.output.Output(fmt.Sprintf("Daemon pid file: %v", client.PidPath)) + l.output.Output(fmt.Sprintf("Daemon socket file: %v", client.SockPath)) + } + return nil +} + +func (l *lifecycle) reportStatusError(err error, outputJSON bool) error { + var msg string + if errors.Is(err, connector.ErrDaemonNotRunning) { + msg = "the daemon is not running" + } else { + msg = err.Error() + } + if outputJSON { + rendered, err := json.MarshalIndent(map[string]string{ + "error": msg, + }, "", " ") + if err != nil { + return err + } + l.output.Output(string(rendered)) + } else { + l.output.Output(fmt.Sprintf("Failed to contact daemon: %v", msg)) + } + return nil +} diff --git a/cli/internal/daemonclient/daemonclient.go b/cli/internal/daemonclient/daemonclient.go new file mode 100644 index 0000000000000..7b38baa438f11 --- /dev/null +++ b/cli/internal/daemonclient/daemonclient.go @@ -0,0 +1,67 @@ +// Package daemonclient is a wrapper around a grpc client +// to talk to turbod +package daemonclient + +import ( + "context" + + "github.com/vercel/turborepo/cli/internal/daemon/connector" + "github.com/vercel/turborepo/cli/internal/fs" + "github.com/vercel/turborepo/cli/internal/turbodprotocol" +) + +// DaemonClient provides access to higher-level functionality from the daemon to a turbo run. +type DaemonClient struct { + client *connector.Client +} + +// Status provides details about the daemon's status +type Status struct { + UptimeMs uint64 `json:"uptimeMs"` + LogFile fs.AbsolutePath `json:"logFile"` + PidFile fs.AbsolutePath `json:"pidFile"` + SockFile fs.AbsolutePath `json:"sockFile"` +} + +// New creates a new instance of a DaemonClient. +func New(client *connector.Client) *DaemonClient { + return &DaemonClient{ + client: client, + } +} + +// GetChangedOutputs implements runcache.OutputWatcher.GetChangedOutputs +func (d *DaemonClient) GetChangedOutputs(ctx context.Context, hash string, repoRelativeOutputGlobs []string) ([]string, error) { + resp, err := d.client.GetChangedOutputs(ctx, &turbodprotocol.GetChangedOutputsRequest{ + Hash: hash, + OutputGlobs: repoRelativeOutputGlobs, + }) + if err != nil { + return nil, err + } + return resp.ChangedOutputGlobs, nil +} + +// NotifyOutputsWritten implements runcache.OutputWatcher.NotifyOutputsWritten +func (d *DaemonClient) NotifyOutputsWritten(ctx context.Context, hash string, repoRelativeOutputGlobs []string) error { + _, err := d.client.NotifyOutputsWritten(ctx, &turbodprotocol.NotifyOutputsWrittenRequest{ + Hash: hash, + OutputGlobs: repoRelativeOutputGlobs, + }) + return err +} + +// Status returns the DaemonStatus from the daemon +func (d *DaemonClient) Status(ctx context.Context) (*Status, error) { + resp, err := d.client.Status(ctx, &turbodprotocol.StatusRequest{}) + if err != nil { + return nil, err + } + daemonStatus := resp.DaemonStatus + return &Status{ + UptimeMs: daemonStatus.UptimeMsec, + LogFile: d.client.LogPath, + PidFile: d.client.PidPath, + SockFile: d.client.SockPath, + }, nil +} diff --git a/cli/internal/filewatcher/filewatcher.go b/cli/internal/filewatcher/filewatcher.go new file mode 100644 index 0000000000000..6a0146f645563 --- /dev/null +++ b/cli/internal/filewatcher/filewatcher.go @@ -0,0 +1,184 @@ +// Package filewatcher is used to handle watching for file changes inside the monorepo +package filewatcher + +import ( + "fmt" + "os" + "path/filepath" + "strings" + "sync" + + "github.com/fsnotify/fsnotify" + "github.com/hashicorp/go-hclog" + "github.com/karrick/godirwalk" + "github.com/pkg/errors" + "github.com/vercel/turborepo/cli/internal/doublestar" + "github.com/vercel/turborepo/cli/internal/fs" +) + +// _ignores is the set of paths we exempt from file-watching +var _ignores = []string{".git", "node_modules"} + +// FileWatchClient defines the callbacks used by the file watching loop. +// All methods are called from the same goroutine so they: +// 1) do not need synchronization +// 2) should minimize the work they are doing when called, if possible +type FileWatchClient interface { + OnFileWatchEvent(ev fsnotify.Event) + OnFileWatchError(err error) + OnFileWatchClosed() +} + +// FileWatcher handles watching all of the files in the monorepo. +// We currently ignore .git and top-level node_modules. We can revisit +// if necessary. +type FileWatcher struct { + *fsnotify.Watcher + + logger hclog.Logger + repoRoot fs.AbsolutePath + excludePattern string + + clientsMu sync.RWMutex + clients []FileWatchClient + closed bool +} + +// New returns a new FileWatcher instance +func New(logger hclog.Logger, repoRoot fs.AbsolutePath, watcher *fsnotify.Watcher) *FileWatcher { + excludes := make([]string, len(_ignores)) + for i, ignore := range _ignores { + excludes[i] = filepath.ToSlash(repoRoot.Join(ignore).ToString() + "/**") + } + excludePattern := "{" + strings.Join(excludes, ",") + "}" + return &FileWatcher{ + Watcher: watcher, + logger: logger, + repoRoot: repoRoot, + excludePattern: excludePattern, + } +} + +// Start recursively adds all directories from the repo root, redacts the excluded ones, +// then fires off a goroutine to respond to filesystem events +func (fw *FileWatcher) Start() error { + if err := fw.watchRecursively(fw.repoRoot); err != nil { + return err + } + // Revoke the ignored directories, which are automatically added + // because they are children of watched directories. + for _, dir := range fw.WatchList() { + excluded, err := doublestar.Match(fw.excludePattern, filepath.ToSlash(dir)) + if err != nil { + return err + } + if excluded { + if err := fw.Remove(dir); err != nil { + fw.logger.Warn(fmt.Sprintf("failed to remove watch on %v: %v", dir, err)) + } + } + } + go fw.watch() + return nil +} + +func (fw *FileWatcher) watchRecursively(root fs.AbsolutePath) error { + err := fs.WalkMode(root.ToString(), func(name string, isDir bool, info os.FileMode) error { + excluded, err := doublestar.Match(fw.excludePattern, filepath.ToSlash(name)) + if err != nil { + return err + } + if excluded { + return godirwalk.SkipThis + } + if info.IsDir() && (info&os.ModeSymlink == 0) { + fw.logger.Debug(fmt.Sprintf("started watching %v", name)) + if err := fw.Add(name); err != nil { + return errors.Wrapf(err, "failed adding watch to %v", name) + } + } + return nil + }) + if err != nil { + return err + } + + return nil +} + +// onFileAdded helps up paper over cross-platform inconsistencies in fsnotify. +// Some fsnotify backends automatically add the contents of directories. Some do +// not. Adding a watch is idempotent, so anytime any file we care about gets added, +// watch it. +func (fw *FileWatcher) onFileAdded(name string) error { + info, err := os.Lstat(name) + if err != nil { + if errors.Is(err, os.ErrNotExist) { + // We can race with a file being added and removed. Ignore it + return nil + } + return errors.Wrapf(err, "error checking lstat of new file %v", name) + } + if info.IsDir() { + if err := fw.watchRecursively(fs.AbsolutePath(name)); err != nil { + return errors.Wrapf(err, "failed recursive watch of %v", name) + } + } else { + if err := fw.Add(name); err != nil { + return errors.Wrapf(err, "failed adding watch to %v", name) + } + } + return nil +} + +// watch is the main file-watching loop. Watching is not recursive, +// so when new directories are added, they are manually recursively watched. +func (fw *FileWatcher) watch() { +outer: + for { + select { + case ev, ok := <-fw.Watcher.Events: + if !ok { + fw.logger.Info("Events channel closed. Exiting watch loop") + break outer + } + if ev.Op&fsnotify.Create != 0 { + if err := fw.onFileAdded(ev.Name); err != nil { + fw.logger.Warn(fmt.Sprintf("failed to handle adding %v: %v", ev.Name, err)) + continue outer + } + } + fw.clientsMu.RLock() + for _, client := range fw.clients { + client.OnFileWatchEvent(ev) + } + fw.clientsMu.RUnlock() + case err, ok := <-fw.Watcher.Errors: + if !ok { + fw.logger.Info("Errors channel closed. Exiting watch loop") + break outer + } + fw.clientsMu.RLock() + for _, client := range fw.clients { + client.OnFileWatchError(err) + } + fw.clientsMu.RUnlock() + } + } + fw.clientsMu.Lock() + fw.closed = true + for _, client := range fw.clients { + client.OnFileWatchClosed() + } + fw.clientsMu.Unlock() +} + +// AddClient registers a client for filesystem events +func (fw *FileWatcher) AddClient(client FileWatchClient) { + fw.clientsMu.Lock() + defer fw.clientsMu.Unlock() + fw.clients = append(fw.clients, client) + if fw.closed { + client.OnFileWatchClosed() + } +} diff --git a/cli/internal/filewatcher/filewatcher_test.go b/cli/internal/filewatcher/filewatcher_test.go new file mode 100644 index 0000000000000..826b511174fe3 --- /dev/null +++ b/cli/internal/filewatcher/filewatcher_test.go @@ -0,0 +1,165 @@ +package filewatcher + +import ( + "runtime" + "sync" + "testing" + "time" + + "github.com/fsnotify/fsnotify" + "github.com/hashicorp/go-hclog" + "github.com/vercel/turborepo/cli/internal/fs" + "github.com/vercel/turborepo/cli/internal/util" + "gotest.tools/v3/assert" +) + +type testClient struct { + mu sync.Mutex + createEvents []fsnotify.Event + notify chan<- struct{} +} + +type helper interface { + Helper() +} + +func (c *testClient) OnFileWatchEvent(ev fsnotify.Event) { + if ev.Op&fsnotify.Create != 0 { + c.mu.Lock() + defer c.mu.Unlock() + c.createEvents = append(c.createEvents, ev) + c.notify <- struct{}{} + } +} + +func (c *testClient) OnFileWatchError(err error) {} + +func (c *testClient) OnFileWatchClosed() {} + +func assertSameSet(t *testing.T, gotSlice []string, wantSlice []string) { + // mark this method as a helper + var tt interface{} = t + helper, ok := tt.(helper) + if ok { + helper.Helper() + } + got := util.SetFromStrings(gotSlice) + want := util.SetFromStrings(wantSlice) + extra := got.Difference(want) + missing := want.Difference(got) + if extra.Len() > 0 { + t.Errorf("found extra elements: %v", extra.UnsafeListOfStrings()) + } + if missing.Len() > 0 { + t.Errorf("missing expected elements: %v", missing.UnsafeListOfStrings()) + } +} + +func expectFilesystemEvent(t *testing.T, ch <-chan struct{}) { + // mark this method as a helper + t.Helper() + select { + case <-ch: + return + case <-time.After(1 * time.Second): + t.Error("Timed out waiting for filesystem event") + } +} + +func expectNoFilesystemEvent(t *testing.T, ch <-chan struct{}) { + // mark this method as a helper + t.Helper() + select { + case ev, ok := <-ch: + if ok { + t.Errorf("got unexpected filesystem event %v", ev) + } else { + t.Error("filewatching closed unexpectedly") + } + case <-time.After(100 * time.Millisecond): + return + } +} + +func TestFileWatching(t *testing.T) { + logger := hclog.Default() + repoRoot := fs.AbsolutePathFromUpstream(t.TempDir()) + err := repoRoot.Join(".git").MkdirAll() + assert.NilError(t, err, "MkdirAll") + err = repoRoot.Join("node_modules", "some-dep").MkdirAll() + assert.NilError(t, err, "MkdirAll") + err = repoRoot.Join("parent", "child").MkdirAll() + assert.NilError(t, err, "MkdirAll") + err = repoRoot.Join("parent", "sibling").MkdirAll() + assert.NilError(t, err, "MkdirAll") + + // Directory layout: + // / + // .git/ + // node_modules/ + // some-dep/ + // parent/ + // child/ + // sibling/ + + watcher, err := fsnotify.NewWatcher() + assert.NilError(t, err, "NewWatcher") + fw := New(logger, repoRoot, watcher) + err = fw.Start() + assert.NilError(t, err, "watchRecursively") + expectedWatching := []string{ + repoRoot.ToString(), + repoRoot.Join("parent").ToString(), + repoRoot.Join("parent", "child").ToString(), + repoRoot.Join("parent", "sibling").ToString(), + } + watching := fw.WatchList() + assertSameSet(t, watching, expectedWatching) + + // Add a client + ch := make(chan struct{}, 1) + c := &testClient{ + notify: ch, + } + fw.AddClient(c) + go fw.watch() + + fooPath := repoRoot.Join("parent", "child", "foo") + err = fooPath.WriteFile([]byte("hello"), 0644) + assert.NilError(t, err, "WriteFile") + expectFilesystemEvent(t, ch) + expectedEvent := fsnotify.Event{ + Op: fsnotify.Create, + Name: fooPath.ToString(), + } + c.mu.Lock() + got := c.createEvents[len(c.createEvents)-1] + c.mu.Unlock() + assert.DeepEqual(t, got, expectedEvent) + // Windows doesn't watch individual files, only directories + if runtime.GOOS != "windows" { + expectedWatching = append(expectedWatching, fooPath.ToString()) + } + watching = fw.WatchList() + assertSameSet(t, watching, expectedWatching) + + deepPath := repoRoot.Join("parent", "sibling", "deep", "path") + err = deepPath.MkdirAll() + assert.NilError(t, err, "MkdirAll") + // We'll catch an event for "deep", but not "deep/path" since + // we don't have a recursive watch + expectFilesystemEvent(t, ch) + + expectedWatching = append(expectedWatching, deepPath.ToString(), repoRoot.Join("parent", "sibling", "deep").ToString()) + watching = fw.WatchList() + assertSameSet(t, watching, expectedWatching) + + gitFilePath := repoRoot.Join(".git", "git-file") + err = gitFilePath.WriteFile([]byte("nope"), 0644) + assert.NilError(t, err, "WriteFile") + expectNoFilesystemEvent(t, ch) + + // No change in watchlist + watching = fw.WatchList() + assertSameSet(t, watching, expectedWatching) +} diff --git a/cli/internal/fs/path.go b/cli/internal/fs/path.go index bcfb74bf1b28e..9f62fa27807fe 100644 --- a/cli/internal/fs/path.go +++ b/cli/internal/fs/path.go @@ -8,6 +8,7 @@ import ( "path/filepath" "reflect" + "github.com/adrg/xdg" "github.com/spf13/pflag" ) @@ -165,6 +166,11 @@ func (ap AbsolutePath) Remove() error { return os.Remove(ap.asString()) } +// Base implements filepath.Base for an absolute path +func (ap AbsolutePath) Base() string { + return filepath.Base(ap.asString()) +} + // Rename implements os.Rename for absolute paths func (ap AbsolutePath) Rename(dest AbsolutePath) error { return os.Rename(ap.asString(), dest.asString()) @@ -201,6 +207,19 @@ func IofsRelativePath(fsysRoot string, absolutePath string) (string, error) { return filepath.Rel(fsysRoot, absolutePath) } +// TempDir returns the absolute path of a directory with the given name +// under the system's default temp directory location +func TempDir(subDir string) AbsolutePath { + return AbsolutePath(os.TempDir()).Join(subDir) +} + +// GetTurboDataDir returns a directory outside of the repo +// where turbo can store data files related to turbo. +func GetTurboDataDir() AbsolutePath { + dataHome := AbsolutePathFromUpstream(xdg.DataHome) + return dataHome.Join("turborepo") +} + type pathValue struct { base AbsolutePath current *AbsolutePath diff --git a/cli/internal/globwatcher/globwatcher.go b/cli/internal/globwatcher/globwatcher.go new file mode 100644 index 0000000000000..0cf6b9d97ea79 --- /dev/null +++ b/cli/internal/globwatcher/globwatcher.go @@ -0,0 +1,151 @@ +package globwatcher + +import ( + "errors" + "fmt" + "path/filepath" + "sync" + + "github.com/fsnotify/fsnotify" + "github.com/hashicorp/go-hclog" + "github.com/vercel/turborepo/cli/internal/doublestar" + "github.com/vercel/turborepo/cli/internal/fs" + "github.com/vercel/turborepo/cli/internal/util" +) + +// ErrClosed is returned when attempting to get changed globs after glob watching has closed +var ErrClosed = errors.New("glob watching is closed") + +// GlobWatcher is used to track unchanged globs by hash. Once a glob registers a file change +// it is no longer tracked until a new hash requests it. Once all globs for a particular hash +// have changed, that hash is no longer tracked. +type GlobWatcher struct { + logger hclog.Logger + repoRoot fs.AbsolutePath + + mu sync.RWMutex // protects field below + hashGlobs map[string]util.Set + globStatus map[string]util.Set // glob -> hashes where this glob hasn't changed + + closed bool +} + +// New returns a new GlobWatcher instance +func New(logger hclog.Logger, repoRoot fs.AbsolutePath) *GlobWatcher { + return &GlobWatcher{ + logger: logger, + repoRoot: repoRoot, + hashGlobs: make(map[string]util.Set), + globStatus: make(map[string]util.Set), + } +} + +func (g *GlobWatcher) setClosed() { + g.mu.Lock() + g.closed = true + g.mu.Unlock() +} + +func (g *GlobWatcher) isClosed() bool { + g.mu.RLock() + defer g.mu.RUnlock() + return g.closed +} + +// WatchGlobs registers the given set of globs to be watched for changes and grouped +// under the given hash. This method pairs with GetChangedGlobs to determine which globs +// out of a set of candidates have changed since WatchGlobs was called for the same hash. +func (g *GlobWatcher) WatchGlobs(hash string, globs []string) error { + if g.isClosed() { + return ErrClosed + } + g.mu.Lock() + defer g.mu.Unlock() + g.hashGlobs[hash] = util.SetFromStrings(globs) + for _, glob := range globs { + existing, ok := g.globStatus[glob] + if !ok { + existing = make(util.Set) + } + existing.Add(hash) + g.globStatus[glob] = existing + } + return nil +} + +// GetChangedGlobs returns the subset of the given candidates that we are not currently +// tracking as "unchanged". +func (g *GlobWatcher) GetChangedGlobs(hash string, candidates []string) ([]string, error) { + if g.isClosed() { + // If filewatching has crashed, return all candidates as changed. + return candidates, nil + } + // hashGlobs tracks all of the unchanged globs for a given hash + // If hashGlobs doesn't have our hash, either everything has changed, + // or we were never tracking it. Either way, consider all the candidates + // to be changed globs. + g.mu.RLock() + defer g.mu.RUnlock() + globsToCheck, ok := g.hashGlobs[hash] + if !ok { + return candidates, nil + } + allGlobs := util.SetFromStrings(candidates) + diff := allGlobs.Difference(globsToCheck) + return diff.UnsafeListOfStrings(), nil +} + +// OnFileWatchEvent implements FileWatchClient.OnFileWatchEvent +// On a file change, check if we have a glob that matches this file. Invalidate +// any matching globs, and remove them from the set of unchanged globs for the correspondin +// hashes. If this is the last glob for a hash, remove the hash from being tracked. +func (g *GlobWatcher) OnFileWatchEvent(ev fsnotify.Event) { + // At this point, we don't care what the Op is, any Op represents a change + // that should invalidate matching globs + g.logger.Debug(fmt.Sprintf("Got fsnotify event %v", ev)) + absolutePath := ev.Name + repoRelativePath, err := g.repoRoot.RelativePathString(absolutePath) + if err != nil { + g.logger.Error(fmt.Sprintf("could not get relative path from %v to %v: %v", g.repoRoot, absolutePath, err)) + return + } + g.mu.Lock() + defer g.mu.Unlock() + for glob, hashStatus := range g.globStatus { + matches, err := doublestar.Match(glob, filepath.ToSlash(repoRelativePath)) + if err != nil { + g.logger.Error(fmt.Sprintf("failed to check path %v against glob %v: %v", repoRelativePath, glob, err)) + continue + } + // If this glob matches, we know that it has changed for every hash that included this glob. + // So, we can delete this glob from every hash tracking it as well as stop watching this glob. + // To stop watching, we unref each of the directories corresponding to this glob. + if matches { + delete(g.globStatus, glob) + for hashUntyped := range hashStatus { + hash := hashUntyped.(string) + hashGlobs, ok := g.hashGlobs[hash] + if !ok { + g.logger.Warn(fmt.Sprintf("failed to find hash %v referenced from glob %v", hash, glob)) + continue + } + hashGlobs.Delete(glob) + // If we've deleted the last glob for a hash, delete the whole hash entry + if hashGlobs.Len() == 0 { + delete(g.hashGlobs, hash) + } + } + } + } +} + +// OnFileWatchError implements FileWatchClient.OnFileWatchError +func (g *GlobWatcher) OnFileWatchError(err error) { + g.logger.Error(fmt.Sprintf("file watching received an error: %v", err)) +} + +// OnFileWatchClosed implements FileWatchClient.OnFileWatchClosed +func (g *GlobWatcher) OnFileWatchClosed() { + g.setClosed() + g.logger.Warn("GlobWatching is closing due to file watching closing") +} diff --git a/cli/internal/globwatcher/globwatcher_test.go b/cli/internal/globwatcher/globwatcher_test.go new file mode 100644 index 0000000000000..e9c36bcaeaf3a --- /dev/null +++ b/cli/internal/globwatcher/globwatcher_test.go @@ -0,0 +1,143 @@ +package globwatcher + +import ( + "testing" + + "github.com/fsnotify/fsnotify" + "github.com/hashicorp/go-hclog" + "github.com/vercel/turborepo/cli/internal/fs" + "gotest.tools/v3/assert" +) + +func setup(t *testing.T, repoRoot fs.AbsolutePath) { + // Directory layout: + // / + // my-pkg/ + // irrelevant + // dist/ + // dist-file + // distChild/ + // child-file + // .next/ + // next-file + distPath := repoRoot.Join("my-pkg", "dist") + childFilePath := distPath.Join("distChild", "child-file") + err := childFilePath.EnsureDir() + assert.NilError(t, err, "EnsureDir") + f, err := childFilePath.Create() + assert.NilError(t, err, "Create") + err = f.Close() + assert.NilError(t, err, "Close") + distFilePath := repoRoot.Join("my-pkg", "dist", "dist-file") + f, err = distFilePath.Create() + assert.NilError(t, err, "Create") + err = f.Close() + assert.NilError(t, err, "Close") + nextFilePath := repoRoot.Join("my-pkg", ".next", "next-file") + err = nextFilePath.EnsureDir() + assert.NilError(t, err, "EnsureDir") + f, err = nextFilePath.Create() + assert.NilError(t, err, "Create") + err = f.Close() + assert.NilError(t, err, "Close") + irrelevantPath := repoRoot.Join("my-pkg", "irrelevant") + f, err = irrelevantPath.Create() + assert.NilError(t, err, "Create") + err = f.Close() + assert.NilError(t, err, "Close") +} + +func TestTrackOutputs(t *testing.T) { + logger := hclog.Default() + + repoRootRaw := t.TempDir() + repoRoot := fs.AbsolutePathFromUpstream(repoRootRaw) + + setup(t, repoRoot) + + globWatcher := New(logger, repoRoot) + + globs := []string{ + "my-pkg/dist/**", + "my-pkg/.next/**", + } + hash := "the-hash" + err := globWatcher.WatchGlobs(hash, globs) + assert.NilError(t, err, "WatchGlobs") + + changed, err := globWatcher.GetChangedGlobs(hash, globs) + assert.NilError(t, err, "GetChangedGlobs") + assert.Equal(t, 0, len(changed), "Expected no changed paths") + + // Make an irrelevant change + globWatcher.OnFileWatchEvent(fsnotify.Event{ + Op: fsnotify.Create, + Name: repoRoot.Join("my-pkg", "irrelevant").ToString(), + }) + + changed, err = globWatcher.GetChangedGlobs(hash, globs) + assert.NilError(t, err, "GetChangedGlobs") + assert.Equal(t, 0, len(changed), "Expected no changed paths") + + // Make a relevant change + globWatcher.OnFileWatchEvent(fsnotify.Event{ + Op: fsnotify.Create, + Name: repoRoot.Join("my-pkg", "dist", "foo").ToString(), + }) + + changed, err = globWatcher.GetChangedGlobs(hash, globs) + assert.NilError(t, err, "GetChangedGlobs") + assert.Equal(t, 1, len(changed), "Expected one changed path remaining") + expected := "my-pkg/dist/**" + assert.Equal(t, expected, changed[0], "Expected dist glob to have changed") + + // Change a file matching the other glob + globWatcher.OnFileWatchEvent(fsnotify.Event{ + Op: fsnotify.Create, + Name: repoRoot.Join("my-pkg", ".next", "foo").ToString(), + }) + // We should no longer be watching anything, since both globs have + // registered changes + if len(globWatcher.hashGlobs) != 0 { + t.Errorf("expected to not track any hashes, found %v", globWatcher.hashGlobs) + } + + // Both globs have changed, we should have stopped tracking + // this hash + changed, err = globWatcher.GetChangedGlobs(hash, globs) + assert.NilError(t, err, "GetChangedGlobs") + assert.DeepEqual(t, globs, changed) +} + +func TestWatchSingleFile(t *testing.T) { + logger := hclog.Default() + + repoRoot := fs.AbsolutePathFromUpstream(t.TempDir()) + + setup(t, repoRoot) + + //watcher := newTestWatcher() + globWatcher := New(logger, repoRoot) + globs := []string{ + "my-pkg/.next/next-file", + } + hash := "the-hash" + err := globWatcher.WatchGlobs(hash, globs) + assert.NilError(t, err, "WatchGlobs") + + assert.Equal(t, 1, len(globWatcher.hashGlobs)) + + // A change to an irrelevant file + globWatcher.OnFileWatchEvent(fsnotify.Event{ + Op: fsnotify.Create, + Name: repoRoot.Join("my-pkg", ".next", "foo").ToString(), + }) + assert.Equal(t, 1, len(globWatcher.hashGlobs)) + + // Change the watched file + globWatcher.OnFileWatchEvent(fsnotify.Event{ + Op: fsnotify.Write, + Name: repoRoot.Join("my-pkg", ".next", "next-file").ToString(), + }) + assert.Equal(t, 0, len(globWatcher.hashGlobs)) +} diff --git a/cli/internal/run/run.go b/cli/internal/run/run.go index 45cccacfe2f30..00343853f9a6c 100644 --- a/cli/internal/run/run.go +++ b/cli/internal/run/run.go @@ -23,6 +23,8 @@ import ( "github.com/vercel/turborepo/cli/internal/config" "github.com/vercel/turborepo/cli/internal/context" "github.com/vercel/turborepo/cli/internal/core" + "github.com/vercel/turborepo/cli/internal/daemon" + "github.com/vercel/turborepo/cli/internal/daemonclient" "github.com/vercel/turborepo/cli/internal/fs" "github.com/vercel/turborepo/cli/internal/graphvisualizer" "github.com/vercel/turborepo/cli/internal/logstreamer" @@ -32,6 +34,7 @@ import ( "github.com/vercel/turborepo/cli/internal/runcache" "github.com/vercel/turborepo/cli/internal/scm" "github.com/vercel/turborepo/cli/internal/scope" + "github.com/vercel/turborepo/cli/internal/signals" "github.com/vercel/turborepo/cli/internal/taskhash" "github.com/vercel/turborepo/cli/internal/ui" "github.com/vercel/turborepo/cli/internal/util" @@ -46,9 +49,9 @@ import ( // RunCommand is a Command implementation that tells Turbo to run a task type RunCommand struct { - Config *config.Config - Ui *cli.ColoredUi - Processes *process.Manager + Config *config.Config + UI *cli.ColoredUi + SignalWatcher *signals.Watcher } // completeGraph represents the common state inferred from the filesystem and pipeline. @@ -91,7 +94,7 @@ occurred again). Arguments passed after '--' will be passed through to the named tasks. ` -func getCmd(config *config.Config, ui cli.Ui, processes *process.Manager) *cobra.Command { +func getCmd(config *config.Config, ui cli.Ui, signalWatcher *signals.Watcher) *cobra.Command { var opts *Opts var flags *pflag.FlagSet cmd := &cobra.Command{ @@ -107,8 +110,9 @@ func getCmd(config *config.Config, ui cli.Ui, processes *process.Manager) *cobra return errors.New("at least one task must be specified") } opts.runOpts.passThroughArgs = passThroughArgs - run := configureRun(config, ui, opts, processes) - return run.run(tasks) + run := configureRun(config, ui, opts, signalWatcher) + ctx := cmd.Context() + return run.run(ctx, tasks) }, } flags = cmd.Flags() @@ -142,7 +146,7 @@ func optsFromFlags(flags *pflag.FlagSet, config *config.Config) *Opts { return opts } -func configureRun(config *config.Config, output cli.Ui, opts *Opts, processes *process.Manager) *run { +func configureRun(config *config.Config, output cli.Ui, opts *Opts, signalWatcher *signals.Watcher) *run { if os.Getenv("TURBO_FORCE") == "true" { opts.runcacheOpts.SkipReads = true } @@ -154,6 +158,8 @@ func configureRun(config *config.Config, output cli.Ui, opts *Opts, processes *p if !config.IsLoggedIn() { opts.cacheOpts.SkipRemote = true } + processes := process.NewManager(config.Logger.Named("processes")) + signalWatcher.AddOnClose(processes.Close) return &run{ opts: opts, config: config, @@ -164,19 +170,19 @@ func configureRun(config *config.Config, output cli.Ui, opts *Opts, processes *p // Synopsis of run command func (c *RunCommand) Synopsis() string { - cmd := getCmd(c.Config, c.Ui, c.Processes) + cmd := getCmd(c.Config, c.UI, c.SignalWatcher) return cmd.Short } // Help returns information about the `run` command func (c *RunCommand) Help() string { - cmd := getCmd(c.Config, c.Ui, c.Processes) + cmd := getCmd(c.Config, c.UI, c.SignalWatcher) return util.HelpForCobraCmd(cmd) } // Run executes tasks in the monorepo func (c *RunCommand) Run(args []string) int { - cmd := getCmd(c.Config, c.Ui, c.Processes) + cmd := getCmd(c.Config, c.UI, c.SignalWatcher) cmd.SetArgs(args) err := cmd.Execute() if err != nil { @@ -197,14 +203,27 @@ type run struct { processes *process.Manager } -func (r *run) run(targets []string) error { +func (r *run) run(ctx gocontext.Context, targets []string) error { startAt := time.Now() - ctx, err := context.New(context.WithGraph(r.config, r.opts.cacheOpts.Dir)) + pkgDepGraph, err := context.New(context.WithGraph(r.config, r.opts.cacheOpts.Dir)) if err != nil { return err } + // This technically could be one flag, but we plan on removing + // the daemon opt-in flag at some point once it stabilizes + if r.opts.runOpts.daemonOptIn && !r.opts.runOpts.noDaemon { + turbodClient, err := daemon.GetClient(ctx, r.config.Cwd, r.config.Logger, r.config.TurboVersion, daemon.ClientOpts{}) + if err != nil { + r.logWarning("", errors.Wrap(err, "failed to contact turbod. Continuing in standalone mode")) + } else { + defer func() { _ = turbodClient.Close() }() + r.config.Logger.Debug("running in daemon mode") + daemonClient := daemonclient.New(turbodClient) + r.opts.runcacheOpts.OutputWatcher = daemonClient + } + } - if err := util.ValidateGraph(&ctx.TopologicalGraph); err != nil { + if err := util.ValidateGraph(&pkgDepGraph.TopologicalGraph); err != nil { return errors.Wrap(err, "Invalid package dependency graph") } @@ -221,7 +240,7 @@ func (r *run) run(targets []string) error { return errors.Wrap(err, "failed to create SCM") } } - filteredPkgs, isAllPackages, err := scope.ResolvePackages(&r.opts.scopeOpts, r.config.Cwd.ToStringDuringMigration(), scmInstance, ctx, r.ui, r.config.Logger) + filteredPkgs, isAllPackages, err := scope.ResolvePackages(&r.opts.scopeOpts, r.config.Cwd.ToStringDuringMigration(), scmInstance, pkgDepGraph, r.ui, r.config.Logger) if err != nil { return errors.Wrap(err, "failed to resolve packages to run") } @@ -236,27 +255,27 @@ func (r *run) run(targets []string) error { } } } - r.config.Logger.Debug("global hash", "value", ctx.GlobalHash) + r.config.Logger.Debug("global hash", "value", pkgDepGraph.GlobalHash) r.config.Logger.Debug("local cache folder", "path", r.opts.cacheOpts.Dir) // TODO: consolidate some of these arguments g := &completeGraph{ - TopologicalGraph: ctx.TopologicalGraph, + TopologicalGraph: pkgDepGraph.TopologicalGraph, Pipeline: pipeline, - PackageInfos: ctx.PackageInfos, - GlobalHash: ctx.GlobalHash, - RootNode: ctx.RootNode, + PackageInfos: pkgDepGraph.PackageInfos, + GlobalHash: pkgDepGraph.GlobalHash, + RootNode: pkgDepGraph.RootNode, } rs := &runSpec{ Targets: targets, FilteredPkgs: filteredPkgs, Opts: r.opts, } - packageManager := ctx.PackageManager - return r.runOperation(g, rs, packageManager, startAt) + packageManager := pkgDepGraph.PackageManager + return r.runOperation(ctx, g, rs, packageManager, startAt) } -func (r *run) runOperation(g *completeGraph, rs *runSpec, packageManager *packagemanager.PackageManager, startAt time.Time) error { +func (r *run) runOperation(ctx gocontext.Context, g *completeGraph, rs *runSpec, packageManager *packagemanager.PackageManager, startAt time.Time) error { vertexSet := make(util.Set) for _, v := range g.TopologicalGraph.Vertices() { vertexSet.Add(v) @@ -299,7 +318,7 @@ func (r *run) runOperation(g *completeGraph, rs *runSpec, packageManager *packag } } } else if rs.Opts.runOpts.dryRun { - tasksRun, err := r.executeDryRun(engine, g, hashTracker, rs) + tasksRun, err := r.executeDryRun(ctx, engine, g, hashTracker, rs) if err != nil { return err } @@ -351,7 +370,7 @@ func (r *run) runOperation(g *completeGraph, rs *runSpec, packageManager *packag sort.Strings(packagesInScope) r.ui.Output(fmt.Sprintf(ui.Dim("• Packages in scope: %v"), strings.Join(packagesInScope, ", "))) r.ui.Output(fmt.Sprintf("%s %s %s", ui.Dim("• Running"), ui.Dim(ui.Bold(strings.Join(rs.Targets, ", "))), ui.Dim(fmt.Sprintf("in %v packages", rs.FilteredPkgs.Len())))) - return r.executeTasks(g, rs, engine, packageManager, hashTracker, startAt) + return r.executeTasks(ctx, g, rs, engine, packageManager, hashTracker, startAt) } return nil } @@ -424,8 +443,10 @@ type runOpts struct { dryRun bool dryRunJSON bool // Graph flags - graphDot bool - graphFile string + graphDot bool + graphFile string + noDaemon bool + daemonOptIn bool } var ( @@ -457,6 +478,15 @@ func addRunOpts(opts *runOpts, flags *pflag.FlagSet, aliases map[string]string) flags.StringVar(&opts.profile, "profile", "", _profileHelp) flags.BoolVar(&opts.continueOnError, "continue", false, _continueHelp) flags.BoolVar(&opts.only, "only", false, _onlyHelp) + flags.BoolVar(&opts.noDaemon, "no-daemon", false, "Run without using turbo's daemon process") + flags.BoolVar(&opts.daemonOptIn, "experimental-use-daemon", false, "Use the experimental turbo daemon") + // Daemon-related flags hidden for now, we can unhide when daemon is ready. + if err := flags.MarkHidden("experimental-use-daemon"); err != nil { + panic(err) + } + if err := flags.MarkHidden("no-daemon"); err != nil { + panic(err) + } if err := flags.MarkHidden("only"); err != nil { // fail fast if we've messed up our flag configuration panic(err) @@ -618,7 +648,7 @@ func (c *RunCommand) logError(log hclog.Logger, prefix string, err error) { prefix += ": " } - c.Ui.Error(fmt.Sprintf("%s%s%s", ui.ERROR_PREFIX, prefix, color.RedString(" %v", err))) + c.UI.Error(fmt.Sprintf("%s%s%s", ui.ERROR_PREFIX, prefix, color.RedString(" %v", err))) } // logError logs an error and outputs it to the UI. @@ -632,15 +662,14 @@ func (r *run) logWarning(prefix string, err error) { r.ui.Error(fmt.Sprintf("%s%s%s", ui.WARNING_PREFIX, prefix, color.YellowString(" %v", err))) } -func (r *run) executeTasks(g *completeGraph, rs *runSpec, engine *core.Scheduler, packageManager *packagemanager.PackageManager, hashes *taskhash.Tracker, startAt time.Time) error { - goctx := gocontext.Background() +func (r *run) executeTasks(ctx gocontext.Context, g *completeGraph, rs *runSpec, engine *core.Scheduler, packageManager *packagemanager.PackageManager, hashes *taskhash.Tracker, startAt time.Time) error { var analyticsSink analytics.Sink if r.config.IsLoggedIn() { analyticsSink = r.config.ApiClient } else { analyticsSink = analytics.NullSink } - analyticsClient := analytics.NewClient(goctx, analyticsSink, r.config.Logger.Named("analytics")) + analyticsClient := analytics.NewClient(ctx, analyticsSink, r.config.Logger.Named("analytics")) defer analyticsClient.CloseWithTimeout(50 * time.Millisecond) // Theoretically this is overkill, but bias towards not spamming the console once := &sync.Once{} @@ -691,9 +720,9 @@ func (r *run) executeTasks(g *completeGraph, rs *runSpec, engine *core.Scheduler } // run the thing - errs := engine.Execute(g.getPackageTaskVisitor(func(pt *nodes.PackageTask) error { + errs := engine.Execute(g.getPackageTaskVisitor(ctx, func(ctx gocontext.Context, pt *nodes.PackageTask) error { deps := engine.TaskGraph.DownEdges(pt.TaskID) - return ec.exec(pt, deps) + return ec.exec(ctx, pt, deps) }), core.ExecOpts{ Parallel: rs.Opts.runOpts.parallel, Concurrency: rs.Opts.runOpts.concurrency, @@ -738,9 +767,9 @@ type hashedTask struct { Dependents []string `json:"dependents"` } -func (r *run) executeDryRun(engine *core.Scheduler, g *completeGraph, taskHashes *taskhash.Tracker, rs *runSpec) ([]hashedTask, error) { +func (r *run) executeDryRun(ctx gocontext.Context, engine *core.Scheduler, g *completeGraph, taskHashes *taskhash.Tracker, rs *runSpec) ([]hashedTask, error) { taskIDs := []hashedTask{} - errs := engine.Execute(g.getPackageTaskVisitor(func(pt *nodes.PackageTask) error { + errs := engine.Execute(g.getPackageTaskVisitor(ctx, func(ctx gocontext.Context, pt *nodes.PackageTask) error { passThroughArgs := rs.ArgsForTask(pt.Task) deps := engine.TaskGraph.DownEdges(pt.TaskID) hash, err := taskHashes.CalculateTaskHash(pt, deps, passThroughArgs) @@ -844,7 +873,7 @@ func (e *execContext) logError(log hclog.Logger, prefix string, err error) { e.ui.Error(fmt.Sprintf("%s%s%s", ui.ERROR_PREFIX, prefix, color.RedString(" %v", err))) } -func (e *execContext) exec(pt *nodes.PackageTask, deps dag.Set) error { +func (e *execContext) exec(ctx gocontext.Context, pt *nodes.PackageTask, deps dag.Set) error { cmdTime := time.Now() targetLogger := e.logger.Named(pt.OutputPrefix()) @@ -883,7 +912,7 @@ func (e *execContext) exec(pt *nodes.PackageTask, deps dag.Set) error { } // Cache --------------------------------------------- taskCache := e.runCache.TaskCache(pt, hash) - hit, err := taskCache.RestoreOutputs(targetUi, targetLogger) + hit, err := taskCache.RestoreOutputs(ctx, targetUi, targetLogger) if err != nil { targetUi.Error(fmt.Sprintf("error fetching from cache: %s", err)) } else if hit { @@ -970,7 +999,7 @@ func (e *execContext) exec(pt *nodes.PackageTask, deps dag.Set) error { if err := closeOutputs(); err != nil { e.logError(targetLogger, "", err) } else { - if err = taskCache.SaveOutputs(targetLogger, targetUi, int(duration.Milliseconds())); err != nil { + if err = taskCache.SaveOutputs(ctx, targetLogger, targetUi, int(duration.Milliseconds())); err != nil { e.logError(targetLogger, "", fmt.Errorf("error caching output: %w", err)) } } @@ -981,8 +1010,9 @@ func (e *execContext) exec(pt *nodes.PackageTask, deps dag.Set) error { return nil } -func (g *completeGraph) getPackageTaskVisitor(visitor func(pt *nodes.PackageTask) error) func(taskID string) error { +func (g *completeGraph) getPackageTaskVisitor(ctx gocontext.Context, visitor func(ctx gocontext.Context, pt *nodes.PackageTask) error) func(taskID string) error { return func(taskID string) error { + name, task := util.GetPackageTaskFromId(taskID) pkg, ok := g.PackageInfos[name] if !ok { @@ -1000,7 +1030,7 @@ func (g *completeGraph) getPackageTaskVisitor(visitor func(pt *nodes.PackageTask // override if we need to... pipeline = altpipe } - return visitor(&nodes.PackageTask{ + return visitor(ctx, &nodes.PackageTask{ TaskID: taskID, Task: task, PackageName: name, diff --git a/cli/internal/run/run_test.go b/cli/internal/run/run_test.go index 9c28ea117493e..ba79a59717d32 100644 --- a/cli/internal/run/run_test.go +++ b/cli/internal/run/run_test.go @@ -446,7 +446,7 @@ func TestUsageText(t *testing.T) { output := ui.Default() cmd := &RunCommand{ Config: cf, - Ui: output, + UI: output, } // just ensure it doesn't panic for now usage := cmd.Help() diff --git a/cli/internal/runcache/output_watcher.go b/cli/internal/runcache/output_watcher.go new file mode 100644 index 0000000000000..8d81a83cbf47f --- /dev/null +++ b/cli/internal/runcache/output_watcher.go @@ -0,0 +1,28 @@ +package runcache + +import "context" + +// OutputWatcher instances are responsible for tracking changes to task outputs +type OutputWatcher interface { + // GetChangedOutputs returns which of the given globs have changed since the specified hash was last run + GetChangedOutputs(ctx context.Context, hash string, repoRelativeOutputGlobs []string) ([]string, error) + // NotifyOutputsWritten tells the watcher that the given globs have been cached with the specified hash + NotifyOutputsWritten(ctx context.Context, hash string, repoRelativeOutputGlobs []string) error +} + +// NoOpOutputWatcher implements OutputWatcher, but always considers every glob to have changed +type NoOpOutputWatcher struct{} + +var _ OutputWatcher = (*NoOpOutputWatcher)(nil) + +// GetChangedOutputs implements OutputWatcher.GetChangedOutputs. +// Since this is a no-op watcher, no tracking is done. +func (NoOpOutputWatcher) GetChangedOutputs(ctx context.Context, hash string, repoRelativeOutputGlobs []string) ([]string, error) { + return repoRelativeOutputGlobs, nil +} + +// NotifyOutputsWritten implements OutputWatcher.NotifyOutputsWritten. +// Since this is a no-op watcher, consider all globs to have changed +func (NoOpOutputWatcher) NotifyOutputsWritten(ctx context.Context, hash string, repoRelativeOutputGlobs []string) error { + return nil +} diff --git a/cli/internal/runcache/runcache.go b/cli/internal/runcache/runcache.go index 0a0a1117d3bad..9fb8d8be55daf 100644 --- a/cli/internal/runcache/runcache.go +++ b/cli/internal/runcache/runcache.go @@ -2,6 +2,7 @@ package runcache import ( "bufio" + "context" "fmt" "io" "os" @@ -30,6 +31,7 @@ type Opts struct { SkipWrites bool TaskOutputModeOverride *util.TaskOutputMode LogReplayer LogReplayer + OutputWatcher OutputWatcher } // AddFlags adds the flags relevant to the runcache package to the given FlagSet @@ -108,6 +110,7 @@ type RunCache struct { writesDisabled bool repoRoot fs.AbsolutePath logReplayer LogReplayer + outputWatcher OutputWatcher colorCache *colorcache.ColorCache } @@ -120,11 +123,15 @@ func New(cache cache.Cache, repoRoot fs.AbsolutePath, opts Opts, colorCache *col writesDisabled: opts.SkipWrites, repoRoot: repoRoot, logReplayer: opts.LogReplayer, + outputWatcher: opts.OutputWatcher, colorCache: colorCache, } if rc.logReplayer == nil { rc.logReplayer = defaultLogReplayer } + if rc.outputWatcher == nil { + rc.outputWatcher = &NoOpOutputWatcher{} + } return rc } @@ -142,25 +149,41 @@ type TaskCache struct { // RestoreOutputs attempts to restore output for the corresponding task from the cache. Returns true // if successful. -func (tc TaskCache) RestoreOutputs(terminal *cli.PrefixedUi, logger hclog.Logger) (bool, error) { +func (tc TaskCache) RestoreOutputs(ctx context.Context, terminal *cli.PrefixedUi, logger hclog.Logger) (bool, error) { if tc.cachingDisabled || tc.rc.readsDisabled { if tc.taskOutputMode != util.NoTaskOutput { terminal.Output(fmt.Sprintf("cache bypass, force executing %s", ui.Dim(tc.hash))) } return false, nil } - - // TODO(gsoltis): check if we need to restore goes here - // That will be an opportunity to prune down the set of outputs as well - hit, _, _, err := tc.rc.cache.Fetch(tc.rc.repoRoot.ToString(), tc.hash, tc.repoRelativeGlobs) + changedOutputGlobs, err := tc.rc.outputWatcher.GetChangedOutputs(ctx, tc.hash, tc.repoRelativeGlobs) if err != nil { - return false, err - } else if !hit { - if tc.taskOutputMode != util.NoTaskOutput { - terminal.Output(fmt.Sprintf("cache miss, executing %s", ui.Dim(tc.hash))) + logger.Warn(fmt.Sprintf("Failed to check if we can skip restoring outputs for %v: %v. Proceeding to check cache", tc.pt.TaskID, err)) + terminal.Warn(ui.Dim(fmt.Sprintf("Failed to check if we can skip restoring outputs for %v: %v. Proceeding to check cache", tc.pt.TaskID, err))) + changedOutputGlobs = tc.repoRelativeGlobs + } + hasChangedOutputs := len(changedOutputGlobs) > 0 + if hasChangedOutputs { + // Note that we currently don't use the output globs when restoring, but we could in the + // future to avoid doing unnecessary file I/O + hit, _, _, err := tc.rc.cache.Fetch(tc.rc.repoRoot.ToString(), tc.hash, changedOutputGlobs) + if err != nil { + return false, err + } else if !hit { + if tc.taskOutputMode != util.NoTaskOutput { + terminal.Output(fmt.Sprintf("cache miss, executing %s", ui.Dim(tc.hash))) + } + return false, nil } - return false, nil + if err := tc.rc.outputWatcher.NotifyOutputsWritten(ctx, tc.hash, tc.repoRelativeGlobs); err != nil { + // Don't fail the whole operation just because we failed to watch the outputs + logger.Warn(fmt.Sprintf("Failed to mark outputs as cached for %v: %v", tc.pt.TaskID, err)) + terminal.Warn(ui.Dim(fmt.Sprintf("Failed to mark outputs as cached for %v: %v", tc.pt.TaskID, err))) + } + } else { + logger.Debug(fmt.Sprintf("Skipping cache check for %v, outputs have not changed since previous run.", tc.pt.TaskID)) } + switch tc.taskOutputMode { // When only showing new task output, cached output should only show the computed hash case util.NewTaskOutput: @@ -239,7 +262,7 @@ func (tc TaskCache) OutputWriter() (io.WriteCloser, error) { var _emptyIgnore []string // SaveOutputs is responsible for saving the outputs of task to the cache, after the task has completed -func (tc TaskCache) SaveOutputs(logger hclog.Logger, terminal cli.Ui, duration int) error { +func (tc TaskCache) SaveOutputs(ctx context.Context, logger hclog.Logger, terminal cli.Ui, duration int) error { if tc.cachingDisabled || tc.rc.writesDisabled { return nil } @@ -263,7 +286,17 @@ func (tc TaskCache) SaveOutputs(logger hclog.Logger, terminal cli.Ui, duration i relativePaths[index] = relativePath } - return tc.rc.cache.Put(tc.pt.Pkg.Dir, tc.hash, duration, relativePaths) + if err = tc.rc.cache.Put(tc.pt.Pkg.Dir, tc.hash, duration, relativePaths); err != nil { + return err + } + err = tc.rc.outputWatcher.NotifyOutputsWritten(ctx, tc.hash, tc.repoRelativeGlobs) + if err != nil { + // Don't fail the cache write because we also failed to record it, we will just do + // extra I/O in the future restoring files that haven't changed from cache + logger.Warn(fmt.Sprintf("Failed to mark outputs as cached for %v: %v", tc.pt.TaskID, err)) + terminal.Warn(ui.Dim(fmt.Sprintf("Failed to mark outputs as cached for %v: %v", tc.pt.TaskID, err))) + } + return nil } // TaskCache returns a TaskCache instance, providing an interface to the underlying cache specific diff --git a/cli/internal/server/server.go b/cli/internal/server/server.go new file mode 100644 index 0000000000000..1b7232f1cd573 --- /dev/null +++ b/cli/internal/server/server.go @@ -0,0 +1,174 @@ +package server + +import ( + "context" + "sync" + "time" + + "github.com/fsnotify/fsnotify" + "github.com/hashicorp/go-hclog" + "github.com/pkg/errors" + "github.com/vercel/turborepo/cli/internal/filewatcher" + "github.com/vercel/turborepo/cli/internal/fs" + "github.com/vercel/turborepo/cli/internal/globwatcher" + "github.com/vercel/turborepo/cli/internal/turbodprotocol" + "google.golang.org/grpc" + codes "google.golang.org/grpc/codes" + status "google.golang.org/grpc/status" +) + +// Server implements the GRPC serverside of TurbodServer +// Note for the future: we don't yet make use of turbo.json +// or the package graph in the server. Once we do, we may need a +// layer of indirection between "the thing that responds to grpc requests" +// and "the thing that holds our persistent data structures" to handle +// changes in the underlying configuration. +type Server struct { + turbodprotocol.UnimplementedTurbodServer + watcher *filewatcher.FileWatcher + globWatcher *globwatcher.GlobWatcher + turboVersion string + started time.Time + logFilePath fs.AbsolutePath + repoRoot fs.AbsolutePath + closerMu sync.Mutex + closer *closer +} + +// GRPCServer is the interface that the turbo server needs to the underlying +// GRPC server. This lets the turbo server register itself, as well as provides +// a hook for shutting down the server. +type GRPCServer interface { + grpc.ServiceRegistrar + GracefulStop() +} + +type closer struct { + grpcServer GRPCServer + once sync.Once +} + +func (c *closer) close() { + // This can get triggered from a request handler (Shutdown). Since + // calling GracefulStop blocks until all request handlers complete, + // we need to run it in a goroutine to let the Shutdown handler complete + // and avoid deadlocking. + c.once.Do(func() { + go func() { + c.grpcServer.GracefulStop() + }() + }) +} + +// New returns a new instance of Server +func New(logger hclog.Logger, repoRoot fs.AbsolutePath, turboVersion string, logFilePath fs.AbsolutePath) (*Server, error) { + watcher, err := fsnotify.NewWatcher() + if err != nil { + return nil, err + } + fileWatcher := filewatcher.New(logger.Named("FileWatcher"), repoRoot, watcher) + globWatcher := globwatcher.New(logger.Named("GlobWatcher"), repoRoot) + server := &Server{ + watcher: fileWatcher, + globWatcher: globWatcher, + turboVersion: turboVersion, + started: time.Now(), + logFilePath: logFilePath, + repoRoot: repoRoot, + } + server.watcher.AddClient(globWatcher) + server.watcher.AddClient(server) + if err := server.watcher.Start(); err != nil { + return nil, errors.Wrapf(err, "watching %v", repoRoot) + } + return server, nil +} + +func (s *Server) tryClose() bool { + s.closerMu.Lock() + defer s.closerMu.Unlock() + if s.closer != nil { + s.closer.close() + return true + } + return false +} + +// OnFileWatchEvent implements filewatcher.FileWatchClient.OnFileWatchEvent +// In the event that the root of the monorepo is deleted, shut down the server. +func (s *Server) OnFileWatchEvent(ev fsnotify.Event) { + if ev.Op&fsnotify.Remove != 0 && ev.Name == s.repoRoot.ToString() { + _ = s.tryClose() + } +} + +// OnFileWatchError implements filewatcher.FileWatchClient.OnFileWatchError +func (s *Server) OnFileWatchError(err error) {} + +// OnFileWatchClosed implements filewatcher.FileWatchClient.OnFileWatchClosed +func (s *Server) OnFileWatchClosed() {} + +// Close is used for shutting down this copy of the server +func (s *Server) Close() error { + return s.watcher.Close() +} + +// Register registers this server to respond to GRPC requests +func (s *Server) Register(grpcServer GRPCServer) { + s.closerMu.Lock() + s.closer = &closer{ + grpcServer: grpcServer, + } + s.closerMu.Unlock() + turbodprotocol.RegisterTurbodServer(grpcServer, s) +} + +// NotifyOutputsWritten implements the NotifyOutputsWritten rpc from turbo.proto +func (s *Server) NotifyOutputsWritten(ctx context.Context, req *turbodprotocol.NotifyOutputsWrittenRequest) (*turbodprotocol.NotifyOutputsWrittenResponse, error) { + err := s.globWatcher.WatchGlobs(req.Hash, req.OutputGlobs) + if err != nil { + return nil, err + } + return &turbodprotocol.NotifyOutputsWrittenResponse{}, nil +} + +// GetChangedOutputs implements the GetChangedOutputs rpc from turbo.proto +func (s *Server) GetChangedOutputs(ctx context.Context, req *turbodprotocol.GetChangedOutputsRequest) (*turbodprotocol.GetChangedOutputsResponse, error) { + changedGlobs, err := s.globWatcher.GetChangedGlobs(req.Hash, req.OutputGlobs) + if err != nil { + return nil, err + } + return &turbodprotocol.GetChangedOutputsResponse{ + ChangedOutputGlobs: changedGlobs, + }, nil +} + +// Hello implements the Hello rpc from turbo.proto +func (s *Server) Hello(ctx context.Context, req *turbodprotocol.HelloRequest) (*turbodprotocol.HelloResponse, error) { + clientVersion := req.Version + if clientVersion != s.turboVersion { + err := status.Errorf(codes.FailedPrecondition, "version mismatch. Client %v Server %v", clientVersion, s.turboVersion) + return nil, err + } + return &turbodprotocol.HelloResponse{}, nil +} + +// Shutdown implements the Shutdown rpc from turbo.proto +func (s *Server) Shutdown(ctx context.Context, req *turbodprotocol.ShutdownRequest) (*turbodprotocol.ShutdownResponse, error) { + if s.tryClose() { + return &turbodprotocol.ShutdownResponse{}, nil + } + err := status.Error(codes.NotFound, "shutdown mechanism not found") + return nil, err +} + +// Status implements the Status rpc from turbo.proto +func (s *Server) Status(ctx context.Context, req *turbodprotocol.StatusRequest) (*turbodprotocol.StatusResponse, error) { + uptime := uint64(time.Since(s.started).Milliseconds()) + return &turbodprotocol.StatusResponse{ + DaemonStatus: &turbodprotocol.DaemonStatus{ + LogFile: s.logFilePath.ToString(), + UptimeMsec: uptime, + }, + }, nil +} diff --git a/cli/internal/server/server_test.go b/cli/internal/server/server_test.go new file mode 100644 index 0000000000000..9a17777584580 --- /dev/null +++ b/cli/internal/server/server_test.go @@ -0,0 +1,72 @@ +package server + +import ( + "context" + "testing" + "time" + + "github.com/hashicorp/go-hclog" + "google.golang.org/grpc" + "gotest.tools/v3/assert" + + turbofs "github.com/vercel/turborepo/cli/internal/fs" + "github.com/vercel/turborepo/cli/internal/turbodprotocol" +) + +type mockGrpc struct { + stopped chan struct{} +} + +func (m *mockGrpc) GracefulStop() { + close(m.stopped) +} + +func (m *mockGrpc) RegisterService(desc *grpc.ServiceDesc, impl interface{}) {} + +func TestDeleteRepoRoot(t *testing.T) { + logger := hclog.Default() + repoRootRaw := t.TempDir() + repoRoot := turbofs.AbsolutePathFromUpstream(repoRootRaw) + + grpcServer := &mockGrpc{ + stopped: make(chan struct{}), + } + + s, err := New(logger, repoRoot, "some-version", "/log/file/path") + assert.NilError(t, err, "New") + s.Register(grpcServer) + + // Delete the repo root, ensure that GracefulStop got called + err = repoRoot.Remove() + assert.NilError(t, err, "Remove") + + select { + case <-grpcServer.stopped: + case <-time.After(2 * time.Second): + t.Error("timed out waiting for graceful stop to be called") + } +} + +func TestShutdown(t *testing.T) { + logger := hclog.Default() + repoRootRaw := t.TempDir() + repoRoot := turbofs.AbsolutePathFromUpstream(repoRootRaw) + + grpcServer := &mockGrpc{ + stopped: make(chan struct{}), + } + + s, err := New(logger, repoRoot, "some-version", "/log/file/path") + assert.NilError(t, err, "New") + s.Register(grpcServer) + + ctx := context.Background() + _, err = s.Shutdown(ctx, &turbodprotocol.ShutdownRequest{}) + assert.NilError(t, err, "Shutdown") + // Ensure that graceful stop gets called + select { + case <-grpcServer.stopped: + case <-time.After(2 * time.Second): + t.Error("timed out waiting for graceful stop to be called") + } +} diff --git a/cli/internal/signals/signals.go b/cli/internal/signals/signals.go new file mode 100644 index 0000000000000..8634144f02b49 --- /dev/null +++ b/cli/internal/signals/signals.go @@ -0,0 +1,60 @@ +package signals + +import ( + "os" + "os/signal" + "sync" + "syscall" +) + +// Watcher watches for signals delivered to this process and provides +// the opportunity for turbo to run cleanup +type Watcher struct { + doneCh chan struct{} + closed bool + mu sync.Mutex + closers []func() +} + +// AddOnClose registers a cleanup handler to run when a signal is received +func (w *Watcher) AddOnClose(closer func()) { + w.mu.Lock() + defer w.mu.Unlock() + w.closers = append(w.closers, closer) +} + +// Close runs the cleanup handlers registered with this watcher +func (w *Watcher) Close() { + w.mu.Lock() + defer w.mu.Unlock() + if w.closed { + return + } + w.closed = true + for _, closer := range w.closers { + closer() + } + w.closers = nil + close(w.doneCh) +} + +// Done returns a channel that will be closed after all of the cleanup +// handlers have been run. +func (w *Watcher) Done() <-chan struct{} { + return w.doneCh +} + +// NewWatcher returns a new Watcher instance for watching signals. +func NewWatcher() *Watcher { + // TODO: platform specific signals to watch for? + signalCh := make(chan os.Signal, 1) + signal.Notify(signalCh, os.Interrupt, syscall.SIGTERM, syscall.SIGQUIT) + w := &Watcher{ + doneCh: make(chan struct{}), + } + go func() { + <-signalCh + w.Close() + }() + return w +} diff --git a/cli/internal/turbodprotocol/turbod.proto b/cli/internal/turbodprotocol/turbod.proto new file mode 100644 index 0000000000000..ff2888bc1e341 --- /dev/null +++ b/cli/internal/turbodprotocol/turbod.proto @@ -0,0 +1,52 @@ +syntax = "proto3"; + +option go_package = "github.com/vercel/turborepo/cli/internal/turbodprotocol"; + +package turbodprotocol; + +service Turbod { + rpc Hello (HelloRequest) returns (HelloResponse); + rpc Shutdown (ShutdownRequest) returns (ShutdownResponse); + rpc Status (StatusRequest) returns (StatusResponse); + // Implement cache watching + rpc NotifyOutputsWritten (NotifyOutputsWrittenRequest) returns (NotifyOutputsWrittenResponse); + rpc GetChangedOutputs (GetChangedOutputsRequest) returns (GetChangedOutputsResponse); +} + +message HelloRequest { + string version = 1; + string session_id = 2; +} + +message HelloResponse {} + +message ShutdownRequest {} + +message ShutdownResponse {} + +message StatusRequest {} + +message StatusResponse { + DaemonStatus daemonStatus = 1; +} + +message NotifyOutputsWrittenRequest { + repeated string output_globs = 1; + string hash = 2; +} + +message NotifyOutputsWrittenResponse {} + +message GetChangedOutputsRequest { + repeated string output_globs = 1; + string hash = 2; +} + +message GetChangedOutputsResponse { + repeated string changed_output_globs = 1; +} + +message DaemonStatus { + string log_file = 1; + uint64 uptime_msec = 2; +} diff --git a/cli/scripts/monorepo.ts b/cli/scripts/monorepo.ts index ffde32634acf0..651f339436907 100644 --- a/cli/scripts/monorepo.ts +++ b/cli/scripts/monorepo.ts @@ -306,7 +306,11 @@ fs.copyFileSync( args?: readonly string[], options?: execa.SyncOptions ) { - return execa.sync(turboPath, [command, ...(args || [])], { + const resolvedArgs = [...args]; + if (process.env.TURBO_USE_DAEMON == "1" && command === "run") { + resolvedArgs.push("--experimental-use-daemon"); + } + return execa.sync(turboPath, [command, ...resolvedArgs], { cwd: this.root, shell: true, ...options,