Skip to content

Commit

Permalink
feat: support parallel hash computing (#29)
Browse files Browse the repository at this point in the history
* feat: support parallel compute hash function
  • Loading branch information
flywukong authored Jul 19, 2023
1 parent 42e346d commit a9545a9
Show file tree
Hide file tree
Showing 4 changed files with 278 additions and 42 deletions.
23 changes: 0 additions & 23 deletions .github/workflows/go.yml
Original file line number Diff line number Diff line change
Expand Up @@ -63,26 +63,3 @@ jobs:
working-directory: ./go
run: make test

coverage:
name: Golang Code Coverage
strategy:
matrix:
go-version: [ 1.18.x ]
os: [ ubuntu-latest ]
runs-on: ${{ matrix.os }}
steps:
- uses: actions/checkout@v3
- name: Setup GitHub Token
run: git config --global url.https://$GH_ACCESS_TOKEN@github.com/.insteadOf https://github.com/
- uses: actions/setup-go@v3
with:
go-version: ${{ matrix.go-version }}
- name: Run coverage
working-directory: ./go
run: go test -race -coverprofile=coverage.txt -covermode=atomic ./...
# - name: Upload coverage to Codecov
# uses: codecov/codecov-action@v3
# with:
# files: go/coverage.txt
# flags: go
# fail_ci_if_error: true
5 changes: 5 additions & 0 deletions go/hash/checksum.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,11 @@ import (
"fmt"
)

type SegmentInfo struct {
SegmentID int
Data []byte
}

// GenerateChecksum generates the checksum of one piece data
func GenerateChecksum(pieceData []byte) []byte {
hash := sha256.New()
Expand Down
195 changes: 181 additions & 14 deletions go/hash/hash.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@ package hash
import (
"bytes"
"errors"
"fmt"
"io"
"os"
"runtime"
"sync"

storageTypes "github.com/bnb-chain/greenfield/x/storage/types"
Expand All @@ -13,6 +15,11 @@ import (
"github.com/bnb-chain/greenfield-common/go/redundancy"
)

const (
maxThreadNum = 5
jobChannelSize = 100
)

// IntegrityHasher compute integrityHash
type IntegrityHasher struct {
ecDataHashes [][][]byte
Expand Down Expand Up @@ -146,9 +153,21 @@ func (i *IntegrityHasher) computeBufferHash() error {
return nil
}

// ComputeIntegrityHash split the reader into segment, ec encode the data, compute the hash roots of pieces
// return the hash result array list and data segmentSize
func ComputeIntegrityHash(reader io.Reader, segmentSize int64, dataShards, parityShards int) ([][]byte, int64,
// ComputeIntegrityHash return the integrity hash of file and data size
// If isSerial is true, compute the integrity hash using the serial version
// If isSerial is false or not provided, compute the integrity hash using the parallel version
func ComputeIntegrityHash(reader io.Reader, segmentSize int64, dataShards, parityShards int, isSerial bool) ([][]byte, int64,
storageTypes.RedundancyType, error,
) {
if isSerial {
return ComputeIntegrityHashSerial(reader, segmentSize, dataShards, parityShards)
}
return ComputeIntegrityHashParallel(reader, segmentSize, dataShards, parityShards)
}

// ComputeIntegrityHashSerial split the reader into segment, ec encode the data, compute the hash roots of pieces in a serial way
// return the hash result array list and data size
func ComputeIntegrityHashSerial(reader io.Reader, segmentSize int64, dataShards, parityShards int) ([][]byte, int64,
storageTypes.RedundancyType, error,
) {
var segChecksumList [][]byte
Expand Down Expand Up @@ -179,16 +198,9 @@ func ComputeIntegrityHash(reader io.Reader, segmentSize int64, dataShards, parit
// compute segment hash
checksum := GenerateChecksum(data)
segChecksumList = append(segChecksumList, checksum)
// get erasure encode bytes
encodeShards, err := redundancy.EncodeRawSegment(data, dataShards, parityShards)
if err != nil {
return nil, 0, storageTypes.REDUNDANCY_EC_TYPE, err
}

for index, shard := range encodeShards {
// compute hash of pieces
piecesHash := GenerateChecksum(shard)
encodeDataHash[index] = append(encodeDataHash[index], piecesHash)
if err = encodeAndComputeHash(encodeDataHash, data, dataShards, parityShards); err != nil {
return nil, 0, storageTypes.REDUNDANCY_EC_TYPE, err
}
}
}
Expand All @@ -212,6 +224,22 @@ func ComputeIntegrityHash(reader io.Reader, segmentSize int64, dataShards, parit
return hashList, contentLen, storageTypes.REDUNDANCY_EC_TYPE, nil
}

func encodeAndComputeHash(encodeDataHash [][][]byte, segment []byte, dataShards, parityShards int) error {
// get erasure encode bytes
encodeShards, err := redundancy.EncodeRawSegment(segment, dataShards, parityShards)
if err != nil {
return err
}

for index, shard := range encodeShards {
// compute hash of pieces
piecesHash := GenerateChecksum(shard)
encodeDataHash[index] = append(encodeDataHash[index], piecesHash)
}

return nil
}

// ComputerHashFromFile open a local file and compute hash result and segmentSize
func ComputerHashFromFile(filePath string, segmentSize int64, dataShards, parityShards int) ([][]byte, int64, storageTypes.RedundancyType, error) {
f, err := os.Open(filePath)
Expand All @@ -221,11 +249,150 @@ func ComputerHashFromFile(filePath string, segmentSize int64, dataShards, parity
}
defer f.Close()

return ComputeIntegrityHash(f, segmentSize, dataShards, parityShards)
return ComputeIntegrityHash(f, segmentSize, dataShards, parityShards, false)
}

// ComputerHashFromBuffer support computing hash and segmentSize from byte buffer
func ComputerHashFromBuffer(content []byte, segmentSize int64, dataShards, parityShards int) ([][]byte, int64, storageTypes.RedundancyType, error) {
reader := bytes.NewReader(content)
return ComputeIntegrityHash(reader, segmentSize, dataShards, parityShards)
return ComputeIntegrityHash(reader, segmentSize, dataShards, parityShards, false)
}

// computePieceHashes encode the segment and return the hashes of ec pieces
func computePieceHashes(segment []byte, dataShards, parityShards int) ([][]byte, error) {
// get erasure encode bytes
encodeShards, err := redundancy.EncodeRawSegment(segment, dataShards, parityShards)
if err != nil {
return nil, err
}

var pieceChecksumList [][]byte
for _, shard := range encodeShards {
// compute hash of pieces
piecesHash := GenerateChecksum(shard)
pieceChecksumList = append(pieceChecksumList, piecesHash)
}

return pieceChecksumList, nil
}

// hashWorker receive the segment info and compute the corresponding segment hash and piece hashes.
// The result will be stored in the sync map to compute integrity hash in order.
func hashWorker(jobs <-chan SegmentInfo, errChan chan<- error, dataShards, parityShards int, wg *sync.WaitGroup,
segmentHashMap *sync.Map, pieceHashMap *sync.Map,
) {
defer wg.Done()

for segInfo := range jobs {
checksum := GenerateChecksum(segInfo.Data)
segmentHashMap.Store(segInfo.SegmentID, checksum)

pieceCheckSumList, err := computePieceHashes(segInfo.Data, dataShards, parityShards)
if err != nil {
errChan <- err
return
}
pieceHashMap.Store(segInfo.SegmentID, pieceCheckSumList)
}
}

// ComputeIntegrityHashParallel split the reader into segment, ec encode the data, compute the hash roots of pieces using
// return the hash result array list and data segmentSize
func ComputeIntegrityHashParallel(reader io.Reader, segmentSize int64, dataShards, parityShards int) ([][]byte, int64, storageTypes.RedundancyType, error) {
var (
segChecksumList [][]byte
ecShards = dataShards + parityShards
contentLen = int64(0)
wg sync.WaitGroup
)
// use sync.map to store the corresponding data of intermediate hash results and segment IDs
segHashMap := &sync.Map{}
pieceHashMap := &sync.Map{}
encodeDataHash := make([][][]byte, ecShards)
// store the result of integrity hash
hashList := make([][]byte, ecShards+1)

jobChan := make(chan SegmentInfo, jobChannelSize)
errChan := make(chan error, 1)
// the thread num should be less than maxThreadNum
threadNum := runtime.NumCPU() / 2
if threadNum > maxThreadNum {
threadNum = maxThreadNum
}
// start workers to compute hash of each segment
for i := 0; i < threadNum; i++ {
wg.Add(1)
go hashWorker(jobChan, errChan, dataShards, parityShards, &wg, segHashMap, pieceHashMap)
}

jobNum := 0
for {
seg := make([]byte, segmentSize)
n, err := reader.Read(seg)
if err != nil {
if err != io.EOF {
log.Error().Msg("failed to read content:" + err.Error())
return nil, 0, storageTypes.REDUNDANCY_EC_TYPE, err
}
break
}

if n > 0 && n <= int(segmentSize) {
contentLen += int64(n)
data := seg[:n]
// compute segment hash

jobChan <- SegmentInfo{SegmentID: jobNum, Data: data}
jobNum++
}
}
close(jobChan)

for i := 0; i < ecShards; i++ {
encodeDataHash[i] = make([][]byte, jobNum)
}

wg.Wait()
close(errChan)

// check error
for err := range errChan {
if err != nil {
log.Error().Msg("err chan detected err:" + err.Error())
return nil, 0, storageTypes.REDUNDANCY_EC_TYPE, err
}
}

for i := 0; i < jobNum; i++ {
segHashValue, ok := segHashMap.Load(i)
if !ok {
return nil, 0, storageTypes.REDUNDANCY_EC_TYPE, fmt.Errorf("fail to load the segment hash")
}
segChecksumList = append(segChecksumList, segHashValue.([]byte))

pieceHashValue, ok := pieceHashMap.Load(i)
if !ok {
return nil, 0, storageTypes.REDUNDANCY_EC_TYPE, fmt.Errorf("fail to load the segment hash")
}
hashValues := pieceHashValue.([][]byte)
for j := 0; j < len(encodeDataHash); j++ {
encodeDataHash[j][i] = hashValues[j]
}
}

// compute the integrity root of pieces of the PrimarySP
hashList[0] = GenerateIntegrityHash(segChecksumList)

// compute the integrity hash of the SecondarySPs
spLen := len(encodeDataHash)
wg.Add(spLen)
for spID, content := range encodeDataHash {
go func(data [][]byte, id int) {
defer wg.Done()
hashList[id+1] = GenerateIntegrityHash(data)
}(content, spID)
}

wg.Wait()
return hashList, contentLen, storageTypes.REDUNDANCY_EC_TYPE, nil
}
Loading

0 comments on commit a9545a9

Please sign in to comment.