Skip to content

Commit ac745eb

Browse files
author
coding-kitties
authored
Merge pull request #39 from coding-kitties/develop
Develop
2 parents ddfccd8 + 87b93a2 commit ac745eb

30 files changed

+877
-534
lines changed

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
[![Build Status](https://travis-ci.org/investing-algorithms/investing-algorithm-framework.svg?branch=master)](https://travis-ci.org/investing-algorithms/investing-algorithm-framework)
1+
[![Build Status](https://travis-ci.org/coding-kitties/investing-algorithm-framework.svg?branch=master)](https://travis-ci.org/investing-algorithms/investing-algorithm-framework)
22

33
# Investing Algorithm Framework
44

Lines changed: 26 additions & 101 deletions
Original file line numberDiff line numberDiff line change
@@ -1,117 +1,49 @@
1-
from queue import Queue
2-
from typing import List, Dict
3-
from wrapt import synchronized
4-
from abc import abstractmethod, ABC
1+
from typing import List
2+
from abc import ABC
3+
from concurrent.futures import ThreadPoolExecutor
54

65
from investing_algorithm_framework.core.workers import Worker
7-
from investing_algorithm_framework.core.exceptions import OperationalException
8-
from investing_algorithm_framework.core.utils import StoppableThread
96
from investing_algorithm_framework.core.events.observer import Observer
107
from investing_algorithm_framework.core.events.observable import Observable
118
from investing_algorithm_framework.configuration.config_constants import \
129
DEFAULT_MAX_WORKERS
1310

1411

15-
class Executor(Observable, Observer, ABC):
12+
class Executor(Observable, ABC):
1613
"""
17-
Executor class: functions as an abstract class that will handle the
14+
Executor class: functions as a thread executor that will handle the
1815
executions of workers in asynchronous order.
16+
17+
It will make use of the concurrent library for execution of the workers.
18+
Also the executor functions as an observable instance.
1919
"""
2020

21-
def __init__(self, max_workers: int = DEFAULT_MAX_WORKERS):
21+
def __init__(
22+
self,
23+
workers: List[Worker],
24+
max_concurrent_workers: int = DEFAULT_MAX_WORKERS
25+
) -> None:
2226
super(Executor, self).__init__()
2327

24-
self._max_workers = max_workers
25-
self._pending_workers: Queue = None
26-
self._running_threads: Dict[Worker, StoppableThread] = {}
28+
self._workers = workers
29+
self._max_concurrent_workers = max_concurrent_workers
2730

2831
def start(self) -> None:
2932
"""
30-
Main entry for the executor.
31-
"""
32-
33-
self._initialize()
34-
self.run_jobs()
35-
36-
def stop(self) -> None:
37-
"""
38-
Function that will stop all running workers.
39-
"""
40-
41-
for worker in self._running_threads:
42-
self.stop_running_worker(worker)
43-
44-
self.clean_up()
45-
46-
def clean_up(self):
47-
"""
48-
Clean ups the resources.
49-
"""
50-
51-
self._pending_workers: Queue = None
52-
self._running_threads: Dict[Worker, StoppableThread] = {}
53-
54-
def _initialize(self):
55-
"""
56-
Functions that initializes the pending workers.
57-
"""
58-
59-
workers = self.create_workers()
60-
61-
if not workers or len(workers) == 0:
62-
raise OperationalException("There where no workers initialized "
63-
"for the executor instance")
64-
65-
self._pending_workers = Queue()
66-
67-
for worker in workers:
68-
self._pending_workers.put(worker)
69-
70-
@abstractmethod
71-
def create_workers(self) -> List[Worker]:
72-
"""
73-
Abstract function that will create the workers.
74-
"""
75-
pass
33+
Main entry for the executor. The executor creates a ThreadPoolExecutor
34+
with the given amount of max_workers.
7635
77-
def run_jobs(self) -> None:
36+
It will then pass all the workers to the ThreadPoolExecutor. When
37+
finished it will update all its observers
7838
"""
79-
Function that will start all the workers.
80-
"""
81-
82-
worker_iteration = self._max_workers - len(
83-
self._running_threads.keys()
84-
)
8539

86-
while worker_iteration > 0 and not self._pending_workers.empty():
87-
worker = self._pending_workers.get()
88-
worker_iteration -= 1
89-
thread = StoppableThread(target=worker.start)
90-
worker.add_observer(self)
91-
self._running_threads[worker] = thread
92-
thread.start()
40+
with ThreadPoolExecutor(max_workers=self._max_concurrent_workers) as \
41+
executor:
9342

94-
@synchronized
95-
def update(self, observable, **kwargs) -> None:
96-
"""
97-
Observer implementation.
98-
"""
43+
for worker in self.workers:
44+
executor.submit(worker.start)
9945

100-
if observable in self._running_threads:
101-
del self._running_threads[observable]
102-
103-
if not self.processing:
104-
self.notify_observers()
105-
else:
106-
self.run_jobs()
107-
108-
def stop_running_worker(self, worker: Worker) -> None:
109-
"""
110-
Function that will stop a running worker.
111-
"""
112-
113-
thread = self._running_threads[worker]
114-
thread.kill()
46+
self.notify_observers()
11547

11648
def add_observer(self, observer: Observer) -> None:
11749
super(Executor, self).add_observer(observer)
@@ -120,12 +52,5 @@ def remove_observer(self, observer: Observer) -> None:
12052
super(Executor, self).remove_observer(observer)
12153

12254
@property
123-
def processing(self) -> bool:
124-
"""
125-
Property that will show if the executor is running.
126-
"""
127-
128-
return (self._pending_workers is not None
129-
and not self._pending_workers.empty()) or \
130-
(self._running_threads is not None
131-
and len(self._running_threads.keys()) > 0)
55+
def workers(self) -> List[Worker]:
56+
return self._workers

investing_algorithm_framework/core/state/state.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,9 @@
66

77
class State(ABC):
88
"""
9-
Represents a state of the Bot, these state are use by the BotContext.
10-
Each implemented state represents a work mode for the
11-
investing_algorithm_framework.
9+
Represents a state of the context, these state are use by the Context.
10+
Each implemented state represents a work mode for a application created
11+
with investing_algorithm_framework.
1212
"""
1313

1414
# Transition state for the next BotState
@@ -24,7 +24,7 @@ def __init__(self, context) -> None:
2424
def start(self):
2525

2626
# Will stop the state if pre-conditions are not met
27-
if not self.validate_state():
27+
if not self.validate_state(pre_state=True):
2828
return
2929

3030
while True:
Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
from investing_algorithm_framework.core.workers.worker import Worker
22
from investing_algorithm_framework.core.workers.scheduled_worker \
33
import ScheduledWorker
4+
from investing_algorithm_framework.core.workers.relational_worker import \
5+
RelationalWorker
46

5-
__all__ = ['Worker', 'ScheduledWorker']
7+
__all__ = ['Worker', 'ScheduledWorker', 'RelationalWorker']
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
from abc import ABC
2+
from typing import Dict, Any
3+
4+
from investing_algorithm_framework.core.workers.worker import Worker
5+
from investing_algorithm_framework.core.exceptions import OperationalException
6+
7+
8+
class RelationalWorker(Worker, ABC):
9+
"""
10+
RelationalWorker will start after it's relational worker has run.
11+
12+
It will check if the related worked had run, and if this is
13+
true it will start itself. Use this worker if you want to create
14+
chains of workers that are depended on each other.
15+
"""
16+
run_after: Worker
17+
18+
def start(self, **kwargs: Dict[str, Any]) -> None:
19+
20+
# Only run if the last time this worker stared is before
21+
# the last time the 'run_after' worker had finished.
22+
if not self.run_after:
23+
raise OperationalException(
24+
'The run_after worker is not set, make sure you set this '
25+
'attribute to let the RelationalWorker run properly.'
26+
)
27+
28+
if self.last_run is not None:
29+
30+
if self.run_after.last_run is None:
31+
raise OperationalException(
32+
"Relational Worker has run before 'run_after' worker."
33+
)
34+
35+
if self.run_after.last_run > self.last_run:
36+
super(RelationalWorker, self).start()
37+
38+
elif self.run_after.last_run is not None:
39+
super(RelationalWorker, self).start()
Lines changed: 38 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,24 +1,58 @@
11
from abc import ABC
2+
from datetime import datetime
3+
from typing import Dict, Any
24

35
from investing_algorithm_framework.core.utils import TimeUnit
46
from investing_algorithm_framework.core.workers.worker import Worker
57

68

79
class ScheduledWorker(Worker, ABC):
10+
time_unit: TimeUnit = None
11+
time_interval: int = None
12+
13+
def start(self, **kwargs: Dict[str, Any]) -> None:
14+
15+
# If the worker has never run, run it
16+
if self.last_run is None:
17+
super(ScheduledWorker, self).start()
18+
19+
else:
20+
# Get the current time
21+
elapsed_time = datetime.now() - self.last_run
22+
23+
# Second evaluation
24+
if self.get_time_unit() is TimeUnit.SECOND:
25+
seconds = elapsed_time.total_seconds()
26+
27+
if seconds > self.get_time_interval():
28+
super(ScheduledWorker, self).start()
29+
30+
# Minute evaluation
31+
elif self.get_time_unit() is TimeUnit.MINUTE:
32+
minutes = divmod(elapsed_time.total_seconds(), 60)
33+
34+
if minutes > self.get_time_interval():
35+
super(ScheduledWorker, self).start()
36+
37+
# Hour evaluation
38+
elif self.get_time_unit() is TimeUnit.HOUR:
39+
hours = divmod(elapsed_time.total_seconds(), 3600)
40+
41+
if hours > self.get_time_interval():
42+
super(ScheduledWorker, self).start()
843

944
def get_time_unit(self) -> TimeUnit:
1045
assert getattr(self, 'time_unit', None) is not None, (
1146
"{} should either include a time_unit attribute, or override the "
12-
"`get_time_unit()`, method.".format(self.__class__.__name__)
47+
"`get_time_unit()` method.".format(self.__class__.__name__)
1348
)
1449

1550
return getattr(self, 'time_unit')
1651

1752
def get_time_interval(self) -> int:
1853
assert getattr(self, 'time_interval', None) is not None, (
19-
"{} should either include a time_interval attribute, or "
20-
"override the `get_time_interval()`, "
21-
"method.".format(self.__class__.__name__)
54+
"{} should either include a time_interval attribute, or override "
55+
"the `get_time_interval()` method.".format(self.__class__.__name__)
2256
)
2357

2458
return getattr(self, 'time_interval')

investing_algorithm_framework/core/workers/worker.py

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,14 @@
1+
import logging
12
from abc import abstractmethod, ABC
23
from typing import Dict, Any
4+
from wrapt import synchronized
5+
from datetime import datetime
36

47
from investing_algorithm_framework.core.events.observable import Observable
58
from investing_algorithm_framework.core.events.observer import Observer
69

10+
logger = logging.getLogger('investing_algorithm_framework')
11+
712

813
class Worker(Observable, ABC):
914
"""
@@ -12,15 +17,19 @@ class Worker(Observable, ABC):
1217
"""
1318

1419
id = None
20+
last_run: datetime = None
1521

1622
def start(self, **kwargs: Dict[str, Any]) -> None:
1723
"""
1824
Function that will start the worker, and notify its observers when
1925
it is finished
2026
"""
2127

28+
logger.info("Starting worker {}".format(self.get_id()))
2229
self.work(**kwargs)
2330
self.notify_observers()
31+
self.update_last_run()
32+
logger.info("Worker {} finished".format(self.get_id()))
2433

2534
@abstractmethod
2635
def work(self, **kwargs: Dict[str, Any]) -> None:
@@ -42,3 +51,17 @@ def get_id(self) -> str:
4251
)
4352

4453
return getattr(self, 'id')
54+
55+
@classmethod
56+
@synchronized
57+
def update_last_run(cls) -> None:
58+
"""
59+
Update last run, this function is synchronized, which means that
60+
different instances can update the last_run attribute from different
61+
threads.
62+
"""
63+
cls.last_run = datetime.now()
64+
65+
@classmethod
66+
def get_last_run(cls):
67+
return cls.last_run

investing_algorithm_framework/management/command.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,8 @@ def execute(self, *args, **options) -> Any:
8989
if response is None and self.success_message is not None:
9090
return self.success_message
9191

92+
return response
93+
9294
@abstractmethod
9395
def handle(self, *args, **options) -> Any:
9496
"""

investing_algorithm_framework/management/command_manager.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ def execute(self) -> None:
4242
]:
4343
response = self.fetch_command(sub_command).\
4444
run_from_argv(self.argv)
45+
response = format_success_message(response)
4546
else:
4647
# Help for sub command
4748
if len(self.argv) > 2:
@@ -55,10 +56,12 @@ def execute(self) -> None:
5556
self.argv[2] = option
5657
response = self.fetch_command(sub_command)\
5758
.run_from_argv(self.argv)
59+
response = format_success_message(response)
5860
else:
5961
# Show general help command
6062
command = HelpCommand()
6163
response = command.run_from_argv(self.argv)
64+
response = format_success_message(response)
6265
except Exception as e:
6366
response = format_error_message(str(e))
6467

0 commit comments

Comments
 (0)