Skip to content

Commit

Permalink
README
Browse files Browse the repository at this point in the history
  • Loading branch information
Evan Culver committed Apr 18, 2020
1 parent 16ad649 commit da355e8
Show file tree
Hide file tree
Showing 4 changed files with 179 additions and 0 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,5 @@
cmd/*/*
.build
!cmd/*/main.go
!cmd/*/README.md
!cmd/*/*.jpg
52 changes: 52 additions & 0 deletions cmd/ncopy/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
# ncopy

I'm calling this `ncopy` for lack of a better name. The setup is this:

Assume you have a function, `copyFile(src, dst)`, that copies files from one machine
(`src`) to another machine (`dst`). Write an algorithm to efficiently copy a file
to N machines. Hint: must be done non-sequentially.

# Solution

For the sake of making this demonstrable, we're going to write `copyFile` as a function
that sleeps for a random amount of time between 3 and 15 seconds to simulate some blocking
operation resembling a real file transfer.

Obviously, the simplest solution is to just roll through the list of destinations sequentially calling `copyFile`,
but this would take a long time and is pretty inefficient. Also, if we have thousands of machines, it
will take a prohibitive amount of time to complete.

So, instead, we're going to start by focusing on two optimizations:

1) do M `copyFile` operations concurrently from any given source
2) as soon as a machine receives the file, use it to send M operations to any remaining hosts

It's as though there are two buckets, one for sources and one for destinations:

![Whiteboard](https://github.com/eculver/go-play/blob/ncopy/cmd/ncopy/ncopy.jpg)

To achieve the concurrency in Go, I decided to use channels to coordinate the effort. There
are two that we care about: `seeds` and `sends`. The `seeds` channel is populated with M source values
to represent that there are M candidates to send from. The result doesn't amount to that much code.

# Alternative Solutions

I thought about managing a heap-based priority queue where each node in the queue represented a source with its priority
being tied to the number of in-flight `copyFile` operations but it got to the point where it seemed like overkill pretty quickly
just to get something working. Plus, by treating the number of in-flight operations as priority in the heap means that every time a `copyFile` finishes, we
would need to "Fix" the heap which is `O(log n)` that is not necessary.

I'm sure there are other solutions too that I didn't consider. I'm sure that in a real-world scenario, we might want
to consider evaluating and isolating certain nodes in the case where they may be impacted by reduced I/O meaning, if a node is
subject to relative network congestion, it would probably make sense to avoid using it as a seed node.

There's also the situation in the real-world where not every `copyFile` operation is going to complete successfully so retries should be considered.
In the face of needing to retry, we'd probably want to know if it was related to the source node and potentially isolate that node
from future copes or maybe even put it in some sort of "penalty box" to account for transient failures.

# Results

Generally, the more nodes there are, the greater paralellism can be achived for basically just the cost of keeping
integers in a channel. There aren't any expensive seeks or traversals at play since there is no prioritization. So, we're
looking at `O(n*m)` space complexity since the implementation eventually puts `m` copies of sources into the seed channel
and an `O(n)` time complexity to iterate over all destinations.
125 changes: 125 additions & 0 deletions cmd/ncopy/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
package main

import (
"flag"
"fmt"
"math/rand"
"sync"
"time"
)

const (
defaultNumDsts = 100
defaultConcurrency = 5
)

// keep track tof the total amount of time spent just for copying so that we
// can report on an "improvement" metric that is just the difference between
// the total copy time and the actual time it took to complete
var totalCopyTime = time.Duration(0)

// ncopy is a simulation of a coordinated file copy mechanism from a single source
// node to N destination nodes. The only assumption is that you have a copyFile
// method that does the actual copying of the file from src to dst. For the sake
// of this example copyFile blocks for a random amount of time.
func copyFile(src, dst int) {
// sleep for a random duration between 3 and 15 seconds
secs := rand.Intn(12) + 3
durs := time.Duration(secs) * time.Second
totalCopyTime = totalCopyTime + durs
fmt.Printf("start copy from %d to %d (%d secs)\n", src, dst, secs)
time.Sleep(durs)
fmt.Printf("done copying from %d to %d\n", src, dst)
}

func main() {
var (
wg sync.WaitGroup
resultLock sync.RWMutex

numDsts = flag.Int("num-dsts", defaultNumDsts, "Number of destinations to send to")
concurrency = flag.Int("concurrency", defaultConcurrency, "Max number of concurrent sends from a source")
)
flag.Parse()

results := make(map[int][]int)
seeds := make(chan int)
sends := make(chan int)
dsts := make([]int, *numDsts)
// generate a list of integer to use as destination IDs
// node 0 will act as the initial seed so start at 1
for i := 1; i <= *numDsts; i++ {
dsts[i-1] = i
}

// seed makes a source available for x concurrent sends
seed := func(src, x int) {
for i := 0; i < x; i++ {
seeds <- src
}
}

saveResult := func(src, dst int) {
resultLock.Lock()
defer resultLock.Unlock()
if _, ok := results[src]; !ok {
results[src] = []int{dst}
return
}
results[src] = append(results[src], dst)
}

// send initiates a send to dst from the next available source
sendTo := func(dst int) {
src := <-seeds
copyFile(src, dst)
saveResult(src, dst)
wg.Done()
// dst is now available for sending, add number of sends matching the concurrency factor
// from it as a new source
go seed(dst, *concurrency)
// and we also put the src back out there too since it's available again
go seed(src, 1)
}

// mark this as our "start" time since everything else is setup
start := time.Now()

// populate the channel of sends that need to happen
go func() {
for _, dst := range dsts {
sends <- dst
}
close(sends)
}()

// populate the initial "set" of sources
go seed(0, *concurrency)

// block until all dsts have been sent to
wg.Add(*numDsts)
for dst := range sends {
go sendTo(dst)
}
wg.Wait()

timing := time.Now().Sub(start)

// spit out some values humans may care about
fmt.Printf("\n\nresults:\n")

timingDiffAbs := totalCopyTime - timing
timingDiffPct := (float64(timingDiffAbs) / float64(totalCopyTime)) * 100
max := 0
maxSrc := -1
for src, dsts := range results {
if len(dsts) > max {
max = len(dsts)
maxSrc = src
}
fmt.Printf("%d sent to %d dsts: %v\n", src, len(dsts), dsts)
}
fmt.Println("--------")
fmt.Printf("max sends: %d sent to %d nodes\n", maxSrc, max)
fmt.Printf("improvement over baseline: %v (%.2f%%)\n", timingDiffAbs, timingDiffPct)
}
Binary file added cmd/ncopy/ncopy.jpg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.

0 comments on commit da355e8

Please sign in to comment.