-
Notifications
You must be signed in to change notification settings - Fork 74
Initial Asyncio Module PR [1/6] #741
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## master #741 +/- ##
==========================================
- Coverage 95.33% 93.48% -1.86%
==========================================
Files 378 389 +11
Lines 21992 24404 +2412
==========================================
+ Hits 20967 22815 +1848
- Misses 1025 1589 +564 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
|
I've added the diff between modules in this PR vs their counterparts in the old API to the PR description. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Had a quick look.
For others: probably easier to use https://difftastic.wilfred.me.uk/introduction.html to make life easier.
Haven't studied the reactor in-detail and associated logic in detail. From the diff it looked like the changes were relatively minor, albeit many of them.
I can review those in-depth if needed (or you cannot find people).
hazelcast/asyncio/client.py
Outdated
| _CLIENT_ID = AtomicInteger() | ||
|
|
||
| @classmethod | ||
| async def create_and_start(cls, config: Config = None, **kwargs) -> "HazelcastClient": |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
General note -- I'm sure there are others.
If we're trying to get this API nice w.r.t. typing as well then this will probably show some error by default as None is not applicable for Config. Might be more prominent here as I think this is entry point into client creation.
Same for __init__ -- config: Config | None.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Might be missing something in the diff: in client.py there's a load of documentation -- is it elsewhere, or why omit it for this one? (most likely doc applicable from __init__)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
re: Docs, I mentioned that in the PR description:
I didn't include the API docs, in order to make the PR smaller. I'll add them in another PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you use better diff tool it doesn't make a difference. Give that one I mentioned a go.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I can review those in-depth if needed (or you cannot find people).
Yes, please.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
At least with PyCharm, the user gets the correct type annotation.
There is this project in case you'd like to try the asyncio module:
https://github.com/yuce/hazelcast-asyncio-sample
In any case, I pushed a PR that adds explicit |None:
baa3bc1
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
PyCharm may be more forgiving -- I've never used it. pyright or pyrefly are good tools to use to determine compliance to the type invariants specified using type annotations.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We check the typings with mypy.
| raise | ||
| _logger.info("Client started") | ||
|
|
||
| async def get_map(self, name: str) -> Map[KeyType, ValueType]: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Intention is only to support also VC in near (immediate) term?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Map is the only proxy included, in order to make the PR small. Other proxies, except VC, may be excluded from the beta release.
| } | ||
|
|
||
|
|
||
| class ProxyManager: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note to others: is in proxy/__init__.py
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the diff that I provided in the description I've shown that, but maybe it wasn't easy to notice.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The diff was not good.
| from hazelcast.internal.asyncio_connection import Connection | ||
| from hazelcast.core import Address | ||
|
|
||
| _BUFFER_SIZE = 128000 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Best to define centrally if possible given it's used across reactors.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think they should be independent with each other.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The other constant hasn't been changed for ~4 years, same value.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There's not a big probability that it will change.
But I would either have to import it from hazelcast.reactor, or refactor the code so both the asyncore and the asyncio reactor imported it from a common module.
That either introduces a dependency between those reactor modules, or require changes in the "old" Python code, which I tried to avoid.
| def shutdown(self): | ||
| if not self._is_live: | ||
| return | ||
| # TODO: cancel tasks |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
compared to reactor.py is this correct?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
start, shutdown are not necessary for the AsyncioReactor.
Removed them in the 3rd PR with this commit:
58783dc
| _CLIENT_ID = AtomicInteger() | ||
|
|
||
| @classmethod | ||
| async def create_and_start(cls, config: Config | None = None, **kwargs) -> "HazelcastClient": |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's also StartNewClientAsync in .net. Also, it's initialized over a factory class.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i would choose the name as new_hazelcast_client to be more consistent and easily understandable by any existing Hazelcast users.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| self._cluster_connect_timeout_text, | ||
| self._max_backoff, | ||
| ) | ||
| time.sleep(sleep_time) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
asyncio.sleep ? called on line 550
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've fixed the WaitStrategy in 91bf1d1
|
|
||
| def callback(future): | ||
| try: | ||
| schema = future.result() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
does .result() block if it's not ready? is it not something like await future, then future.result() or schema = await future?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No it doesn't block.
Trying to get the result when !future.done() raises an exception.
The fetch_schema_future.add_done_callback(callback) a few lines below makes sure that callback receives a done future.
ihsandemir
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will continue to review.
| @@ -0,0 +1,2 @@ | |||
| from hazelcast.asyncio.client import HazelcastClient | |||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if you will follow on this way of exposing the API, no need to keep the client.py under this folder but put under internal, right?
| _CLIENT_ID = AtomicInteger() | ||
|
|
||
| @classmethod | ||
| async def create_and_start(cls, config: Config | None = None, **kwargs) -> "HazelcastClient": |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i would choose the name as new_hazelcast_client to be more consistent and easily understandable by any existing Hazelcast users.
| await client._start() | ||
| return client | ||
|
|
||
| def __init__(self, config: Config | None = None, **kwargs): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is documentation at the hazelcast/client.py, would you like to duplicate it here as well?
"""The client can be configured either by:
- providing a configuration object as the first parameter of the
constructor
.. code:: python
from hazelcast import HazelcastClient
from hazelcast.config import Config
config = Config()
config.cluster_name = "a-cluster"
client = HazelcastClient(config)
- passing configuration options as keyword arguments
.. code:: python
from hazelcast import HazelcastClient
client = HazelcastClient(
cluster_name="a-cluster",
)
See the :class:`hazelcast.config.Config` documentation for the possible
configuration options.
Args:
config: Optional configuration object.
**kwargs: Optional keyword arguments of the client configuration.
"""
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I removed all documentation intentionally to keep the size of the PR smaller.
I'll put all the docs back in a separate PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually this made the review worse since I compare the 2 files and I see a lot of diff due to the documentations. Why not just put them in here. The PR is already over certain number of lines, it should not matter if that is 5K vs 7k :) and it will ease the review by comparison of the files.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The documentation will be added in a separate PR, and will be reviewed by the docs team.
|
@ihsandemir #741 (comment) Let's discuss about that on the TDD. |
| self._cluster_view_listener.start() | ||
| await self._connection_manager.start(self._load_balancer) | ||
| sync_start = not self._config.async_start | ||
| if sync_start: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What does sync_start mean in this context? Didn't we seperated the client already?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sync_start controls whether the client returns the control to the user after connecting to all members, or not. It's not related to asyncio. Async start with the asyncio client is currently not supported.
|
|
||
| @property | ||
| def name(self) -> str: | ||
| return self._name |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If I remember correctly, the missing docs will be part of next PRs to reduce line of changes here.
| ) | ||
|
|
||
|
|
||
| class TopicMessage(typing.Generic[MessageType]): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does it have any support?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, it doesn't yet.
Removed unnecessary code from that file at: 2128f5e
| ) | ||
| return await self._invoke(request, handler) | ||
|
|
||
| async def flush(self) -> None: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why force_unlock and lock are removed?
| for partition_id, entry_list in partition_map.items(): | ||
| request = map_put_all_codec.encode_request( | ||
| self.name, entry_list, False | ||
| ) # TODO trigger map loader |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: While put_all triggers map loader, set_all doesn't trigger that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The comment is from the old API.
| def __repr__(self) -> str: | ||
| return '%s(name="%s")' % (type(self).__name__, self.name) | ||
|
|
||
| def _invoke(self, request, response_handler=_no_op_response_handler) -> asyncio.Future: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why not put async def but return future? i thought you would not need to use futures when you have asyncio.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is asyncio.Future, not hazelcast.future.Future.
| self._invocation_service.invoke(invocation) | ||
| return invocation.future | ||
|
|
||
| async def _ainvoke_on_partition( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why have this function and also have _invoke_on_partition?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's a helper.
| _CLIENT_PUBLIC_ENDPOINT_QUALIFIER = EndpointQualifier(ProtocolType.CLIENT, "public") | ||
|
|
||
|
|
||
| class WaitStrategy: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why change from internal to public access?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's in the hazelcast.internal package, so it is internal.
| try: | ||
| # TODO: creating the task may not throw the exception | ||
| # TODO: protect the loop against exceptions, so all handlers run | ||
| maybe_coro = on_connection_opened(connection) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why was this? Which path it returns as non-coroutine?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| @unittest.skipIf( | ||
| compare_client_version("4.2.1") < 0, "Tests the features added in 4.2.1 version of the client" | ||
| ) | ||
| class AuthenticationTest(unittest.IsolatedAsyncioTestCase, HazelcastTestCase): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what was the reason for IsolatedAsyncioTestCase?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Otherwise async code cannot be used in tests.
ihsandemir
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looking at code coverage report #741 (comment), new code is around 76% covered. Will you complete it to 100%?
There are 5 more PRs that port more tests. |
This is the initial asyncio support.
asynciomodule which contains public asyncio APIinternalmodule andinternal/asyncio_modules, which contains the private asyncio API/implementation.tests/integration/asyncio/authentication_teststests/integration/asyncio/backup_acks_teststests/integration/asyncio/client_test(one test is not ported, due to its Topic DDS dependency)tests/integration/asyncio/proxy/map_testMost of the code in this PR was duplicated to the
internalmodule by prefixing them withasyncio_. For exampleROOT/cluster.pywas duplicated/modifed asinternal/asyncio_cluster.pyHere is the diff between modules in this PR vs their counterparts in the old API:
https://gist.github.com/yuce/56e79a29a1d4d1d996788381d489c0a4