Skip to content

Commit

Permalink
Merge pull request #621 from Fuyukai/pipes
Browse files Browse the repository at this point in the history
Add support for pipes.
  • Loading branch information
Fuyukai authored Aug 29, 2018
2 parents fc7c7e0 + aaad5b5 commit fbb4543
Show file tree
Hide file tree
Showing 4 changed files with 268 additions and 0 deletions.
Empty file added trio/_subprocess/__init__.py
Empty file.
120 changes: 120 additions & 0 deletions trio/_subprocess/unix_pipes.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
import fcntl
import os
from typing import Tuple

from .. import _core, BrokenStreamError
from .._abc import SendStream, ReceiveStream

__all__ = ["PipeSendStream", "PipeReceiveStream", "make_pipe"]


class _PipeMixin:
def __init__(self, pipefd: int):
if not isinstance(pipefd, int):
raise TypeError(
"{0.__class__.__name__} needs a pipe fd".format(self)
)

self._pipe = pipefd
self._closed = False

flags = fcntl.fcntl(self._pipe, fcntl.F_GETFL)
fcntl.fcntl(self._pipe, fcntl.F_SETFL, flags | os.O_NONBLOCK)

def _close(self):
if self._closed:
return

self._closed = True
os.close(self._pipe)

async def aclose(self):
# XX: This would be in _close, but this can only be used from an
# async context.
_core.notify_fd_close(self._pipe)
self._close()
await _core.checkpoint()

def fileno(self) -> int:
"""Gets the file descriptor for this pipe."""
return self._pipe

def __del__(self):
self._close()


class PipeSendStream(_PipeMixin, SendStream):
"""Represents a send stream over an os.pipe object."""

async def send_all(self, data: bytes):
# we have to do this no matter what
await _core.checkpoint()
if self._closed:
raise _core.ClosedResourceError("this pipe is already closed")

if not data:
return

length = len(data)
# adapted from the SocketStream code
with memoryview(data) as view:
total_sent = 0
while total_sent < length:
with view[total_sent:] as remaining:
try:
total_sent += os.write(self._pipe, remaining)
except BrokenPipeError as e:
await _core.checkpoint()
raise BrokenStreamError from e
except BlockingIOError:
await self.wait_send_all_might_not_block()

async def wait_send_all_might_not_block(self) -> None:
if self._closed:
await _core.checkpoint()
raise _core.ClosedResourceError("This pipe is already closed")

try:
await _core.wait_writable(self._pipe)
except BrokenPipeError as e:
# kqueue: raises EPIPE on wait_writable instead
# of sending, which is annoying
# also doesn't checkpoint so we have to do that
# ourselves here too
await _core.checkpoint()
raise BrokenStreamError from e


class PipeReceiveStream(_PipeMixin, ReceiveStream):
"""Represents a receive stream over an os.pipe object."""

async def receive_some(self, max_bytes: int) -> bytes:
if self._closed:
await _core.checkpoint()
raise _core.ClosedResourceError("this pipe is already closed")

if not isinstance(max_bytes, int):
await _core.checkpoint()
raise TypeError("max_bytes must be integer >= 1")

if max_bytes < 1:
await _core.checkpoint()
raise ValueError("max_bytes must be integer >= 1")

while True:
try:
await _core.checkpoint_if_cancelled()
data = os.read(self._pipe, max_bytes)
except BlockingIOError:
await _core.wait_readable(self._pipe)
else:
await _core.cancel_shielded_checkpoint()
break

return data


async def make_pipe() -> Tuple[PipeSendStream, PipeReceiveStream]:
"""Makes a new pair of pipes."""
(r, w) = os.pipe()
return PipeSendStream(w), PipeReceiveStream(r)
Empty file.
148 changes: 148 additions & 0 deletions trio/tests/subprocess/test_unix_pipes.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
import errno
import select

import os
import pytest

from trio._core.tests.tutil import gc_collect_harder
from ... import _core
from ...testing import (wait_all_tasks_blocked, check_one_way_stream)

posix = os.name == "posix"

pytestmark = pytest.mark.skipif(
not posix, reason="pipes are only supported on posix"
)

if posix:
from ..._subprocess.unix_pipes import (
PipeSendStream, PipeReceiveStream, make_pipe
)


async def test_send_pipe():
r, w = os.pipe()
send = PipeSendStream(w)
assert send.fileno() == w
await send.send_all(b"123")
assert (os.read(r, 8)) == b"123"

os.close(r)
os.close(w)
send._closed = True


async def test_receive_pipe():
r, w = os.pipe()
recv = PipeReceiveStream(r)
assert (recv.fileno()) == r
os.write(w, b"123")
assert (await recv.receive_some(8)) == b"123"

os.close(r)
os.close(w)
recv._closed = True


async def test_pipes_combined():
write, read = await make_pipe()
count = 2**20

async def sender():
big = bytearray(count)
await write.send_all(big)

async def reader():
await wait_all_tasks_blocked()
received = 0
while received < count:
received += len(await read.receive_some(4096))

assert received == count

async with _core.open_nursery() as n:
n.start_soon(sender)
n.start_soon(reader)

await read.aclose()
await write.aclose()


async def test_send_on_closed_pipe():
write, read = await make_pipe()
await write.aclose()

with pytest.raises(_core.ClosedResourceError):
await write.send_all(b"123")

await read.aclose()


async def test_pipe_errors():
with pytest.raises(TypeError):
PipeReceiveStream(None)

with pytest.raises(ValueError):
await PipeReceiveStream(0).receive_some(0)


async def test_del():
w, r = await make_pipe()
f1, f2 = w.fileno(), r.fileno()
del w, r
gc_collect_harder()

with pytest.raises(OSError) as excinfo:
os.close(f1)
assert excinfo.value.errno == errno.EBADF

with pytest.raises(OSError) as excinfo:
os.close(f2)
assert excinfo.value.errno == errno.EBADF


async def test_async_with():
w, r = await make_pipe()
async with w, r:
pass

assert w._closed
assert r._closed

with pytest.raises(OSError) as excinfo:
os.close(w.fileno())
assert excinfo.value.errno == errno.EBADF

with pytest.raises(OSError) as excinfo:
os.close(r.fileno())
assert excinfo.value.errno == errno.EBADF


async def make_clogged_pipe():
s, r = await make_pipe()
try:
while True:
# We want to totally fill up the pipe buffer.
# This requires working around a weird feature that POSIX pipes
# have.
# If you do a write of <= PIPE_BUF bytes, then it's guaranteed
# to either complete entirely, or not at all. So if we tried to
# write PIPE_BUF bytes, and the buffer's free space is only
# PIPE_BUF/2, then the write will raise BlockingIOError... even
# though a smaller write could still succeed! To avoid this,
# make sure to write >PIPE_BUF bytes each time, which disables
# the special behavior.
# For details, search for PIPE_BUF here:
# http://pubs.opengroup.org/onlinepubs/9699919799/functions/write.html

# for the getattr:
# https://bitbucket.org/pypy/pypy/issues/2876/selectpipe_buf-is-missing-on-pypy3
buf_size = getattr(select, "PIPE_BUF", 8192)
os.write(s.fileno(), b"x" * buf_size * 2)
except BlockingIOError:
pass
return s, r


async def test_pipe_fully():
await check_one_way_stream(make_pipe, make_clogged_pipe)

0 comments on commit fbb4543

Please sign in to comment.