Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 15 additions & 8 deletions cassandra/io/asyncioreactor.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
from cassandra.connection import Connection, ConnectionShutdown
import threading

from cassandra.connection import Connection, ConnectionShutdown
import sys
import asyncio
import logging
import os
Expand Down Expand Up @@ -88,9 +90,11 @@ def __init__(self, *args, **kwargs):

self._connect_socket()
self._socket.setblocking(0)

self._write_queue = asyncio.Queue()
self._write_queue_lock = asyncio.Lock()
loop_args = dict()
if sys.version_info[0] == 3 and sys.version_info[1] < 10:
loop_args['loop'] = self._loop
self._write_queue = asyncio.Queue(**loop_args)
self._write_queue_lock = asyncio.Lock(**loop_args)

# see initialize_reactor -- loop is running in a separate thread, so we
# have to use a threadsafe call
Expand All @@ -108,8 +112,11 @@ def initialize_reactor(cls):
if cls._pid != os.getpid():
cls._loop = None
if cls._loop is None:
cls._loop = asyncio.new_event_loop()
asyncio.set_event_loop(cls._loop)
try:
cls._loop = asyncio.get_running_loop()
except RuntimeError:
cls._loop = asyncio.new_event_loop()
asyncio.set_event_loop(cls._loop)

if not cls._loop_thread:
# daemonize so the loop will be shut down on interpreter
Expand Down Expand Up @@ -162,7 +169,7 @@ def push(self, data):
else:
chunks = [data]

if self._loop_thread.ident != get_ident():
if self._loop_thread != threading.current_thread():
asyncio.run_coroutine_threadsafe(
self._push_msg(chunks),
loop=self._loop
Expand All @@ -173,7 +180,7 @@ def push(self, data):

async def _push_msg(self, chunks):
# This lock ensures all chunks of a message are sequential in the Queue
with await self._write_queue_lock:
async with self._write_queue_lock:
for chunk in chunks:
self._write_queue.put_nowait(chunk)

Expand Down
1 change: 1 addition & 0 deletions tests/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,3 +112,4 @@ def is_windows():

notwindows = unittest.skipUnless(not is_windows(), "This test is not adequate for windows")
notpypy = unittest.skipUnless(not platform.python_implementation() == 'PyPy', "This tests is not suitable for pypy")
notasyncio = unittest.skipUnless(not EVENT_LOOP_MANAGER == 'asyncio', "This tests is not suitable for EVENT_LOOP_MANAGER=asyncio")
5 changes: 2 additions & 3 deletions tests/integration/cqlengine/model/test_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -256,10 +256,9 @@ class SensitiveModel(Model):
rows[-1]
rows[-1:]

# Asyncio complains loudly about old syntax on python 3.7+, so get rid of all of those
relevant_warnings = [warn for warn in w if "with (yield from lock)" not in str(warn.message)]
# ignore DeprecationWarning('The loop argument is deprecated since Python 3.8, and scheduled for removal in Python 3.10.')
relevant_warnings = [warn for warn in w if "The loop argument is deprecated" not in str(warn.message)]

self.assertEqual(len(relevant_warnings), 4)
self.assertIn("__table_name_case_sensitive__ will be removed in 4.0.", str(relevant_warnings[0].message))
self.assertIn("__table_name_case_sensitive__ will be removed in 4.0.", str(relevant_warnings[1].message))
self.assertIn("ModelQuerySet indexing with negative indices support will be removed in 4.0.",
Expand Down
3 changes: 2 additions & 1 deletion tests/integration/standard/test_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
from cassandra import connection
from cassandra.connection import DefaultEndPoint

from tests import notwindows
from tests import notwindows, notasyncio
from tests.integration import use_singledc, get_server_versions, CASSANDRA_VERSION, \
execute_until_pass, execute_with_long_wait_retry, get_node, MockLoggingHandler, get_unsupported_lower_protocol, \
get_unsupported_upper_protocol, protocolv6, local, CASSANDRA_IP, greaterthanorequalcass30, lessthanorequalcass40, \
Expand Down Expand Up @@ -1108,6 +1108,7 @@ def test_add_profile_timeout(self):
raise Exception("add_execution_profile didn't timeout after {0} retries".format(max_retry_count))

@notwindows
@notasyncio # asyncio can't do timeouts smaller than 1ms, as this test requires
def test_execute_query_timeout(self):
with TestCluster() as cluster:
session = cluster.connect(wait_for_all_pools=True)
Expand Down