Skip to content

Commit

Permalink
revise readme
Browse files Browse the repository at this point in the history
  • Loading branch information
gj authored and gj committed Sep 8, 2018
1 parent ed5b0ca commit 8b1c59c
Show file tree
Hide file tree
Showing 6 changed files with 108 additions and 20 deletions.
21 changes: 11 additions & 10 deletions botflow/botframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,16 +83,17 @@ def make_bot(cls, i, o, f):


if not isinstance(f, typing.Callable):
if isinstance(f,(list,types.GeneratorType,range)):

tb = LoopBot(i, o, f)
bi = tb.make_botinfo()
return [bi]



else:
f = raw_value_wrap(f)
f = raw_value_wrap(f)
# if isinstance(f,(list,types.GeneratorType,range)):
#
# tb = LoopBot(i, o, f)
# bi = tb.make_botinfo()
# return [bi]
#
#
#
# else:
# f = raw_value_wrap(f)

if isinstance(f, route.Timer):
f.make_route_bot(i,o)
Expand Down
2 changes: 1 addition & 1 deletion botflow/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ def __init__(self):
self.main_lock._locked=True
self.check_stoping=True
self.default_queue_max_size=1000
self.backpressure_rate_limit=1000 #per sec
self.backpressure_rate_limit=0 #per sec


def __repr__(self):
Expand Down
11 changes: 5 additions & 6 deletions botflow/node.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,20 +136,19 @@ def __init__(self,speed):

async def __call__(self,data):
self.processed_count += 1
if self.processed_count%100==0:
if self.processed_count > self.speed_limit:
await self.lock.acquire()
end = datetime.datetime.now()
s=(end-self.start_time).total_seconds()
speed_now=self.processed_count/s
if speed_now>(self.speed_limit*1.1) :
sleep_time=self.processed_count/self.speed_limit-s
print(f"need to sleep{sleep_time}")
if sleep_time > 2:
sleep_time=2
self.start_time = datetime.datetime.now()
await asyncio.sleep(sleep_time)

else:
self.start_time = datetime.datetime.now()
self.processed_count=0
self.start_time=datetime.datetime.now()

self.lock.release()

return data
Expand Down
20 changes: 20 additions & 0 deletions botflow/queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ def __init__(self,maxsize=None,loop=None):
self.high_water = 0
self.qm.add(self)
self.put_count=0
self.get_count=0

self.start_time=datetime.datetime.now()
self.speed_limit=config.backpressure_rate_limit
Expand All @@ -51,6 +52,25 @@ def __init__(self,maxsize=None,loop=None):


async def readable(self):
while self.empty():
getter = self._loop.create_future()
self._getters.append(getter)
try:
await getter
except:
getter.cancel() # Just in case getter is not done yet.

try:
self._getters.remove(getter)
except ValueError:
pass

if not self.empty() and not getter.cancelled():
# We were woken up by put_nowait(), but can't take
# the call. Wake up the next in line.
self._wakeup_next(self._getters)
raise

pass
#TODO

Expand Down
5 changes: 2 additions & 3 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ def read(fname):
description='Data-driven and Reactive programming framework'
' ',
long_description=read("README.rst"),
version='0.1.9',
version='0.1.9.1',
url='https://github.com/kkyx/botflow',
author='Guojian Li',
author_email='guojianlee@gmail.com',
Expand All @@ -24,10 +24,9 @@ def read(fname):
'License :: OSI Approved :: BSD License',
'Programming Language :: Python :: 3'
],
packages=['databot', 'databot.ex'],
packages=['botflow', 'botflow.ex'],
install_requires=[
'aiohttp>=3.3.0',
'aiomysql>=0.0.19',
'graphviz'

],
Expand Down
69 changes: 69 additions & 0 deletions tests/test_backpressure.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
from botflow import *
from botflow.node import SpeedLimit
from botflow.config import config
import logging
logger=logging.getLogger("botflow.queue")
logger.setLevel(logging.DEBUG)
sum=0

def sum_up(i):

global sum
sum+=i
return i


def test_sum():
target=config.default_queue_max_size*3
Bot.reset()
Pipe(
range(target),
sum_up,
)


Bot.run()
print(sum)

Bot.debug_print()
to_sum=(0+target-1)*target/2
assert sum == to_sum


def test_speed():
limited_speed=20

import datetime
start=datetime.datetime.now()
count=0
speed_record=[]
def speed_rate(i):
nonlocal count,speed_record,start
count+=1
if count>=limited_speed:
end=datetime.datetime.now()
s=(end-start).total_seconds()
speed_record.append(count/s)
count=0
start=datetime.datetime.now()


Bot.reset()
Pipe(
range(limited_speed*11),
SpeedLimit(limited_speed),
speed_rate

)

Bot.run()
ok_count=0
for s in speed_record:
up=limited_speed*1.1
low=limited_speed *0.9
if s>low and s<up:
ok_count+=1
assert ok_count>=7

# assert s < up
# assert s > low

0 comments on commit 8b1c59c

Please sign in to comment.