Skip to content

Commit

Permalink
add Update parity function (#60)
Browse files Browse the repository at this point in the history
Add Update parity function
  • Loading branch information
chenzhongtao authored and klauspost committed Aug 20, 2017
1 parent dc6af2d commit d78bf47
Show file tree
Hide file tree
Showing 7 changed files with 314 additions and 6 deletions.
20 changes: 19 additions & 1 deletion galois_amd64.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,10 @@ func galMulAVX2Xor(low, high, in, out []byte)
//go:noescape
func galMulAVX2(low, high, in, out []byte)

// This is what the assembler routines does in blocks of 16 bytes:
//go:noescape
func sSE2XorSlice(in, out []byte)

// This is what the assembler rountes does in blocks of 16 bytes:
/*
func galMulSSSE3(low, high, in, out []byte) {
for n, input := range in {
Expand Down Expand Up @@ -71,3 +74,18 @@ func galMulSliceXor(c byte, in, out []byte, ssse3, avx2 bool) {
}
}
}

// slice galois add
func sliceXor(in, out []byte, sse2 bool) {
var done int
if sse2 {
sSE2XorSlice(in, out)
done = (len(in) >> 4) << 4
}
remain := len(in) - done
if remain > 0 {
for i := done; i < len(in); i++ {
out[i] ^= in[i]
}
}
}
24 changes: 24 additions & 0 deletions galois_amd64.s
Original file line number Diff line number Diff line change
Expand Up @@ -162,3 +162,27 @@ done_avx2:

BYTE $0xc5; BYTE $0xf8; BYTE $0x77 // VZEROUPPER
RET



// func sSE2XorSlice(in, out []byte)
TEXT ·sSE2XorSlice(SB), 7, $0
MOVQ in+0(FP), SI // SI: &in
MOVQ in_len+8(FP), R9 // R9: len(in)
MOVQ out+24(FP), DX // DX: &out
SHRQ $4, R9 // len(in) / 16
CMPQ R9, $0
JEQ done_xor_sse2

loopback_xor_sse2:
MOVOU (SI), X0 // in[x]
MOVOU (DX), X1 // out[x]
PXOR X0, X1
MOVOU X1, (DX)
ADDQ $16, SI // in+=16
ADDQ $16, DX // out+=16
SUBQ $1, R9
JNZ loopback_xor_sse2

done_xor_sse2:
RET
7 changes: 7 additions & 0 deletions galois_noasm.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,10 @@ func galMulSliceXor(c byte, in, out []byte, ssse3, avx2 bool) {
out[n] ^= mt[input]
}
}

// slice galois add
func sliceXor(in, out []byte, sse2 bool) {
for n, input := range in {
out[n] ^= input
}
}
47 changes: 47 additions & 0 deletions galois_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,3 +153,50 @@ func TestGalois(t *testing.T) {
t.Fatal("galExp(13, 7) != 43")
}
}

func TestSliceGalADD(t *testing.T) {

lengthList := []int{16, 32, 34}
for _, length := range lengthList {
in := make([]byte, length)
fillRandom(in)
out := make([]byte, length)
fillRandom(out)
expect := make([]byte, length)
for i := range expect {
expect[i] = in[i] ^ out[i]
}
sliceXor(in, out, false)
if 0 != bytes.Compare(out, expect) {
t.Errorf("got %#v, expected %#v", out, expect)
}
fillRandom(out)
for i := range expect {
expect[i] = in[i] ^ out[i]
}
sliceXor(in, out, true)
if 0 != bytes.Compare(out, expect) {
t.Errorf("got %#v, expected %#v", out, expect)
}
}

for i := 0; i < 256; i++ {
a := byte(i)
for j := 0; j < 256; j++ {
b := byte(j)
for k := 0; k < 256; k++ {
c := byte(k)
x := galAdd(a, galAdd(b, c))
y := galAdd(galAdd(a, b), c)
if x != y {
t.Fatal("add does not match:", x, "!=", y)
}
x = galMultiply(a, galMultiply(b, c))
y = galMultiply(galMultiply(a, b), c)
if x != y {
t.Fatal("multiply does not match:", x, "!=", y)
}
}
}
}
}
15 changes: 11 additions & 4 deletions options.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,10 @@ import (
type Option func(*options)

type options struct {
maxGoroutines int
minSplitSize int
useAVX2, useSSSE3 bool
usePAR1Matrix bool
maxGoroutines int
minSplitSize int
useAVX2, useSSSE3, useSSE2 bool
usePAR1Matrix bool
}

var defaultOptions = options{
Expand All @@ -28,6 +28,7 @@ func init() {
// Detect CPU capabilities.
defaultOptions.useSSSE3 = cpuid.CPU.SSSE3()
defaultOptions.useAVX2 = cpuid.CPU.AVX2()
defaultOptions.useSSE2 = cpuid.CPU.SSE2()
}

// WithMaxGoroutines is the maximum number of goroutines number for encoding & decoding.
Expand Down Expand Up @@ -67,6 +68,12 @@ func withAVX2(enabled bool) Option {
}
}

func withSSE2(enabled bool) Option {
return func(o *options) {
o.useSSE2 = enabled
}
}

// WithPAR1Matrix causes the encoder to build the matrix how PARv1
// does. Note that the method they use is buggy, and may lead to cases
// where recovery is impossible, even if there are enough parity
Expand Down
105 changes: 104 additions & 1 deletion reedsolomon.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,14 @@ type Encoder interface {
// calling the Verify function is likely to fail.
ReconstructData(shards [][]byte) error

// Update parity is use for change a few data shards and update it's parity.
// Input 'newDatashards' containing data shards changed.
// Input 'shards' containing old data shards (if data shard not changed, it can be nil) and old parity shards.
// new parity shards will in shards[DataShards:]
// Update is very useful if DataShards much larger than ParityShards and changed data shards is few. It will
// faster than Encode and not need read all data shards to encode.
Update(shards [][]byte, newDatashards [][]byte) error

// Split a data slice into the number of shards given to the encoder,
// and create empty parity shards.
//
Expand Down Expand Up @@ -221,7 +229,7 @@ func New(dataShards, parityShards int, opts ...Option) (Encoder, error) {
}

// ErrTooFewShards is returned if too few shards where given to
// Encode/Verify/Reconstruct. It will also be returned from Reconstruct
// Encode/Verify/Reconstruct/Update. It will also be returned from Reconstruct
// if there were too few shards to reconstruct the missing data.
var ErrTooFewShards = errors.New("too few shards given")

Expand Down Expand Up @@ -249,6 +257,101 @@ func (r reedSolomon) Encode(shards [][]byte) error {
return nil
}

// ErrInvalidInput is returned if invalid input parameter of Update.
var ErrInvalidInput = errors.New("invalid input")

func (r reedSolomon) Update(shards [][]byte, newDatashards [][]byte) error {
if len(shards) != r.Shards {
return ErrTooFewShards
}

if len(newDatashards) != r.DataShards {
return ErrTooFewShards
}

err := checkShards(shards, true)
if err != nil {
return err
}

err = checkShards(newDatashards, true)
if err != nil {
return err
}

for i := range newDatashards {
if newDatashards[i] != nil && shards[i] == nil {
return ErrInvalidInput
}
}
for _, p := range shards[r.DataShards:] {
if p == nil {
return ErrInvalidInput
}
}

shardSize := shardSize(shards)

// Get the slice of output buffers.
output := shards[r.DataShards:]

// Do the coding.
r.updateParityShards(r.parity, shards[0:r.DataShards], newDatashards[0:r.DataShards], output, r.ParityShards, shardSize)
return nil
}

func (r reedSolomon) updateParityShards(matrixRows, oldinputs, newinputs, outputs [][]byte, outputCount, byteCount int) {
if r.o.maxGoroutines > 1 && byteCount > r.o.minSplitSize {
r.updateParityShardsP(matrixRows, oldinputs, newinputs, outputs, outputCount, byteCount)
return
}

for c := 0; c < r.DataShards; c++ {
in := newinputs[c]
if in == nil {
continue
}
oldin := oldinputs[c]
// oldinputs data will be change
sliceXor(in, oldin, r.o.useSSE2)
for iRow := 0; iRow < outputCount; iRow++ {
galMulSliceXor(matrixRows[iRow][c], oldin, outputs[iRow], r.o.useSSSE3, r.o.useAVX2)
}
}
}

func (r reedSolomon) updateParityShardsP(matrixRows, oldinputs, newinputs, outputs [][]byte, outputCount, byteCount int) {
var wg sync.WaitGroup
do := byteCount / r.o.maxGoroutines
if do < r.o.minSplitSize {
do = r.o.minSplitSize
}
start := 0
for start < byteCount {
if start+do > byteCount {
do = byteCount - start
}
wg.Add(1)
go func(start, stop int) {
for c := 0; c < r.DataShards; c++ {
in := newinputs[c]
if in == nil {
continue
}
oldin := oldinputs[c]
// oldinputs data will be change
sliceXor(in[start:stop], oldin[start:stop], r.o.useSSE2)
for iRow := 0; iRow < outputCount; iRow++ {
galMulSliceXor(matrixRows[iRow][c], oldin[start:stop], outputs[iRow][start:stop], r.o.useSSSE3, r.o.useAVX2)
}
}
wg.Done()
}(start, start+do)
start += do
}
wg.Wait()
}

// Verify returns true if the parity shards contain the right data.
// The data is the same format as Encode. No data is modified.
func (r reedSolomon) Verify(shards [][]byte) (bool, error) {
Expand Down
102 changes: 102 additions & 0 deletions reedsolomon_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,108 @@ func testEncoding(t *testing.T, o ...Option) {
}
}

func TestUpdate(t *testing.T) {
testEncoding(t)
for _, o := range testOpts() {
testUpdate(t, o...)
}
}

func testUpdate(t *testing.T, o ...Option) {
perShard := 50000
r, err := New(10, 3, o...)
if err != nil {
t.Fatal(err)
}
shards := make([][]byte, 13)
for s := range shards {
shards[s] = make([]byte, perShard)
}

rand.Seed(0)
for s := 0; s < 13; s++ {
fillRandom(shards[s])
}

err = r.Encode(shards)
if err != nil {
t.Fatal(err)
}
ok, err := r.Verify(shards)
if err != nil {
t.Fatal(err)
}
if !ok {
t.Fatal("Verification failed")
}

newdatashards := make([][]byte, 10)
for s := 0; s < 10; s++ {
newdatashards[s] = make([]byte, perShard)
fillRandom(newdatashards[s])
err = r.Update(shards, newdatashards)
if err != nil {
t.Fatal(err)
}
shards[s] = newdatashards[s]
ok, err := r.Verify(shards)
if err != nil {
t.Fatal(err)
}
if !ok {
t.Fatal("Verification failed")
}
newdatashards[s] = nil
}
for s := 0; s < 9; s++ {
newdatashards[s] = make([]byte, perShard)
newdatashards[s+1] = make([]byte, perShard)
fillRandom(newdatashards[s])
fillRandom(newdatashards[s+1])
err = r.Update(shards, newdatashards)
if err != nil {
t.Fatal(err)
}
shards[s] = newdatashards[s]
shards[s+1] = newdatashards[s+1]
ok, err := r.Verify(shards)
if err != nil {
t.Fatal(err)
}
if !ok {
t.Fatal("Verification failed")
}
newdatashards[s] = nil
newdatashards[s+1] = nil
}
for newNum := 1; newNum <= 10; newNum++ {
for s := 0; s <= 10-newNum; s++ {
for i := 0; i < newNum; i++ {
newdatashards[s+i] = make([]byte, perShard)
fillRandom(newdatashards[s+i])
}
err = r.Update(shards, newdatashards)
if err != nil {
t.Fatal(err)
}
for i := 0; i < newNum; i++ {
shards[s+i] = newdatashards[s+i]
}
ok, err := r.Verify(shards)
if err != nil {
t.Fatal(err)
}
if !ok {
t.Fatal("Verification failed")
}
for i := 0; i < newNum; i++ {
newdatashards[s+i] = nil
}
}
}

}

func TestReconstruct(t *testing.T) {
testReconstruct(t)
for _, o := range testOpts() {
Expand Down

0 comments on commit d78bf47

Please sign in to comment.