Skip to content

Commit

Permalink
Merge pull request #35 from developerforce/jclinn/addPythonDisclaimers
Browse files Browse the repository at this point in the history
add Python client disclaimers
  • Loading branch information
judy-lin authored May 9, 2023
2 parents 5eada80 + 0a81dda commit 1e6fd65
Show file tree
Hide file tree
Showing 5 changed files with 43 additions and 19 deletions.
6 changes: 5 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,8 @@ $RECYCLE.BIN/
# Dependency directory for Go examples
go/vendor/

*/target/
*/target/

# Python compiled files
*.pyo
*.pyc
6 changes: 3 additions & 3 deletions python/InventoryAppExample/InventoryApp.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
from util.ChangeEventHeaderUtility import process_bitmap

my_publish_topic = '/event/NewOrderConfirmation__e'
latest_replay_id = None


def make_publish_request(schema_id, record_id, obj):
Expand Down Expand Up @@ -71,7 +70,7 @@ def process_order(event, pubsub):
# If all requested events are delivered, release the semaphore
# so that a new FetchRequest gets sent by `PubSub.fetch_req_stream()`.
if event.pending_num_requested == 0:
pubsub.semaphore.release()
pubsub.release_subscription_semaphore()

for evt in event.events:
payload_bytes = evt.event.payload
Expand Down Expand Up @@ -126,7 +125,8 @@ def process_order(event, pubsub):
print("[", time.strftime('%b %d, %Y %l:%M%p %Z'), "] The subscription is active.")

# The replay_id is used to resubscribe after this position in the stream if the client disconnects.
latest_replay_id = event.latest_replay_id
# Implement storage of replay for resubscribe!!!
event.latest_replay_id


def run(argument_dict):
Expand Down
37 changes: 28 additions & 9 deletions python/InventoryAppExample/PubSub.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
PubSub.py
This file defines the class `PubSub`, which contains common functionality for
both publisher and subscriber clients.
both publisher and subscriber clients.
"""

import io
Expand Down Expand Up @@ -38,7 +38,7 @@ class PubSub(object):
"""
Class with helpers to use the Salesforce Pub/Sub API.
"""
semaphore = threading.Semaphore(1)

json_schema_dict = {}

def __init__(self, argument_dict):
Expand All @@ -54,12 +54,23 @@ def __init__(self, argument_dict):
self.session_id = None
self.pb2 = pb2
self.topic_name = get_argument('topic', argument_dict)
# If the API version is not provided as an argument, use a default value
# If the API version is not provided as an argument, use a default value
if get_argument('apiVersion', argument_dict) == None:
self.apiVersion = '55.0'
self.apiVersion = '57.0'
else:
# Otherwise, get the version from the argument
self.apiVersion = get_argument('apiVersion', argument_dict)
"""
Semaphore used for subscriptions. This keeps the subscription stream open
to receive events and to notify when to send the next FetchRequest.
See Python Quick Start for more information.
https://developer.salesforce.com/docs/platform/pub-sub-api/guide/qs-python-quick-start.html
There is probably a better way to do this. This is only sample code. Please
use your own discretion when writing your production Pub/Sub API client.
Make sure to use only one semaphore per subscribe call if you are planning
to share the same instance of PubSub.
"""
self.semaphore = threading.Semaphore(1)

def auth(self):
"""
Expand All @@ -84,7 +95,7 @@ def auth(self):
self.url = "{}://{}".format(url_parts.scheme, url_parts.netloc)
self.session_id = res_xml[4].text
except IndexError:
print("An exception occurred. Check the response XML below:",
print("An exception occurred. Check the response XML below:",
res.__dict__)

# Get org ID from UserInfo
Expand All @@ -97,6 +108,12 @@ def auth(self):
('instanceurl', self.url),
('tenantid', self.tenant_id))

def release_subscription_semaphore(self):
"""
Release semaphore so FetchRequest can be sent
"""
self.semaphore.release()

