Skip to content

Commit

Permalink
Merge pull request #28 from kkyon/0.2.0
Browse files Browse the repository at this point in the history
0.2.0
  • Loading branch information
kkyon authored Sep 14, 2018
2 parents 96172c0 + 9609087 commit d959939
Show file tree
Hide file tree
Showing 39 changed files with 1,106 additions and 856 deletions.
69 changes: 57 additions & 12 deletions README.rst
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
=======
Botflow
=======
0.2.0 alpha


* Data-driven programming framework
* Paralleled in coroutines and ThreadPool
* Type- and content-based route function
* Interactive programming with Jupyter Notebook


Installing
Expand Down Expand Up @@ -71,6 +74,20 @@ _Load the price of Bitcoin every 2 seconds. Advantage price aggregator sample ca
Bot.run()
main()
**Or write in chain style**


.. code-block:: python
from botflow import *
p_cd_bitcoin=Pipe().Timer(delay=2).Loop("http://api.coindesk.com/v1/bpi/currentprice.json")\
.HttpLoader().Map(lambda r: r.json['bpi']['USD']['rate_float']).Map(print)
p_cd_bitcoin.run()
- **Http server Support OOB** Publish your data pipe to public quickly.

Expand Down Expand Up @@ -127,7 +144,28 @@ Botflow will replay the data from nearest completed node, usually step N-1.
It will save a lot of time in the development phase.

Release
=======

:**0.2.0**: Milestone release.:

# Jupyter support. Able to run inside Jupyter note book.

# pipe can be nest in another Pipe.


p1=Pipe(get_image)
p2=Pipe(get_price)
p_get_all=Pipe(Zip(p1,p2)).Filter

# Support Chain style pipe line creating.

Pipe(range(1,10)).Map(lambda x:x+1).Fiter(lambda x:x>2)

same as :

Pipe(range(1,10),lambda x:x+1,Filter(lambda x:x>2))



:**0.1.9**: Major change see below .:

# Backpressure rate limit support
Expand Down Expand Up @@ -166,30 +204,37 @@ Data-driven programming is typically applied to streams of structured data for f

Botflow has a few basic concepts to implement Data-driven programming .

- **Pipe**
It is the main stream process of the program. All units will work inside.
- **Node**
It is callable unit.Any callable function and object can work as Node. It is driven by data. Custom functions work as Nodes.
- **Source**
It is feed stream data to the pipe.

* **Timer**: It will send a message in the pipe by timer param. **delay**, **max_time** **until** some finished
* **Pipe.run**: you can use Pipe.run to trigger the data into pipe. By default it will feed int **0**



- **Function**
It is callable unit.Any callable function and object can work as Node. It is driven by data. Custom functions work as Map unit.
There are some built-in nodes:


* **Timer**: It will send a message in the pipe by timer param. **delay**, **max_time** **until** some finished
* **HttpLoader**: Get a url and return the HTTP response

* **Fetch**: (Alias:HttpLoader) Get a url and return the HTTP response
* **AioFile**: for file I/O.
* **SpeedLimit**: limit the stream speed limit
* **Delay**: delay in special second.
* **Zip** : Wait for all branched to finish and merged the result into a tuple.
* **Map** : Work ad Convert unit.
* **Filter** : Drop data from pipe if it does not match some condition
* **Flat** : Drop data from pipe if it does not match some condition


- **Route**
It will be used to create a complex data flow network, not just one main process. Botflow can nest Routes inside Routes.
It is a powerful concept.
There are some pre built-in Route:
* **Branch** : Duplicate data from parent pipe to a branch.
* **Line** : Extend the length of route.
* **Join** : Duplicate data to many branches, and return result to pipe.
* **LinkTo**: Route flow to any Node or Route for making loop , circle
* **Pipe**: It is the main stream process of the program. All units will work inside.
* **Tee** : (Alias:Branch) Duplicate data from parent pipe to a child pipe as branch.
* **Zip** : Combine multi pipes result to list.
* **Link**: (Alias: LinkTo) Route flow to any Node or Route for making loop , circle


All units (Pipe, Node, Route) communicate via queues and perform parallel computation in coroutines.
Expand Down
7 changes: 4 additions & 3 deletions botflow/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from .route import Pipe,Timer,Branch,Join,Return,SendTo,Line,Link
from .node import Node,Zip,Filter,Delay,SpeedLimit
from .route import Timer,Branch,Join,Link,Zip
from .pipe import Pipe
from .function import Function,Filter,Delay,SpeedLimit,Flat,ToText
from . import route
from botflow.ex.aiofile import AioFile
from .bdata import Bdata,Databoard
Expand All @@ -11,5 +12,5 @@

__all__ = ["Pipe","Timer","Branch","Join","Zip","HttpRequest",
"HttpLoader", "AioFile", "route",
"Bdata","HttpServer","BotFlow","SendTo","Bot","Delay","SpeedLimit","Line","Link"]
"Bdata","HttpServer","BotFlow","Bot","Delay","SpeedLimit","Link","Function","Flat","ToText"]

