Skip to content
This repository was archived by the owner on Mar 22, 2024. It is now read-only.

Commit def1ffd

Browse files
committed
merge both TestCrash and Interrupt into the receiver
2 parents 93c4746 + 61f7aa7 commit def1ffd

File tree

5 files changed

+120
-22
lines changed

5 files changed

+120
-22
lines changed

internal/gateway/logging.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ type LogLevel zerolog.Level
1616
const (
1717
LogDebug LogLevel = LogLevel(zerolog.DebugLevel)
1818
LogInfo LogLevel = LogLevel(zerolog.InfoLevel)
19+
LogWarn LogLevel = LogLevel(zerolog.WarnLevel)
1920
LogError LogLevel = LogLevel(zerolog.ErrorLevel)
2021
)
2122

internal/gateway/receiver/mock_receiver.go

Lines changed: 11 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -10,31 +10,27 @@ import (
1010
// MockReceiver is a mocked implementation of the Receiver interface, for testing
1111
// Can implement fault injection
1212
type MockReceiver struct {
13+
CvmfsReceiver
1314
}
1415

1516
// NewMockReceiver constructs a new MockReceiver object which implements the
1617
// Receiver interface
17-
func NewMockReceiver(ctx context.Context) (Receiver, error) {
18-
return &MockReceiver{}, nil
18+
func NewMockReceiver(ctx context.Context, execPath string, args ...string) (Receiver, error) {
19+
CvmfsReceiver, err := NewCvmfsReceiver(ctx, execPath, args...)
20+
return &MockReceiver{*CvmfsReceiver}, err
1921
}
2022

21-
// Quit command
22-
func (r *MockReceiver) Quit() error {
23-
return nil
24-
}
25-
26-
// Echo command
27-
func (r *MockReceiver) Echo() error {
23+
func (r *MockReceiver) Commit(leasePath, oldRootHash, newRootHash string, tag gw.RepositoryTag) error {
24+
gw.LogC(r.CvmfsReceiver.ctx, "mock receiver", gw.LogWarn).
25+
Str("leaserPath", leasePath).
26+
Msg("Requested commit agains a testing mock, shortcircuit success")
2827
return nil
2928
}
3029

31-
// SubmitPayload command
3230
func (r *MockReceiver) SubmitPayload(leasePath string, payload io.Reader, digest string, headerSize int) error {
33-
return nil
34-
}
35-
36-
// Commit command
37-
func (r *MockReceiver) Commit(leasePath, oldRootHash, newRootHash string, tag gw.RepositoryTag) error {
31+
gw.LogC(r.CvmfsReceiver.ctx, "mock receiver", gw.LogWarn).
32+
Str("leaserPath", leasePath).
33+
Msg("Requested submit agains a testing mock, shortcircuit success")
3834
return nil
3935
}
4036

internal/gateway/receiver/pool.go

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,19 @@ func (p commitTask) Context() context.Context {
5656
return p.ctx
5757
}
5858

59+
type testCrashTask struct {
60+
ctx context.Context
61+
replyChan chan<- error
62+
}
63+
64+
func (p testCrashTask) Reply() chan<- error {
65+
return p.replyChan
66+
}
67+
68+
func (p testCrashTask) Context() context.Context {
69+
return p.ctx
70+
}
71+
5972
// Pool maintains a number of parallel receiver workers to service
6073
// payload submission and commit requests. Payload submissions are done in
6174
// parallel, using Config.NumReceivers workers, while only a single commit
@@ -111,6 +124,14 @@ func (p *Pool) CommitLease(ctx context.Context, leasePath, oldRootHash, newRootH
111124
return result
112125
}
113126

127+
// this is private, it is not enough to just make it private but it is a good start
128+
func (p *Pool) testCrash(ctx context.Context) error {
129+
reply := make(chan error)
130+
p.tasks <- testCrashTask{ctx, reply}
131+
result := <-reply
132+
return result
133+
}
134+
114135
func worker(tasks <-chan task, pool *Pool, workerIdx int) {
115136
gw.Log("worker_pool", gw.LogDebug).
116137
Int("worker_id", workerIdx).
@@ -149,6 +170,9 @@ M:
149170
case commitTask:
150171
result = receiver.Commit(t.leasePath, t.oldRootHash, t.newRootHash, t.tag)
151172
taskType = "commit"
173+
case testCrashTask:
174+
result = receiver.TestCrash()
175+
taskType = "testcrash"
152176
default:
153177
task.Reply() <- fmt.Errorf("unknown task type")
154178
return

internal/gateway/receiver/receiver.go

Lines changed: 33 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@ import (
88
"io"
99
"os"
1010
"os/exec"
11-
"strconv"
1211
"strings"
1312

1413
gw "github.com/cvmfs/gateway/internal/gateway"
@@ -36,7 +35,8 @@ const (
3635
receiverCheckToken // Unused
3736
receiverSubmitPayload
3837
receiverCommit
39-
receiverError // Unused
38+
receiverError // Unused
39+
receiverTestCrash // Used only in testing
4040
)
4141

4242
// Receiver contains the operations that "receiver" worker processes perform
@@ -47,15 +47,16 @@ type Receiver interface {
4747
Commit(leasePath, oldRootHash, newRootHash string, tag gw.RepositoryTag) error
4848
Interrupt() error // like Ctrl-C SIGTERM -2
4949
// Kill() error // like Crtl-D SIGKILL -9
50+
TestCrash() error
5051
}
5152

5253
// NewReceiver is the factory method for Receiver types
53-
func NewReceiver(ctx context.Context, execPath string, mock bool) (Receiver, error) {
54+
func NewReceiver(ctx context.Context, execPath string, mock bool, args ...string) (Receiver, error) {
5455
if mock {
55-
return NewMockReceiver(ctx)
56+
return NewMockReceiver(ctx, execPath, append(args, "-w ''")...)
5657
}
5758

58-
return NewCvmfsReceiver(ctx, execPath)
59+
return NewCvmfsReceiver(ctx, execPath, args...)
5960
}
6061

6162
// CvmfsReceiver spawns an external cvmfs_receiver worker process
@@ -74,12 +75,14 @@ type ReceiverReply struct {
7475
}
7576

7677
// NewCvmfsReceiver will spawn an external cvmfs_receiver worker process and wait for a command
77-
func NewCvmfsReceiver(ctx context.Context, execPath string) (*CvmfsReceiver, error) {
78+
func NewCvmfsReceiver(ctx context.Context, execPath string, args ...string) (*CvmfsReceiver, error) {
7879
if _, err := os.Stat(execPath); os.IsNotExist(err) {
7980
return nil, errors.Wrap(err, "worker process executable not found")
8081
}
8182

82-
cmd := exec.Command(execPath, "-i", strconv.Itoa(3), "-o", strconv.Itoa(4))
83+
cmdLine := []string{"-i", "3", "-o", "4"}
84+
cmdLine = append(cmdLine, args...)
85+
cmd := exec.Command(execPath, cmdLine...)
8386

8487
workerInRead, workerInWrite, err := os.Pipe()
8588
if err != nil {
@@ -115,6 +118,12 @@ func NewCvmfsReceiver(ctx context.Context, execPath string) (*CvmfsReceiver, err
115118
return nil, errors.Wrap(err, "could not start worker process")
116119
}
117120

121+
// it is necessary to close this two files, otherwise, if the receiver crash,
122+
// a read on the `workerOutRead` / `workerCmdOut` will hang forever.
123+
// details: https://web.archive.org/web/20200429092830/https://redbeardlab.com/2020/04/29/on-linux-pipes-fork-and-passing-file-descriptors-to-other-process/
124+
workerInRead.Close()
125+
workerOutWrite.Close()
126+
118127
gw.LogC(ctx, "receiver", gw.LogDebug).
119128
Str("command", "start").
120129
Msg("worker process ready")
@@ -241,10 +250,21 @@ func (r *CvmfsReceiver) Interrupt() error {
241250
return r.worker.Process.Signal(os.Interrupt)
242251
}
243252

253+
// Method used only in testing, we provide an empty implementation here
254+
func (r *CvmfsReceiver) TestCrash() error {
255+
reply, err := r.call(receiverTestCrash, nil, nil)
256+
if err != nil {
257+
return err
258+
}
259+
result := toReceiverError(reply)
260+
return result
261+
}
262+
244263
func (r *CvmfsReceiver) call(reqID receiverOp, msg []byte, payload io.Reader) ([]byte, error) {
245264
if err := r.request(reqID, msg, payload); err != nil {
246265
return nil, err
247266
}
267+
248268
return r.reply()
249269
}
250270

@@ -269,12 +289,18 @@ func (r *CvmfsReceiver) request(reqID receiverOp, msg []byte, payload io.Reader)
269289
func (r *CvmfsReceiver) reply() ([]byte, error) {
270290
buf := make([]byte, 4)
271291
if _, err := io.ReadFull(r.workerCmdOut, buf); err != nil {
292+
if (err == io.EOF) || (err == io.ErrUnexpectedEOF) {
293+
return nil, errors.Wrap(err, "possible that the receiver crashed")
294+
}
272295
return nil, errors.Wrap(err, "could not read reply size")
273296
}
274297
repSize := int32(binary.LittleEndian.Uint32(buf))
275298

276299
reply := make([]byte, repSize)
277300
if _, err := io.ReadFull(r.workerCmdOut, reply); err != nil {
301+
if (err == io.EOF) || (err == io.ErrUnexpectedEOF) {
302+
return nil, errors.Wrap(err, "possible that the receiver crashed")
303+
}
278304
return nil, errors.Wrap(err, "could not read reply body")
279305
}
280306

internal/gateway/receiver/receiver_test.go

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,59 @@ func TestReceiverCycle(t *testing.T) {
1212
}
1313
if err := receiver.Echo(); err != nil {
1414
t.Fatalf("echo request failed: %v", err)
15+
1516
}
1617
if err := receiver.Quit(); err != nil {
1718
t.Fatalf("quit request failed: %v", err)
1819
}
1920
}
21+
22+
func TestReceiverOnCrashWeReturnError(t *testing.T) {
23+
receiver, err := NewReceiver(context.TODO(), "/usr/bin/cvmfs_receiver", true)
24+
if err != nil {
25+
t.Fatalf("could not start receiver: %v", err)
26+
}
27+
if err := receiver.Echo(); err != nil {
28+
t.Fatalf("echo request failed: %v", err)
29+
}
30+
err = receiver.TestCrash()
31+
// note how we check the err being equal (==) and not different (!=) to nil
32+
if err == nil {
33+
t.Fatalf("crash request failed: %v", err)
34+
}
35+
}
36+
37+
func TestReceiverAfterCrashWeCanStillCallCommandAndTheyWillReturnAnError(t *testing.T) {
38+
receiver, err := NewReceiver(context.TODO(), "/usr/bin/cvmfs_receiver", true)
39+
if err != nil {
40+
t.Fatalf("could not start receiver: %v", err)
41+
}
42+
if err := receiver.Echo(); err != nil {
43+
t.Fatalf("echo request failed: %v", err)
44+
}
45+
46+
receiver.TestCrash()
47+
48+
if receiver.Echo() == nil {
49+
t.Fatalf("echo after crash didn't return nil")
50+
}
51+
52+
if receiver.Quit() == nil {
53+
t.Fatalf("quit after crash didn't return nil")
54+
}
55+
}
56+
57+
// reduntat test, but it mimic a problem we found in production.
58+
// after a crash the .Quit() was hanging
59+
func TestReceiverAfterCrashQuiteDoesNotHang(t *testing.T) {
60+
receiver, err := NewReceiver(context.TODO(), "/usr/bin/cvmfs_receiver", true)
61+
if err != nil {
62+
t.Fatalf("could not start receiver: %v", err)
63+
}
64+
65+
receiver.TestCrash()
66+
67+
if receiver.Quit() == nil {
68+
t.Fatalf("quit after crash didn't return nil")
69+
}
70+
}

0 commit comments

Comments
 (0)