Skip to content

Commit

Permalink
Merge pull request #2 from echenran/main
Browse files Browse the repository at this point in the history
Reformat directory structure w/ improved Python example
  • Loading branch information
echenran authored Aug 23, 2021
2 parents d0ac593 + fb8f269 commit 6d391af
Show file tree
Hide file tree
Showing 9 changed files with 458 additions and 268 deletions.
124 changes: 124 additions & 0 deletions InventoryAppExample/InventoryApp.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
"""
InventoryApp.py
This is a subscriber client that listens for Change Data Capture events for the
Opportunity object and publishes `/event/NewOrderConfirmation__e` events. In
the example, this file would be hosted somewhere outside of Salesforce. The `if
__debug__` conditionals are to slow down the speed of the app for demoing
purposes.
"""
import os, sys

dir_path = os.path.dirname(os.path.realpath(__file__))
parent_dir_path = os.path.abspath(os.path.join(dir_path, os.pardir))
sys.path.insert(0, parent_dir_path)

from datetime import datetime, timedelta
import logging

from PubSub import PubSub
import pubsub_api_pb2 as pb2
from utils.ClientUtil import command_line_input
import time

my_publish_topic = '/event/NewOrderConfirmation__e'
latest_replay_id = None


def make_publish_request(schema_id, record_id, obj):
"""
Creates a PublishRequest per the proto file.
"""
req = pb2.PublishRequest(
topic_name=my_publish_topic,
events=generate_producer_events(schema_id, record_id, obj))
return req


def generate_producer_events(schema_id, record_id, obj):
"""
Encodes the data to be sent in the event and creates a ProducerEvent per
the proto file.
"""
schema = obj.get_schema_json(schema_id)
dt = datetime.now() + timedelta(days=5)
payload = {
"CreatedDate": int(datetime.now().timestamp()),
"CreatedById": '005R0000000cw06IAA',
"OpptyRecordId__c": record_id,
"EstimatedDeliveryDate__c": int(dt.timestamp()),
"Weight__c": 58.2}
req = {
"id": "234",
"schema_id": schema_id,
"payload": obj.encode(schema, payload),
}
return [req]


def process_order(event, pubsub):
"""
This is a callback that gets passed to the `PubSub.subscribe()` method. It
decodes the payload of the received event and extracts the opportunity ID.
Next, it calls a helper function to publish the
`/event/NewOrderConfirmation__e` event. For simplicity, this sample uses an
estimated delivery date of five days from the current date. When no events
are received within a certain time period, the API's subscribe method sends
keepalive messages and the latest replay ID through this callback.
"""
if event.events:
payload_bytes = event.events[0].event.payload
schema_id = event.events[0].event.schema_id
decoded = pubsub.decode(pubsub.get_schema_json(schema_id),
payload_bytes)

# Do not process updates to EstimatedDeliveryDate__c field
delivery_date_field = '0x01000000'
if delivery_date_field in decoded['ChangeEventHeader']['changedFields']:
return

record_id = decoded['ChangeEventHeader']['recordIds'][0]

if __debug__:
time.sleep(10)
print("> Received new order! Processing order...")
if __debug__:
time.sleep(4)
print(" Done! Order replicated in inventory system.")
if __debug__:
time.sleep(2)
print("> Calculating estimated delivery date...")
if __debug__:
time.sleep(2)
print(" Done! Sending estimated delivery date back to Salesforce.")
if __debug__:
time.sleep(10)

topic_info = pubsub.get_topic(topic_name=my_publish_topic)

# Publish NewOrderConfirmation__e event
res = pubsub.stub.Publish(make_publish_request(topic_info.schema_id, record_id, pubsub),
metadata=pubsub.metadata)
if res.results[0].replay_id:
print("> Event published successfully.")
else:
print("> Failed publishing event.")
else:
print("[", time.strftime('%b %d, %Y %l:%M%p %Z'), "] The subscription is active.")

# The replay_id is used to resubscribe after this position in the stream if the client disconnects.
latest_replay_id = event.latest_replay_id


def run(argument_dict):
cdc_listener = PubSub(argument_dict)
cdc_listener.auth()

# Subscribe to Opportunity CDC events
cdc_listener.subscribe('/data/OpportunityChangeEvent', process_order)


if __name__ == '__main__':
argument_dict = command_line_input(sys.argv[1:])
logging.basicConfig()
run(argument_dict)
202 changes: 202 additions & 0 deletions InventoryAppExample/PubSub.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,202 @@
"""
PubSub.py
This file defines the class `PubSub`, which contains common functionality for
both publisher and subscriber clients.
"""

