Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add Python client disclaimers #35

Merged
merged 2 commits into from
May 9, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,8 @@ $RECYCLE.BIN/
# Dependency directory for Go examples
go/vendor/

*/target/
*/target/

# Python compiled files
*.pyo
*.pyc
6 changes: 3 additions & 3 deletions python/InventoryAppExample/InventoryApp.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
from util.ChangeEventHeaderUtility import process_bitmap

my_publish_topic = '/event/NewOrderConfirmation__e'
latest_replay_id = None


def make_publish_request(schema_id, record_id, obj):
Expand Down Expand Up @@ -71,7 +70,7 @@ def process_order(event, pubsub):
# If all requested events are delivered, release the semaphore
# so that a new FetchRequest gets sent by `PubSub.fetch_req_stream()`.
if event.pending_num_requested == 0:
pubsub.semaphore.release()
pubsub.release_subscription_semaphore()

for evt in event.events:
payload_bytes = evt.event.payload
Expand Down Expand Up @@ -126,7 +125,8 @@ def process_order(event, pubsub):
print("[", time.strftime('%b %d, %Y %l:%M%p %Z'), "] The subscription is active.")

# The replay_id is used to resubscribe after this position in the stream if the client disconnects.
latest_replay_id = event.latest_replay_id
# Implement storage of replay for resubscribe!!!
event.latest_replay_id


def run(argument_dict):
Expand Down
37 changes: 28 additions & 9 deletions python/InventoryAppExample/PubSub.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
PubSub.py

This file defines the class `PubSub`, which contains common functionality for
both publisher and subscriber clients.
both publisher and subscriber clients.
"""

import io
Expand Down Expand Up @@ -38,7 +38,7 @@ class PubSub(object):
"""
Class with helpers to use the Salesforce Pub/Sub API.
"""
semaphore = threading.Semaphore(1)

json_schema_dict = {}

def __init__(self, argument_dict):
Expand All @@ -54,12 +54,23 @@ def __init__(self, argument_dict):
self.session_id = None
self.pb2 = pb2
self.topic_name = get_argument('topic', argument_dict)
# If the API version is not provided as an argument, use a default value
# If the API version is not provided as an argument, use a default value
if get_argument('apiVersion', argument_dict) == None:
self.apiVersion = '55.0'
self.apiVersion = '57.0'
else:
# Otherwise, get the version from the argument
self.apiVersion = get_argument('apiVersion', argument_dict)
"""
Semaphore used for subscriptions. This keeps the subscription stream open
to receive events and to notify when to send the next FetchRequest.
See Python Quick Start for more information.
https://developer.salesforce.com/docs/platform/pub-sub-api/guide/qs-python-quick-start.html
There is probably a better way to do this. This is only sample code. Please
use your own discretion when writing your production Pub/Sub API client.
Make sure to use only one semaphore per subscribe call if you are planning
to share the same instance of PubSub.
"""
self.semaphore = threading.Semaphore(1)

def auth(self):
"""
Expand All @@ -84,7 +95,7 @@ def auth(self):
self.url = "{}://{}".format(url_parts.scheme, url_parts.netloc)
self.session_id = res_xml[4].text
except IndexError:
print("An exception occurred. Check the response XML below:",
print("An exception occurred. Check the response XML below:",
res.__dict__)

# Get org ID from UserInfo
Expand All @@ -97,6 +108,12 @@ def auth(self):
('instanceurl', self.url),
('tenantid', self.tenant_id))

def release_subscription_semaphore(self):
"""
Release semaphore so FetchRequest can be sent
"""
self.semaphore.release()

def make_fetch_request(self, topic, replay_type, replay_id, num_requested):
"""
Creates a FetchRequest per the proto file.
Expand All @@ -122,7 +139,9 @@ def fetch_req_stream(self, topic, replay_type, replay_id, num_requested):
Returns a FetchRequest stream for the Subscribe RPC.
"""
while True:
# Only send FetchRequest when needed. Semaphore release indicates need for new FetchRequest
self.semaphore.acquire()
print("Sending Fetch Request")
yield self.make_fetch_request(topic, replay_type, replay_id, num_requested)

