Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce a toxic that closes a connection after given time period #195

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Binary file added toxics/debug.test
Binary file not shown.
48 changes: 48 additions & 0 deletions toxics/limit_time.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package toxics

import "time"

// LimitTimeToxic has shuts connection after given time
type LimitTimeToxic struct {
Time int64 `json:"time"`
}

type LimitTimeToxicState struct {
ElapsedMilliseconds int64
}

func (t *LimitTimeToxic) Pipe(stub *ToxicStub) {
state := stub.State.(*LimitTimeToxicState)

if state.ElapsedMilliseconds >= t.Time {
stub.Close()
return
}

timeout := time.Duration(t.Time-state.ElapsedMilliseconds) * time.Millisecond
start := time.Now()
for {
select {
case <-time.After(timeout):
stub.Close()
return
case <-stub.Interrupt:
state.ElapsedMilliseconds = int64(time.Since(start) / time.Millisecond)
return
case c := <-stub.Input:
if c == nil {
stub.Close()
return
}
stub.Output <- c
}
}
}

func (t *LimitTimeToxic) NewState() interface{} {
return new(LimitTimeToxicState)
}

func init() {
Register("limit_time", new(LimitTimeToxic))
}
96 changes: 96 additions & 0 deletions toxics/limit_time_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
package toxics_test

import (
"bytes"
"testing"
"time"

"github.com/Shopify/toxiproxy/stream"
"github.com/Shopify/toxiproxy/toxics"
)

func TestLimitTimeToxicContinuesAfterInterrupt(t *testing.T) {
timeout := int64(1000)
toxic := &toxics.LimitTimeToxic{Time: timeout}

input := make(chan *stream.StreamChunk)
output := make(chan *stream.StreamChunk)
stub := toxics.NewToxicStub(input, output)
stub.State = toxic.NewState()

// Wait for half the timeout and interrupt
go func() {
time.Sleep(time.Duration(timeout/2) * time.Millisecond)
stub.Interrupt <- struct{}{}
}()

start := time.Now()
toxic.Pipe(stub)
elapsed1 := time.Since(start)
if int64(elapsed1/time.Millisecond) >= timeout {
t.Error("Interrupt did not immediately return from pipe")
}

// Without sending anything then pipe should wait for remainder of timeout and close stub
toxic.Pipe(stub)
elapsedTotal := time.Since(start)

if int64(elapsedTotal/time.Millisecond) > int64((float64(timeout) * 1.1)) {
t.Error("Timeout started again after interrupt")
}

if int64(elapsedTotal/time.Millisecond) < timeout {
t.Error("Did not wait for timeout to elapse")
}

if !stub.Closed() {
t.Error("Did not close pipe after timeout")
}
}

func TestLimitTimeToxicNilInputShouldClosePipe(t *testing.T) {
timeout := int64(30000)
toxic := &toxics.LimitTimeToxic{Time: timeout}

input := make(chan *stream.StreamChunk)
output := make(chan *stream.StreamChunk)
stub := toxics.NewToxicStub(input, output)
stub.State = toxic.NewState()

go func() {
input <- nil
}()

start := time.Now()
toxic.Pipe(stub)
elapsed1 := time.Since(start)
if int64(elapsed1/time.Millisecond) >= timeout {
t.Error("Did not immediately close pipe")
}

if !stub.Closed() {
t.Error("Did not close pipe")
}

}

func TestLimitTimeToxicSendsDataThroughBeforeTimeoutReached(t *testing.T) {
timeout := int64(30000)
toxic := &toxics.LimitTimeToxic{Time: timeout}

input := make(chan *stream.StreamChunk)
output := make(chan *stream.StreamChunk)
stub := toxics.NewToxicStub(input, output)
stub.State = toxic.NewState()

go toxic.Pipe(stub)

inputBuffer := buffer(100)
input <- &stream.StreamChunk{Data: inputBuffer}

sentData := <-output

if !bytes.Equal(sentData.Data, inputBuffer) {
t.Error("Data did not get sent through")
}
}