9 changes: 8 additions & 1 deletion botflow/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,4 +76,11 @@ def flatten(d):


class BotExit(Exception):
pass
pass


_BOT_LOOP=asyncio.new_event_loop()

def get_loop():
# _BOT_LOOP.set_debug(enabled=True)
return _BOT_LOOP
6 changes: 3 additions & 3 deletions botflow/bdata.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from .base import CountRef,Singleton
from .base import CountRef,Singleton,get_loop

import asyncio
import logging
Expand Down Expand Up @@ -133,7 +133,7 @@ def create_future(self,ori,callback):
if ori == 0:
raise Exception("can't wait for 0 input")
return
future = asyncio.get_event_loop().create_future()
future = get_loop().create_future()
future.add_done_callback(callback)
self._futures[ori] = future
return future
Expand All @@ -142,7 +142,7 @@ async def wait_ori(self, ori):
if ori == 0 or ori in self._futures:
raise Exception("can't wait for 0 input")
return
future = asyncio.get_event_loop().create_future()
future = get_loop().create_future()
self._futures[ori] = future

return await future
Expand Down
24 changes: 15 additions & 9 deletions botflow/bot.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
import asyncio
import logging
from .nodebase import Node
from .functionbase import Function
from . import function
from .bdata import Bdata
from .config import config
import typing,types
from .botbase import BotBase,BotManager,BotInfo,filter_out
from .base import BotExit,flatten
from .base import BotExit,flatten,get_loop

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -33,7 +34,7 @@ def __init__(self,input_q,output_q,func):

async def pre_hook(self):

if isinstance(self.func, Node):
if isinstance(self.func, Function):
await self.func.node_init()

self.raw_bdata = self.func.raw_bdata
Expand All @@ -42,7 +43,7 @@ async def pre_hook(self):
self.raw_bdata = False

async def post_hook(self):
if isinstance(self.func, Node):
if isinstance(self.func, Function):
await self.func.node_close()

async def sync_to_async(self, f, data):
Expand All @@ -64,13 +65,13 @@ async def call_wrap_r(self,func, bdata):

try:

if isinstance(func, Node) and func.raw_bdata:
if isinstance(func, Function) and func.raw_bdata:
param = bdata
else:
param = bdata.data

if hasattr(func, 'boost_type'):
loop = asyncio.get_event_loop()
loop = get_loop()
result = await loop.run_in_executor(None, func, param)


Expand Down Expand Up @@ -121,15 +122,20 @@ async def append_q(self,call_wrap_r,func,bdata,q):
all_none=False
if all_none == False:
self.produced_count += 1
await q.put(Bdata(r, bdata.ori))
if len(r) ==1:
await q.put(Bdata(r[0], bdata.ori))
else:
await q.put(Bdata(r, bdata.ori))

elif isinstance(r,typing.Generator):
for i in r:
self.produced_count+=1
await q.put(Bdata(i, bdata.ori))

else:
if r is not None :
#None only can ignore when Filter Node.

if r is not None or (bdata.ori.data!=0 and not isinstance(func,function.Filter)):
self.produced_count += 1
await q.put(Bdata(r,bdata.ori))

Expand All @@ -153,7 +159,7 @@ def create_coro(self,bdata):
coro = self.append_q(self.merge_list,self.func, bdata, self.output_q)

return coro
elif isinstance(bdata.data,Exception):
elif config.exception_policy != config.Exception_pipein and isinstance(bdata.data,Exception):
return self.output_q.put(bdata)


Expand Down
15 changes: 12 additions & 3 deletions botflow/botbase.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
import asyncio
from .config import config
import logging
from .base import Singleton, list_included
from .base import Singleton, list_included,get_loop

from .base import copy_size
from .nodebase import Node
from .functionbase import Function
from .bdata import Bdata
from .queue import SinkQueue
import typing, types
Expand Down Expand Up @@ -57,6 +57,8 @@ def __init__(self):
self._bots = []
self._pipes = set()
self.bot_id = 0
self.loop=get_loop()


def rest(self):
self._bots = []
Expand Down Expand Up @@ -85,6 +87,12 @@ def ready_to_stop(cls, bi):
def new_bot_id(self):
self.bot_id += 1
return self.bot_id
def remove_by_pipe(self,pipe):

self._bots = [ b for b in self._bots if b.pipeline != pipe ]




def add_pipes(self, pipe):
self._pipes.add(pipe)
Expand Down Expand Up @@ -226,7 +234,7 @@ async def call_wrap(func, bdata, iq, oq, raw_bdata=False):
ori = bdata.ori

if hasattr(func, 'boost_type'):
loop = asyncio.get_event_loop()
loop = get_loop()
r_or_c = await loop.run_in_executor(None, func, param)

else:
Expand Down Expand Up @@ -355,6 +363,7 @@ async def main_loop(self):
# self.lock.release()

except BotExit:
logger.debug("bot_{} exit".format(id(self)))
break
except:
raise
Expand Down
Loading

0 comments on commit d959939

Please sign in to comment.