-
Notifications
You must be signed in to change notification settings - Fork 31
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Replay marker #10
Comments
import asyncio mydb = mysql.connector.connect( mycursor = mydb.cursor()
class MyReplayMarkerStorage(): if name == "main": |
To be honest your code example looks really messy, even after applying correct indentation. So instead of editing your example, I created a basic BTW. I'm not sure but it seems like your example contains some authentication credentials. Please remove it if possible. import asyncio
from typing import Optional
import aiomysql # type: ignore
from aiosfstream import (
SalesforceStreamingClient,
ReplayMarker,
ReplayMarkerStorage,
)
MYSQL_HOST = "<mysql_host>"
MYSQL_PORT = 3306
MYSQL_USER = "<mysql_user>"
MYSQL_PASSWORD = "<mysql_password>"
MYSQL_DATABASE = "<mysql_database>"
SF_CONSUMER_KEY = "<salesforce_consumer_key>"
SF_CONSUMER_SECRET = "<salesforce_consumer_secret>"
SF_USERNAME = "<salesforce_username>"
SF_PASSWORD = "<salesforce_password>"
class MySQLReplayMarkerStorage(ReplayMarkerStorage):
"""Replay marker storage implementation for storing replay markers in
a MySQL table
"""
#: SQL table creation statement template
TABLE_CREATION_TEMPLATE = """
CREATE TABLE IF NOT EXISTS `{table_name}` (
`subscription` VARCHAR(255) NOT NULL,
`date` VARCHAR(32) NOT NULL,
`replay_id` INT NOT NULL,
CONSTRAINT `pk_replay` PRIMARY KEY (`subscription`)
)
"""
#: SQL statement template for setting the value of a replay marker
SET_REPLAY_MARKER_TEMPLATE = """
REPLACE INTO `{table_name}` (`subscription`, `date`, `replay_id`)
VALUES (%s, %s, %s)
"""
#: SQL statement template for getting the value of a replay marker
GET_REPLAY_MARKER_TEMPLATE = """
SELECT `date`, `replay_id`
FROM `{table_name}`
WHERE `subscription`=%s
"""
def __init__(self, connection_pool: aiomysql.Pool,
table_name: str = "replay") -> None:
"""
:param connection_pool: MySQL connection pool
:param table_name: Name of the table for storing replay markers
"""
super().__init__()
self.connection_pool = connection_pool
self.table_name = table_name
def render_sql(self, template: str) -> str:
"""Create an SQL statement from the given *template* by inserting the
correct table name
:param template: SQL statement template
:return: The rendered SQL statement
"""
return template.format(table_name=self.table_name)
async def ensure_table_exists(self) -> None:
"""Create the table for storing the replay markers if it doesn't
already exist
"""
async with self.connection_pool.acquire() as connection:
async with connection.cursor() as cursor:
sql = self.render_sql(self.TABLE_CREATION_TEMPLATE)
await cursor.execute(sql)
await connection.commit()
async def set_replay_marker(self, subscription: str,
replay_marker: ReplayMarker) -> None:
"""Store the *replay_marker* for the given *subscription*
:param subscription: Name of the subscribed channel
:param replay_marker: A replay marker
"""
async with self.connection_pool.acquire() as connection:
async with connection.cursor() as cursor:
sql = self.render_sql(self.SET_REPLAY_MARKER_TEMPLATE)
await cursor.execute(sql, (
subscription,
replay_marker.date,
replay_marker.replay_id
))
await connection.commit()
async def get_replay_marker(self, subscription: str) \
-> Optional[ReplayMarker]:
"""Retrieve a stored replay marker for the given *subscription*
:param subscription: Name of the subscribed channel
:return: A replay marker or ``None`` if there is nothing stored for \
the given *subscription*
"""
async with self.connection_pool.acquire() as connection:
async with connection.cursor() as cursor:
sql = self.render_sql(self.GET_REPLAY_MARKER_TEMPLATE)
await cursor.execute(sql, (subscription,))
result = await cursor.fetchone()
if result is None:
return None
return ReplayMarker(**result)
async def stream_events() -> None:
"""Stream event messages using the Streaming API"""
# create the MySQL connection pool
async with aiomysql.create_pool(
host=MYSQL_HOST,
port=MYSQL_PORT,
user=MYSQL_USER,
password=MYSQL_PASSWORD,
db=MYSQL_DATABASE,
cursorclass=aiomysql.DictCursor,
) as connection_pool:
# create the replay marker storage object
replay_storage = MySQLReplayMarkerStorage(connection_pool)
# make sure the required table exists
await replay_storage.ensure_table_exists()
# create the Streaming API client
async with SalesforceStreamingClient(
consumer_key=SF_CONSUMER_KEY,
consumer_secret=SF_CONSUMER_SECRET,
username=SF_USERNAME,
password=SF_PASSWORD,
replay=replay_storage,
) as client:
# subscribe to PushTopic or CDC or ...
await client.subscribe("/data/ChangeEvents")
# listen for incoming messages
async for message in client:
print(message)
if __name__ == "__main__":
loop = asyncio.get_event_loop()
loop.run_until_complete(stream_events()) |
Thanks a lot, the credentials are invalid hence no harm there |
@robertmrk I have modelled my code similar to above but it is closing the connection abruptly if it receives an existing replay id. Please see my issue here and an not sure what I am doing wrong there. Any input there will help me greatly. |
Hi i can't find out the return type of the Replay marker that needs to be passed in the Streaming client class, i have made a custom class to store the Replay id in SQL
When i retrieve it from SQL and try to pass that replay id in the Streaming client, i get an error saying it's not a valid type for the parameter
The text was updated successfully, but these errors were encountered: