Unable to Receive Events from Salesforce Pub/Sub API using Dart and gRPC #38
Description
I'm trying to subscribe and listen to events from the Salesforce Pub/Sub API using Dart and gRPC. I've been able to successfully implement this in Python following the quick start guide, but I'm experiencing difficulties in Dart. Despite following a similar pattern, I'm not receiving any events in Dart while the Python script receives them in real time.
Here is the Dart code:
import 'package:grpc/grpc.dart';
import 'package:pubsub_testing/src/generated/salesforceProtoFile.pbgrpc.dart' as pb_grpc;
import "package:pubsub_testing/src/generated/salesforceProtoFile.pb.dart" as pb2;
// Session ID, instance URL, and tenant ID are provided.
final authMetadata = CallOptions(metadata: {
'accesstoken': sessionId,
'instanceurl': instanceUrl,
'tenantid': tenantId,
});
final channel = ClientChannel(
'api.pubsub.salesforce.com',
port: 7443,
);
final stub = pb_grpc.PubSubClient(channel);
Stream<pb2.FetchRequest> fetchReqStream(String topic) async* {
while (true) {
yield pb2.FetchRequest(
topicName: topic,
replayPreset: pb2.ReplayPreset.LATEST,
numRequested: 100,
);
}
}
Future<void> subscribe(String mySubTopic) async {
print('Subscribing to $mySubTopic');
try {
final substream =
stub.subscribe(fetchReqStream(mySubTopic), options: authMetadata);
print("substream: $substream");
await for (var event in substream) {
print("Got an event!\n");
if (event.events.isNotEmpty) {
print("Number of events received: ${event.events.length}");
var payloadbytes = event.events[0].event.payload;
var schemaid = event.events[0].event.schemaId;
var schema = await stub.getSchema(pb2.SchemaRequest(schemaId: schemaid),
options: authMetadata);
print("Got an event!\n");
} else {
print("[${DateTime.now()}] The subscription is active.");
}
}
} catch (e) {
print('An error occurred during subscription: $e');
}
}
The code runs without throwing an error, and it seems to successfully subscribe because the terminal does not close, indicating that it's waiting for events. However, no events are being printed to the console, even though I know they are being published because my Python script is receiving them.
I've checked the permissions, and the session ID I'm using has the necessary permissions to subscribe to the topic.
Does anyone have experience with this type of situation or could suggest possible solutions? I would appreciate any insights or assistance.
The following is the Python implementation that works(Of course I removed sensible info). Also, I commented out some lines such that I made the code as minimal as possible so it would be easier to replicate in Dart. This file works and receives events in real time:
from __future__ import print_function
# import grpc
import requests
import threading
import io
import pubsub_api_pb2 as pb2
import pubsub_api_pb2_grpc as pb2_grpc
# import avro.schema
# import avro.io
import time
import certifi
import json
semaphore = threading.Semaphore(1)
latest_replay_id = None
# with open(certifi.where(), 'rb') as f:
# creds = grpc.ssl_channel_credentials(f.read())
with grpc.secure_channel('api.pubsub.salesforce.com:7443', grpc.ssl_channel_credentials(None)) as channel:
# Store Auth
sessionid = ''
instanceurl = ''
tenantid = ''
authmetadata = (('accesstoken', sessionid),
('instanceurl', instanceurl),
('tenantid', tenantid))
# Generate Stub
stub = pb2_grpc.PubSubStub(channel)
# Subscribe to event channel
def fetchReqStream(topic):
while True:
semaphore.acquire()
yield pb2.FetchRequest(
topic_name = topic,
replay_preset = pb2.ReplayPreset.LATEST,
num_requested = 1)
# Decode event message payload
def decode(schema, payload):
schema = avro.schema.parse(schema)
buf = io.BytesIO(payload)
decoder = avro.io.BinaryDecoder(buf)
reader = avro.io.DatumReader(schema)
ret = reader.read(decoder)
return ret
# Make the subscribe call
mysubtopic = "/event/RS_L__ConversationEvent__e"
print('Subscribing to ' + mysubtopic)
substream = stub.Subscribe(fetchReqStream(mysubtopic),
metadata=authmetadata )
for event in substream:
if event.events:
semaphore.release()
print("Number of events received: ", len(event.events))
payloadbytes = event.events[0].event.payload
schemaid = event.events[0].event.schema_id
schema = stub.GetSchema(
pb2.SchemaRequest(schema_id=schemaid),
metadata=authmetadata).schema_json
decoded = decode(schema, payloadbytes)
print("Got an event!", json.dumps(decoded), "\n")
# print(f"payloadbytes: {payloadbytes}")
# print(f"schemaid: {schemaid}")
else:
print("[", time.strftime('%b %d, %Y %l:%M%p %Z'),
"] The subscription is active.")
# latest_replay_id = event.latest_replay_id