Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add "at_time" source #274

Open
jdye64 opened this issue Sep 18, 2019 · 7 comments
Open

add "at_time" source #274

jdye64 opened this issue Sep 18, 2019 · 7 comments

Comments

@jdye64
Copy link
Contributor

jdye64 commented Sep 18, 2019

I have some optimized Kafka code that I need to run which consumes data from Kafka with C++ and then creates GPU DataFrames (also C++ code) without having to pass each message back to python which with my benchmarks I noted was rather time consuming to create the large number of PyObjects.

It occured to me that if there was a source that acted like a lite version of cron which would allow a user to say something like "trigger this stream every 10 seconds, 3 days, 4 months, Every odd Friday, etc" then my optimized code could be triggered in my map function at my desired interval.

I realize this could be done in the user application by simply calling "emit" at those desired intervals but it seemed much more clean to just offer a source that did the same.

I'm willing to open a PR with this one but wanted to get anyones feedback first.

@martindurant
Copy link
Member

I feel like there is functionality very close to this already, since there are stream nodes (e.g., timed_window) which do things on a timed basis. Would you be interested in implementing the "cron" source? Perhaps it should emit the time, or perhaps some custom payload.

@zjw0358
Copy link

zjw0358 commented Oct 16, 2019


@Stream.register_api(staticmethod)
class scheduler(Stream):
    """schedu stream.

    Examples:
    s = scheduler()
    s.add_job(name='hello',func = lambda x:'heoo',seconds=5,start_date='2019-04-03 09:25:00')
    s.get_jobs()

    

    s.add_job(func=lambda :print('yahoo'),seconds=5)

    Parameters:
    weeks (int) – number of weeks to wait
    days (int) – number of days to wait
    hours (int) – number of hours to wait
    minutes (int) – number of minutes to wait
    seconds (int) – number of seconds to wait
    start_date (datetime|str) – starting point for the interval calculation
    end_date (datetime|str) – latest possible date/time to trigger on
    timezone (datetime.tzinfo|str) – to use for the date/time calculations.
    jitter (int|None) – advance or delay  by jitter seconds at most.

    """

    def __init__(self, start=True, **kwargs):
        from apscheduler.schedulers.tornado import TornadoScheduler
        from apscheduler.executors.pool import ThreadPoolExecutor
        import pytz

        self._scheduler = TornadoScheduler(
            timezone=pytz.timezone('Asia/Shanghai'),
            executors={'default': ThreadPoolExecutor(20)},
            job_defaults={
                'coalesce': False,
                'max_instances': 1
            }
        )
        super(scheduler, self).__init__(ensure_io_loop=True, **kwargs)
        self.stopped = True
        if start:
            self.start()

    def start(self):
        if self.stopped:
            self.stopped = False
        self._scheduler.start()

    def stop(self):
        self._scheduler.stop()
        self.stopped = True

    def add_job(self, func, name=None, **kwargs):
        """增加任务.

        Example:
         i.add_job(name='hello',func = lambda x:'heoo',
         seconds=5,start_date='2019-04-03 09:25:00')
        """
        return self._scheduler.add_job(
            func=lambda: self._emit(func()),
            name=name,
            id=name,
            trigger='interval',
            **kwargs)

    def remove_job(self, name):
        return self._scheduler.remove_job(job_id=name)

    def get_jobs(self,):
        return self._scheduler.get_jobs()

@martindurant
Copy link
Member

@zjw0358 , care to explain this?

@zjw0358
Copy link

zjw0358 commented Oct 16, 2019

@zjw0358 , care to explain this?

keywords:
from apscheduler.schedulers.tornado import TornadoScheduler
self._scheduler = TornadoScheduler......

func=lambda: self._emit(func()),

@jdye64
Copy link
Contributor Author

jdye64 commented Dec 9, 2019

@zjw0358 So is this something that is already present in Tornado you are saying but we just aren't taking advantage of it in the scheduler? Am I understanding that correctly?

@miaobainian36
Copy link

@zjw0358
the code is what i need, just wonderful.

@martindurant
Copy link
Member

Does this code exist in a repo somewhere? streamz now supports registering nodes via entrypoints, so you can make your class more discoverable, but would be happy to mention it in the docs too.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants