From db45c4d4052954296c78d4a89187fdbf7abb5ad8 Mon Sep 17 00:00:00 2001 From: Aaron Piotrowski Date: Thu, 28 Dec 2023 15:12:12 -0600 Subject: [PATCH] Do not return Future from provideFor() --- examples/test-server.php | 2 +- src/ClusterWatcher.php | 5 +--- src/ServerSocketPipeProvider.php | 45 +++++++++++++------------------- 3 files changed, 20 insertions(+), 32 deletions(-) diff --git a/examples/test-server.php b/examples/test-server.php index 666e0c1..cb7f480 100644 --- a/examples/test-server.php +++ b/examples/test-server.php @@ -31,7 +31,7 @@ $socket = $ipcHub->accept($key); echo "Providing servers for process\n"; - $provider->provideFor($socket); + async($provider->provideFor(...), $socket); $processes[] = $process; } diff --git a/src/ClusterWatcher.php b/src/ClusterWatcher.php index b4ea65e..14398d9 100644 --- a/src/ClusterWatcher.php +++ b/src/ClusterWatcher.php @@ -204,7 +204,7 @@ private function startWorker(int $id): ContextClusterWorker $deferredCancellation, $id, ): void { - $futures = [$this->provider->provideFor($socket)]; + async($this->provider->provideFor(...), $socket, $deferredCancellation->getCancellation())->ignore(); try { try { @@ -229,9 +229,6 @@ private function startWorker(int $id): ContextClusterWorker $context->close(); } - // Wait for the STDIO streams to be consumed and closed. - Future\await($futures); - if ($this->running) { $this->workers[$id] = $this->startWorker($this->nextId++); } diff --git a/src/ServerSocketPipeProvider.php b/src/ServerSocketPipeProvider.php index 70d7198..ed3c3e4 100644 --- a/src/ServerSocketPipeProvider.php +++ b/src/ServerSocketPipeProvider.php @@ -7,11 +7,9 @@ use Amp\ByteStream\StreamChannel; use Amp\ByteStream\WritableBuffer; use Amp\Cancellation; -use Amp\CancelledException; use Amp\Cluster\Internal\StreamResourceSendPipe; use Amp\ForbidCloning; use Amp\ForbidSerialization; -use Amp\Future; use Amp\Serialization\NativeSerializer; use Amp\Serialization\Serializer; use Amp\Socket\BindContext; @@ -20,7 +18,6 @@ use Amp\Sync\Channel; use Amp\Sync\ChannelException; use const Amp\Process\IS_WINDOWS; -use function Amp\async; final class ServerSocketPipeProvider { @@ -43,44 +40,38 @@ public function __construct(BindContext $bindContext = new BindContext()) } /** - * @return Future - * * @throws SocketException */ - public function provideFor(ReadableStream&ResourceStream $stream, ?Cancellation $cancellation = null): Future + public function provideFor(ReadableStream&ResourceStream $stream, ?Cancellation $cancellation = null): void { /** @var Channel $channel */ $channel = new StreamChannel($stream, new WritableBuffer(), $this->serializer); /** @var StreamResourceSendPipe $pipe */ $pipe = new StreamResourceSendPipe($stream, $this->serializer); - $servers = &$this->servers; - $bindContext = $this->bindContext; - return async(static function () use (&$servers, $channel, $pipe, $bindContext, $cancellation): void { - try { - while ($address = $channel->receive($cancellation)) { - /** @psalm-suppress DocblockTypeContradiction Extra manual check to enforce docblock types. */ - if (!$address instanceof SocketAddress) { - throw new \ValueError(\sprintf( + try { + while ($address = $channel->receive($cancellation)) { + /** @psalm-suppress DocblockTypeContradiction Extra manual check to enforce docblock types. */ + if (!$address instanceof SocketAddress) { + throw new \ValueError( + \sprintf( 'Expected only instances of %s on channel; do not use the given socket outside %s', SocketAddress::class, self::class, - )); - } + ) + ); + } - $uri = (string) $address; - $server = $servers[$uri] ??= self::bind($uri, $bindContext); + $uri = (string) $address; + $server = $this->servers[$uri] ??= self::bind($uri, $this->bindContext); - $pipe->send($server, $address); - } - } catch (CancelledException) { - // Providing cancelled by $cancellation. - } catch (ChannelException) { - // Sending context closed the channel abruptly. - } finally { - $pipe->close(); + $pipe->send($server, $address); } - }); + } catch (ChannelException $exception) { + throw new SocketException('Provider channel closed: ' . $exception->getMessage(), previous: $exception); + } finally { + $pipe->close(); + } } /**