Skip to content

Inline 'addNew[...]' methods to MqttClient to reduce Repository interface #35

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Aug 19, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 0 additions & 52 deletions src/Contracts/Repository.php
Original file line number Diff line number Diff line change
Expand Up @@ -58,17 +58,6 @@ public function countTopicSubscriptions(): int;
*/
public function addTopicSubscription(TopicSubscription $subscription): void;

/**
* Adds a new topic subscription with the given settings to the repository.
*
* @param string $topic
* @param callable $callback
* @param int $messageId
* @param int $qualityOfService
* @return TopicSubscription
*/
public function addNewTopicSubscription(string $topic, callable $callback, int $messageId, int $qualityOfService): TopicSubscription;

/**
* Get all topic subscriptions with the given message identifier.
*
Expand Down Expand Up @@ -118,26 +107,6 @@ public function countPendingPublishMessages(): int;
*/
public function addPendingPublishedMessage(PublishedMessage $message): void;

/**
* Adds a new pending published message with the given settings to the repository.
*
* @param int $messageId
* @param string $topic
* @param string $message
* @param int $qualityOfService
* @param bool $retain
* @param DateTime|null $sentAt
* @return PublishedMessage
*/
public function addNewPendingPublishedMessage(
int $messageId,
string $topic,
string $message,
int $qualityOfService,
bool $retain,
DateTime $sentAt = null
): PublishedMessage;

/**
* Gets a pending published message with the given message identifier, if found.
*
Expand Down Expand Up @@ -189,16 +158,6 @@ public function countPendingUnsubscribeRequests(): int;
*/
public function addPendingUnsubscribeRequest(UnsubscribeRequest $request): void;

/**
* Adds a new pending unsubscribe request with the given settings to the repository.
*
* @param int $messageId
* @param string $topic
* @param DateTime|null $sentAt
* @return UnsubscribeRequest
*/
public function addNewPendingUnsubscribeRequest(int $messageId, string $topic, DateTime $sentAt = null): UnsubscribeRequest;

/**
* Gets a pending unsubscribe request with the given message identifier, if found.
*
Expand Down Expand Up @@ -241,17 +200,6 @@ public function countPendingPublishConfirmations(): int;
*/
public function addPendingPublishConfirmation(PublishedMessage $message): void;

/**
* Adds a new pending publish confirmation with the given settings to the repository.
*
* @param int $messageId
* @param string $topic
* @param string $message
* @return PublishedMessage
* @throws PendingPublishConfirmationAlreadyExistsException
*/
public function addNewPendingPublishConfirmation(int $messageId, string $topic, string $message): PublishedMessage;

/**
* Gets a pending publish confirmation with the given message identifier, if found.
*
Expand Down
17 changes: 12 additions & 5 deletions src/MqttClient.php
Original file line number Diff line number Diff line change
Expand Up @@ -537,7 +537,9 @@ public function publish(string $topic, string $message, int $qualityOfService =

if ($qualityOfService > self::QOS_AT_MOST_ONCE) {
$messageId = $this->repository->newMessageId();
$this->repository->addNewPendingPublishedMessage($messageId, $topic, $message, $qualityOfService, $retain);

$pendingMessage = new PublishedMessage($messageId, $topic, $message, $qualityOfService, $retain);
$this->repository->addPendingPublishedMessage($pendingMessage);
}

$this->publishMessage($topic, $message, $qualityOfService, $retain, $messageId);
Expand Down Expand Up @@ -629,7 +631,8 @@ public function subscribe(string $topic, callable $callback, int $qualityOfServi
'qos' => $qualityOfService,
]);

$this->repository->addNewTopicSubscription($topic, $callback, $messageId, $qualityOfService);
$pendingMessage = new TopicSubscription($topic, $callback, $messageId, $qualityOfService);
$this->repository->addTopicSubscription($pendingMessage);

$this->writeToSocket($data);
}
Expand Down Expand Up @@ -659,7 +662,8 @@ public function unsubscribe(string $topic): void
'topic' => $topic,
]);

$this->repository->addNewPendingUnsubscribeRequest($messageId, $topic);
$pendingMessage = new UnsubscribeRequest($messageId, $topic);
$this->repository->addPendingUnsubscribeRequest($pendingMessage);

$this->writeToSocket($data);
}
Expand Down Expand Up @@ -817,11 +821,14 @@ protected function handleMessage(Message $message): void
if ($message->getQualityOfService() === self::QOS_EXACTLY_ONCE) {
try {
$this->sendPublishReceived($message->getMessageId());
$this->repository->addNewPendingPublishConfirmation(
$pendingMessage = new PublishedMessage(
$message->getMessageId(),
$message->getTopic(),
$message->getContent()
$message->getContent(),
2,
false
);
$this->repository->addPendingPublishConfirmation($pendingMessage);
} catch (PendingPublishConfirmationAlreadyExistsException $e) {
// We already received and processed this message, therefore we do not respond
// with a receipt a second time and wait for the release instead.
Expand Down
80 changes: 0 additions & 80 deletions src/Repositories/BaseRepository.php
Original file line number Diff line number Diff line change
Expand Up @@ -25,24 +25,6 @@ abstract class BaseRepository
*/
abstract public function addTopicSubscription(TopicSubscription $subscription): void;

/**
* Adds a topic subscription to the repository.
*
* @param string $topic
* @param callable $callback
* @param int $messageId
* @param int $qualityOfService
* @return TopicSubscription
*/
public function addNewTopicSubscription(string $topic, callable $callback, int $messageId, int $qualityOfService): TopicSubscription
{
$subscription = new TopicSubscription($topic, $callback, $messageId, $qualityOfService);

$this->addTopicSubscription($subscription);

return $subscription;
}

/**
* Adds a pending published message to the repository.
*
Expand All @@ -51,33 +33,6 @@ public function addNewTopicSubscription(string $topic, callable $callback, int $
*/
abstract public function addPendingPublishedMessage(PublishedMessage $message): void;

/**
* Adds a new pending published message with the given settings to the repository.
*
* @param int $messageId
* @param string $topic
* @param string $message
* @param int $qualityOfService
* @param bool $retain
* @param DateTime|null $sentAt
* @return PublishedMessage
*/
public function addNewPendingPublishedMessage(
int $messageId,
string $topic,
string $message,
int $qualityOfService,
bool $retain,
DateTime $sentAt = null
): PublishedMessage
{
$message = new PublishedMessage($messageId, $topic, $message, $qualityOfService, $retain, $sentAt);

$this->addPendingPublishedMessage($message);

return $message;
}

/**
* Adds a pending unsubscribe request to the repository.
*
Expand All @@ -86,23 +41,6 @@ public function addNewPendingPublishedMessage(
*/
abstract public function addPendingUnsubscribeRequest(UnsubscribeRequest $request): void;

/**
* Adds a new pending unsubscribe request with the given settings to the repository.
*
* @param int $messageId
* @param string $topic
* @param DateTime|null $sentAt
* @return UnsubscribeRequest
*/
public function addNewPendingUnsubscribeRequest(int $messageId, string $topic, DateTime $sentAt = null): UnsubscribeRequest
{
$request = new UnsubscribeRequest($messageId, $topic, $sentAt);

$this->addPendingUnsubscribeRequest($request);

return $request;
}

/**
* Adds a pending publish confirmation to the repository.
*
Expand All @@ -111,22 +49,4 @@ public function addNewPendingUnsubscribeRequest(int $messageId, string $topic, D
* @throws PendingPublishConfirmationAlreadyExistsException
*/
abstract public function addPendingPublishConfirmation(PublishedMessage $message): void;

/**
* Adds a new pending publish confirmation with the given settings to the repository.
*
* @param int $messageId
* @param string $topic
* @param string $message
* @return PublishedMessage
* @throws PendingPublishConfirmationAlreadyExistsException
*/
public function addNewPendingPublishConfirmation(int $messageId, string $topic, string $message): PublishedMessage
{
$message = new PublishedMessage($messageId, $topic, $message, 2, false);

$this->addPendingPublishConfirmation($message);

return $message;
}
}