Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feature(zjow): add middleware for ape-x structure pipeline #696

Merged
merged 14 commits into from
Aug 11, 2023

Conversation

zjowowen
Copy link
Collaborator

@zjowowen zjowowen commented Aug 1, 2023

Add middleware priority_calculator for calculate priority in collector.
Add middleware PeriodicalModelExchanger for better control model send and receive process.

@zjowowen zjowowen added the enhancement New feature or request label Aug 1, 2023
@zjowowen zjowowen self-assigned this Aug 1, 2023
@PaParaZz1 PaParaZz1 added the efficiency optimization Efficiency optimization (time, memory and so on) label Aug 1, 2023
@zjowowen zjowowen changed the title feature(zjow): add middleware for apex-x structure pipeline feature(zjow): add middleware for ape-x structure pipeline Aug 2, 2023
@@ -54,7 +54,10 @@ def __init__(

def push(self, chain: Callable, data: Any, meta: Optional[dict] = None, *args, **kwargs) -> BufferedData:
if meta is None:
meta = {'priority': self.max_priority}
if 'priority' in data:
meta = {'priority': data['priority'].item()}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do item operation in original priority producer

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

meta = {'priority': self.max_priority}
if 'priority' in data:
meta = {'priority': data['priority'].item()}
else:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add unittest for this new if-else branch

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

Parallel.runner(n_parallel_workers=2, startup_interval=0)(periodical_model_exchanger_main)


if __name__ == "__main__":
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unittest doesn't need this entry

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed


priority = func_for_priority_calculation(ctx.trajectories)
for i in range(len(priority)):
ctx.trajectories[i]['priority'] = torch.tensor(priority[i], dtype=torch.float32).unsqueeze(-1)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

don't transform it to tensor

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed


def _priority_calculator(ctx: "OnlineRLContext") -> None:

priority = func_for_priority_calculation(ctx.trajectories)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

priority_calculation_fn

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

from ding.framework import OnlineRLContext


def priority_calculator(func_for_priority_calculation: Callable) -> Callable:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add unittest for this file

self._model_loader = model_loader
self._event_name = event_name
self._period = period
self.mode = mode
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not underline here

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed


def _cache_state_dict(self, msg: Dict[str, Any]):
# msg: Dict {'id':id,'model':state_dict: Union[object, Storage]}
print(f"node_id[{task.router.node_id}] get model msg")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use logging

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

@codecov
Copy link

codecov bot commented Aug 11, 2023

Codecov Report

Merging #696 (f6dda29) into main (6b188c9) will decrease coverage by 1.27%.
Report is 13 commits behind head on main.
The diff coverage is 50.71%.

❗ Current head f6dda29 differs from pull request most recent head e1a7f84. Consider uploading reports for the commit e1a7f84 to get more accurate results

@@            Coverage Diff             @@
##             main     #696      +/-   ##
==========================================
- Coverage   82.06%   80.79%   -1.27%     
==========================================
  Files         586      597      +11     
  Lines       47515    49631    +2116     
==========================================
+ Hits        38991    40101    +1110     
- Misses       8524     9530    +1006     
Flag Coverage Δ
unittests 80.79% <50.71%> (-1.27%) ⬇️

Flags with carried forward coverage won't be shown. Click here to find out more.

Files Changed Coverage Δ
ding/entry/utils.py 73.17% <ø> (ø)
ding/framework/middleware/functional/enhancer.py 39.65% <0.00%> (ø)
ding/policy/edac.py 18.26% <ø> (ø)
ding/policy/mbpolicy/mbsac.py 95.75% <ø> (ø)
...r/collector/battle_interaction_serial_evaluator.py 93.79% <ø> (ø)
ding/entry/serial_entry_mbrl.py 75.18% <4.00%> (-16.48%) ⬇️
ding/policy/dqn.py 80.21% <6.25%> (-7.13%) ⬇️
ding/framework/middleware/distributer.py 16.37% <12.34%> (-64.13%) ⬇️
ding/worker/replay_buffer/naive_buffer.py 75.31% <13.95%> (-13.47%) ⬇️
ding/policy/mbpolicy/utils.py 25.00% <14.28%> (-75.00%) ⬇️
... and 42 more

... and 223 files with indirect coverage changes

@PaParaZz1 PaParaZz1 merged commit d905ca8 into opendilab:main Aug 11, 2023
15 of 18 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
efficiency optimization Efficiency optimization (time, memory and so on) enhancement New feature or request
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants