From 5e3919f486a3d077883acbce742d991b07371b0a Mon Sep 17 00:00:00 2001 From: Fabian Gruber Date: Wed, 12 Feb 2025 10:06:33 +0100 Subject: [PATCH] feat(publisher): add ping method to publisher --- src/Queue/Broker/AMQP.php | 13 +++++++++---- src/Queue/Broker/Redis.php | 5 +++++ src/Queue/Publisher.php | 7 +++++++ 3 files changed, 21 insertions(+), 4 deletions(-) diff --git a/src/Queue/Broker/AMQP.php b/src/Queue/Broker/AMQP.php index 0bd745e..6613f1c 100644 --- a/src/Queue/Broker/AMQP.php +++ b/src/Queue/Broker/AMQP.php @@ -128,6 +128,11 @@ public function close(): void } } + public function ping(): bool + { + return $this->withChannel(fn (AMQPChannel $channel) => $channel->is_open()); + } + public function enqueue(Queue $queue, array $payload): bool { $payload = [ @@ -172,10 +177,10 @@ public function getQueueSize(Queue $queue, bool $failedJobs = false): int } /** - * @param callable(AMQPChannel $channel): void $callback + * @param callable(AMQPChannel $channel): mixed $callback * @throws \Exception */ - private function withChannel(callable $callback): void + private function withChannel(callable $callback): mixed { $createChannel = function (): AMQPChannel { $connection = new AMQPStreamConnection($this->host, $this->port, $this->user, $this->password, $this->vhost, heartbeat: $this->heartbeat); @@ -194,13 +199,13 @@ private function withChannel(callable $callback): void } try { - $callback($this->channel); + return $callback($this->channel); } catch (\Throwable $th) { // createChannel() might throw, in that case set the channel to `null` first. $this->channel = null; // try creating a new connection once, if this still fails, throw the error $this->channel = $createChannel(); - $callback($this->channel); + return $callback($this->channel); } } } diff --git a/src/Queue/Broker/Redis.php b/src/Queue/Broker/Redis.php index 34ee22e..e255ff4 100644 --- a/src/Queue/Broker/Redis.php +++ b/src/Queue/Broker/Redis.php @@ -94,6 +94,11 @@ public function close(): void $this->closed = true; } + public function ping(): bool + { + return $this->connection->ping(); + } + public function enqueue(Queue $queue, array $payload): bool { $payload = [ diff --git a/src/Queue/Publisher.php b/src/Queue/Publisher.php index da07481..3f9c321 100644 --- a/src/Queue/Publisher.php +++ b/src/Queue/Publisher.php @@ -4,6 +4,13 @@ interface Publisher { + /** + * Checks if the publisher can reach the queue. + * + * @return bool + */ + public function ping(): bool; + /** * Publishes a new message onto the queue. *