Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replay marker #10

Closed
maulikagarwal800 opened this issue May 26, 2019 · 4 comments
Closed

Replay marker #10

maulikagarwal800 opened this issue May 26, 2019 · 4 comments
Assignees

Comments

@maulikagarwal800
Copy link

Hi i can't find out the return type of the Replay marker that needs to be passed in the Streaming client class, i have made a custom class to store the Replay id in SQL
When i retrieve it from SQL and try to pass that replay id in the Streaming client, i get an error saying it's not a valid type for the parameter

@maulikagarwal800
Copy link
Author

import asyncio
import json
import shelve
from azure.servicebus.control_client import ServiceBusService
from aiosfstream import SalesforceStreamingClient
from aiosfstream import ReplayMarker
import mysql.connector

mydb = mysql.connector.connect(
host="localhost",
user="root",
passwd="",
database="salesforce"
)

mycursor = mydb.cursor()

            # store *replay_marker* for the given *subscription*

    # retrieve the replay marker for the given *subscription*

class MyReplayMarkerStorage():
def set_replay_marker(self, subscription, replay_marker,date):
sql = "INSERT INTO replay (subscription, replaymarker,date) VALUES (%s, %s,%s)"
val = (subscription, replay_marker,date)
mycursor.execute(sql, val)
mydb.commit()
def get_replay_marker(self, subscription):
sql = "SELECT MAX(replaymarker),MAX(date1) FROM replay WHERE subscription = %s "
val = (subscription,)
mycursor.execute(sql, val)
myresult = mycursor.fetchall()
replay_marker = myresult[0][0]
date = myresult[0][1]
return replay_marker,date
object = MyReplayMarkerStorage()
async def stream_events():
replay,date = object.get_replay_marker("/topic/SalesOrderss")
marker = ReplayMarker(str(date) ,replay)
print(marker)
async with SalesforceStreamingClient(consumer_key="",replay = marker) as client:
await client.subscribe("/topic/SalesOrderss")
async for message in client:
topic = message["channel"]
data = message["data"]
date = message ["data"]["event"]["createdDate"]
print(date)
print(message)
event = data["event"]
replay = event["replayId"]
print(type(replay))
object.set_replay_marker(topic,replay,date)
json = data['sobject']
service_name = 'aligntechpoc'
key_name = 'rpolicy' # SharedAccessKeyName from Azure portal
key_value = '88PqRJLNpiAPao0UVyDDr9uhjexCaEZGrqCvV+nXh2I=' # SharedAccessKey from Azure portal
# sbs = ServiceBusService(service_name,shared_access_key_name=key_name,shared_access_key_value=key_value)
# sbs.send_event("aligntechrec",json)
print(message)

if name == "main":
loop = asyncio.get_event_loop()
loop.run_until_complete(stream_events())

@robertmrk robertmrk self-assigned this May 28, 2019
@robertmrk
Copy link
Owner

Hi @maulikagarwal800

To be honest your code example looks really messy, even after applying correct indentation. So instead of editing your example, I created a basic ReplayMarkerStorage implementation for you, for storing ReplayMarker objects in a MySQL database.
You can use it as-is, but I would recommend to extend it with some local caching, and flush ReplayMarker objects to the database periodically, or on shutdown. Or use some simpler key-value store instead.

BTW. I'm not sure but it seems like your example contains some authentication credentials. Please remove it if possible.

import asyncio
from typing import Optional

import aiomysql  # type: ignore
from aiosfstream import (
    SalesforceStreamingClient,
    ReplayMarker,
    ReplayMarkerStorage,
)


MYSQL_HOST = "<mysql_host>"
MYSQL_PORT = 3306
MYSQL_USER = "<mysql_user>"
MYSQL_PASSWORD = "<mysql_password>"
MYSQL_DATABASE = "<mysql_database>"

SF_CONSUMER_KEY = "<salesforce_consumer_key>"
SF_CONSUMER_SECRET = "<salesforce_consumer_secret>"
SF_USERNAME = "<salesforce_username>"
SF_PASSWORD = "<salesforce_password>"


class MySQLReplayMarkerStorage(ReplayMarkerStorage):
    """Replay marker storage implementation for storing replay markers in
    a MySQL table
    """
    #: SQL table creation statement template
    TABLE_CREATION_TEMPLATE = """
        CREATE TABLE IF NOT EXISTS `{table_name}` (
            `subscription` VARCHAR(255) NOT NULL,
            `date` VARCHAR(32) NOT NULL,
            `replay_id` INT NOT NULL,
            CONSTRAINT `pk_replay` PRIMARY KEY (`subscription`)
        )
    """
    #: SQL statement template for setting the value of a replay marker
    SET_REPLAY_MARKER_TEMPLATE = """
        REPLACE INTO `{table_name}` (`subscription`, `date`, `replay_id`)
        VALUES (%s, %s, %s)
    """
    #: SQL statement template for getting the value of a replay marker
    GET_REPLAY_MARKER_TEMPLATE = """
        SELECT `date`, `replay_id`
        FROM `{table_name}`
        WHERE `subscription`=%s
    """

    def __init__(self, connection_pool: aiomysql.Pool,
                 table_name: str = "replay") -> None:
        """
        :param connection_pool: MySQL connection pool
        :param table_name: Name of the table for storing replay markers
        """
        super().__init__()
        self.connection_pool = connection_pool
        self.table_name = table_name

    def render_sql(self, template: str) -> str:
        """Create an SQL statement from the given *template* by inserting the
        correct table name

        :param template: SQL statement template
        :return: The rendered SQL statement
        """
        return template.format(table_name=self.table_name)

    async def ensure_table_exists(self) -> None:
        """Create the table for storing the replay markers if it doesn't
        already exist
        """
        async with self.connection_pool.acquire() as connection:
            async with connection.cursor() as cursor:
                sql = self.render_sql(self.TABLE_CREATION_TEMPLATE)
                await cursor.execute(sql)
                await connection.commit()

    async def set_replay_marker(self, subscription: str,
                                replay_marker: ReplayMarker) -> None:
        """Store the *replay_marker* for the given *subscription*

        :param subscription: Name of the subscribed channel
        :param replay_marker: A replay marker
        """
        async with self.connection_pool.acquire() as connection:
            async with connection.cursor() as cursor:
                sql = self.render_sql(self.SET_REPLAY_MARKER_TEMPLATE)
                await cursor.execute(sql, (
                    subscription,
                    replay_marker.date,
                    replay_marker.replay_id
                ))
                await connection.commit()

    async def get_replay_marker(self, subscription: str) \
            -> Optional[ReplayMarker]:
        """Retrieve a stored replay marker for the given *subscription*

        :param subscription: Name of the subscribed channel
        :return: A replay marker or ``None`` if there is nothing stored for \
        the given *subscription*
        """
        async with self.connection_pool.acquire() as connection:
            async with connection.cursor() as cursor:
                sql = self.render_sql(self.GET_REPLAY_MARKER_TEMPLATE)
                await cursor.execute(sql, (subscription,))
                result = await cursor.fetchone()

                if result is None:
                    return None

                return ReplayMarker(**result)


async def stream_events() -> None:
    """Stream event messages using the Streaming API"""

    # create the MySQL connection pool
    async with aiomysql.create_pool(
        host=MYSQL_HOST,
        port=MYSQL_PORT,
        user=MYSQL_USER,
        password=MYSQL_PASSWORD,
        db=MYSQL_DATABASE,
        cursorclass=aiomysql.DictCursor,
    ) as connection_pool:

        # create the replay marker storage object
        replay_storage = MySQLReplayMarkerStorage(connection_pool)
        # make sure the required table exists
        await replay_storage.ensure_table_exists()

        # create the Streaming API client
        async with SalesforceStreamingClient(
            consumer_key=SF_CONSUMER_KEY,
            consumer_secret=SF_CONSUMER_SECRET,
            username=SF_USERNAME,
            password=SF_PASSWORD,
            replay=replay_storage,
        ) as client:

            # subscribe to PushTopic or CDC or ...
            await client.subscribe("/data/ChangeEvents")

            # listen for incoming messages
            async for message in client:
                print(message)


if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    loop.run_until_complete(stream_events())

@maulikagarwal800
Copy link
Author

Thanks a lot, the credentials are invalid hence no harm there

@welcomemat-services
Copy link

@robertmrk I have modelled my code similar to above but it is closing the connection abruptly if it receives an existing replay id. Please see my issue here and an not sure what I am doing wrong there. Any input there will help me greatly.

#19 (comment)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants