Skip to content

Commit

Permalink
Swap around resource types
Browse files Browse the repository at this point in the history
  • Loading branch information
trowski committed Dec 24, 2023
1 parent 8873f02 commit f896a52
Show file tree
Hide file tree
Showing 11 changed files with 79 additions and 49 deletions.
5 changes: 3 additions & 2 deletions examples/test-client.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
use Amp\Parallel\Ipc;
use Amp\SignalCancellation;
use Amp\Socket\InternetAddress;
use Amp\Socket\ResourceSocket;
use Amp\Socket\SocketAddress;
use Amp\Sync\Channel;

Expand All @@ -23,8 +24,8 @@
printf("Received %s from %s\n", base64_encode($key), $uri);

$socket = Ipc\connect($uri, $key);
if (!$socket instanceof ResourceStream) {
throw new \Error("Expected instance of " . ResourceStream::class);
if (!$socket instanceof ResourceSocket) {
throw new \TypeError("Expected instance of " . ResourceStream::class);
}

$clusterServerFactory = new ClusterServerSocketFactory($socket);
Expand Down
6 changes: 2 additions & 4 deletions src/ClientSocketTransferPipe.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

namespace Amp\Cluster;

use Amp\ByteStream\ResourceStream;
use Amp\Cancellation;
use Amp\Closable;
use Amp\ForbidCloning;
Expand All @@ -11,7 +10,6 @@
use Amp\Serialization\SerializationException;
use Amp\Serialization\Serializer;
use Amp\Socket\ResourceSocket;
use Amp\Socket\Socket;
use Amp\Socket\SocketException;

/**
Expand All @@ -30,7 +28,7 @@ final class ClientSocketTransferPipe implements Closable
private readonly StreamResourceSendPipe $send;

public function __construct(
Socket&ResourceStream $socket,
ResourceSocket $socket,
Serializer $serializer = new NativeSerializer(),
) {
$this->receive = new StreamResourceReceivePipe($socket, $serializer);
Expand Down Expand Up @@ -66,7 +64,7 @@ public function receive(
* @throws SerializationException
* @throws SocketException
*/
public function send(Socket&ResourceStream $socket, mixed $data = null): void
public function send(ResourceSocket $socket, mixed $data = null): void
{
$resource = $socket->getResource();
if (!\is_resource($resource)) {
Expand Down
5 changes: 2 additions & 3 deletions src/Cluster.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

namespace Amp\Cluster;

use Amp\ByteStream\ResourceStream;
use Amp\Cancellation;
use Amp\Cluster\Internal\ClusterLogHandler;
use Amp\Cluster\Internal\ClusterMessage;
Expand All @@ -14,8 +13,8 @@
use Amp\Pipeline\ConcurrentIterator;
use Amp\Pipeline\Queue;
use Amp\Socket\ResourceServerSocketFactory;
use Amp\Socket\ResourceSocket;
use Amp\Socket\ServerSocketFactory;
use Amp\Socket\Socket;
use Amp\Sync\Channel;
use Amp\Sync\ChannelException;
use Monolog\Handler\HandlerInterface as MonologHandler;
Expand Down Expand Up @@ -123,7 +122,7 @@ public static function awaitTermination(?Cancellation $cancellation = null): voi
/**
* @param positive-int $contextId
*/
private static function run(int $contextId, Channel $channel, Socket&ResourceStream $transferSocket): void
private static function run(int $contextId, Channel $channel, ResourceSocket $transferSocket): void
{
self::$cluster = new self($contextId, $channel, new ClusterServerSocketFactory($transferSocket));
self::$cluster->loop();
Expand Down
5 changes: 2 additions & 3 deletions src/ClusterServerSocketFactory.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,16 @@

namespace Amp\Cluster;

use Amp\ByteStream\ResourceStream;
use Amp\ByteStream\StreamChannel;
use Amp\ForbidCloning;
use Amp\ForbidSerialization;
use Amp\Serialization\NativeSerializer;
use Amp\Serialization\SerializationException;
use Amp\Socket\BindContext;
use Amp\Socket\ResourceServerSocket;
use Amp\Socket\ResourceSocket;
use Amp\Socket\ServerSocket;
use Amp\Socket\ServerSocketFactory;
use Amp\Socket\Socket;
use Amp\Socket\SocketAddress;
use Amp\Socket\SocketException;
use Amp\Sync\Channel;
Expand All @@ -30,7 +29,7 @@ final class ClusterServerSocketFactory implements ServerSocketFactory
/** @var StreamResourceReceivePipe<null> */
private readonly StreamResourceReceivePipe $pipe;

public function __construct(Socket&ResourceStream $socket)
public function __construct(ResourceSocket $socket)
{
$serializer = new NativeSerializer();
$this->channel = new StreamChannel($socket, $socket, $serializer);
Expand Down
5 changes: 2 additions & 3 deletions src/ClusterServerSocketProvider.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

namespace Amp\Cluster;

use Amp\ByteStream\ResourceStream;
use Amp\ByteStream\StreamChannel;
use Amp\Cancellation;
use Amp\CancelledException;
Expand All @@ -12,7 +11,7 @@
use Amp\Serialization\NativeSerializer;
use Amp\Serialization\Serializer;
use Amp\Socket\BindContext;
use Amp\Socket\Socket;
use Amp\Socket\ResourceSocket;
use Amp\Socket\SocketAddress;
use Amp\Socket\SocketException;
use Amp\Sync\Channel;
Expand Down Expand Up @@ -45,7 +44,7 @@ public function __construct(BindContext $bindContext = new BindContext())
*
* @throws SocketException
*/
public function provideFor(Socket&ResourceStream $socket, ?Cancellation $cancellation = null): Future
public function provideFor(ResourceSocket $socket, ?Cancellation $cancellation = null): Future
{
/** @var Channel<SocketAddress|null, never> $channel */
$channel = new StreamChannel($socket, $socket, $this->serializer);
Expand Down
43 changes: 39 additions & 4 deletions src/Internal/TransferSocket.php
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,16 @@
namespace Amp\Cluster\Internal;

use Amp\ByteStream\ResourceStream;
use Amp\Closable;
use Amp\Cluster\TransferredResource;
use Amp\DeferredFuture;
use Amp\ForbidCloning;
use Amp\ForbidSerialization;
use Amp\Socket\Socket;
use Amp\Socket\SocketException;
use Socket as SocketResource;

/** @internal */
final class TransferSocket
final class TransferSocket implements Closable
{
use ForbidCloning;
use ForbidSerialization;
Expand All @@ -20,7 +21,9 @@ final class TransferSocket

private readonly \Closure $errorHandler;

public function __construct(Socket&ResourceStream $socket)
private readonly DeferredFuture $onClose;

public function __construct(ResourceStream $socket)
{
if (!\extension_loaded('sockets')) {
throw new \Error('ext-sockets is required for ' . self::class);
Expand All @@ -37,7 +40,39 @@ public function __construct(Socket&ResourceStream $socket)
}

$this->socket = $socketResource;
$this->errorHandler = static fn () => true;
$this->errorHandler = $errorHandler = static fn () => true;
$this->onClose = new DeferredFuture();

$this->onClose(static function () use ($socketResource, $errorHandler): void {
\set_error_handler($errorHandler);
try {
\socket_close($socketResource);
} finally {
\restore_error_handler();
}
});
}

public function __destruct()
{
$this->close();
}

public function close(): void
{
if (!$this->onClose->isComplete()) {
$this->onClose->complete();
}
}

public function isClosed(): bool
{
return $this->onClose->isComplete();
}

public function onClose(\Closure $onClose): void
{
$this->onClose->getFuture()->finally($onClose);
}

/**
Expand Down
6 changes: 3 additions & 3 deletions src/Internal/cluster-runner.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,11 @@

namespace Amp\Cluster\Internal;

use Amp\ByteStream\ResourceStream;
use Amp\Cluster\Cluster;
use Amp\Cluster\Watcher;
use Amp\Future;
use Amp\Parallel\Ipc;
use Amp\Socket\ResourceSocket;
use Amp\Sync\Channel;
use Amp\TimeoutCancellation;
use function Amp\async;
Expand Down Expand Up @@ -44,8 +44,8 @@
}

try {
if (!$transferSocket instanceof ResourceStream) {
throw new \TypeError('Socket connector must return an instance of ' . ResourceStream::class
if (!$transferSocket instanceof ResourceSocket) {
throw new \TypeError('Socket connector must return an instance of ' . ResourceSocket::class
. ' in order to be used to transfer other sockets');
}

Expand Down
23 changes: 12 additions & 11 deletions src/StreamResourceReceivePipe.php
Original file line number Diff line number Diff line change
Expand Up @@ -24,20 +24,22 @@ final class StreamResourceReceivePipe implements Closable
use ForbidCloning;
use ForbidSerialization;

private readonly Internal\TransferSocket $transferSocket;

/** @var Suspension<(\Closure():never)|null>|null */
private ?Suspension $waiting = null;

/** @var \SplQueue<TransferredResource<string>> */
private readonly \SplQueue $receiveQueue;

public function __construct(
private readonly Socket&ResourceStream $socket,
ResourceStream $resourceStream,
private readonly Serializer $serializer,
) {
$transferSocket = new Internal\TransferSocket($socket);
$this->transferSocket = $transferSocket = new Internal\TransferSocket($resourceStream);
$this->receiveQueue = $receiveQueue = new \SplQueue();

$streamResource = $socket->getResource();
$streamResource = $resourceStream->getResource();
if (!\is_resource($streamResource)) {
throw new SocketException('The provided socket has already been closed');
}
Expand All @@ -48,12 +50,11 @@ public function __construct(
static function (string $callbackId, $stream) use (
&$suspension,
$transferSocket,
$socket,
$receiveQueue,
): void {
try {
if (\feof($stream)) {
$socket->close();
$transferSocket->close();
$suspension?->resume(static fn () => throw new SocketException(
'The transfer socket closed while waiting to receive a socket',
));
Expand All @@ -67,7 +68,7 @@ static function (string $callbackId, $stream) use (
$suspension?->resume();
}
} catch (\Throwable $exception) {
$socket->close();
$transferSocket->close();
$suspension?->resume(static fn () => throw new SocketException(
'The transfer socket threw an exception: ' . $exception->getMessage(),
previous: $exception,
Expand All @@ -78,7 +79,7 @@ static function (string $callbackId, $stream) use (
},
);

$this->socket->onClose(static function () use (&$suspension, $onReadable): void {
$this->transferSocket->onClose(static function () use (&$suspension, $onReadable): void {
EventLoop::cancel($onReadable);
$suspension?->resume(static fn () => throw new SocketException('The transfer socket closed unexpectedly'));
});
Expand All @@ -91,17 +92,17 @@ public function __destruct()

public function close(): void
{
$this->socket->close();
$this->transferSocket->close();
}

public function isClosed(): bool
{
return $this->socket->isClosed();
return $this->transferSocket->isClosed();
}

public function onClose(\Closure $onClose): void
{
$this->socket->onClose($onClose);
$this->transferSocket->onClose($onClose);
}

/**
Expand All @@ -117,7 +118,7 @@ public function receive(?Cancellation $cancellation = null): ?TransferredResourc
throw new PendingReadError();
}

if ($this->socket->isClosed()) {
if ($this->transferSocket->isClosed()) {
throw new SocketException('The transfer socket has been closed');
}

Expand Down
18 changes: 8 additions & 10 deletions src/StreamResourceSendPipe.php
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
use Amp\ForbidSerialization;
use Amp\Serialization\SerializationException;
use Amp\Serialization\Serializer;
use Amp\Socket\Socket;
use Amp\Socket\SocketException;
use Revolt\EventLoop;
use Revolt\EventLoop\Suspension;
Expand All @@ -29,13 +28,13 @@ final class StreamResourceSendPipe implements Closable
private readonly string $onWritable;

public function __construct(
private readonly Socket&ResourceStream $socket,
ResourceStream $resourceStream,
private readonly Serializer $serializer,
) {
$this->transferSocket = $transferSocket = new Internal\TransferSocket($socket);
$this->transferSocket = $transferSocket = new Internal\TransferSocket($resourceStream);
$this->transferQueue = $transferQueue = new \SplQueue();

$streamResource = $socket->getResource();
$streamResource = $resourceStream->getResource();
if (!\is_resource($streamResource)) {
throw new SocketException('The provided socket has already been closed');
}
Expand All @@ -44,11 +43,10 @@ public function __construct(
$streamResource,
static function (string $callbackId, $stream) use (
$transferSocket,
$socket,
$transferQueue,
): void {
if (\feof($stream)) {
$socket->close();
$transferSocket->close();
return;
}

Expand Down Expand Up @@ -77,7 +75,7 @@ static function (string $callbackId, $stream) use (
},
));

$this->socket->onClose(static function () use ($transferQueue, $onWritable): void {
$this->transferSocket->onClose(static function () use ($transferQueue, $onWritable): void {
EventLoop::cancel($onWritable);

while (!$transferQueue->isEmpty()) {
Expand All @@ -97,17 +95,17 @@ public function __destruct()

public function close(): void
{
$this->socket->close();
$this->transferSocket->close();
}

public function isClosed(): bool
{
return $this->socket->isClosed();
return $this->transferSocket->isClosed();
}

public function onClose(\Closure $onClose): void
{
$this->socket->onClose($onClose);
$this->transferSocket->onClose($onClose);
}

/**
Expand Down
Loading

0 comments on commit f896a52

Please sign in to comment.