import io
import threading
import xml.etree.ElementTree as et
from datetime import datetime

import avro.io
import avro.schema
import certifi
import grpc
import requests

import pubsub_api_pb2 as pb2
import pubsub_api_pb2_grpc as pb2_grpc
from urllib.parse import urlparse
from utils.ClientUtil import load_properties

properties = load_properties("../resources/application.properties")

with open(certifi.where(), 'rb') as f:
secure_channel_credentials = grpc.ssl_channel_credentials(f.read())


def get_argument(key, argument_dict):
if key in argument_dict.keys():
return argument_dict[key]
else:
return properties.get(key)


class PubSub(object):
"""
Class with helpers to use the Salesforce Pub/Sub API.
"""
semaphore = threading.Semaphore(1)

def __init__(self, argument_dict):
self.url = get_argument('instance_url', argument_dict)
self.username = get_argument('username', argument_dict)
self.password = get_argument('password', argument_dict)
self.metadata = None
if get_argument('https', argument_dict) == 'true':
grpc_host = get_argument('grpcHost', argument_dict)
grpc_port = get_argument('grpcPort', argument_dict)
url = grpc_host + ":" + grpc_port
channel = grpc.secure_channel(url, secure_channel_credentials)
else:
channel = grpc.insecure_channel('localhost:7011')
self.stub = pb2_grpc.PubSubStub(channel)
self.session_id = None
self.pb2 = pb2
self.tenant_id = get_argument('tenant_id', argument_dict)
self.topic_name = get_argument('topic', argument_dict)

def auth(self):
"""
Sends a login request to the Salesforce SOAP API to retrieve a session
token. The session token is bundled with other identifying information
to create a tuple of metadata headers, which are needed for every RPC
call.
"""
url_suffix = '/services/Soap/u/43.0/'
headers = {'content-type': 'text/xml', 'SOAPAction': 'Login'}
xml = "<soapenv:Envelope xmlns:soapenv='http://schemas.xmlsoap.org/soap/envelope/' " + \
"xmlns:xsi='http://www.w3.org/2001/XMLSchema-instance' " + \
"xmlns:urn='urn:partner.soap.sforce.com'><soapenv:Body>" + \
"<urn:login><urn:username><![CDATA[" + self.username + \
"]]></urn:username><urn:password><![CDATA[" + self.password + \
"]]></urn:password></urn:login></soapenv:Body></soapenv:Envelope>"
res = requests.post(self.url + url_suffix, data=xml, headers=headers)
res_xml = et.fromstring(res.content.decode('utf-8'))[0][0][0]

try:
url_parts = urlparse(res_xml[3].text)
self.url = "{}://{}".format(url_parts.scheme, url_parts.netloc)
self.session_id = res_xml[4].text
except IndexError:
print("An exception occurred. Check the response XML below:",
res.__dict__)

# Set metadata headers
self.metadata = (('x-sfdc-api-session-token', self.session_id),
('x-sfdc-instance-url', self.url),
('x-sfdc-tenant-id', self.tenant_id))

def make_fetch_request(self, topic):
"""
Creates a FetchRequest per the proto file.
"""
return pb2.FetchRequest(
topic_name=topic,
replay_preset=pb2.LATEST,
num_requested=1)

def fetch_req_stream(self, topic):
"""
Returns a FetchRequest stream for the Subscribe RPC.
"""
while True:
self.semaphore.acquire()
yield self.make_fetch_request(topic)

def encode(self, schema, payload):
"""
Uses Avro and the event schema to encode a payload. The `encode()` and
`decode()` methods are helper functions to serialize and deserialize
the payloads of events that clients will publish and receive using
Avro. If you develop an implementation with a language other than
Python, you will need to find an Avro library in that language that
helps you encode and decode with Avro. When publishing an event, the
plaintext payload needs to be Avro-encoded with the event schema for
the API to accept it. When receiving an event, the Avro-encoded payload
needs to be Avro-decoded with the event schema for you to read it in
plaintext.
"""
schema = avro.schema.parse(schema)
buf = io.BytesIO()
encoder = avro.io.BinaryEncoder(buf)
writer = avro.io.DatumWriter(schema)
writer.write(payload, encoder)
return buf.getvalue()

def decode(self, schema, payload):
"""
Uses Avro and the event schema to decode a serialized payload. The
`encode()` and `decode()` methods are helper functions to serialize and
deserialize the payloads of events that clients will publish and
receive using Avro. If you develop an implementation with a language
other than Python, you will need to find an Avro library in that
language that helps you encode and decode with Avro. When publishing an
event, the plaintext payload needs to be Avro-encoded with the event
schema for the API to accept it. When receiving an event, the
Avro-encoded payload needs to be Avro-decoded with the event schema for
you to read it in plaintext.
"""
schema = avro.schema.parse(schema)
buf = io.BytesIO(payload)
decoder = avro.io.BinaryDecoder(buf)
reader = avro.io.DatumReader(schema)
ret = reader.read(decoder)
return ret

def get_topic(self, topic_name):
return self.stub.GetTopic(pb2.TopicRequest(topic_name=topic_name),
metadata=self.metadata)

def get_schema_json(self, schema_id):
"""
Uses GetSchema RPC to retrieve schema given a schema ID.
"""
res = self.stub.GetSchema(pb2.SchemaRequest(schema_id=schema_id),
metadata=self.metadata)
return res.schema_json

def generate_producer_events(self, schema, schema_id):
"""
Encodes the data to be sent in the event and creates a ProducerEvent per
the proto file.
"""
payload = {
"CreatedDate": int(datetime.now().timestamp()),
"CreatedById": '005R0000000cw06IAA', # Your user ID
"textt__c": 'text ==t Hello World'
}
req = {
"id": "261", # This can be anything
"schema_id": schema_id,
"payload": self.encode(schema, payload)
}
return [req]

def subscribe(self, topic, callback):
"""
Calls the Subscribe RPC defined in the proto file and accepts a
client-defined callback to handle any events that are returned by the
API. It uses a semaphore to prevent the Python client from closing the
connection prematurely (this is due to the way Python's GRPC library is
designed and may not be necessary for other languages--Java, for
example, does not need this).
"""
sub_stream = self.stub.Subscribe(self.fetch_req_stream(topic),
metadata=self.metadata)
for event in sub_stream:
self.semaphore.release()
callback(event, self)

def publish(self, topic_name, schema, schema_id):
"""
Publishes events to the specified Platform Event topic.
"""

return self.stub.Publish(self.pb2.PublishRequest(
topic_name=topic_name,
events=self.generate_producer_events(schema,
schema_id)),
metadata=self.metadata)
36 changes: 36 additions & 0 deletions InventoryAppExample/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
# Pub/Sub API Example - Inventory App

This example of the Pub/Sub API is meant to be a conceptual example only—the
code is not a template, not meant for copying and pasting, and not intended to
serve as anything other than a read-only learning resource. We encourage you to
read through the code and this README in order to understand the logic
underpinning the example, so that you can take the learnings and apply them to
your own implementations. Also note that the way this example is structured is
but one way to interact with the Pub/Sub API using Python. You are free to
mirror the structure in your own code, but it is far from the only way to
engage with the API.

The example imagines a scenario in which salespeople closing opportunities in
Salesforce need an "Estimated Delivery Date" field filled in by an integration
between Salesforce and an external inventory app. When an opportunity is closed
in Salesforce, a Change Data Capture event gets published by Salesforce. This
event gets consumed by an inventory app (`InventoryApp.py`) hosted in an
external system like AWS, which sets off the inventory process for the order,
like packaging, shipping, etc. Once the inventory app has calculated the
estimated delivery date for the order, it sends that information back to
Salesforce in the payload of a `NewOrderConfirmation` event. On the Salesforce
side, a subscriber client (`SalesforceListener.py`) receives the
`NewOrderConfirmation` event and uses the date contained in the payload to
update the very opportunity that just closed with its estimated delivery date.
In this scenario, this enables the salesperson who closed the deal to report
the estimated delivery date to their customer right away—the integration acts
so quickly that the salesperson can see the estimated delivery date almost
instantaneously after they close the opportunity.

A video demonstrating this app in action can be found on the
[TrailheaDX](https://www.salesforce.com/trailheadx) website. After
registering/logging in, go to [Product & Partner
Demos](https://www.salesforce.com/trailheadx) and click `Integrations &
Analytics` > `Platform APIs` to watch it.

The proto file for the API can be found [here](https://github.com/developerforce/pub-sub-api-pilot/blob/main/pubsub_api.proto).
Loading

0 comments on commit 6d391af

Please sign in to comment.