def make_fetch_request(self, topic, replay_type, replay_id, num_requested):
"""
Creates a FetchRequest per the proto file.
Expand All @@ -122,7 +139,9 @@ def fetch_req_stream(self, topic, replay_type, replay_id, num_requested):
Returns a FetchRequest stream for the Subscribe RPC.
"""
while True:
# Only send FetchRequest when needed. Semaphore release indicates need for new FetchRequest
self.semaphore.acquire()
print("Sending Fetch Request")
yield self.make_fetch_request(topic, replay_type, replay_id, num_requested)

def encode(self, schema, payload):
Expand All @@ -136,7 +155,7 @@ def encode(self, schema, payload):
plaintext payload needs to be Avro-encoded with the event schema for
the API to accept it. When receiving an event, the Avro-encoded payload
needs to be Avro-decoded with the event schema for you to read it in
plaintext.
plaintext.
"""
schema = avro.schema.parse(schema)
buf = io.BytesIO()
Expand Down Expand Up @@ -183,7 +202,7 @@ def get_schema_json(self, schema_id):
def generate_producer_events(self, schema, schema_id):
"""
Encodes the data to be sent in the event and creates a ProducerEvent per
the proto file.
the proto file. Change the below payload to match the schema used.
"""
payload = {
"CreatedDate": int(datetime.now().timestamp()),
Expand All @@ -203,7 +222,7 @@ def subscribe(self, topic, replay_type, replay_id, num_requested, callback):
API. It uses a semaphore to prevent the Python client from closing the
connection prematurely (this is due to the way Python's GRPC library is
designed and may not be necessary for other languages--Java, for
example, does not need this).
example, does not need this).
"""
sub_stream = self.stub.Subscribe(self.fetch_req_stream(topic, replay_type, replay_id, num_requested), metadata=self.metadata)
print("> Subscribed to", topic)
Expand All @@ -219,4 +238,4 @@ def publish(self, topic_name, schema, schema_id):
topic_name=topic_name,
events=self.generate_producer_events(schema,
schema_id)),
metadata=self.metadata)
metadata=self.metadata)
2 changes: 2 additions & 0 deletions python/InventoryAppExample/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,3 +36,5 @@ Analytics` > `Platform APIs` to watch it.
The proto file for the API can be found [here](https://github.com/developerforce/pub-sub-api/blob/main/pubsub_api.proto).

This example uses Python features that require Python version 3.10 or later, such as the `match` statement.

To build a working client example in Python please follow [the Python Quick Start Guide.](https://developer.salesforce.com/docs/platform/pub-sub-api/guide/qs-python-quick-start.html)
11 changes: 5 additions & 6 deletions python/InventoryAppExample/SalesforceListener.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,6 @@
from PubSub import PubSub
from utils.ClientUtil import command_line_input

latest_replay_id = None


def process_confirmation(event, pubsub):
"""
Expand All @@ -38,13 +36,13 @@ def process_confirmation(event, pubsub):

if event.events:
print("Number of events received in FetchResponse: ", len(event.events))
# If all requested evetns are delivered, release the semaphore
# If all requested events are delivered, release the semaphore
# so that a new FetchRequest gets sent by `PubSub.fetch_req_stream()`.
if event.pending_num_requested == 0:
pubsub.semaphore.release()
pubsub.release_subscription_semaphore()

for evt in event.events:
# Get the event payload and shema, then decode the payload
# Get the event payload and schema, then decode the payload
payload_bytes = evt.event.payload
json_schema = pubsub.get_schema_json(evt.event.schema_id)
decoded_event = pubsub.decode(json_schema, payload_bytes)
Expand Down Expand Up @@ -73,7 +71,8 @@ def process_confirmation(event, pubsub):
print("[", time.strftime('%b %d, %Y %l:%M%p %Z'), "] The subscription is active.")

# The replay_id is used to resubscribe after this position in the stream if the client disconnects.
latest_replay_id = event.latest_replay_id
# Implement storage of replay for resubscribe!!!
event.latest_replay_id

def run(argument_dict):
sfdc_updater = PubSub(argument_dict)
Expand Down

0 comments on commit 1e6fd65

Please sign in to comment.