Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add subscribe account statuses endpoint #762

Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions access/grpc/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -337,3 +337,26 @@ func convertSubscribeOptions(opts ...access.SubscribeOption) *SubscribeConfig {
}
return subsConf
}

func (c *Client) SubscribeAccountStatusesFromStartHeight(
ctx context.Context,
startBlockHeight uint64,
filter flow.AccountStatusFilter,
) (<-chan flow.AccountStatus, <-chan error, error) {
return c.grpc.SubscribeAccountStatusesFromStartHeight(ctx, startBlockHeight, filter)
}

func (c *Client) SubscribeAccountStatusesFromStartBlockID(
ctx context.Context,
startBlockID flow.Identifier,
filter flow.AccountStatusFilter,
) (<-chan flow.AccountStatus, <-chan error, error) {
return c.grpc.SubscribeAccountStatusesFromStartBlockID(ctx, startBlockID, filter)
}

func (c *Client) SubscribeAccountStatusesFromLatestBlock(
ctx context.Context,
filter flow.AccountStatusFilter,
) (<-chan flow.AccountStatus, <-chan error, error) {
return c.grpc.SubscribeAccountStatusesFromLatestBlock(ctx, filter)
}
38 changes: 38 additions & 0 deletions access/grpc/convert/convert.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"fmt"
"time"

"github.com/onflow/flow/protobuf/go/flow/executiondata"
"google.golang.org/protobuf/types/known/timestamppb"

"github.com/onflow/cadence"
Expand Down Expand Up @@ -76,6 +77,43 @@ func MessageToAccount(m *entities.Account) (flow.Account, error) {
}, nil
}

func MessageToAccountStatus(m *executiondata.SubscribeAccountStatusesResponse) (flow.AccountStatus, error) {
if m == nil {
return flow.AccountStatus{}, ErrEmptyMessage
}

results, err := MessageToAccountStatusResults(m.GetResults())
if err != nil {
return flow.AccountStatus{}, fmt.Errorf("error converting results: %w", err)
}

return flow.AccountStatus{
BlockID: MessageToIdentifier(m.GetBlockId()),
BlockHeight: m.GetBlockHeight(),
MessageIndex: m.GetMessageIndex(),
Results: results,
}, nil
}

func MessageToAccountStatusResults(m []*executiondata.SubscribeAccountStatusesResponse_Result) ([]*flow.AccountStatusResult, error) {
results := make([]*flow.AccountStatusResult, len(m))
var emptyOptions []jsoncdc.Option

for i, r := range m {
events, err := MessagesToEvents(r.GetEvents(), emptyOptions)
if err != nil {
return nil, fmt.Errorf("error converting events: %w", err)
}

results[i] = &flow.AccountStatusResult{
Address: MessageToIdentifier(r.GetAddress()),
Events: events,
}
}

return results, nil
}

func AccountKeyToMessage(a *flow.AccountKey) *entities.AccountKey {
return &entities.AccountKey{
Index: uint32(a.Index),
Expand Down
135 changes: 135 additions & 0 deletions access/grpc/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -1155,3 +1155,138 @@ func (c *BaseClient) subscribeEvents(

return sub, errChan, nil
}

func (c *BaseClient) SubscribeAccountStatusesFromStartHeight(
ctx context.Context,
startHeight uint64,
filter flow.AccountStatusFilter,
opts ...grpc.CallOption,
) (<-chan flow.AccountStatus, <-chan error, error) {
request := &executiondata.SubscribeAccountStatusesFromStartHeightRequest{
StartBlockHeight: startHeight,
EventEncodingVersion: c.eventEncoding,
}
request.Filter = &executiondata.StatusFilter{
EventType: filter.EventTypes,
Address: filter.Addresses,
}

subscribeClient, err := c.executionDataClient.SubscribeAccountStatusesFromStartHeight(ctx, request, opts...)
if err != nil {
return nil, nil, newRPCError(err)
}

accountStatutesChan := make(chan flow.AccountStatus)
errChan := make(chan error)

go func() {
defer close(accountStatutesChan)
defer close(errChan)
receiveAccountStatusesFromClient(ctx, subscribeClient, accountStatutesChan, errChan)
}()

return accountStatutesChan, errChan, nil
}

func (c *BaseClient) SubscribeAccountStatusesFromStartBlockID(
ctx context.Context,
startBlockID flow.Identifier,
filter flow.AccountStatusFilter,
opts ...grpc.CallOption,
) (<-chan flow.AccountStatus, <-chan error, error) {
request := &executiondata.SubscribeAccountStatusesFromStartBlockIDRequest{
StartBlockId: startBlockID.Bytes(),
EventEncodingVersion: c.eventEncoding,
}
request.Filter = &executiondata.StatusFilter{
EventType: filter.EventTypes,
Address: filter.Addresses,
}

subscribeClient, err := c.executionDataClient.SubscribeAccountStatusesFromStartBlockID(ctx, request, opts...)
if err != nil {
return nil, nil, newRPCError(err)
}

accountStatutesChan := make(chan flow.AccountStatus)
errChan := make(chan error)

go func() {
defer close(accountStatutesChan)
defer close(errChan)
receiveAccountStatusesFromClient(ctx, subscribeClient, accountStatutesChan, errChan)
}()

return accountStatutesChan, errChan, nil
}

func (c *BaseClient) SubscribeAccountStatusesFromLatestBlock(
ctx context.Context,
filter flow.AccountStatusFilter,
opts ...grpc.CallOption,
) (<-chan flow.AccountStatus, <-chan error, error) {
request := &executiondata.SubscribeAccountStatusesFromLatestBlockRequest{
EventEncodingVersion: c.eventEncoding,
}
request.Filter = &executiondata.StatusFilter{
EventType: filter.EventTypes,
Address: filter.Addresses,
}

subscribeClient, err := c.executionDataClient.SubscribeAccountStatusesFromLatestBlock(ctx, request, opts...)
if err != nil {
return nil, nil, newRPCError(err)
}

accountStatutesChan := make(chan flow.AccountStatus)
errChan := make(chan error)

go func() {
defer close(accountStatutesChan)
defer close(errChan)
receiveAccountStatusesFromClient(ctx, subscribeClient, accountStatutesChan, errChan)
}()

return accountStatutesChan, errChan, nil
}

func receiveAccountStatusesFromClient[Client interface {
illia-malachyn marked this conversation as resolved.
Show resolved Hide resolved
Recv() (*executiondata.SubscribeAccountStatusesResponse, error)
}](
ctx context.Context,
client Client,
accountStatutesChan chan<- flow.AccountStatus,
errChan chan<- error,
) {
sendErr := func(err error) {
select {
case <-ctx.Done():
case errChan <- err:
}
}

for {
accountStatusResponse, err := client.Recv()
if err != nil {
if err == io.EOF {
// End of stream, return gracefully
return
}

sendErr(fmt.Errorf("error receiving account status: %w", err))
return
}

accountStatus, err := convert.MessageToAccountStatus(accountStatusResponse)
if err != nil {
sendErr(fmt.Errorf("error converting message to account status: %w", err))
return
}

illia-malachyn marked this conversation as resolved.
Show resolved Hide resolved
select {
case <-ctx.Done():
return
case accountStatutesChan <- accountStatus:
}
}
}
Loading
Loading