Skip to content

Commit f4529c7

Browse files
+ Move process command to separate file
1 parent 723cb2a commit f4529c7

File tree

3 files changed

+186
-174
lines changed

3 files changed

+186
-174
lines changed

src/thread/cli/__init__.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,3 +9,6 @@
99

1010
# Import #
1111
from .base import cli_base as app
12+
from .process import process as process_cli
13+
14+
app.command(name = 'process')(process_cli)

src/thread/cli/base.py

Lines changed: 1 addition & 174 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,8 @@
1-
import os
2-
import time
3-
import json
4-
import inspect
5-
import importlib
6-
71
import typer
82
import logging
9-
from rich.progress import Progress, TextColumn, BarColumn, TimeRemainingColumn, TimeElapsedColumn
10-
from typing import Union, Pattern, Required, Optional, Callable
113

124
from . import __version__
13-
from .utils import DebugOption, VerboseOption, QuietOption, verbose_args_processor, kwargs_processor
5+
from .utils import DebugOption, VerboseOption, QuietOption, verbose_args_processor
146
logger = logging.getLogger('base')
157

168

@@ -123,168 +115,3 @@ def config(configuration: str):
123115
[blue]Configure[/blue] the system. :wrench:
124116
"""
125117
typer.echo('Coming soon!')
126-
127-
128-
129-
@cli_base.command(context_settings = {'allow_extra_args': True}, no_args_is_help = True)
130-
def process(
131-
ctx: typer.Context,
132-
func: str = typer.Argument(help = '(path.to.file:function_name) OR (lambda x: x)'),
133-
dataset: str = typer.Argument(help = '(path/to/file.txt) OR ([ i for i in range(2) ])'),
134-
135-
args: list[str] = typer.Option([], '--args', '-a', help = 'Arguments passed to each thread'),
136-
threads: int = typer.Option(8, '--threads', '-t', help = 'Maximum number of threads (will scale down based on dataset size)'),
137-
138-
daemon: bool = typer.Option(False, '--daemon', '-d', help = 'Threads to run in daemon mode'),
139-
output: str = typer.Option('./output.json', '--output', '-o', help = 'Output file location'),
140-
fileout: bool = typer.Option(True, '--fileout', is_flag = True, help = 'Weather to write output to a file'),
141-
stdout: bool = typer.Option(False, '--stdout', is_flag = True, help = 'Weather to print the output'),
142-
143-
debug: bool = DebugOption,
144-
verbose: bool = VerboseOption,
145-
quiet: bool = QuietOption
146-
):
147-
"""
148-
[bold]Utilise parallel processing on a dataset[/bold]
149-
150-
\b\n
151-
[bold]:glowing_star: Examples[/bold]
152-
Kwargs can be parsed by adding overflow arguments in pairs
153-
[green]$ thread process ... k1 v1 k2 v2[/green]
154-
=> kwargs = {k1: v1, k2: v2}
155-
156-
Single kwargs will be ignored
157-
[green]$ thread process ... a1[/green]
158-
=> kwargs = {}
159-
160-
"""
161-
verbose_args_processor(debug, verbose, quiet)
162-
kwargs = kwargs_processor(ctx)
163-
logger.debug('Processed kwargs: %s' % kwargs)
164-
165-
166-
# Verify output
167-
if not fileout and not stdout:
168-
raise typer.BadParameter('No output method specified')
169-
170-
if fileout and not os.path.exists('/'.join(output.split('/')[:-1])):
171-
raise typer.BadParameter('Output file directory does not exist')
172-
173-
174-
175-
176-
# Loading function
177-
f = None
178-
try:
179-
logger.info('Attempted to interpret function')
180-
f = eval(func) # I know eval is bad practice, but I have yet to find a safer replacement
181-
logger.debug(f'Evaluated function: %s' % f)
182-
183-
if not inspect.isfunction(f):
184-
logger.info('Invalid function')
185-
except Exception:
186-
logger.info('Failed to interpret function')
187-
188-
if not f:
189-
try:
190-
logger.info('Attempting to fetch function file')
191-
192-
fPath, fName = func.split(':')
193-
f = importlib.import_module(fPath).__dict__[fName]
194-
logger.debug(f'Evaluated function: %s' % f)
195-
196-
if not inspect.isfunction(f):
197-
logger.info('Not a function')
198-
raise Exception('Not a function')
199-
except Exception as e:
200-
logger.warning('Failed to fetch function')
201-
raise typer.BadParameter('Failed to fetch function') from e
202-
203-
204-
205-
206-
# Loading dataset
207-
ds: Union[list, tuple, set, None] = None
208-
try:
209-
logger.info('Attempting to interpret dataset')
210-
ds = eval(dataset)
211-
logger.debug(f'Evaluated dataset: %s' % (str(ds)[:125] + '...' if len(str(ds)) > 125 else ds))
212-
213-
if not isinstance(ds, (list, tuple, set)):
214-
logger.info('Invalid dataset literal')
215-
ds = None
216-
217-
except Exception:
218-
logger.info('Failed to interpret dataset')
219-
220-
if not ds:
221-
try:
222-
logger.info('Attempting to fetch data file')
223-
if not os.path.isfile(dataset):
224-
logger.info('Invalid file path')
225-
raise Exception('Invalid file path')
226-
227-
with open(dataset, 'r') as a:
228-
ds = [ i.endswith('\n') and i[:-2] for i in a.readlines() ]
229-
except Exception as e:
230-
logger.warning('Failed to read dataset')
231-
raise typer.BadParameter('Failed to read dataset') from e
232-
233-
logger.info('Interpreted dataset')
234-
235-
236-
# Setup
237-
logger.debug('Importing module')
238-
from ..thread import ParallelProcessing
239-
logger.info('Spawning threads... [Expected: {tcount} threads]'.format(tcount=min(len(ds), threads)))
240-
241-
newProcess = ParallelProcessing(
242-
function = f,
243-
dataset = list(ds),
244-
args = args,
245-
kwargs = kwargs,
246-
daemon = daemon,
247-
max_threads = threads
248-
)
249-
250-
logger.info('Created parallel process')
251-
logger.info('Starting parallel process')
252-
253-
start_t = time.perf_counter()
254-
newProcess.start()
255-
256-
typer.echo('Started parallel process')
257-
typer.echo('Waiting for parallel process to complete, this may take a while...')
258-
259-
with Progress(
260-
TextColumn('[bold blue]{task.description}', justify = 'right'),
261-
BarColumn(bar_width = None),
262-
'[progress.percentage]{task.percentage:>3.1f}%',
263-
'•',
264-
TimeRemainingColumn(),
265-
'•',
266-
TimeElapsedColumn()
267-
) as progress:
268-
percentage = 0
269-
job = progress.add_task('Working...', total = 100)
270-
271-
while percentage < 100:
272-
percentage = round(sum(i.progress for i in newProcess._threads) / (len(newProcess._threads) or 8), 2) * 100
273-
progress.update(job, completed = percentage)
274-
time.sleep(0.1)
275-
276-
result = newProcess.get_return_values()
277-
278-
typer.echo(f'Completed in {(time.perf_counter() - start_t):.5f}s')
279-
if fileout:
280-
typer.echo(f'Writing file to {output}...')
281-
try:
282-
with open(output, 'w') as f:
283-
json.dump(result, f, indent = 2)
284-
typer.echo(f'Wrote to file')
285-
except Exception as e:
286-
logger.error('Failed to write to file')
287-
logger.debug(str(e))
288-
289-
if stdout:
290-
typer.echo(result)

src/thread/cli/process.py

Lines changed: 182 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,182 @@
1+
"""Parallel Processing command"""
2+
3+
import os
4+
import time
5+
import json
6+
import inspect
7+
import importlib
8+
9+
import typer
10+
import logging
11+
from typing import Union
12+
13+
from rich.progress import Progress, TextColumn, BarColumn, TimeRemainingColumn, TimeElapsedColumn
14+
from .utils import DebugOption, VerboseOption, QuietOption, verbose_args_processor, kwargs_processor
15+
16+
17+
cli = typer.Typer()
18+
logger = logging.getLogger('base')
19+
20+
21+
@cli.command(context_settings = {'allow_extra_args': True}, no_args_is_help = True)
22+
def process(
23+
ctx: typer.Context,
24+
func: str = typer.Argument(help = '(path.to.file:function_name) OR (lambda x: x)'),
25+
dataset: str = typer.Argument(help = '(path/to/file.txt) OR ([ i for i in range(2) ])'),
26+
27+
args: list[str] = typer.Option([], '--args', '-a', help = 'Arguments passed to each thread'),
28+
threads: int = typer.Option(8, '--threads', '-t', help = 'Maximum number of threads (will scale down based on dataset size)'),
29+
30+
daemon: bool = typer.Option(False, '--daemon', '-d', help = 'Threads to run in daemon mode'),
31+
output: str = typer.Option('./output.json', '--output', '-o', help = 'Output file location'),
32+
fileout: bool = typer.Option(True, '--fileout', is_flag = True, help = 'Weather to write output to a file'),
33+
stdout: bool = typer.Option(False, '--stdout', is_flag = True, help = 'Weather to print the output'),
34+
35+
debug: bool = DebugOption,
36+
verbose: bool = VerboseOption,
37+
quiet: bool = QuietOption
38+
):
39+
"""
40+
[bold]Utilise parallel processing on a dataset[/bold]
41+
42+
\b\n
43+
[bold]:glowing_star: Examples[/bold]
44+
Kwargs can be parsed by adding overflow arguments in pairs
45+
[green]$ thread process ... k1 v1 k2 v2[/green]
46+
=> kwargs = {k1: v1, k2: v2}
47+
48+
Single kwargs will be ignored
49+
[green]$ thread process ... a1[/green]
50+
=> kwargs = {}
51+
52+
"""
53+
verbose_args_processor(debug, verbose, quiet)
54+
kwargs = kwargs_processor(ctx)
55+
logger.debug('Processed kwargs: %s' % kwargs)
56+
57+
58+
# Verify output
59+
if not fileout and not stdout:
60+
raise typer.BadParameter('No output method specified')
61+
62+
if fileout and not os.path.exists('/'.join(output.split('/')[:-1])):
63+
raise typer.BadParameter('Output file directory does not exist')
64+
65+
66+
67+
68+
# Loading function
69+
f = None
70+
try:
71+
logger.info('Attempted to interpret function')
72+
f = eval(func) # I know eval is bad practice, but I have yet to find a safer replacement
73+
logger.debug(f'Evaluated function: %s' % f)
74+
75+
if not inspect.isfunction(f):
76+
logger.info('Invalid function')
77+
except Exception:
78+
logger.info('Failed to interpret function')
79+
80+
if not f:
81+
try:
82+
logger.info('Attempting to fetch function file')
83+
84+
fPath, fName = func.split(':')
85+
f = importlib.import_module(fPath).__dict__[fName]
86+
logger.debug(f'Evaluated function: %s' % f)
87+
88+
if not inspect.isfunction(f):
89+
logger.info('Not a function')
90+
raise Exception('Not a function')
91+
except Exception as e:
92+
logger.warning('Failed to fetch function')
93+
raise typer.BadParameter('Failed to fetch function') from e
94+
95+
96+
97+
98+
# Loading dataset
99+
ds: Union[list, tuple, set, None] = None
100+
try:
101+
logger.info('Attempting to interpret dataset')
102+
ds = eval(dataset)
103+
logger.debug(f'Evaluated dataset: %s' % (str(ds)[:125] + '...' if len(str(ds)) > 125 else ds))
104+
105+
if not isinstance(ds, (list, tuple, set)):
106+
logger.info('Invalid dataset literal')
107+
ds = None
108+
109+
except Exception:
110+
logger.info('Failed to interpret dataset')
111+
112+
if not ds:
113+
try:
114+
logger.info('Attempting to fetch data file')
115+
if not os.path.isfile(dataset):
116+
logger.info('Invalid file path')
117+
raise Exception('Invalid file path')
118+
119+
with open(dataset, 'r') as a:
120+
ds = [ i.endswith('\n') and i[:-2] for i in a.readlines() ]
121+
except Exception as e:
122+
logger.warning('Failed to read dataset')
123+
raise typer.BadParameter('Failed to read dataset') from e
124+
125+
logger.info('Interpreted dataset')
126+
127+
128+
# Setup
129+
logger.debug('Importing module')
130+
from ..thread import ParallelProcessing
131+
logger.info('Spawning threads... [Expected: {tcount} threads]'.format(tcount=min(len(ds), threads)))
132+
133+
newProcess = ParallelProcessing(
134+
function = f,
135+
dataset = list(ds),
136+
args = args,
137+
kwargs = kwargs,
138+
daemon = daemon,
139+
max_threads = threads
140+
)
141+
142+
logger.info('Created parallel process')
143+
logger.info('Starting parallel process')
144+
145+
start_t = time.perf_counter()
146+
newProcess.start()
147+
148+
typer.echo('Started parallel process')
149+
typer.echo('Waiting for parallel process to complete, this may take a while...')
150+
151+
with Progress(
152+
TextColumn('[bold blue]{task.description}', justify = 'right'),
153+
BarColumn(bar_width = None),
154+
'[progress.percentage]{task.percentage:>3.1f}%',
155+
'•',
156+
TimeRemainingColumn(),
157+
'•',
158+
TimeElapsedColumn()
159+
) as progress:
160+
percentage = 0
161+
job = progress.add_task('Working...', total = 100)
162+
163+
while percentage < 100:
164+
percentage = round(sum(i.progress for i in newProcess._threads) / (len(newProcess._threads) or 8), 2) * 100
165+
progress.update(job, completed = percentage)
166+
time.sleep(0.1)
167+
168+
result = newProcess.get_return_values()
169+
170+
typer.echo(f'Completed in {(time.perf_counter() - start_t):.5f}s')
171+
if fileout:
172+
typer.echo(f'Writing file to {output}...')
173+
try:
174+
with open(output, 'w') as f:
175+
json.dump(result, f, indent = 2)
176+
typer.echo(f'Wrote to file')
177+
except Exception as e:
178+
logger.error('Failed to write to file')
179+
logger.debug(str(e))
180+
181+
if stdout:
182+
typer.echo(result)

0 commit comments

Comments
 (0)