Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add subscribe blocks endpoints #758

Conversation

illia-malachyn
Copy link
Contributor

Part of #746

@franklywatson franklywatson changed the title Add subscribe blocks enpoinds Add subscribe blocks endpoints Sep 18, 2024
@codecov-commenter
Copy link

codecov-commenter commented Sep 19, 2024

Codecov Report

Attention: Patch coverage is 63.21839% with 32 lines in your changes missing coverage. Please review.

Project coverage is 53.71%. Comparing base (7b58582) to head (ec522cd).
Report is 1 commits behind head on master.

Files with missing lines Patch % Lines
access/grpc/grpc.go 75.34% 11 Missing and 7 partials ⚠️
access/grpc/convert/convert.go 0.00% 8 Missing ⚠️
access/grpc/client.go 0.00% 6 Missing ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##           master     #758      +/-   ##
==========================================
+ Coverage   52.92%   53.71%   +0.79%     
==========================================
  Files          35       35              
  Lines        5580     5667      +87     
==========================================
+ Hits         2953     3044      +91     
+ Misses       2231     2205      -26     
- Partials      396      418      +22     
Flag Coverage Δ
unittests 53.71% <63.21%> (+0.79%) ⬆️

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

Copy link
Contributor

@janezpodhostnik janezpodhostnik left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a lot of repeated boilerplate happening here.

How about using a generic method like this

func genericSubscribe[Receive, Response any](
	ctx context.Context,
	receive func() (Receive, error),
	convert func(Receive) (Response, error),
	topicNameForErrors string,
) (<-chan Response, <-chan error) {
	sub := make(chan Response)
	errChan := make(chan error)

	sendErr := func(err error) {
		select {
		case <-ctx.Done():
		case errChan <- err:
		}
	}

	go func() {
		defer close(sub)
		defer close(errChan)

		for {
			resp, err := receive()
			if err != nil {
				if err == io.EOF {
					return
				}

				sendErr(fmt.Errorf("error receiving %s: %w", topicNameForErrors, err))
				return
			}

			response, err := convert(resp)
			if err != nil {
				sendErr(fmt.Errorf("error converting %s: %w", topicNameForErrors, err))
				return
			}

			select {
			case <-ctx.Done():
				return
			case sub <- response:
			}
		}
	}()

	return sub, errChan
}

Using it would look like this in the case of subscribeExecutionData:

func (c *BaseClient) subscribeExecutionData(
	ctx context.Context,
	req *executiondata.SubscribeExecutionDataRequest,
	opts ...grpc.CallOption,
) (<-chan flow.ExecutionDataStreamResponse, <-chan error, error) {
	stream, err := c.executionDataClient.SubscribeExecutionData(ctx, req, opts...)
	if err != nil {
		return nil, nil, err
	}

	sub, errChan := genericSubscribe(
		ctx,
		stream.Recv,
		func(resp *executiondata.SubscribeExecutionDataResponse) (flow.ExecutionDataStreamResponse, error) {
			execData, err := convert.MessageToBlockExecutionData(resp.GetBlockExecutionData())
			if err != nil {
				return flow.ExecutionDataStreamResponse{}, fmt.Errorf("error converting execution data for block %d: %w", resp.GetBlockHeight(), err)
			}

			response := flow.ExecutionDataStreamResponse{
				Height:         resp.BlockHeight,
				ExecutionData:  execData,
				BlockTimestamp: resp.BlockTimestamp.AsTime(),
			}
			return response, nil
		},
		"execution data",
	)

	return sub, errChan, nil
}

Another question I have is how do we feel about changing the return type to (<-chan flow.ExecutionDataStreamResponse, <-chan error) from (<-chan flow.ExecutionDataStreamResponse, <-chan error, error) by simply sending that first error on the error channel. This might be easier to handle.

cc: @peterargue

@illia-malachyn
Copy link
Contributor Author

@janezpodhostnik Good catch! I also noticed it and created a separate issue for this. I want to unify all code duplication at one place after we merge them in

@illia-malachyn
Copy link
Contributor Author

illia-malachyn commented Sep 25, 2024

There's a number of different issues that share similar code/functionality, so we better "deduplicate" it after we merge all of them.
What do you think about it?

@franklywatson
Copy link
Contributor

@illia-malachyn sounds good thanks

access/grpc/grpc.go Outdated Show resolved Hide resolved
@peterargue
Copy link
Contributor

peterargue commented Sep 27, 2024

@illia-malachyn can you merge master.

@franklywatson franklywatson merged commit bd1d074 into onflow:master Oct 1, 2024
1 check passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants