From 8c0477315728eba8db931233c1fc8c2c941e423a Mon Sep 17 00:00:00 2001 From: David Brochart Date: Thu, 7 Nov 2024 14:57:12 +0100 Subject: [PATCH] Use zmq-anyio --- ipykernel/debugger.py | 4 +- ipykernel/inprocess/session.py | 2 +- ipykernel/iostream.py | 72 ++++++++++++++--------------- ipykernel/ipkernel.py | 8 ++-- ipykernel/kernelapp.py | 59 +++++------------------- ipykernel/kernelbase.py | 82 ++++++++++++++++++---------------- ipykernel/shellchannel.py | 5 ++- ipykernel/subshell.py | 9 ++-- ipykernel/subshell_manager.py | 45 ++++++++++--------- pyproject.toml | 1 + tests/conftest.py | 31 +++++-------- tests/test_async.py | 16 +++---- 12 files changed, 148 insertions(+), 186 deletions(-) diff --git a/ipykernel/debugger.py b/ipykernel/debugger.py index 780d1801..36aced05 100644 --- a/ipykernel/debugger.py +++ b/ipykernel/debugger.py @@ -241,7 +241,7 @@ async def _send_request(self, msg): self.log.debug("DEBUGPYCLIENT:") self.log.debug(self.routing_id) self.log.debug(buf) - await self.debugpy_socket.send_multipart((self.routing_id, buf)) + await self.debugpy_socket.asend_multipart((self.routing_id, buf)) async def _wait_for_response(self): # Since events are never pushed to the message_queue @@ -437,7 +437,7 @@ async def start(self): (self.shell_socket.getsockopt(ROUTING_ID)), ) - msg = await self.shell_socket.recv_multipart() + msg = await self.shell_socket.arecv_multipart() ident, msg = self.session.feed_identities(msg, copy=True) try: msg = self.session.deserialize(msg, content=True, copy=True) diff --git a/ipykernel/inprocess/session.py b/ipykernel/inprocess/session.py index 0eaed2c6..70b13574 100644 --- a/ipykernel/inprocess/session.py +++ b/ipykernel/inprocess/session.py @@ -3,7 +3,7 @@ class Session(_Session): async def recv(self, socket, copy=True): - return await socket.recv_multipart() + return await socket.arecv_multipart() def send( self, diff --git a/ipykernel/iostream.py b/ipykernel/iostream.py index d8171017..19334212 100644 --- a/ipykernel/iostream.py +++ b/ipykernel/iostream.py @@ -20,6 +20,7 @@ from typing import Any, Callable import zmq +import zmq_anyio from anyio import create_task_group, run, sleep, to_thread from jupyter_client.session import extract_header @@ -55,11 +56,11 @@ def run(self): run(self._main) async def _main(self): - async with create_task_group() as tg: + async with create_task_group() as self._task_group: for task in self._tasks: - tg.start_soon(task) + self._task_group.start_soon(task) await to_thread.run_sync(self.__stop.wait) - tg.cancel_scope.cancel() + self._task_group.cancel_scope.cancel() def stop(self): """Stop the thread. @@ -78,7 +79,7 @@ class IOPubThread: whose IO is always run in a thread. """ - def __init__(self, socket, pipe=False): + def __init__(self, socket: zmq_anyio.Socket, pipe=False): """Create IOPub thread Parameters @@ -91,10 +92,7 @@ def __init__(self, socket, pipe=False): """ # ensure all of our sockets as sync zmq.Sockets # don't create async wrappers until we are within the appropriate coroutines - self.socket: zmq.Socket[bytes] | None = zmq.Socket(socket) - if self.socket.context is None: - # bug in pyzmq, shadow socket doesn't always inherit context attribute - self.socket.context = socket.context # type:ignore[unreachable] + self.socket: zmq_anyio.Socket = socket self._context = socket.context self.background_socket = BackgroundSocket(self) @@ -108,14 +106,14 @@ def __init__(self, socket, pipe=False): self._event_pipe_gc_lock: threading.Lock = threading.Lock() self._event_pipe_gc_seconds: float = 10 self._setup_event_pipe() - tasks = [self._handle_event, self._run_event_pipe_gc] + tasks = [self._handle_event, self._run_event_pipe_gc, self.socket.start] if pipe: tasks.append(self._handle_pipe_msgs) self.thread = _IOPubThread(tasks) def _setup_event_pipe(self): """Create the PULL socket listening for events that should fire in this thread.""" - self._pipe_in0 = self._context.socket(zmq.PULL, socket_class=zmq.Socket) + self._pipe_in0 = self._context.socket(zmq.PULL) self._pipe_in0.linger = 0 _uuid = b2a_hex(os.urandom(16)).decode("ascii") @@ -150,7 +148,7 @@ def _event_pipe(self): except AttributeError: # new thread, new event pipe # create sync base socket - event_pipe = self._context.socket(zmq.PUSH, socket_class=zmq.Socket) + event_pipe = self._context.socket(zmq.PUSH) event_pipe.linger = 0 event_pipe.connect(self._event_interface) self._local.event_pipe = event_pipe @@ -169,30 +167,28 @@ async def _handle_event(self): Whenever *an* event arrives on the event stream, *all* waiting events are processed in order. """ - # create async wrapper within coroutine - pipe_in = zmq.asyncio.Socket(self._pipe_in0) - try: - while True: - await pipe_in.recv() - # freeze event count so new writes don't extend the queue - # while we are processing - n_events = len(self._events) - for _ in range(n_events): - event_f = self._events.popleft() - event_f() - except Exception: - if self.thread.__stop.is_set(): - return - raise + pipe_in = zmq_anyio.Socket(self._pipe_in0) + async with pipe_in: + try: + while True: + await pipe_in.arecv() + # freeze event count so new writes don't extend the queue + # while we are processing + n_events = len(self._events) + for _ in range(n_events): + event_f = self._events.popleft() + event_f() + except Exception: + if self.thread.__stop.is_set(): + return + raise def _setup_pipe_in(self): """setup listening pipe for IOPub from forked subprocesses""" - ctx = self._context - # use UUID to authenticate pipe messages self._pipe_uuid = os.urandom(16) - self._pipe_in1 = ctx.socket(zmq.PULL, socket_class=zmq.Socket) + self._pipe_in1 = zmq_anyio.Socket(self._context.socket(zmq.PULL)) self._pipe_in1.linger = 0 try: @@ -210,18 +206,18 @@ def _setup_pipe_in(self): async def _handle_pipe_msgs(self): """handle pipe messages from a subprocess""" # create async wrapper within coroutine - self._async_pipe_in1 = zmq.asyncio.Socket(self._pipe_in1) - try: - while True: - await self._handle_pipe_msg() - except Exception: - if self.thread.__stop.is_set(): - return - raise + async with self._pipe_in1: + try: + while True: + await self._handle_pipe_msg() + except Exception: + if self.thread.__stop.is_set(): + return + raise async def _handle_pipe_msg(self, msg=None): """handle a pipe message from a subprocess""" - msg = msg or await self._async_pipe_in1.recv_multipart() + msg = msg or await self._pipe_in1.arecv_multipart() if not self._pipe_flag or not self._is_main_process(): return if msg[0] != self._pipe_uuid: diff --git a/ipykernel/ipkernel.py b/ipykernel/ipkernel.py index 48efa6cd..4e84a4c0 100644 --- a/ipykernel/ipkernel.py +++ b/ipykernel/ipkernel.py @@ -12,7 +12,7 @@ from dataclasses import dataclass import comm -import zmq.asyncio +import zmq_anyio from anyio import TASK_STATUS_IGNORED, create_task_group, to_thread from anyio.abc import TaskStatus from IPython.core import release @@ -76,7 +76,7 @@ class IPythonKernel(KernelBase): help="Set this flag to False to deactivate the use of experimental IPython completion APIs.", ).tag(config=True) - debugpy_socket = Instance(zmq.asyncio.Socket, allow_none=True) + debugpy_socket = Instance(zmq_anyio.Socket, allow_none=True) user_module = Any() @@ -212,7 +212,7 @@ def __init__(self, **kwargs): } async def process_debugpy(self): - async with create_task_group() as tg: + async with self.debug_shell_socket, self.debugpy_socket, create_task_group() as tg: tg.start_soon(self.receive_debugpy_messages) tg.start_soon(self.poll_stopped_queue) await to_thread.run_sync(self.debugpy_stop.wait) @@ -235,7 +235,7 @@ async def receive_debugpy_message(self, msg=None): if msg is None: assert self.debugpy_socket is not None - msg = await self.debugpy_socket.recv_multipart() + msg = await self.debugpy_socket.arecv_multipart() # The first frame is the socket id, we can drop it frame = msg[1].decode("utf-8") self.log.debug("Debugpy received: %s", frame) diff --git a/ipykernel/kernelapp.py b/ipykernel/kernelapp.py index 55efaa8e..1cf5697b 100644 --- a/ipykernel/kernelapp.py +++ b/ipykernel/kernelapp.py @@ -18,7 +18,7 @@ from pathlib import Path import zmq -import zmq.asyncio +import zmq_anyio from anyio import create_task_group, run from IPython.core.application import ( # type:ignore[attr-defined] BaseIPythonApplication, @@ -325,15 +325,15 @@ def init_sockets(self): """Create a context, a session, and the kernel sockets.""" self.log.info("Starting the kernel at pid: %i", os.getpid()) assert self.context is None, "init_sockets cannot be called twice!" - self.context = context = zmq.asyncio.Context() + self.context = context = zmq.Context() atexit.register(self.close) - self.shell_socket = context.socket(zmq.ROUTER) + self.shell_socket = zmq_anyio.Socket(context.socket(zmq.ROUTER)) self.shell_socket.linger = 1000 self.shell_port = self._bind_socket(self.shell_socket, self.shell_port) self.log.debug("shell ROUTER Channel on port: %i" % self.shell_port) - self.stdin_socket = zmq.Context(context).socket(zmq.ROUTER) + self.stdin_socket = context.socket(zmq.ROUTER) self.stdin_socket.linger = 1000 self.stdin_port = self._bind_socket(self.stdin_socket, self.stdin_port) self.log.debug("stdin ROUTER Channel on port: %i" % self.stdin_port) @@ -349,18 +349,19 @@ def init_sockets(self): def init_control(self, context): """Initialize the control channel.""" - self.control_socket = context.socket(zmq.ROUTER) + self.control_socket = zmq_anyio.Socket(context.socket(zmq.ROUTER)) self.control_socket.linger = 1000 self.control_port = self._bind_socket(self.control_socket, self.control_port) self.log.debug("control ROUTER Channel on port: %i" % self.control_port) - self.debugpy_socket = context.socket(zmq.STREAM) + self.debugpy_socket = zmq_anyio.Socket(context, zmq.STREAM) self.debugpy_socket.linger = 1000 - self.debug_shell_socket = context.socket(zmq.DEALER) + self.debug_shell_socket = zmq_anyio.Socket(context.socket(zmq.DEALER)) self.debug_shell_socket.linger = 1000 - if self.shell_socket.getsockopt(zmq.LAST_ENDPOINT): - self.debug_shell_socket.connect(self.shell_socket.getsockopt(zmq.LAST_ENDPOINT)) + last_endpoint = self.shell_socket.getsockopt(zmq.LAST_ENDPOINT) + if last_endpoint: + self.debug_shell_socket.connect(last_endpoint) if hasattr(zmq, "ROUTER_HANDOVER"): # set router-handover to workaround zeromq reconnect problems @@ -373,7 +374,7 @@ def init_control(self, context): def init_iopub(self, context): """Initialize the iopub channel.""" - self.iopub_socket = context.socket(zmq.PUB) + self.iopub_socket = zmq_anyio.Socket(context.socket(zmq.PUB)) self.iopub_socket.linger = 1000 self.iopub_port = self._bind_socket(self.iopub_socket, self.iopub_port) self.log.debug("iopub PUB Channel on port: %i" % self.iopub_port) @@ -634,43 +635,6 @@ def configure_tornado_logger(self): handler.setFormatter(formatter) logger.addHandler(handler) - def _init_asyncio_patch(self): - """set default asyncio policy to be compatible with tornado - - Tornado 6 (at least) is not compatible with the default - asyncio implementation on Windows - - Pick the older SelectorEventLoopPolicy on Windows - if the known-incompatible default policy is in use. - - Support for Proactor via a background thread is available in tornado 6.1, - but it is still preferable to run the Selector in the main thread - instead of the background. - - do this as early as possible to make it a low priority and overridable - - ref: https://github.com/tornadoweb/tornado/issues/2608 - - FIXME: if/when tornado supports the defaults in asyncio without threads, - remove and bump tornado requirement for py38. - Most likely, this will mean a new Python version - where asyncio.ProactorEventLoop supports add_reader and friends. - - """ - if sys.platform.startswith("win"): - import asyncio - - try: - from asyncio import WindowsProactorEventLoopPolicy, WindowsSelectorEventLoopPolicy - except ImportError: - pass - # not affected - else: - if type(asyncio.get_event_loop_policy()) is WindowsProactorEventLoopPolicy: - # WindowsProactorEventLoopPolicy is not compatible with tornado 6 - # fallback to the pre-3.8 default of Selector - asyncio.set_event_loop_policy(WindowsSelectorEventLoopPolicy()) - def init_pdb(self): """Replace pdb with IPython's version that is interruptible. @@ -690,7 +654,6 @@ def init_pdb(self): @catch_config_error def initialize(self, argv=None): """Initialize the application.""" - self._init_asyncio_patch() super().initialize(argv) if self.subapp is not None: return diff --git a/ipykernel/kernelbase.py b/ipykernel/kernelbase.py index d496e0c9..34db2e41 100644 --- a/ipykernel/kernelbase.py +++ b/ipykernel/kernelbase.py @@ -35,6 +35,7 @@ import psutil import zmq +import zmq_anyio from anyio import TASK_STATUS_IGNORED, Event, create_task_group, sleep, to_thread from anyio.abc import TaskStatus from IPython.core.error import StdinNotImplementedError @@ -88,7 +89,7 @@ class Kernel(SingletonConfigurable): session = Instance(Session, allow_none=True) profile_dir = Instance("IPython.core.profiledir.ProfileDir", allow_none=True) - shell_socket = Instance(zmq.asyncio.Socket, allow_none=True) + shell_socket = Instance(zmq_anyio.Socket, allow_none=True) implementation: str implementation_version: str @@ -96,7 +97,7 @@ class Kernel(SingletonConfigurable): _is_test = Bool(False) - control_socket = Instance(zmq.asyncio.Socket, allow_none=True) + control_socket = Instance(zmq_anyio.Socket, allow_none=True) control_tasks: t.Any = List() debug_shell_socket = Any() @@ -267,7 +268,7 @@ async def process_control_message(self, msg=None): assert self.session is not None assert self.control_thread is None or threading.current_thread() == self.control_thread - msg = msg or await self.control_socket.recv_multipart() + msg = msg or await self.control_socket.arecv_multipart() idents, msg = self.session.feed_identities(msg) try: msg = self.session.deserialize(msg, content=True) @@ -364,26 +365,29 @@ async def shell_channel_thread_main(self): assert self.shell_channel_thread is not None assert threading.current_thread() == self.shell_channel_thread - try: - while True: - msg = await self.shell_socket.recv_multipart(copy=False) - # deserialize only the header to get subshell_id - # Keep original message to send to subshell_id unmodified. - _, msg2 = self.session.feed_identities(msg, copy=False) - try: - msg3 = self.session.deserialize(msg2, content=False, copy=False) - subshell_id = msg3["header"].get("subshell_id") - - # Find inproc pair socket to use to send message to correct subshell. - socket = self.shell_channel_thread.manager.get_shell_channel_socket(subshell_id) - assert socket is not None - socket.send_multipart(msg, copy=False) - except Exception: - self.log.error("Invalid message", exc_info=True) # noqa: G201 - except BaseException: - if self.shell_stop.is_set(): - return - raise + async with self.shell_socket: + try: + while True: + msg = await self.shell_socket.arecv_multipart(copy=False) + # deserialize only the header to get subshell_id + # Keep original message to send to subshell_id unmodified. + _, msg2 = self.session.feed_identities(msg, copy=False) + try: + msg3 = self.session.deserialize(msg2, content=False, copy=False) + subshell_id = msg3["header"].get("subshell_id") + + # Find inproc pair socket to use to send message to correct subshell. + socket = self.shell_channel_thread.manager.get_shell_channel_socket( + subshell_id + ) + assert socket is not None + socket.send_multipart(msg, copy=False) + except Exception: + self.log.error("Invalid message", exc_info=True) # noqa: G201 + except BaseException: + if self.shell_stop.is_set(): + return + raise async def shell_main(self, subshell_id: str | None): """Main loop for a single subshell.""" @@ -411,13 +415,15 @@ async def shell_main(self, subshell_id: str | None): async def process_shell(self, socket=None): # socket=None is valid if kernel subshells are not supported. - try: - while True: - await self.process_shell_message(socket=socket) - except BaseException: - if self.shell_stop.is_set(): - return - raise + _socket = self.shell_socket if socket is None else socket + async with _socket: + try: + while True: + await self.process_shell_message(socket=socket) + except BaseException: + if self.shell_stop.is_set(): + return + raise async def process_shell_message(self, msg=None, socket=None): # If socket is None kernel subshells are not supported so use socket=shell_socket. @@ -435,8 +441,8 @@ async def process_shell_message(self, msg=None, socket=None): assert socket is None socket = self.shell_socket - no_msg = msg is None if self._is_test else not await socket.poll(0) - msg = msg or await socket.recv_multipart(copy=False) + no_msg = msg is None if self._is_test else not await socket.apoll(0) + msg = msg or await socket.arecv_multipart(copy=False) received_time = time.monotonic() copy = not isinstance(msg[0], zmq.Message) @@ -490,7 +496,7 @@ async def process_shell_message(self, msg=None, socket=None): try: result = handler(socket, idents, msg) if inspect.isawaitable(result): - await result + result = await result except Exception: self.log.error("Exception in message handler:", exc_info=True) # noqa: G201 except KeyboardInterrupt: @@ -509,7 +515,7 @@ async def process_shell_message(self, msg=None, socket=None): self._publish_status("idle", "shell") async def control_main(self): - async with create_task_group() as tg: + async with self.control_socket, create_task_group() as tg: for task in self.control_tasks: tg.start_soon(task) tg.start_soon(self.process_control) @@ -1077,7 +1083,7 @@ async def create_subshell_request(self, socket, ident, parent) -> None: # Request is passed to shell channel thread to process. other_socket = self.shell_channel_thread.manager.get_control_other_socket() await other_socket.send_json({"type": "create"}) - reply = await other_socket.recv_json() + reply = await other_socket.arecv_json() self.session.send(socket, "create_subshell_reply", reply, parent, ident) @@ -1099,7 +1105,7 @@ async def delete_subshell_request(self, socket, ident, parent) -> None: # Request is passed to shell channel thread to process. other_socket = self.shell_channel_thread.manager.get_control_other_socket() await other_socket.send_json({"type": "delete", "subshell_id": subshell_id}) - reply = await other_socket.recv_json() + reply = await other_socket.arecv_json() self.session.send(socket, "delete_subshell_reply", reply, parent, ident) @@ -1113,8 +1119,8 @@ async def list_subshell_request(self, socket, ident, parent) -> None: # This should only be called in the control thread if it exists. # Request is passed to shell channel thread to process. other_socket = self.shell_channel_thread.manager.get_control_other_socket() - await other_socket.send_json({"type": "list"}) - reply = await other_socket.recv_json() + await other_socket.asend_json({"type": "list"}) + reply = await other_socket.arecv_json() self.session.send(socket, "list_subshell_reply", reply, parent, ident) diff --git a/ipykernel/shellchannel.py b/ipykernel/shellchannel.py index bc0459c4..b62c0cf7 100644 --- a/ipykernel/shellchannel.py +++ b/ipykernel/shellchannel.py @@ -1,5 +1,6 @@ """A thread for a shell channel.""" -import zmq.asyncio +import zmq +import zmq_anyio from .subshell_manager import SubshellManager from .thread import SHELL_CHANNEL_THREAD_NAME, BaseThread @@ -11,7 +12,7 @@ class ShellChannelThread(BaseThread): Communicates with shell/subshell threads via pairs of ZMQ inproc sockets. """ - def __init__(self, context: zmq.asyncio.Context, shell_socket: zmq.asyncio.Socket, **kwargs): + def __init__(self, context: zmq.Context, shell_socket: zmq_anyio.Socket, **kwargs): """Initialize the thread.""" super().__init__(name=SHELL_CHANNEL_THREAD_NAME, **kwargs) self._manager: SubshellManager | None = None diff --git a/ipykernel/subshell.py b/ipykernel/subshell.py index 18e15ab3..69afd1e2 100644 --- a/ipykernel/subshell.py +++ b/ipykernel/subshell.py @@ -2,7 +2,8 @@ from threading import current_thread -import zmq.asyncio +import zmq +import zmq_anyio from .thread import BaseThread @@ -15,16 +16,16 @@ def __init__(self, subshell_id: str, **kwargs): super().__init__(name=f"subshell-{subshell_id}", **kwargs) # Inproc PAIR socket, for communication with shell channel thread. - self._pair_socket: zmq.asyncio.Socket | None = None + self._pair_socket: zmq_anyio.Socket | None = None - async def create_pair_socket(self, context: zmq.asyncio.Context, address: str) -> None: + async def create_pair_socket(self, context: zmq.Context, address: str) -> None: """Create inproc PAIR socket, for communication with shell channel thread. Should be called from this thread, so usually via add_task before the thread is started. """ assert current_thread() == self - self._pair_socket = context.socket(zmq.PAIR) + self._pair_socket = zmq_anyio.Socket(context, zmq.PAIR) self._pair_socket.connect(address) def run(self) -> None: diff --git a/ipykernel/subshell_manager.py b/ipykernel/subshell_manager.py index 805d6f81..b1df696b 100644 --- a/ipykernel/subshell_manager.py +++ b/ipykernel/subshell_manager.py @@ -10,7 +10,7 @@ from threading import Lock, current_thread, main_thread import zmq -import zmq.asyncio +import zmq_anyio from anyio import create_memory_object_stream, create_task_group from .subshell import SubshellThread @@ -20,7 +20,7 @@ @dataclass class Subshell: thread: SubshellThread - shell_channel_socket: zmq.asyncio.Socket + shell_channel_socket: zmq_anyio.Socket class SubshellManager: @@ -38,10 +38,10 @@ class SubshellManager: against multiple subshells attempting to send at the same time. """ - def __init__(self, context: zmq.asyncio.Context, shell_socket: zmq.asyncio.Socket): + def __init__(self, context: zmq.Context, shell_socket: zmq_anyio.Socket): assert current_thread() == main_thread() - self._context: zmq.asyncio.Context = context + self._context: zmq.Context = context self._shell_socket = shell_socket self._cache: dict[str, Subshell] = {} self._lock_cache = Lock() @@ -83,10 +83,10 @@ def close(self) -> None: break self._stop_subshell(subshell) - def get_control_other_socket(self) -> zmq.asyncio.Socket: + def get_control_other_socket(self) -> zmq_anyio.Socket: return self._control_other_socket - def get_other_socket(self, subshell_id: str | None) -> zmq.asyncio.Socket: + def get_other_socket(self, subshell_id: str | None) -> zmq_anyio.Socket: """Return the other inproc pair socket for a subshell. This socket is accessed from the subshell thread. @@ -98,7 +98,7 @@ def get_other_socket(self, subshell_id: str | None) -> zmq.asyncio.Socket: assert socket is not None return socket - def get_shell_channel_socket(self, subshell_id: str | None) -> zmq.asyncio.Socket: + def get_shell_channel_socket(self, subshell_id: str | None) -> zmq_anyio.Socket: """Return the shell channel inproc pair socket for a subshell. This socket is accessed from the shell channel thread. @@ -124,9 +124,9 @@ async def listen_from_control(self, subshell_task: t.Any) -> None: socket = self._control_shell_channel_socket while True: - request = await socket.recv_json() # type: ignore[misc] + request = await socket.arecv_json() # type: ignore[misc] reply = await self._process_control_request(request, subshell_task) - await socket.send_json(reply) # type: ignore[func-returns-value] + await socket.asend_json(reply) # type: ignore[func-returns-value] async def listen_from_subshells(self) -> None: """Listen for reply messages on inproc sockets of all subshells and resend @@ -159,10 +159,10 @@ def subshell_id_from_thread_id(self, thread_id: int) -> str | None: def _create_inproc_pair_socket( self, name: str | None, shell_channel_end: bool - ) -> zmq.asyncio.Socket: + ) -> zmq_anyio.Socket: """Create and return a single ZMQ inproc pair socket.""" address = self._get_inproc_socket_address(name) - socket = self._context.socket(zmq.PAIR) + socket = zmq_anyio.Socket(self._context, zmq.PAIR) if shell_channel_end: socket.bind(address) else: @@ -208,7 +208,7 @@ def _get_inproc_socket_address(self, name: str | None) -> str: full_name = f"subshell-{name}" if name else "subshell" return f"inproc://{full_name}" - def _get_shell_channel_socket(self, subshell_id: str | None) -> zmq.asyncio.Socket: + def _get_shell_channel_socket(self, subshell_id: str | None) -> zmq_anyio.Socket: if subshell_id is None: return self._parent_shell_channel_socket with self._lock_cache: @@ -230,16 +230,17 @@ async def _listen_for_subshell_reply(self, subshell_id: str | None) -> None: shell_channel_socket = self._get_shell_channel_socket(subshell_id) - try: - while True: - msg = await shell_channel_socket.recv_multipart(copy=False) - with self._lock_shell_socket: - await self._shell_socket.send_multipart(msg) - except BaseException: - if not self._is_subshell(subshell_id): - # Subshell no longer exists so exit gracefully - return - raise + async with shell_channel_socket: + try: + while True: + msg = await shell_channel_socket.arecv_multipart(copy=False) + with self._lock_shell_socket: + await self._shell_socket.asend_multipart(msg) + except BaseException: + if not self._is_subshell(subshell_id): + # Subshell no longer exists so exit gracefully + return + raise async def _process_control_request( self, request: dict[str, t.Any], subshell_task: t.Any diff --git a/pyproject.toml b/pyproject.toml index 675d9d87..16f30287 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -34,6 +34,7 @@ dependencies = [ "psutil>=5.7", "packaging>=22", "anyio>=4.2.0", + "zmq-anyio >=0.2.1", ] [project.urls] diff --git a/tests/conftest.py b/tests/conftest.py index 2c266555..3a74dc57 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,4 +1,3 @@ -import asyncio import logging import os from math import inf @@ -7,8 +6,8 @@ import pytest import zmq -import zmq.asyncio -from anyio import create_memory_object_stream, create_task_group +import zmq_anyio +from anyio import create_memory_object_stream, create_task_group, sleep from anyio.streams.memory import MemoryObjectReceiveStream, MemoryObjectSendStream from jupyter_client.session import Session @@ -23,10 +22,9 @@ resource = None # type:ignore -@pytest.fixture() -def anyio_backend(): - return "asyncio" - +# @pytest.fixture +# def anyio_backend(): +# return 'asyncio' pytestmark = pytest.mark.anyio @@ -46,11 +44,6 @@ def anyio_backend(): resource.setrlimit(resource.RLIMIT_NOFILE, (soft, hard)) -# Enforce selector event loop on Windows. -if os.name == "nt": - asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy()) # type:ignore - - class TestSession(Session): """A session that copies sent messages to an internal stream, so that they can be accessed later. @@ -77,21 +70,21 @@ def send(self, socket, *args, **kwargs): class KernelMixin: - shell_socket: zmq.asyncio.Socket - control_socket: zmq.asyncio.Socket + shell_socket: zmq_anyio.Socket + control_socket: zmq_anyio.Socket stop: Callable[[], None] log = logging.getLogger() def _initialize(self): self._is_test = True - self.context = context = zmq.asyncio.Context() - self.iopub_socket = context.socket(zmq.PUB) - self.stdin_socket = context.socket(zmq.ROUTER) + self.context = context = zmq.Context() + self.iopub_socket = zmq_anyio.Socket(context.socket(zmq.PUB)) + self.stdin_socket = zmq_anyio.Socket(context.socket(zmq.ROUTER)) self.test_sockets = [self.iopub_socket] for name in ["shell", "control"]: - socket = context.socket(zmq.ROUTER) + socket = zmq_anyio.Socket(context.socket(zmq.ROUTER)) self.test_sockets.append(socket) setattr(self, f"{name}_socket", socket) @@ -142,7 +135,7 @@ def _prep_msg(self, *args, **kwargs): async def _wait_for_msg(self): while not self._reply: - await asyncio.sleep(0.1) + await sleep(0.1) _, msg = self.session.feed_identities(self._reply) return self.session.deserialize(msg) diff --git a/tests/test_async.py b/tests/test_async.py index a40db4a0..f1d91c5e 100644 --- a/tests/test_async.py +++ b/tests/test_async.py @@ -8,6 +8,8 @@ from .test_message_spec import validate_message from .utils import TIMEOUT, execute, flush_channels, start_new_kernel +pytestmark = pytest.mark.anyio + KC = KM = None @@ -30,24 +32,22 @@ def test_async_await(): assert content["status"] == "ok", content -# FIXME: @pytest.mark.parametrize("asynclib", ["asyncio", "trio", "curio"]) @pytest.mark.skipif(os.name == "nt", reason="Cannot interrupt on Windows") -@pytest.mark.parametrize("asynclib", ["asyncio"]) -def test_async_interrupt(asynclib, request): +def test_async_interrupt(anyio_backend, request): assert KC is not None assert KM is not None try: - __import__(asynclib) + __import__(anyio_backend) except ImportError: - pytest.skip("Requires %s" % asynclib) - request.addfinalizer(lambda: execute("%autoawait asyncio", KC)) + pytest.skip("Requires %s" % anyio_backend) + request.addfinalizer(lambda: execute(f"%autoawait {anyio_backend}", KC)) flush_channels(KC) - msg_id, content = execute("%autoawait " + asynclib, KC) + msg_id, content = execute(f"%autoawait {anyio_backend}", KC) assert content["status"] == "ok", content flush_channels(KC) - msg_id = KC.execute(f"print('begin'); import {asynclib}; await {asynclib}.sleep(5)") + msg_id = KC.execute(f"print('begin'); import {anyio_backend}; await {anyio_backend}.sleep(5)") busy = KC.get_iopub_msg(timeout=TIMEOUT) validate_message(busy, "status", msg_id) assert busy["content"]["execution_state"] == "busy"