| 
1 | 1 | import json  | 
 | 2 | +import logging  | 
2 | 3 | import pathlib  | 
3 | 4 | import typing  | 
4 | 5 | 
 
  | 
 | 
14 | 15 |     type RecordType = dict[str, typing.Any]  | 
15 | 16 | 
 
  | 
16 | 17 | SQSEventType = typing.TypedDict("SQSEventType", {"Records": list[RecordType]})  | 
17 |  | -WorkerType = typing.Callable[[chalice.app.Chalice, RecordType], dict[str, typing.Any]]  | 
 | 18 | +WorkerType = typing.Callable[[RecordType], dict[str, typing.Any]]  | 
 | 19 | + | 
 | 20 | +logger = logging.getLogger(__name__)  | 
 | 21 | +workers: dict[str, WorkerType] = {}  | 
 | 22 | + | 
 | 23 | +for _workers in typing.cast(  | 
 | 24 | +    list[list[WorkerType]],  | 
 | 25 | +    import_util.auto_import_patterns(pattern="workers", file_prefix="", dir=pathlib.Path(__file__).parent),  | 
 | 26 | +):  | 
 | 27 | +    _func_names = {worker.__name__ for worker in _workers}  | 
 | 28 | +    if _duplicated := _func_names & workers.keys():  | 
 | 29 | +        raise ValueError(f"Worker {_duplicated} is already registered")  | 
 | 30 | + | 
 | 31 | +    workers.update({worker.__name__: worker for worker in _workers})  | 
 | 32 | + | 
 | 33 | + | 
 | 34 | +def _sqs_handler(event: chalice.app.SQSEvent) -> list[dict[str, typing.Any]]:  | 
 | 35 | +    parsed_event: SQSEventType = event.to_dict()  | 
 | 36 | +    logger.info(f"{parsed_event=}")  | 
 | 37 | + | 
 | 38 | +    results: list[dict[str, typing.Any]] = []  | 
 | 39 | +    for record in parsed_event["Records"]:  | 
 | 40 | +        try:  | 
 | 41 | +            worker_name = json.loads(record["body"])["worker"]  | 
 | 42 | +            results.append(workers[worker_name](record))  | 
 | 43 | +        except Exception as e:  | 
 | 44 | +            logger.error(f"Failed to handle event: {record}", exc_info=e)  | 
 | 45 | +            results.append({"error": "Failed to handle event"})  | 
 | 46 | + | 
 | 47 | +    logger.info(f"{results=}")  | 
 | 48 | +    return results  | 
18 | 49 | 
 
  | 
19 | 50 | 
 
  | 
20 | 51 | def register_worker(app: chalice.app.Chalice) -> None:  | 
21 |  | -    workers: dict[str, WorkerType] = {}  | 
22 |  | - | 
23 |  | -    for _workers in typing.cast(  | 
24 |  | -        list[list[WorkerType]],  | 
25 |  | -        import_util.auto_import_patterns(pattern="workers", file_prefix="", dir=pathlib.Path(__file__).parent),  | 
26 |  | -    ):  | 
27 |  | -        _func_names = {worker.__name__ for worker in _workers}  | 
28 |  | -        if _duplicated := _func_names & workers.keys():  | 
29 |  | -            raise ValueError(f"Worker {_duplicated} is already registered")  | 
30 |  | - | 
31 |  | -        workers.update({worker.__name__: worker for worker in _workers})  | 
32 |  | - | 
33 |  | -    @app.on_sqs_message(queue=config_module.config.infra.queue_name)  | 
34 |  | -    def sqs_handler(event: chalice.app.SQSEvent) -> list[dict[str, typing.Any]]:  | 
35 |  | -        parsed_event: SQSEventType = event.to_dict()  | 
36 |  | -        app.log.info(f"{parsed_event=}")  | 
37 |  | - | 
38 |  | -        results: list[dict[str, typing.Any]] = []  | 
39 |  | -        for record in parsed_event["Records"]:  | 
40 |  | -            try:  | 
41 |  | -                worker_name = json.loads(record["body"])["worker"]  | 
42 |  | -                result = workers[worker_name](app, record)  | 
43 |  | -                results.append(result)  | 
44 |  | -            except Exception as e:  | 
45 |  | -                app.log.error(f"Failed to handle event: {record}", exc_info=e)  | 
46 |  | -                results.append({"error": "Failed to handle event"})  | 
47 |  | - | 
48 |  | -        app.log.info(f"{results=}")  | 
49 |  | -        return results  | 
 | 52 | +    app.on_sqs_message(queue=config_module.config.infra.queue_name)(_sqs_handler)  | 
0 commit comments