From 47c87b9f5b3bb220ce2a7bed8ce2a8c28514205f Mon Sep 17 00:00:00 2001 From: Samuel Stauffer Date: Tue, 19 Apr 2016 10:13:42 -0700 Subject: [PATCH] Vendor throttle pkg to avoid a dependency --- .travis.yml | 3 +- zk/throttle_test.go | 136 ++++++++++++++++++++++++++++++++++++++++++++ zk/zk_test.go | 10 ++-- 3 files changed, 142 insertions(+), 7 deletions(-) create mode 100644 zk/throttle_test.go diff --git a/.travis.yml b/.travis.yml index 0dac91b8..b94cf732 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,6 +1,7 @@ language: go go: - - 1.4.2 + - 1.6.1 + - tip sudo: false diff --git a/zk/throttle_test.go b/zk/throttle_test.go new file mode 100644 index 00000000..633ce05f --- /dev/null +++ b/zk/throttle_test.go @@ -0,0 +1,136 @@ +/* +Copyright 2012 Google Inc. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Vendored from go4.org/net/throttle + +package zk + +import ( + "fmt" + "net" + "sync" + "time" +) + +const unitSize = 1400 // read/write chunk size. ~MTU size. + +type Rate struct { + KBps int // or 0, to not rate-limit bandwidth + Latency time.Duration +} + +// byteTime returns the time required for n bytes. +func (r Rate) byteTime(n int) time.Duration { + if r.KBps == 0 { + return 0 + } + return time.Duration(float64(n)/1024/float64(r.KBps)) * time.Second +} + +type Listener struct { + net.Listener + Down Rate // server Writes to Client + Up Rate // server Reads from client +} + +func (ln *Listener) Accept() (net.Conn, error) { + c, err := ln.Listener.Accept() + time.Sleep(ln.Up.Latency) + if err != nil { + return nil, err + } + tc := &conn{Conn: c, Down: ln.Down, Up: ln.Up} + tc.start() + return tc, nil +} + +type nErr struct { + n int + err error +} + +type writeReq struct { + writeAt time.Time + p []byte + resc chan nErr +} + +type conn struct { + net.Conn + Down Rate // for reads + Up Rate // for writes + + wchan chan writeReq + closeOnce sync.Once + closeErr error +} + +func (c *conn) start() { + c.wchan = make(chan writeReq, 1024) + go c.writeLoop() +} + +func (c *conn) writeLoop() { + for req := range c.wchan { + time.Sleep(req.writeAt.Sub(time.Now())) + var res nErr + for len(req.p) > 0 && res.err == nil { + writep := req.p + if len(writep) > unitSize { + writep = writep[:unitSize] + } + n, err := c.Conn.Write(writep) + time.Sleep(c.Up.byteTime(len(writep))) + res.n += n + res.err = err + req.p = req.p[n:] + } + req.resc <- res + } +} + +func (c *conn) Close() error { + c.closeOnce.Do(func() { + err := c.Conn.Close() + close(c.wchan) + c.closeErr = err + }) + return c.closeErr +} + +func (c *conn) Write(p []byte) (n int, err error) { + defer func() { + if e := recover(); e != nil { + n = 0 + err = fmt.Errorf("%v", err) + return + } + }() + resc := make(chan nErr, 1) + c.wchan <- writeReq{time.Now().Add(c.Up.Latency), p, resc} + res := <-resc + return res.n, res.err +} + +func (c *conn) Read(p []byte) (n int, err error) { + const max = 1024 + if len(p) > max { + p = p[:max] + } + n, err = c.Conn.Read(p) + time.Sleep(c.Down.byteTime(n)) + return +} diff --git a/zk/zk_test.go b/zk/zk_test.go index 10e0b586..c52cb0c9 100644 --- a/zk/zk_test.go +++ b/zk/zk_test.go @@ -7,8 +7,6 @@ import ( "strings" "testing" "time" - - "camlistore.org/pkg/throttle" ) func TestCreate(t *testing.T) { @@ -411,8 +409,8 @@ func TestSlowServer(t *testing.T) { realAddr := fmt.Sprintf("127.0.0.1:%d", ts.Servers[0].Port) proxyAddr, stopCh, err := startSlowProxy(t, - throttle.Rate{}, throttle.Rate{}, - realAddr, func(ln *throttle.Listener) { + Rate{}, Rate{}, + realAddr, func(ln *Listener) { if ln.Up.Latency == 0 { ln.Up.Latency = time.Millisecond * 2000 ln.Down.Latency = time.Millisecond * 2000 @@ -461,12 +459,12 @@ func TestSlowServer(t *testing.T) { } } -func startSlowProxy(t *testing.T, up, down throttle.Rate, upstream string, adj func(ln *throttle.Listener)) (string, chan bool, error) { +func startSlowProxy(t *testing.T, up, down Rate, upstream string, adj func(ln *Listener)) (string, chan bool, error) { ln, err := net.Listen("tcp", "127.0.0.1:0") if err != nil { return "", nil, err } - tln := &throttle.Listener{ + tln := &Listener{ Listener: ln, Up: up, Down: down,