Skip to content

Commit

Permalink
Tests and bugfix for the case where the dispatcher has finished befor…
Browse files Browse the repository at this point in the history
…e the worker started
  • Loading branch information
mnapoli committed Sep 25, 2013
1 parent 3ae580d commit 6804e5b
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 23 deletions.
10 changes: 7 additions & 3 deletions src/MyCLabs/Work/Dispatcher/RabbitMQWorkDispatcher.php
Original file line number Diff line number Diff line change
Expand Up @@ -58,12 +58,16 @@ public function runBackground(
$replyExchange = null;
$replyQueue = null;
if ($waitForResult) {
// Create a temporary exchange and queue for communicating with the worker
// Create a temporary exchange (durable, autodelete) for communicating with the worker
$replyExchange = uniqid('tmp');
$this->channel->exchange_declare($replyExchange, 'fanout');
$this->channel->exchange_declare($replyExchange, 'fanout', false, true, true);
// Create and bind a queue for the dispatcher (our queue) (exclusive queue)
list($replyQueue, ,) = $this->channel->queue_declare('', false, false, true);
$this->channel->queue_bind($replyQueue, $replyExchange);
$messageOptions['reply_to'] = $replyExchange;
// Create and bind a queue for the worker (durable non-exclusive queue)
list($workerReplyQueue, ,) = $this->channel->queue_declare('', false, true, false);
$this->channel->queue_bind($workerReplyQueue, $replyExchange);
$messageOptions['reply_to'] = $replyExchange . ';' . $workerReplyQueue;
}

$message = new AMQPMessage(serialize($task), $messageOptions);
Expand Down
25 changes: 7 additions & 18 deletions src/MyCLabs/Work/Worker/RabbitMQWorker.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
use Exception;
use MyCLabs\Work\Task\Task;
use PhpAmqpLib\Channel\AMQPChannel;
use PhpAmqpLib\Connection\AMQPConnection;
use PhpAmqpLib\Exception\AMQPProtocolChannelException;
use PhpAmqpLib\Message\AMQPMessage;

/**
Expand Down Expand Up @@ -66,13 +68,11 @@ private function taskHandler(AMQPMessage $message)
/** @var AMQPChannel $channel */
$channel = $message->delivery_info['channel'];

// Listen to the "reply_to" exchange
// Listen to the "reply_to" queue
$replyExchange = null;
$replyQueue = null;
if ($message->has('reply_to')) {
$replyExchange = $message->get('reply_to');
list($replyQueue, ,) = $this->channel->queue_declare('', false, false, true);
$this->channel->queue_bind($replyQueue, $replyExchange);
list($replyExchange, $replyQueue) = explode(';', $message->get('reply_to'));
}

/** @var Task $task */
Expand Down Expand Up @@ -125,8 +125,6 @@ private function taskHandler(AMQPMessage $message)
*/
private function notifyDispatcher($exchange, $queue, $messageContent)
{
$dispatcherNotified = false;

// We put in the queue that we finished
$this->channel->basic_publish(new AMQPMessage($messageContent), $exchange);

Expand All @@ -138,20 +136,11 @@ private function notifyDispatcher($exchange, $queue, $messageContent)
return false;
}

// If the first message of the queue is a "timeout" message from the emitter
if ($message->body == 'timeout') {
// Delete the temporary exchange
$this->channel->exchange_delete($exchange);
$dispatcherNotified = false;
}

// If the first message of the queue is our message, we can die in peace
if ($message->body == $messageContent) {
// Do not delete the temp exchange: still used by the app
$dispatcherNotified = true;
}
// (else it would be the "timeout" message from the dispatcher)
$dispatcherNotified = ($message->body == $messageContent);

// Delete the temporary queue
// Delete our queue
$this->channel->queue_delete($queue);

return $dispatcherNotified;
Expand Down
28 changes: 26 additions & 2 deletions tests/FunctionalTest/MyCLabs/Work/RabbitMQ/RabbitMQTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -274,10 +274,34 @@ public function testWorkWithWaitDispatcherTimeout()
->method('onTaskException');
$worker->addEventListener($listener);

// Execute 1 task
$worker->work(1);
}

/**
* Test when the worker start after the dispatcher has emitted 1 task and timeouted.
* The problem met was that the exchange didn't exist anymore.
*/
public function testWorkerStartAfterDispatcherTimeout()
{
$worker = new RabbitMQWorker($this->channel, $this->queue);
$taskExecutor = $this->getMockForAbstractClass('MyCLabs\Work\TaskExecutor\TaskExecutor');
$worker->registerTaskExecutor('FunctionalTest\MyCLabs\Work\RabbitMQ\FakeTask', $taskExecutor);

// Run the task dispatcher and wait for it to timeout and finish
$file = __DIR__ . '/dispatch-task.php';
// The worker waits for 1ms
$wait = 0.001;
$return = exec("php $file {$this->queue} $wait");
// Check that the log is empty (no error)
$this->assertStringEqualsFile($log, '');
$this->assertEquals('', $return);

$listener = $this->getMock('MyCLabs\Work\EventListener');
$listener->expects($this->once())
->method('onTaskSuccess')
// Check that $dispatcherNotified = false
->with($this->anything(), false);
$worker->addEventListener($listener);

$worker->work(1);
}
}

0 comments on commit 6804e5b

Please sign in to comment.