Skip to content

Commit

Permalink
v2 dispatcher
Browse files Browse the repository at this point in the history
  • Loading branch information
ian-shim committed Nov 8, 2024
1 parent 6e14746 commit 2128649
Show file tree
Hide file tree
Showing 12 changed files with 1,040 additions and 17 deletions.
21 changes: 21 additions & 0 deletions core/serialization.go
Original file line number Diff line number Diff line change
Expand Up @@ -393,6 +393,27 @@ func (h *BlobHeader) Deserialize(data []byte) (*BlobHeader, error) {
return h, err
}

func SerializeMerkleProof(proof *merkletree.Proof) []byte {
proofBytes := make([]byte, 0)
for _, hash := range proof.Hashes {
proofBytes = append(proofBytes, hash[:]...)
}
return proofBytes
}

func DeserializeMerkleProof(data []byte) (*merkletree.Proof, error) {
proof := &merkletree.Proof{}
if len(data)%32 != 0 {
return nil, fmt.Errorf("invalid proof length")
}
for i := 0; i < len(data); i += 32 {
var hash [32]byte
copy(hash[:], data[i:i+32])
proof.Hashes = append(proof.Hashes, hash[:])
}
return proof, nil
}

func encode(obj any) ([]byte, error) {
var buf bytes.Buffer
enc := gob.NewEncoder(&buf)
Expand Down
3 changes: 2 additions & 1 deletion core/v2/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ var (
// TODO(mooselumph): Put these parameters on chain and add on-chain checks to ensure that the number of operators does not
// conflict with the existing on-chain limits
ParametersMap = map[BlobVersion]BlobVersionParameters{
0: {CodingRate: 8, ReconstructionThreshold: 0.22, NumChunks: 8192},
0: {CodingRate: 8, ReconstructionThreshold: 0.22, ConfirmationThreshold: 55, NumChunks: 8192},
}
)

Expand Down Expand Up @@ -228,6 +228,7 @@ type BlobVerificationInfo struct {
type BlobVersionParameters struct {
CodingRate uint32
ReconstructionThreshold float64
ConfirmationThreshold uint8
NumChunks uint32
}

Expand Down
10 changes: 1 addition & 9 deletions disperser/batcher/batcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,7 @@ func (b *Batcher) updateConfirmationInfo(
blobsToRetry = append(blobsToRetry, batchData.blobs[blobIndex])
continue
}
proof = serializeProof(merkleProof)
proof = core.SerializeMerkleProof(merkleProof)
}

confirmationInfo := &disperser.ConfirmationInfo{
Expand Down Expand Up @@ -563,14 +563,6 @@ func (b *Batcher) HandleSingleBatch(ctx context.Context) error {
return nil
}

func serializeProof(proof *merkletree.Proof) []byte {
proofBytes := make([]byte, 0)
for _, hash := range proof.Hashes {
proofBytes = append(proofBytes, hash[:]...)
}
return proofBytes
}

func (b *Batcher) parseBatchIDFromReceipt(txReceipt *types.Receipt) (uint32, error) {
if len(txReceipt.Logs) == 0 {
return 0, errors.New("failed to get transaction receipt with logs")
Expand Down
100 changes: 99 additions & 1 deletion disperser/common/v2/blobstore/dynamo_metadata_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ func (s *BlobMetadataStore) GetBlobMetadataByStatus(ctx context.Context, status
Value: strconv.Itoa(int(status)),
},
":updatedAt": &types.AttributeValueMemberN{
Value: strconv.FormatInt(time.Now().Unix(), 10),
Value: strconv.FormatInt(int64(lastUpdatedAt), 10),
}})
if err != nil {
return nil, err
Expand Down Expand Up @@ -422,6 +422,76 @@ func (s *BlobMetadataStore) GetAttestation(ctx context.Context, batchHeaderHash
return attestation, nil
}

func (s *BlobMetadataStore) PutBlobVerificationInfo(ctx context.Context, verificationInfo *corev2.BlobVerificationInfo) error {
item, err := MarshalBlobVerificationInfo(verificationInfo)
if err != nil {
return err
}

err = s.dynamoDBClient.PutItemWithCondition(ctx, s.tableName, item, "attribute_not_exists(PK) AND attribute_not_exists(SK)", nil, nil)
if errors.Is(err, commondynamodb.ErrConditionFailed) {
return common.ErrAlreadyExists
}

return err
}

func (s *BlobMetadataStore) GetBlobVerificationInfo(ctx context.Context, blobKey corev2.BlobKey, batchHeaderHash [32]byte) (*corev2.BlobVerificationInfo, error) {
bhh := hex.EncodeToString(batchHeaderHash[:])
item, err := s.dynamoDBClient.GetItem(ctx, s.tableName, map[string]types.AttributeValue{
"PK": &types.AttributeValueMemberS{
Value: blobKeyPrefix + blobKey.Hex(),
},
"SK": &types.AttributeValueMemberS{
Value: batchHeaderKeyPrefix + bhh,
},
})

if err != nil {
return nil, err
}

if item == nil {
return nil, fmt.Errorf("%w: verification info not found for key %s", common.ErrMetadataNotFound, blobKey.Hex())
}

info, err := UnmarshalBlobVerificationInfo(item)
if err != nil {
return nil, err
}

return info, nil
}

func (s *BlobMetadataStore) GetBlobVerificationInfos(ctx context.Context, blobKey corev2.BlobKey) ([]*corev2.BlobVerificationInfo, error) {
items, err := s.dynamoDBClient.Query(ctx, s.tableName, "PK = :pk AND begins_with(SK, :prefix)", commondynamodb.ExpressionValues{
":pk": &types.AttributeValueMemberS{
Value: blobKeyPrefix + blobKey.Hex(),
},
":prefix": &types.AttributeValueMemberS{
Value: batchHeaderKeyPrefix,
},
})

if err != nil {
return nil, err
}

if len(items) == 0 {
return nil, fmt.Errorf("%w: verification info not found for key %s", common.ErrMetadataNotFound, blobKey.Hex())
}

responses := make([]*corev2.BlobVerificationInfo, len(items))
for i, item := range items {
responses[i], err = UnmarshalBlobVerificationInfo(item)
if err != nil {
return nil, fmt.Errorf("failed to unmarshal verification info: %w", err)
}
}

return responses, nil
}

func GenerateTableSchema(tableName string, readCapacityUnits int64, writeCapacityUnits int64) *dynamodb.CreateTableInput {
return &dynamodb.CreateTableInput{
AttributeDefinitions: []types.AttributeDefinition{
Expand Down Expand Up @@ -749,6 +819,34 @@ func UnmarshalBatchHeader(item commondynamodb.Item) (*corev2.BatchHeader, error)
return &header, nil
}

func MarshalBlobVerificationInfo(verificationInfo *corev2.BlobVerificationInfo) (commondynamodb.Item, error) {
fields, err := attributevalue.MarshalMap(verificationInfo)
if err != nil {
return nil, fmt.Errorf("failed to marshal blob verification info: %w", err)
}

bhh, err := verificationInfo.BatchHeader.Hash()
if err != nil {
return nil, fmt.Errorf("failed to hash batch header: %w", err)
}
hashstr := hex.EncodeToString(bhh[:])

fields["PK"] = &types.AttributeValueMemberS{Value: blobKeyPrefix + verificationInfo.BlobKey.Hex()}
fields["SK"] = &types.AttributeValueMemberS{Value: batchHeaderKeyPrefix + hashstr}

return fields, nil
}

func UnmarshalBlobVerificationInfo(item commondynamodb.Item) (*corev2.BlobVerificationInfo, error) {
verificationInfo := corev2.BlobVerificationInfo{}
err := attributevalue.UnmarshalMap(item, &verificationInfo)
if err != nil {
return nil, fmt.Errorf("failed to unmarshal blob verification info: %w", err)
}

return &verificationInfo, nil
}

func MarshalAttestation(attestation *corev2.Attestation) (commondynamodb.Item, error) {
fields, err := attributevalue.MarshalMap(attestation)
if err != nil {
Expand Down
44 changes: 44 additions & 0 deletions disperser/common/v2/blobstore/dynamo_metadata_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,11 @@ func TestBlobMetadataStoreOperations(t *testing.T) {
assert.NoError(t, err)
assert.Len(t, queued, 1)
assert.Equal(t, metadata1, queued[0])
// query to get newer blobs should result in 0 results
queued, err = blobMetadataStore.GetBlobMetadataByStatus(ctx, v2.Queued, metadata1.UpdatedAt+100)
assert.NoError(t, err)
assert.Len(t, queued, 0)

certified, err := blobMetadataStore.GetBlobMetadataByStatus(ctx, v2.Certified, 0)
assert.NoError(t, err)
assert.Len(t, certified, 1)
Expand Down Expand Up @@ -150,6 +155,45 @@ func TestBlobMetadataStoreCerts(t *testing.T) {
err = blobMetadataStore.PutBlobCertificate(ctx, blobCert1, fragmentInfo)
assert.ErrorIs(t, err, common.ErrAlreadyExists)

// get multiple certs
numCerts := 100
keys := make([]corev2.BlobKey, numCerts)
for i := 0; i < numCerts; i++ {
blobCert := &corev2.BlobCertificate{
BlobHeader: &corev2.BlobHeader{
BlobVersion: 0,
QuorumNumbers: []core.QuorumID{0},
BlobCommitments: mockCommitment,
PaymentMetadata: core.PaymentMetadata{
AccountID: "0x123",
BinIndex: uint32(i),
CumulativePayment: big.NewInt(321),
},
Signature: []byte("signature"),
},
RelayKeys: []corev2.RelayKey{0},
}
blobKey, err := blobCert.BlobHeader.BlobKey()
assert.NoError(t, err)
keys[i] = blobKey
err = blobMetadataStore.PutBlobCertificate(ctx, blobCert, fragmentInfo)
assert.NoError(t, err)
}

certs, fragmentInfos, err := blobMetadataStore.GetBlobCertificates(ctx, keys)
assert.NoError(t, err)
assert.Len(t, certs, numCerts)
assert.Len(t, fragmentInfos, numCerts)
binIndexes := make(map[uint32]struct{})
for i := 0; i < numCerts; i++ {
assert.Equal(t, fragmentInfos[i], fragmentInfo)
binIndexes[certs[i].BlobHeader.PaymentMetadata.BinIndex] = struct{}{}
}
assert.Len(t, binIndexes, numCerts)
for i := 0; i < numCerts; i++ {
assert.Contains(t, binIndexes, uint32(i))
}

deleteItems(t, []commondynamodb.Key{
{
"PK": &types.AttributeValueMemberS{Value: "BlobKey#" + blobKey.Hex()},
Expand Down
Loading

0 comments on commit 2128649

Please sign in to comment.