diff --git a/README.md b/README.md index 36dab04..f02c094 100644 --- a/README.md +++ b/README.md @@ -9,4 +9,5 @@ |4|Instead of sending each line to the channel, now sending 100 lines chunked together. Also, to minimise garbage collection, not freeing up memory when resetting a slice. |3:41.76|-161.07|[b7b1781](https://github.com/shraddhaag/1brc/commit/b7b1781f58fd258a06940bd6c05eb404c8a14af6)| |5|Read file in chunks of 100 MB instead of reading line by line. |3:32.62|-9.14|[c26fea4](https://github.com/shraddhaag/1brc/commit/c26fea40019552a7e4fc1c864236f433b1b686f0)| |6|Convert temperature from `string` to `int64`, process in `int64` and convert to `float64` at the end. |2:51.50|-41.14|[7812da4](https://github.com/shraddhaag/1brc/commit/7812da4d0be07dd4686d5f9b9df1e93b08cd0dd1)| -|7|In the city <> temperatures map, replaced the value for each key (city) to preprocessed min, max, count and sum of all temperatures instead of storing all recorded temperatures for the city.|1:39.81|-71.79|| \ No newline at end of file +|7|In the city <> temperatures map, replaced the value for each key (city) to preprocessed min, max, count and sum of all temperatures instead of storing all recorded temperatures for the city.|1:39.81|-71.79|[e5213a8](https://github.com/shraddhaag/1brc/commit/e5213a836b17bec0a858474a11f07c902e724bba)| +|8|Use producer consumer pattern to read file in chunks and process the chunks in parallel.|1:43.82|+14.01|| \ No newline at end of file diff --git a/main.go b/main.go index faf8c1c..96fa637 100644 --- a/main.go +++ b/main.go @@ -1,6 +1,7 @@ package main import ( + "bytes" "errors" "flag" "fmt" @@ -14,6 +15,7 @@ import ( "sort" "strconv" "strings" + "sync" ) var cpuprofile = flag.String("cpuprofile", "", "write cpu profile to `file`") @@ -47,7 +49,7 @@ func main() { defer pprof.StopCPUProfile() } - fmt.Println(evaluate(*input)) + evaluate(*input) if *memprofile != "" { f, err := os.Create("./profiles/" + *memprofile) @@ -62,11 +64,6 @@ func main() { } } -type result struct { - city string - temp string -} - func evaluate(input string) string { mapOfTemp, err := readFileLineByLineIntoAMap(input) if err != nil { @@ -104,39 +101,59 @@ func readFileLineByLineIntoAMap(filepath string) (map[string]cityTemperatureInfo if err != nil { panic(err) } + defer file.Close() mapOfTemp := make(map[string]cityTemperatureInfo) - - chanOwner := func() <-chan []string { - resultStream := make(chan []string, 100) - toSend := make([]string, 100) - // reading 100MB per request - chunkSize := 100 * 1024 * 1024 - buf := make([]byte, chunkSize) - var stringsBuilder strings.Builder - stringsBuilder.Grow(500) - var count int + resultStream := make(chan []string, 100) + chunkStream := make(chan []byte, 15) + chunkSize := 64 * 1024 * 1024 + var wg sync.WaitGroup + + // spawn workers to consume (process) file chunks read + for i := 0; i < runtime.NumCPU()-1; i++ { + wg.Add(1) go func() { - defer close(resultStream) - for { - readTotal, err := file.Read(buf) - if err != nil { - if errors.Is(err, io.EOF) { - count = processReadChunk(buf, readTotal, count, &stringsBuilder, toSend, resultStream) - break - } - panic(err) - } - count = processReadChunk(buf, readTotal, count, &stringsBuilder, toSend, resultStream) - } - if count != 0 { - resultStream <- toSend[:count] + for chunk := range chunkStream { + processReadChunk(chunk, resultStream) } + wg.Done() }() - return resultStream } - resultStream := chanOwner() + // spawn a goroutine to read file in chunks and send it to the chunk channel for further processing + go func() { + buf := make([]byte, chunkSize) + leftover := make([]byte, 0, chunkSize) + for { + readTotal, err := file.Read(buf) + if err != nil { + if errors.Is(err, io.EOF) { + break + } + panic(err) + } + buf = buf[:readTotal] + + toSend := make([]byte, readTotal) + copy(toSend, buf) + + lastNewLineIndex := bytes.LastIndex(buf, []byte{'\n'}) + + toSend = append(leftover, buf[:lastNewLineIndex+1]...) + leftover = make([]byte, len(buf[lastNewLineIndex+1:])) + copy(leftover, buf[lastNewLineIndex+1:]) + + chunkStream <- toSend + + } + close(chunkStream) + + // wait for all chunks to be proccessed before closing the result stream + wg.Wait() + close(resultStream) + }() + + // process all city temperatures derived after processing the file chunks for t := range resultStream { for _, text := range t { index := strings.Index(text, ";") @@ -166,7 +183,7 @@ func readFileLineByLineIntoAMap(filepath string) (map[string]cityTemperatureInfo } } } - // fmt.Println(mapOfTemp) + return mapOfTemp, nil } @@ -176,8 +193,12 @@ func convertStringToInt64(input string) int64 { return output } -func processReadChunk(buf []byte, readTotal, count int, stringsBuilder *strings.Builder, toSend []string, resultStream chan<- []string) int { - for _, char := range buf[:readTotal] { +func processReadChunk(buf []byte, resultStream chan<- []string) { + var count int + var stringsBuilder strings.Builder + toSend := make([]string, 100) + + for _, char := range buf { if char == '\n' { if stringsBuilder.Len() != 0 { toSend[count] = stringsBuilder.String() @@ -195,8 +216,9 @@ func processReadChunk(buf []byte, readTotal, count int, stringsBuilder *strings. stringsBuilder.WriteByte(char) } } - - return count + if count != 0 { + resultStream <- toSend[:count] + } } func round(x float64) float64 { diff --git a/profiles/cpu-parallel.prof b/profiles/cpu-parallel.prof new file mode 100644 index 0000000..cefee28 Binary files /dev/null and b/profiles/cpu-parallel.prof differ