Skip to content

Commit 0848bfe

Browse files
tzdyballiamsi
andauthored
feat: Celestia DA Layer Client implementation (#399)
* Celestia Node RPC Client * celestia DA implementation * mock celestia-node RPC server Co-authored-by: Ismail Khoffi <Ismail.Khoffi@gmail.com>
1 parent 308322f commit 0848bfe

File tree

14 files changed

+843
-16
lines changed

14 files changed

+843
-16
lines changed

da/celestia/celestia.go

Lines changed: 147 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,147 @@
1+
package celestia
2+
3+
import (
4+
"context"
5+
"encoding/json"
6+
"time"
7+
8+
"github.com/gogo/protobuf/proto"
9+
10+
"github.com/celestiaorg/optimint/da"
11+
"github.com/celestiaorg/optimint/libs/cnrc"
12+
"github.com/celestiaorg/optimint/log"
13+
"github.com/celestiaorg/optimint/store"
14+
"github.com/celestiaorg/optimint/types"
15+
pb "github.com/celestiaorg/optimint/types/pb/optimint"
16+
)
17+
18+
// DataAvailabilityLayerClient use celestia-node public API.
19+
type DataAvailabilityLayerClient struct {
20+
client *cnrc.Client
21+
22+
config Config
23+
logger log.Logger
24+
}
25+
26+
var _ da.DataAvailabilityLayerClient = &DataAvailabilityLayerClient{}
27+
var _ da.BlockRetriever = &DataAvailabilityLayerClient{}
28+
29+
type Config struct {
30+
BaseURL string `json:"base_url"`
31+
Timeout time.Duration `json:"timeout"`
32+
GasLimit uint64 `json:"gas_limit"`
33+
NamespaceID [8]byte `json:"namespace_id"`
34+
}
35+
36+
func (c *DataAvailabilityLayerClient) Init(config []byte, kvStore store.KVStore, logger log.Logger) error {
37+
c.logger = logger
38+
39+
if len(config) > 0 {
40+
return json.Unmarshal(config, &c.config)
41+
}
42+
43+
return nil
44+
}
45+
46+
func (c *DataAvailabilityLayerClient) Start() error {
47+
c.logger.Info("starting Celestia Data Availability Layer Client", "baseURL", c.config.BaseURL)
48+
var err error
49+
c.client, err = cnrc.NewClient(c.config.BaseURL, cnrc.WithTimeout(c.config.Timeout))
50+
return err
51+
}
52+
53+
func (c *DataAvailabilityLayerClient) Stop() error {
54+
c.logger.Info("stopping Celestia Data Availability Layer Client")
55+
return nil
56+
}
57+
58+
func (c *DataAvailabilityLayerClient) SubmitBlock(block *types.Block) da.ResultSubmitBlock {
59+
blob, err := block.MarshalBinary()
60+
if err != nil {
61+
return da.ResultSubmitBlock{
62+
DAResult: da.DAResult{
63+
Code: da.StatusError,
64+
Message: err.Error(),
65+
},
66+
}
67+
}
68+
69+
txResponse, err := c.client.SubmitPFD(context.TODO(), c.config.NamespaceID, blob, c.config.GasLimit)
70+
71+
if err != nil {
72+
return da.ResultSubmitBlock{
73+
DAResult: da.DAResult{
74+
Code: da.StatusError,
75+
Message: err.Error(),
76+
},
77+
}
78+
}
79+
80+
return da.ResultSubmitBlock{
81+
DAResult: da.DAResult{
82+
Code: da.StatusSuccess,
83+
Message: "tx hash: " + txResponse.TxHash,
84+
DAHeight: uint64(txResponse.Height),
85+
},
86+
}
87+
}
88+
89+
func (c *DataAvailabilityLayerClient) CheckBlockAvailability(dataLayerHeight uint64) da.ResultCheckBlock {
90+
shares, err := c.client.NamespacedShares(context.TODO(), c.config.NamespaceID, dataLayerHeight)
91+
if err != nil {
92+
return da.ResultCheckBlock{
93+
DAResult: da.DAResult{
94+
Code: da.StatusError,
95+
Message: err.Error(),
96+
},
97+
}
98+
}
99+
100+
return da.ResultCheckBlock{
101+
DAResult: da.DAResult{
102+
Code: da.StatusSuccess,
103+
DAHeight: dataLayerHeight,
104+
},
105+
DataAvailable: len(shares) > 0,
106+
}
107+
}
108+
109+
func (c *DataAvailabilityLayerClient) RetrieveBlocks(dataLayerHeight uint64) da.ResultRetrieveBlocks {
110+
data, err := c.client.NamespacedData(context.TODO(), c.config.NamespaceID, dataLayerHeight)
111+
if err != nil {
112+
return da.ResultRetrieveBlocks{
113+
DAResult: da.DAResult{
114+
Code: da.StatusError,
115+
Message: err.Error(),
116+
},
117+
}
118+
}
119+
120+
blocks := make([]*types.Block, len(data))
121+
for i, msg := range data {
122+
var block pb.Block
123+
err = proto.Unmarshal(msg, &block)
124+
if err != nil {
125+
c.logger.Error("failed to unmarshal block", "daHeight", dataLayerHeight, "position", i, "error", err)
126+
continue
127+
}
128+
blocks[i] = new(types.Block)
129+
err := blocks[i].FromProto(&block)
130+
if err != nil {
131+
return da.ResultRetrieveBlocks{
132+
DAResult: da.DAResult{
133+
Code: da.StatusError,
134+
Message: err.Error(),
135+
},
136+
}
137+
}
138+
}
139+
140+
return da.ResultRetrieveBlocks{
141+
DAResult: da.DAResult{
142+
Code: da.StatusSuccess,
143+
DAHeight: dataLayerHeight,
144+
},
145+
Blocks: blocks,
146+
}
147+
}

da/celestia/mock/messages.go

Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,106 @@
1+
package mock
2+
3+
import (
4+
"bytes"
5+
"encoding/binary"
6+
)
7+
8+
// This code is extracted from celestia-app. It's here to build shares from messages (serialized blocks).
9+
// TODO(tzdybal): if we stop using `/namespaced_shares` we can get rid of this file.
10+
11+
const (
12+
ShareSize = 256
13+
NamespaceSize = 8
14+
MsgShareSize = ShareSize - NamespaceSize
15+
)
16+
17+
// splitMessage breaks the data in a message into the minimum number of
18+
// namespaced shares
19+
func splitMessage(rawData []byte, nid []byte) []NamespacedShare {
20+
shares := make([]NamespacedShare, 0)
21+
firstRawShare := append(append(
22+
make([]byte, 0, ShareSize),
23+
nid...),
24+
rawData[:MsgShareSize]...,
25+
)
26+
shares = append(shares, NamespacedShare{firstRawShare, nid})
27+
rawData = rawData[MsgShareSize:]
28+
for len(rawData) > 0 {
29+
shareSizeOrLen := min(MsgShareSize, len(rawData))
30+
rawShare := append(append(
31+
make([]byte, 0, ShareSize),
32+
nid...),
33+
rawData[:shareSizeOrLen]...,
34+
)
35+
paddedShare := zeroPadIfNecessary(rawShare, ShareSize)
36+
share := NamespacedShare{paddedShare, nid}
37+
shares = append(shares, share)
38+
rawData = rawData[shareSizeOrLen:]
39+
}
40+
return shares
41+
}
42+
43+
// Share contains the raw share data without the corresponding namespace.
44+
type Share []byte
45+
46+
// NamespacedShare extends a Share with the corresponding namespace.
47+
type NamespacedShare struct {
48+
Share
49+
ID []byte
50+
}
51+
52+
func min(a, b int) int {
53+
if a <= b {
54+
return a
55+
}
56+
return b
57+
}
58+
59+
func zeroPadIfNecessary(share []byte, width int) []byte {
60+
oldLen := len(share)
61+
if oldLen < width {
62+
missingBytes := width - oldLen
63+
padByte := []byte{0}
64+
padding := bytes.Repeat(padByte, missingBytes)
65+
share = append(share, padding...)
66+
return share
67+
}
68+
return share
69+
}
70+
71+
// marshalDelimited marshals the raw data (excluding the namespace) of this
72+
// message and prefixes it with the length of that encoding.
73+
func marshalDelimited(data []byte) ([]byte, error) {
74+
lenBuf := make([]byte, binary.MaxVarintLen64)
75+
length := uint64(len(data))
76+
n := binary.PutUvarint(lenBuf, length)
77+
return append(lenBuf[:n], data...), nil
78+
}
79+
80+
// appendToShares appends raw data as shares.
81+
// Used to build shares from blocks/messages.
82+
func appendToShares(shares []NamespacedShare, nid []byte, rawData []byte) []NamespacedShare {
83+
if len(rawData) <= MsgShareSize {
84+
rawShare := append(append(
85+
make([]byte, 0, len(nid)+len(rawData)),
86+
nid...),
87+
rawData...,
88+
)
89+
paddedShare := zeroPadIfNecessary(rawShare, ShareSize)
90+
share := NamespacedShare{paddedShare, nid}
91+
shares = append(shares, share)
92+
} else { // len(rawData) > MsgShareSize
93+
shares = append(shares, splitMessage(rawData, nid)...)
94+
}
95+
return shares
96+
}
97+
98+
type namespacedSharesResponse struct {
99+
Shares []Share `json:"shares"`
100+
Height uint64 `json:"height"`
101+
}
102+
103+
type namespacedDataResponse struct {
104+
Data [][]byte `json:"data"`
105+
Height uint64 `json:"height"`
106+
}

0 commit comments

Comments
 (0)