Skip to content

Unable to Receive Events from Salesforce Pub/Sub API using Dart and gRPC #38

Open
@josematute

Description

I'm trying to subscribe and listen to events from the Salesforce Pub/Sub API using Dart and gRPC. I've been able to successfully implement this in Python following the quick start guide, but I'm experiencing difficulties in Dart. Despite following a similar pattern, I'm not receiving any events in Dart while the Python script receives them in real time.

Here is the Dart code:

import 'package:grpc/grpc.dart';
    import 'package:pubsub_testing/src/generated/salesforceProtoFile.pbgrpc.dart' as pb_grpc;
    import "package:pubsub_testing/src/generated/salesforceProtoFile.pb.dart" as pb2;
    
    // Session ID, instance URL, and tenant ID are provided.
    
    final authMetadata = CallOptions(metadata: {
      'accesstoken': sessionId,
      'instanceurl': instanceUrl,
      'tenantid': tenantId,
    });

    final channel = ClientChannel(
      'api.pubsub.salesforce.com',
      port: 7443,
    );
    
    final stub = pb_grpc.PubSubClient(channel);
    
    Stream<pb2.FetchRequest> fetchReqStream(String topic) async* {
      while (true) {
        yield pb2.FetchRequest(
          topicName: topic,
          replayPreset: pb2.ReplayPreset.LATEST,
          numRequested: 100,
        );
      }
    }
    
    Future<void> subscribe(String mySubTopic) async {
      print('Subscribing to $mySubTopic');
      try {
        final substream =
            stub.subscribe(fetchReqStream(mySubTopic), options: authMetadata);
        print("substream: $substream");
    
        await for (var event in substream) {
          print("Got an event!\n");
          if (event.events.isNotEmpty) {
            print("Number of events received: ${event.events.length}");
            var payloadbytes = event.events[0].event.payload;
            var schemaid = event.events[0].event.schemaId;
    
            var schema = await stub.getSchema(pb2.SchemaRequest(schemaId: schemaid),
                options: authMetadata);
    
            print("Got an event!\n");
          } else {
            print("[${DateTime.now()}] The subscription is active.");
          }
        }
      } catch (e) {
        print('An error occurred during subscription: $e');
      }
    }

The code runs without throwing an error, and it seems to successfully subscribe because the terminal does not close, indicating that it's waiting for events. However, no events are being printed to the console, even though I know they are being published because my Python script is receiving them.

I've checked the permissions, and the session ID I'm using has the necessary permissions to subscribe to the topic.

Does anyone have experience with this type of situation or could suggest possible solutions? I would appreciate any insights or assistance.

The following is the Python implementation that works(Of course I removed sensible info). Also, I commented out some lines such that I made the code as minimal as possible so it would be easier to replicate in Dart. This file works and receives events in real time:

from __future__ import print_function
# import grpc
import requests
import threading
import io
import pubsub_api_pb2 as pb2
import pubsub_api_pb2_grpc as pb2_grpc
# import avro.schema
# import avro.io
import time
import certifi
import json

semaphore = threading.Semaphore(1)

latest_replay_id = None

# with open(certifi.where(), 'rb') as f:
#     creds = grpc.ssl_channel_credentials(f.read())

with grpc.secure_channel('api.pubsub.salesforce.com:7443', grpc.ssl_channel_credentials(None)) as channel:



    # Store Auth 
    sessionid = ''
    instanceurl = ''
    tenantid = ''
    authmetadata = (('accesstoken', sessionid),
    ('instanceurl', instanceurl),
    ('tenantid', tenantid))
    
    # Generate Stub
    stub = pb2_grpc.PubSubStub(channel)

    # Subscribe to event channel
    def fetchReqStream(topic):
        while True:
            semaphore.acquire()
            yield pb2.FetchRequest(
                topic_name = topic,
                replay_preset = pb2.ReplayPreset.LATEST,
                num_requested = 1)
    
    # Decode event message payload
    def decode(schema, payload):
        schema = avro.schema.parse(schema)
        buf = io.BytesIO(payload)
        decoder = avro.io.BinaryDecoder(buf)
        reader = avro.io.DatumReader(schema)
        ret = reader.read(decoder)
        return ret

    # Make the subscribe call
    mysubtopic = "/event/RS_L__ConversationEvent__e"
    print('Subscribing to ' + mysubtopic)
    substream = stub.Subscribe(fetchReqStream(mysubtopic),
            metadata=authmetadata   )
    for event in substream:
        if event.events:
            semaphore.release()
            print("Number of events received: ", len(event.events))
            payloadbytes = event.events[0].event.payload
            schemaid = event.events[0].event.schema_id
            schema = stub.GetSchema(
                    pb2.SchemaRequest(schema_id=schemaid),
                    metadata=authmetadata).schema_json
            decoded = decode(schema, payloadbytes)
            print("Got an event!", json.dumps(decoded), "\n")
            # print(f"payloadbytes: {payloadbytes}")
            # print(f"schemaid: {schemaid}")
        else:
            print("[", time.strftime('%b %d, %Y %l:%M%p %Z'),
            "] The subscription is active.")
        # latest_replay_id = event.latest_replay_id

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions