Skip to content

distributed breaks with msgpack 1.0.0rc1 #3491

Closed
@sk1p

Description

@sk1p

Just for information, distributed breaks with the 1.0.0 release candidate of msgpack. To reproduce:

  1. pip install distributed 'msgpack==1.0.0rc1'
  2. start a local dask cluster
  3. connect client and call Client.run(some_function)
In [2]: Client("tcp://...:8786")                                     
Out[2]: <Client: 'tcp://...:8786' processes=1 threads=8, memory=16.46 GB>

In [3]: c = Client("tcp://...:8786")                                 

In [4]: c.run(lambda: "meh")                                                    
---------------------------------------------------------------------------
ValueError                                Traceback (most recent call last)
<ipython-input-4-5ff942d48f6b> in <module>
----> 1 c.run(lambda: "meh")

~/.virtualenvs/ddtest/lib/python3.7/site-packages/distributed/client.py in run(self, function, *args, **kwargs)
   2384         >>> c.run(print_state, wait=False)  # doctest: +SKIP
   2385         """
-> 2386         return self.sync(self._run, function, *args, **kwargs)
   2387 
   2388     def run_coroutine(self, function, *args, **kwargs):

~/.virtualenvs/ddtest/lib/python3.7/site-packages/distributed/client.py in sync(self, func, asynchronous, callback_timeout, *args, **kwargs)
    765         else:
    766             return sync(
--> 767                 self.loop, func, *args, callback_timeout=callback_timeout, **kwargs
    768             )
    769 

~/.virtualenvs/ddtest/lib/python3.7/site-packages/distributed/utils.py in sync(loop, func, callback_timeout, *args, **kwargs)
    343     if error[0]:
    344         typ, exc, tb = error[0]
--> 345         raise exc.with_traceback(tb)
    346     else:
    347         return result[0]

~/.virtualenvs/ddtest/lib/python3.7/site-packages/distributed/utils.py in f()
    327             if callback_timeout is not None:
    328                 future = asyncio.wait_for(future, callback_timeout)
--> 329             result[0] = yield future
    330         except Exception as exc:
    331             error[0] = sys.exc_info()

~/.virtualenvs/ddtest/lib/python3.7/site-packages/tornado/gen.py in run(self)
    733 
    734                     try:
--> 735                         value = future.result()
    736                     except Exception:
    737                         exc_info = sys.exc_info()

~/.virtualenvs/ddtest/lib/python3.7/site-packages/distributed/client.py in _run(self, function, nanny, workers, wait, *args, **kwargs)
   2313             ),
   2314             workers=workers,
-> 2315             nanny=nanny,
   2316         )
   2317         results = {}

~/.virtualenvs/ddtest/lib/python3.7/site-packages/distributed/core.py in send_recv_from_rpc(**kwargs)
    755             name, comm.name = comm.name, "ConnectionPool." + key
    756             try:
--> 757                 result = await send_recv(comm=comm, op=key, **kwargs)
    758             finally:
    759                 self.pool.reuse(self.addr, comm)

~/.virtualenvs/ddtest/lib/python3.7/site-packages/distributed/core.py in send_recv(comm, reply, serializers, deserializers, **kwargs)
    538         await comm.write(msg, serializers=serializers, on_error="raise")
    539         if reply:
--> 540             response = await comm.read(deserializers=deserializers)
    541         else:
    542             response = None

~/.virtualenvs/ddtest/lib/python3.7/site-packages/distributed/comm/tcp.py in read(self, deserializers)
    210             try:
    211                 msg = await from_frames(
--> 212                     frames, deserialize=self.deserialize, deserializers=deserializers
    213                 )
    214             except EOFError:

~/.virtualenvs/ddtest/lib/python3.7/site-packages/distributed/comm/utils.py in from_frames(frames, deserialize, deserializers)
     67         res = await offload(_from_frames)
     68     else:
---> 69         res = _from_frames()
     70 
     71     return res

~/.virtualenvs/ddtest/lib/python3.7/site-packages/distributed/comm/utils.py in _from_frames()
     53         try:
     54             return protocol.loads(
---> 55                 frames, deserialize=deserialize, deserializers=deserializers
     56             )
     57         except EOFError:

~/.virtualenvs/ddtest/lib/python3.7/site-packages/distributed/protocol/core.py in loads(frames, deserialize, deserializers)
    104 
    105         header = frames.pop()
--> 106         header = msgpack.loads(header, use_list=False, **msgpack_opts)
    107         keys = header["keys"]
    108         headers = header["headers"]

msgpack/_unpacker.pyx in msgpack._cmsgpack.unpackb()

ValueError: tuple is not allowed for map key

distributed.protocol.core - CRITICAL - Failed to deserialize
Traceback (most recent call last):
  File "/home/alex/.virtualenvs/ddtest/lib/python3.7/site-packages/distributed/protocol/core.py", line 106, in loads
    header = msgpack.loads(header, use_list=False, **msgpack_opts)
  File "msgpack/_unpacker.pyx", line 195, in msgpack._cmsgpack.unpackb
ValueError: tuple is not allowed for map key

cc @Brow71189

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions