Skip to content

Commit

Permalink
Do not return Future from provideFor()
Browse files Browse the repository at this point in the history
  • Loading branch information
trowski committed Dec 28, 2023
1 parent df06ba0 commit db45c4d
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 32 deletions.
2 changes: 1 addition & 1 deletion examples/test-server.php
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
$socket = $ipcHub->accept($key);

echo "Providing servers for process\n";
$provider->provideFor($socket);
async($provider->provideFor(...), $socket);

$processes[] = $process;
}
Expand Down
5 changes: 1 addition & 4 deletions src/ClusterWatcher.php
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ private function startWorker(int $id): ContextClusterWorker
$deferredCancellation,
$id,
): void {
$futures = [$this->provider->provideFor($socket)];
async($this->provider->provideFor(...), $socket, $deferredCancellation->getCancellation())->ignore();

try {
try {
Expand All @@ -229,9 +229,6 @@ private function startWorker(int $id): ContextClusterWorker
$context->close();
}

// Wait for the STDIO streams to be consumed and closed.
Future\await($futures);

if ($this->running) {
$this->workers[$id] = $this->startWorker($this->nextId++);
}
Expand Down
45 changes: 18 additions & 27 deletions src/ServerSocketPipeProvider.php
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,9 @@
use Amp\ByteStream\StreamChannel;
use Amp\ByteStream\WritableBuffer;
use Amp\Cancellation;
use Amp\CancelledException;
use Amp\Cluster\Internal\StreamResourceSendPipe;
use Amp\ForbidCloning;
use Amp\ForbidSerialization;
use Amp\Future;
use Amp\Serialization\NativeSerializer;
use Amp\Serialization\Serializer;
use Amp\Socket\BindContext;
Expand All @@ -20,7 +18,6 @@
use Amp\Sync\Channel;
use Amp\Sync\ChannelException;
use const Amp\Process\IS_WINDOWS;
use function Amp\async;

final class ServerSocketPipeProvider
{
Expand All @@ -43,44 +40,38 @@ public function __construct(BindContext $bindContext = new BindContext())
}

/**
* @return Future<void>
*
* @throws SocketException
*/
public function provideFor(ReadableStream&ResourceStream $stream, ?Cancellation $cancellation = null): Future
public function provideFor(ReadableStream&ResourceStream $stream, ?Cancellation $cancellation = null): void
{
/** @var Channel<SocketAddress|null, never> $channel */
$channel = new StreamChannel($stream, new WritableBuffer(), $this->serializer);
/** @var StreamResourceSendPipe<SocketAddress> $pipe */
$pipe = new StreamResourceSendPipe($stream, $this->serializer);

$servers = &$this->servers;
$bindContext = $this->bindContext;
return async(static function () use (&$servers, $channel, $pipe, $bindContext, $cancellation): void {
try {
while ($address = $channel->receive($cancellation)) {
/** @psalm-suppress DocblockTypeContradiction Extra manual check to enforce docblock types. */
if (!$address instanceof SocketAddress) {
throw new \ValueError(\sprintf(
try {
while ($address = $channel->receive($cancellation)) {
/** @psalm-suppress DocblockTypeContradiction Extra manual check to enforce docblock types. */
if (!$address instanceof SocketAddress) {
throw new \ValueError(
\sprintf(
'Expected only instances of %s on channel; do not use the given socket outside %s',
SocketAddress::class,
self::class,
));
}
)
);
}

$uri = (string) $address;
$server = $servers[$uri] ??= self::bind($uri, $bindContext);
$uri = (string) $address;
$server = $this->servers[$uri] ??= self::bind($uri, $this->bindContext);

$pipe->send($server, $address);
}
} catch (CancelledException) {
// Providing cancelled by $cancellation.
} catch (ChannelException) {
// Sending context closed the channel abruptly.
} finally {
$pipe->close();
$pipe->send($server, $address);
}
});
} catch (ChannelException $exception) {
throw new SocketException('Provider channel closed: ' . $exception->getMessage(), previous: $exception);
} finally {
$pipe->close();
}
}

/**
Expand Down

0 comments on commit db45c4d

Please sign in to comment.