Skip to content

Feature: Stream.pmap, parallele map for streaming data#7

Open
dboyliao wants to merge 3 commits intod2207197:masterfrom
dboyliao:feature/strm_pmap
Open

Feature: Stream.pmap, parallele map for streaming data#7
dboyliao wants to merge 3 commits intod2207197:masterfrom
dboyliao:feature/strm_pmap

Conversation

@dboyliao
Copy link
Copy Markdown
Contributor

Testing script updated and pass

[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
'''
if processes is None:
processes = os.cpu_count()
Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

這應該是 mp.Pool 內建就會做的事。應該維持 None 即可

'''
if processes is None:
processes = os.cpu_count()
pool_chunk_size = max(chunk_size // processes, 1)
Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

看不懂爲何要除以 processes 數量。系統容許的 queue 大小應該跟 processes 個數無關。

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

我怕 queue 爆掉而已~
所以加了個每個 chunk 會是多少這件事~

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

我改成就傳給 Pool 好了~

if not data:
break
yield from pool.imap(f, data, pool_chunk_size)
return Stream(iter(gen()))
Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

要用 @as_stream 包裝比較好。
@as_stream 處理了不少事情,包含 StreamTable 與 Stream 的相容。這樣可以保證 Stream 所有 method 都可以在 StreamTable使用。

另外 iter() 應該是多餘的。

@d2207197
Copy link
Copy Markdown
Owner

@dboyliao ☝️ 👆

@dboyliao
Copy link
Copy Markdown
Contributor Author

Orz 我現在才看到~
github 的 notification 我都當沒看到~XDDD

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants