@@ -26,6 +26,7 @@ def __init__(self, cvar, *, loop=None):
2626 self .pipe_ctx = {0 , 1 , 2 }
2727 self .pipe_connection_lost_fut = asyncio .Future (loop = loop )
2828 self .process_exited_fut = asyncio .Future (loop = loop )
29+ self .error_received_fut = asyncio .Future (loop = loop )
2930 self .connection_lost_ctx = None
3031 self .done = asyncio .Future (loop = loop )
3132
@@ -77,6 +78,14 @@ def buffer_updated(self, nbytes):
7778 )
7879
7980
81+ class _DatagramProtocol (_BaseProtocol , asyncio .DatagramProtocol ):
82+ def datagram_received (self , data , addr ):
83+ self .data_received_fut .set_result (self .cvar .get ())
84+
85+ def error_received (self , exc ):
86+ self .error_received_fut .set_result (self .cvar .get ())
87+
88+
8089class _SubprocessProtocol (_BaseProtocol , asyncio .SubprocessProtocol ):
8190 def pipe_data_received (self , fd , data ):
8291 self .data_received_fut .set_result (self .cvar .get ())
@@ -703,6 +712,45 @@ async def test():
703712
704713 self .loop .run_until_complete (test ())
705714
715+ def test_datagram_protocol (self ):
716+ cvar = contextvars .ContextVar ('cvar' , default = 'outer' )
717+ proto = _DatagramProtocol (cvar , loop = self .loop )
718+ server_addr = ('127.0.0.1' , 8888 )
719+ client_addr = ('127.0.0.1' , 0 )
720+
721+ async def run ():
722+ self .assertEqual (cvar .get (), 'outer' )
723+ cvar .set ('inner' )
724+
725+ def close ():
726+ cvar .set ('closing' )
727+ proto .transport .close ()
728+
729+ try :
730+ await self .loop .create_datagram_endpoint (
731+ lambda : proto , local_addr = server_addr )
732+ inner = await proto .connection_made_fut
733+ self .assertEqual (inner , "inner" )
734+
735+ s = socket .socket (socket .AF_INET , type = socket .SOCK_DGRAM )
736+ s .bind (client_addr )
737+ s .sendto (b'data' , server_addr )
738+ inner = await proto .data_received_fut
739+ self .assertEqual (inner , "inner" )
740+
741+ self .loop .call_soon (close )
742+ await proto .done
743+ if self .implementation != 'asyncio' :
744+ # bug in asyncio
745+ self .assertEqual (proto .connection_lost_ctx , "inner" )
746+ finally :
747+ proto .transport .close ()
748+ s .close ()
749+ # let transports close
750+ await asyncio .sleep (0.1 )
751+
752+ self .loop .run_until_complete (run ())
753+
706754
707755class Test_UV_Context (_ContextBaseTests , tb .UVTestCase ):
708756 pass
0 commit comments