Skip to content

Commit

Permalink
@peter/retriever test (0glabs#22)
Browse files Browse the repository at this point in the history
* retriever test
  • Loading branch information
0g-peterzhb authored Mar 28, 2024
1 parent f76949b commit 2824603
Show file tree
Hide file tree
Showing 8 changed files with 164 additions and 56 deletions.
4 changes: 3 additions & 1 deletion .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,9 @@ jobs:
- name: Build Protobuf
run: |
cp api/proto/disperser/disperser.proto tests/
cd tests && python -m grpc_tools.protoc --proto_path=. ./disperser.proto --python_out=. --grpc_python_out=.
cp api/proto/retriever/retriever.proto tests/
cd tests && python -m grpc_tools.protoc --proto_path=. ./disperser.proto --python_out=. --grpc_python_out=. && python -m grpc_tools.protoc --proto_path=. ./retriever.proto --python_out=. --grpc_python_out=.
- name: Run tests
run: |
Expand Down
17 changes: 9 additions & 8 deletions tests/da_put_get_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,27 +17,28 @@ def setup_params(self):
self.num_nodes = 1

def run_test(self):
client = self.da_services[-1]
disperser = self.da_services[-2]

data = randbytes(507904)
disperse_response = client.disperse_blob(data)
disperse_response = disperser.disperse_blob(data)

self.log.info(disperse_response)
request_id = disperse_response.request_id
reply = client.get_blob_status(request_id)
reply = disperser.get_blob_status(request_id)
count = 0
while reply.status != BlobStatus.CONFIRMED and count <= 5:
reply = client.get_blob_status(request_id)
self.log.info(f'blob status {reply.status}')
reply = disperser.get_blob_status(request_id)
count += 1
time.sleep(10)

info = reply.info
self.log.info(f'reply info {info}')
# retrieve the blob
reply = client.retrieve_blob(info)
self.log.info(f'reply data {reply.data}')
reply = disperser.retrieve_blob(info)
assert_equal(reply.data[:len(data)], data)

retriever = self.da_services[-1]
retriever_response = retriever.retrieve_blob(info)
assert_equal(retriever_response.data[:len(data)], data)


if __name__ == "__main__":
Expand Down
43 changes: 23 additions & 20 deletions tests/da_test_framework/da_batcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,26 +37,29 @@ def __init__(
log,
None,
)
self.args = [binary, "--batcher.pull-interval", "10s",
"--chain.rpc", local_conf['blockchain_rpc_endpoint'],
"--chain.private-key", GENESIS_PRIV_KEY,
"--batcher.finalizer-interval", "20s",
"--batcher.aws.region", "us-east-1",
"--batcher.aws.access-key-id", "localstack",
"--batcher.aws.secret-access-key", "localstack",
"--batcher.aws.endpoint-url", "http://0.0.0.0:4566",
"--batcher.s3-bucket-name", "test-zgda-blobstore",
"--batcher.dynamodb-table-name", "test-BlobMetadata",
"--encoder-socket", "0.0.0.0:34000",
"--batcher.batch-size-limit", "10000",
"--batcher.srs-order", "300000",
"--encoding-timeout", "10s",
"--chain-read-timeout", "12s",
"--chain-write-timeout", "13s",
"--batcher.storage.node-url", f'http://{local_conf["node_rpc_endpoint"]}',
"--batcher.storage.kv-url", f'http://{local_conf["kv_rpc_endpoint"]}',
"--batcher.storage.kv-stream-id", local_conf['stream_id'],
"--batcher.storage.flow-contract", local_conf['log_contract_address']]
self.args = [
binary,
"--batcher.pull-interval", "10s",
"--chain.rpc", local_conf['blockchain_rpc_endpoint'],
"--chain.private-key", GENESIS_PRIV_KEY,
"--batcher.finalizer-interval", "20s",
"--batcher.aws.region", "us-east-1",
"--batcher.aws.access-key-id", "localstack",
"--batcher.aws.secret-access-key", "localstack",
"--batcher.aws.endpoint-url", "http://0.0.0.0:4566",
"--batcher.s3-bucket-name", "test-zgda-blobstore",
"--batcher.dynamodb-table-name", "test-BlobMetadata",
"--encoder-socket", "0.0.0.0:34000",
"--batcher.batch-size-limit", "10000",
"--batcher.srs-order", "300000",
"--encoding-timeout", "10s",
"--chain-read-timeout", "12s",
"--chain-write-timeout", "13s",
"--batcher.storage.node-url", f'http://{local_conf["node_rpc_endpoint"]}',
"--batcher.storage.kv-url", f'http://{local_conf["kv_rpc_endpoint"]}',
"--batcher.storage.kv-stream-id", local_conf['stream_id'],
"--batcher.storage.flow-contract", local_conf['log_contract_address']
]

def start(self):
self.log.info("Start DA batcher")
Expand Down
31 changes: 17 additions & 14 deletions tests/da_test_framework/da_encoder.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,11 @@

class DAEncoder(TestNode):
def __init__(
self,
root_dir,
binary,
updated_config,
log,
self,
root_dir,
binary,
updated_config,
log,
):
local_conf = dict(log_config_file="log_config")

Expand All @@ -32,15 +32,18 @@ def __init__(
log,
None,
)
self.args = [binary, "--disperser-encoder.grpc-port", "34000",
"--disperser-encoder.metrics-http-port", "9109",
"--kzg.g1-path", f"{__file_path__}/../../inabox/resources/kzg/g1.point.300000",
"--kzg.g2-path", f"{__file_path__}/../../inabox/resources/kzg/g2.point.300000",
"--kzg.cache-path", f"{__file_path__}/../../inabox/resources/kzg/SRSTables",
"--kzg.srs-order", "300000",
"--kzg.num-workers", "12",
"--disperser-encoder.log.level-std", "trace",
"--disperser-encoder.log.level-file", "trace"]
self.args = [
binary,
"--disperser-encoder.grpc-port", "34000",
"--disperser-encoder.metrics-http-port", "9109",
"--kzg.g1-path", f"{__file_path__}/../../inabox/resources/kzg/g1.point.300000",
"--kzg.g2-path", f"{__file_path__}/../../inabox/resources/kzg/g2.point.300000",
"--kzg.cache-path", f"{__file_path__}/../../inabox/resources/kzg/SRSTables",
"--kzg.srs-order", "300000",
"--kzg.num-workers", "12",
"--disperser-encoder.log.level-std", "trace",
"--disperser-encoder.log.level-file", "trace",
]

def start(self):
self.log.info("Start DA encoder")
Expand Down
1 change: 1 addition & 0 deletions tests/da_test_framework/da_node_type.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,4 @@ class DANodeType(Enum):
DA_ENCODER = 4
DA_BATCHER = 5
DA_SERVER = 6
DA_RETRIEVER = 7
81 changes: 81 additions & 0 deletions tests/da_test_framework/da_retriever.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
import os
import sys
import time
import grpc
import retriever_pb2 as pb2
import retriever_pb2_grpc as pb2_grpc
from da_test_framework.da_node_type import DANodeType

sys.path.append("../0g-storage-kv/tests")

from test_framework.blockchain_node import TestNode


__file_path__ = os.path.dirname(os.path.realpath(__file__))


class DARetriever(TestNode):
def __init__(
self,
root_dir,
binary,
updated_config,
log,
):
local_conf = dict(log_config_file="log_config")
print(updated_config)
print(local_conf)
local_conf.update(updated_config)

data_dir = os.path.join(root_dir, "da_retriever")
self.grpc_url = "0.0.0.0:32011"
super().__init__(
DANodeType.DA_RETRIEVER,
14,
data_dir,
None,
binary,
local_conf,
log,
None,
)
self.args = [
binary,
"--retriever.hostname", "localhost",
"--retriever.grpc-port", "32011",
"--retriever.storage.node-url", f'http://{local_conf["node_rpc_endpoint"]}',
"--retriever.storage.kv-url", f'http://{local_conf["kv_rpc_endpoint"]}',
"--retriever.storage.kv-stream-id", local_conf['stream_id'],
"--retriever.storage.flow-contract", local_conf['log_contract_address'],
"--retriever.log.level-std", "trace",
"--kzg.g1-path", f"{__file_path__}/../../inabox/resources/kzg/g1.point.300000",
"--kzg.g2-path", f"{__file_path__}/../../inabox/resources/kzg/g2.point.300000",
"--kzg.cache-path", f"{__file_path__}/../../inabox/resources/kzg/SRSTables",
"--kzg.srs-order", "300000",
]

def wait_for_rpc_connection(self):
# TODO: health check of service availability
time.sleep(3)
self.channel = grpc.insecure_channel(self.grpc_url)
# bind the client and the server
self.stub = pb2_grpc.RetrieverStub(self.channel)

def start(self):
self.log.info("Start DA retriever")
super().start()

def stop(self):
self.log.info("Stop DA retriever")
try:
super().stop(kill=True, wait=False)
except AssertionError as e:
err = repr(e)
if "no RPC connection" in err:
self.log.debug(f"Stop DA retriever: no RPC connection")
else:
raise e

def retrieve_blob(self, info):
message = pb2.BlobRequest(batch_header_hash=info.blob_verification_proof.batch_metadata.batch_header_hash, blob_index=info.blob_verification_proof.blob_index)
return self.stub.RetrieveBlob(message)
24 changes: 11 additions & 13 deletions tests/da_test_framework/da_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,7 @@

sys.path.append("../0g-storage-kv/tests")

from test_framework.blockchain_node import TestNode, FailedToStartError
from utility.simple_rpc_proxy import SimpleRpcProxy
from test_framework.blockchain_node import TestNode


__file_path__ = os.path.dirname(os.path.realpath(__file__))
Expand All @@ -24,8 +23,6 @@ def __init__(
log,
):
local_conf = dict(log_config_file="log_config")
print(updated_config)
print(local_conf)
local_conf.update(updated_config)

data_dir = os.path.join(root_dir, "da_server")
Expand All @@ -40,13 +37,16 @@ def __init__(
log,
None,
)
self.args = [binary, "--disperser-server.grpc-port", "51001",
"--disperser-server.s3-bucket-name", "test-zgda-blobstore",
"--disperser-server.dynamodb-table-name", "test-BlobMetadata",
"--disperser-server.aws.region", "us-east-1",
"--disperser-server.aws.access-key-id", "localstack",
"--disperser-server.aws.secret-access-key", "localstack",
"--disperser-server.aws.endpoint-url", "http://0.0.0.0:4566"]
self.args = [
binary,
"--disperser-server.grpc-port", "51001",
"--disperser-server.s3-bucket-name", "test-zgda-blobstore",
"--disperser-server.dynamodb-table-name", "test-BlobMetadata",
"--disperser-server.aws.region", "us-east-1",
"--disperser-server.aws.access-key-id", "localstack",
"--disperser-server.aws.secret-access-key", "localstack",
"--disperser-server.aws.endpoint-url", "http://0.0.0.0:4566"
]

def wait_for_rpc_connection(self):
# TODO: health check of service availability
Expand All @@ -69,10 +69,8 @@ def disperse_blob(self, data):

def retrieve_blob(self, info):
message = pb2.RetrieveBlobRequest(batch_header_hash=info.blob_verification_proof.batch_metadata.batch_header_hash, blob_index=info.blob_verification_proof.blob_index)
self.log.info(f'retrieve blob {message}')
return self.stub.RetrieveBlob(message)

def get_blob_status(self, request_id):
message = pb2.BlobStatusRequest(request_id=request_id)
self.log.info(f'get blob status {message}')
return self.stub.GetBlobStatus(message)
19 changes: 19 additions & 0 deletions tests/da_test_framework/da_test_framework.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from da_test_framework.da_encoder import DAEncoder
from da_test_framework.da_batcher import DABatcher
from da_test_framework.da_server import DAServer
from da_test_framework.da_retriever import DARetriever

__file_path__ = os.path.dirname(os.path.realpath(__file__))
binary_ext = ".exe" if is_windows_platform() else ""
Expand Down Expand Up @@ -62,6 +63,9 @@ def __init__(
self.__default_da_server_binary__ = os.path.join(
tmp_dir, "da_server" + binary_ext
)
self.__default_da_retriever_binary__ = os.path.join(
tmp_dir, "da_retriever" + binary_ext
)

def add_arguments(self, parser: argparse.ArgumentParser):
super().add_arguments(parser)
Expand Down Expand Up @@ -92,12 +96,20 @@ def add_arguments(self, parser: argparse.ArgumentParser):
default=self.__default_da_server_binary__,
type=str,
)

parser.add_argument(
"--da-retriever-binary",
dest="da_retriever",
default=self.__default_da_retriever_binary__,
type=str,
)

def setup_nodes(self):
self.localstack_binary = self.options.localstack
self.da_encoder_binary = self.options.da_encoder
self.da_batcher_binary = self.options.da_batcher
self.da_server_binary = self.options.da_server
self.da_retriever_binary = self.options.da_retriever

self.build_binary()

Expand All @@ -109,6 +121,7 @@ def setup_nodes(self):
assert os.path.exists(self.da_encoder_binary), f"da encoder binary not found: {self.da_encoder_binary}"
assert os.path.exists(self.da_batcher_binary), f"da batcher binary not found: {self.da_batcher_binary}"
assert os.path.exists(self.da_server_binary), f"da server binary not found: {self.da_server_binary}"
assert os.path.exists(self.da_retriever_binary), f"da retriever binary not found: {self.da_retriever_binary}"

super().setup_nodes()
self.stream_ids = [to_stream_id(i) for i in range(MAX_STREAM_ID)]
Expand Down Expand Up @@ -142,6 +155,10 @@ def build_binary(self):
self.build_da_node(self.da_batcher_binary, os.path.join(da_disperser_cmd, "batcher"))
if not os.path.exists(self.da_server_binary):
self.build_da_node(self.da_server_binary, os.path.join(da_disperser_cmd, "apiserver"))

da_retriever_cmd = os.path.join(da_root, "retriever", "cmd")
if not os.path.exists(self.da_retriever_binary):
self.build_da_node(self.da_retriever_binary, da_retriever_cmd)

def build_zgs_node(self, zgs_node_path, zgs_cli_path):
zgs_root_path = os.path.join(__file_path__, "..", "..", "0g-storage-kv", "0g-storage-node")
Expand Down Expand Up @@ -204,6 +221,8 @@ def setup_da_nodes(self, stream_id):
updated_config['log_contract_address'] = self.contract.address()
self.setup_da_node(DABatcher, self.da_batcher_binary, updated_config)
self.setup_da_node(DAServer, self.da_server_binary)

self.setup_da_node(DARetriever, self.da_retriever_binary, updated_config)
self.log.info("All DA service started")

def setup_da_node(self, clazz, binary, updated_config={}):
Expand Down

0 comments on commit 2824603

Please sign in to comment.