Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add return type hints on the public API #107

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 9 additions & 15 deletions src/Bunny/Async/Client.php
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ public function __destruct()
/**
* Initializes instance.
*/
protected function init()
protected function init(): void
{
parent::init();
$this->flushWriteBufferPromise = null;
Expand All @@ -106,7 +106,7 @@ protected function init()
*
* @param float $maxSeconds
*/
public function run($maxSeconds = null)
public function run($maxSeconds = null): void
{
if ($maxSeconds !== null) {
$this->stopTimer = $this->eventLoop->addTimer($maxSeconds, function () {
Expand All @@ -120,7 +120,7 @@ public function run($maxSeconds = null)
/**
* Calls {@link eventLoop}'s stop() method.
*/
public function stop()
public function stop(): void
{
if ($this->stopTimer) {
$this->eventLoop->cancelTimer($this->stopTimer);
Expand All @@ -143,10 +143,8 @@ protected function feedReadBuffer()
*
* - Calls {@link eventLoops}'s addWriteStream() with client's stream.
* - Consecutive calls will return the same instance of promise.
*
* @return Promise\PromiseInterface
*/
protected function flushWriteBuffer()
protected function flushWriteBuffer(): Promise\PromiseInterface
{
if ($this->flushWriteBufferPromise) {
return $this->flushWriteBufferPromise;
Expand Down Expand Up @@ -179,10 +177,8 @@ protected function flushWriteBuffer()
* Connects to AMQP server.
*
* Calling connect() multiple times will result in error.
*
* @return Promise\PromiseInterface
*/
public function connect()
public function connect(): Promise\PromiseInterface
{
if ($this->state !== ClientStateEnum::NOT_CONNECTED) {
return Promise\reject(new ClientException("Client already connected/connecting."));
Expand Down Expand Up @@ -233,9 +229,8 @@ public function connect()
*
* @param int $replyCode
* @param string $replyText
* @return Promise\PromiseInterface
*/
public function disconnect($replyCode = 0, $replyText = "")
public function disconnect($replyCode = 0, $replyText = ""): Promise\PromiseInterface
{
if ($this->state === ClientStateEnum::DISCONNECTING) {
return $this->disconnectPromise;
Expand Down Expand Up @@ -289,15 +284,15 @@ public function disconnect($replyCode = 0, $replyText = "")
*
* @param callable $callback
*/
public function addAwaitCallback(callable $callback)
public function addAwaitCallback(callable $callback): void
{
$this->awaitCallbacks[] = $callback;
}

/**
* {@link eventLoop}'s read stream callback notifying client that data from server arrived.
*/
public function onDataAvailable()
public function onDataAvailable(): void
{
$this->read();

Expand Down Expand Up @@ -327,7 +322,7 @@ public function onDataAvailable()
/**
* Callback when heartbeat timer timed out.
*/
public function onHeartbeat()
public function onHeartbeat(): void
{
$now = microtime(true);
$nextHeartbeat = ($this->lastWrite ?: $now) + $this->options["heartbeat"];
Expand All @@ -345,5 +340,4 @@ public function onHeartbeat()
$this->heartbeatTimer = $this->eventLoop->addTimer($nextHeartbeat - $now, [$this, "onHeartbeat"]);
}
}

}
33 changes: 11 additions & 22 deletions src/Bunny/Channel.php
Original file line number Diff line number Diff line change
Expand Up @@ -111,30 +111,24 @@ public function __construct(AbstractClient $client, $channelId)

/**
* Returns underlying client instance.
*
* @return AbstractClient
*/
public function getClient()
public function getClient(): AbstractClient
{
return $this->client;
}

/**
* Returns channel id.
*
* @return int
*/
public function getChannelId()
public function getChannelId(): int
{
return $this->channelId;
}

/**
* Returns the channel mode.
*
* @return int
*/
public function getMode()
public function getMode(): int
{
return $this->mode;
}
Expand All @@ -143,9 +137,8 @@ public function getMode()
* Listener is called whenever 'basic.return' frame is received with arguments (Message $returnedMessage, MethodBasicReturnFrame $frame)
*
* @param callable $callback
* @return $this
*/
public function addReturnListener(callable $callback)
public function addReturnListener(callable $callback): self
{
$this->removeReturnListener($callback); // remove if previously added to prevent calling multiple times
$this->returnCallbacks[] = $callback;
Expand All @@ -156,9 +149,8 @@ public function addReturnListener(callable $callback)
* Removes registered return listener. If the callback is not registered, this is noop.
*
* @param callable $callback
* @return $this
*/
public function removeReturnListener(callable $callback)
public function removeReturnListener(callable $callback): self
{
foreach ($this->returnCallbacks as $k => $v) {
if ($v === $callback) {
Expand All @@ -173,9 +165,8 @@ public function removeReturnListener(callable $callback)
* Listener is called whenever 'basic.ack' or 'basic.nack' is received.
*
* @param callable $callback
* @return $this
*/
public function addAckListener(callable $callback)
public function addAckListener(callable $callback): self
{
if ($this->mode !== ChannelModeEnum::CONFIRM) {
throw new ChannelException("Ack/nack listener can be added when channel in confirm mode.");
Expand All @@ -190,9 +181,8 @@ public function addAckListener(callable $callback)
* Removes registered ack/nack listener. If the callback is not registered, this is noop.
*
* @param callable $callback
* @return $this
*/
public function removeAckListener(callable $callback)
public function removeAckListener(callable $callback): self
{
if ($this->mode !== ChannelModeEnum::CONFIRM) {
throw new ChannelException("Ack/nack listener can be removed when channel in confirm mode.");
Expand All @@ -214,9 +204,8 @@ public function removeAckListener(callable $callback)
*
* @param int $replyCode
* @param string $replyText
* @return PromiseInterface
*/
public function close($replyCode = 0, $replyText = "")
public function close($replyCode = 0, $replyText = ""): PromiseInterface
{
if ($this->state === ChannelStateEnum::CLOSED) {
throw new ChannelException("Trying to close already closed channel #{$this->channelId}.");
Expand Down Expand Up @@ -283,7 +272,7 @@ public function consume(callable $callback, $queue = "", $consumerTag = "", $noL
* @param bool $nowait
* @param array $arguments
*/
public function run(callable $callback, $queue = "", $consumerTag = "", $noLocal = false, $noAck = false, $exclusive = false, $nowait = false, $arguments = [])
public function run(callable $callback, $queue = "", $consumerTag = "", $noLocal = false, $noAck = false, $exclusive = false, $nowait = false, $arguments = []): void
{
$response = $this->consume($callback, $queue, $consumerTag, $noLocal, $noAck, $exclusive, $nowait, $arguments);

Expand Down Expand Up @@ -558,7 +547,7 @@ private function enterConfirmMode(callable $callback = null)
*
* @param AbstractFrame $frame
*/
public function onFrameReceived(AbstractFrame $frame)
public function onFrameReceived(AbstractFrame $frame): void
{
if ($this->state === ChannelStateEnum::ERROR) {
throw new ChannelException("Channel in error state.");
Expand Down Expand Up @@ -704,7 +693,7 @@ public function onFrameReceived(AbstractFrame $frame)
/**
* Callback after content body has been completely received.
*/
protected function onBodyComplete()
protected function onBodyComplete(): void
{
if ($this->returnFrame) {
$content = $this->bodyBuffer->consume($this->bodyBuffer->getLength());
Expand Down
19 changes: 7 additions & 12 deletions src/Bunny/Client.php
Original file line number Diff line number Diff line change
Expand Up @@ -67,21 +67,17 @@ public function __destruct()

/**
* Reads data from stream to {@link readBuffer}.
*
* @return boolean
*/
protected function feedReadBuffer()
protected function feedReadBuffer(): bool
{
$this->read();
return true;
}

/**
* Writes all data from {@link writeBuffer} to stream.
*
* @return boolean
*/
protected function flushWriteBuffer()
protected function flushWriteBuffer(): bool
{
while (!$this->writeBuffer->isEmpty()) {
$this->write();
Expand All @@ -93,9 +89,8 @@ protected function flushWriteBuffer()
* Synchronously connects to AMQP server.
*
* @throws \Exception
* @return self
*/
public function connect()
public function connect(): self
{
if ($this->state !== ClientStateEnum::NOT_CONNECTED) {
throw new ClientException("Client already connected/connecting.");
Expand Down Expand Up @@ -130,9 +125,9 @@ public function connect()
*
* @param int $replyCode
* @param string $replyText
* @return Promise\PromiseInterface
* @return
*/
public function disconnect($replyCode = 0, $replyText = "")
public function disconnect($replyCode = 0, $replyText = ""): Promise\PromiseInterface
{
if ($this->state === ClientStateEnum::DISCONNECTING) {
return $this->disconnectPromise;
Expand Down Expand Up @@ -169,7 +164,7 @@ public function disconnect($replyCode = 0, $replyText = "")
*
* @param float $maxSeconds
*/
public function run($maxSeconds = null)
public function run($maxSeconds = null): void
{
if (!$this->isConnected()) {
throw new ClientException("Client has to be connected.");
Expand Down Expand Up @@ -263,7 +258,7 @@ public function run($maxSeconds = null)
/**
* Stops client's event loop.
*/
public function stop()
public function stop(): void
{
$this->running = false;
}
Expand Down