def encode(self, schema, payload):
Expand All @@ -136,7 +155,7 @@ def encode(self, schema, payload):
plaintext payload needs to be Avro-encoded with the event schema for
the API to accept it. When receiving an event, the Avro-encoded payload
needs to be Avro-decoded with the event schema for you to read it in
plaintext.
plaintext.
"""
schema = avro.schema.parse(schema)
buf = io.BytesIO()
Expand Down Expand Up @@ -183,7 +202,7 @@ def get_schema_json(self, schema_id):
def generate_producer_events(self, schema, schema_id):
"""
Encodes the data to be sent in the event and creates a ProducerEvent per
the proto file.
the proto file. Change the below payload to match the schema used.
"""
payload = {
"CreatedDate": int(datetime.now().timestamp()),
Expand All @@ -203,7 +222,7 @@ def subscribe(self, topic, replay_type, replay_id, num_requested, callback):
API. It uses a semaphore to prevent the Python client from closing the
connection prematurely (this is due to the way Python's GRPC library is
designed and may not be necessary for other languages--Java, for
example, does not need this).
example, does not need this).
"""
sub_stream = self.stub.Subscribe(self.fetch_req_stream(topic, replay_type, replay_id, num_requested), metadata=self.metadata)
print("> Subscribed to", topic)
Expand All @@ -219,4 +238,4 @@ def publish(self, topic_name, schema, schema_id):
topic_name=topic_name,
events=self.generate_producer_events(schema,
schema_id)),
metadata=self.metadata)
metadata=self.metadata)
11 changes: 5 additions & 6 deletions python/InventoryAppExample/SalesforceListener.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,6 @@
from PubSub import PubSub
from utils.ClientUtil import command_line_input

latest_replay_id = None


def process_confirmation(event, pubsub):
"""
Expand All @@ -38,13 +36,13 @@ def process_confirmation(event, pubsub):

if event.events:
print("Number of events received in FetchResponse: ", len(event.events))
# If all requested evetns are delivered, release the semaphore
# If all requested events are delivered, release the semaphore
# so that a new FetchRequest gets sent by `PubSub.fetch_req_stream()`.
if event.pending_num_requested == 0:
pubsub.semaphore.release()
pubsub.release_subscription_semaphore()

for evt in event.events:
# Get the event payload and shema, then decode the payload
# Get the event payload and schema, then decode the payload
payload_bytes = evt.event.payload
json_schema = pubsub.get_schema_json(evt.event.schema_id)
decoded_event = pubsub.decode(json_schema, payload_bytes)
Expand Down Expand Up @@ -73,7 +71,8 @@ def process_confirmation(event, pubsub):
print("[", time.strftime('%b %d, %Y %l:%M%p %Z'), "] The subscription is active.")

# The replay_id is used to resubscribe after this position in the stream if the client disconnects.
latest_replay_id = event.latest_replay_id
# Implement storage of replay for resubscribe!!!
event.latest_replay_id

def run(argument_dict):
sfdc_updater = PubSub(argument_dict)
Expand Down
8 changes: 8 additions & 0 deletions python/resources/application.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
username=
password=
url=
topic=
grpcHost=api.pubsub.salesforce.com
grpcPort=443
# API version is used in the REST request for record updates
apiVersion=55.0
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this be changed to apiVersion=57.0?

62 changes: 62 additions & 0 deletions python/utils/ClientUtil.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
import getopt


def load_properties(filepath, sep='=', comment_char='#'):
"""
Read the file passed as parameter as a properties file.
"""
props = {}
with open(filepath, "rt") as f:
for line in f:
l = line.strip()
if l and not l.startswith(comment_char):
key_value = l.split(sep)
key = key_value[0].strip()
value = sep.join(key_value[1:]).strip().strip('"')
props[key] = value

return props


def command_line_input(argument_list):
argument_dict = {}
options = "uplhotev"

# Long options
long_options = ["username=", "password=", "url=", "grpcHost=", "grpcPort=", "topic=", "tenantId=",
"apiVersion="]

try:
# Parsing argument
arguments, values = getopt.getopt(argument_list, options, long_options)

for currentArgument, currentValue in arguments:
if currentArgument in ("-u", "--username"):
argument_dict["username"] = currentValue

elif currentArgument in ("-p", "--password"):
argument_dict['password'] = currentValue

elif currentArgument in ("-l", "--url"):
argument_dict['url'] = currentValue

elif currentArgument in ("-h", "--grpcHost"):
argument_dict['grpcHost'] = currentValue

elif currentArgument in ("-o", "--grpcPort"):
argument_dict['grpcPort'] = currentValue

elif currentArgument in ("-t", "--topic"):
argument_dict['topic'] = currentValue

elif currentArgument in ("-e", "--tenantId"):
argument_dict['tenant_id'] = currentValue

elif currentArgument in ("-v", "--apiVersion"):
argument_dict['apiVersion'] = currentValue

return argument_dict

except getopt.error as err:
# output error, and return with an error code
print(str(err))
Empty file added python/utils/__init__.py
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this file be removed?

Empty file.