Skip to content

Commit c9984c5

Browse files
authored
Inline 'addNew(...)' methods to MqttClient to reduce Repository interface (#35)
1 parent 50004bd commit c9984c5

File tree

3 files changed

+12
-137
lines changed

3 files changed

+12
-137
lines changed

src/Contracts/Repository.php

Lines changed: 0 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -58,17 +58,6 @@ public function countTopicSubscriptions(): int;
5858
*/
5959
public function addTopicSubscription(TopicSubscription $subscription): void;
6060

61-
/**
62-
* Adds a new topic subscription with the given settings to the repository.
63-
*
64-
* @param string $topic
65-
* @param callable $callback
66-
* @param int $messageId
67-
* @param int $qualityOfService
68-
* @return TopicSubscription
69-
*/
70-
public function addNewTopicSubscription(string $topic, callable $callback, int $messageId, int $qualityOfService): TopicSubscription;
71-
7261
/**
7362
* Get all topic subscriptions with the given message identifier.
7463
*
@@ -118,26 +107,6 @@ public function countPendingPublishMessages(): int;
118107
*/
119108
public function addPendingPublishedMessage(PublishedMessage $message): void;
120109

121-
/**
122-
* Adds a new pending published message with the given settings to the repository.
123-
*
124-
* @param int $messageId
125-
* @param string $topic
126-
* @param string $message
127-
* @param int $qualityOfService
128-
* @param bool $retain
129-
* @param DateTime|null $sentAt
130-
* @return PublishedMessage
131-
*/
132-
public function addNewPendingPublishedMessage(
133-
int $messageId,
134-
string $topic,
135-
string $message,
136-
int $qualityOfService,
137-
bool $retain,
138-
DateTime $sentAt = null
139-
): PublishedMessage;
140-
141110
/**
142111
* Gets a pending published message with the given message identifier, if found.
143112
*
@@ -189,16 +158,6 @@ public function countPendingUnsubscribeRequests(): int;
189158
*/
190159
public function addPendingUnsubscribeRequest(UnsubscribeRequest $request): void;
191160

192-
/**
193-
* Adds a new pending unsubscribe request with the given settings to the repository.
194-
*
195-
* @param int $messageId
196-
* @param string $topic
197-
* @param DateTime|null $sentAt
198-
* @return UnsubscribeRequest
199-
*/
200-
public function addNewPendingUnsubscribeRequest(int $messageId, string $topic, DateTime $sentAt = null): UnsubscribeRequest;
201-
202161
/**
203162
* Gets a pending unsubscribe request with the given message identifier, if found.
204163
*
@@ -241,17 +200,6 @@ public function countPendingPublishConfirmations(): int;
241200
*/
242201
public function addPendingPublishConfirmation(PublishedMessage $message): void;
243202

244-
/**
245-
* Adds a new pending publish confirmation with the given settings to the repository.
246-
*
247-
* @param int $messageId
248-
* @param string $topic
249-
* @param string $message
250-
* @return PublishedMessage
251-
* @throws PendingPublishConfirmationAlreadyExistsException
252-
*/
253-
public function addNewPendingPublishConfirmation(int $messageId, string $topic, string $message): PublishedMessage;
254-
255203
/**
256204
* Gets a pending publish confirmation with the given message identifier, if found.
257205
*

src/MqttClient.php

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -537,7 +537,9 @@ public function publish(string $topic, string $message, int $qualityOfService =
537537

538538
if ($qualityOfService > self::QOS_AT_MOST_ONCE) {
539539
$messageId = $this->repository->newMessageId();
540-
$this->repository->addNewPendingPublishedMessage($messageId, $topic, $message, $qualityOfService, $retain);
540+
541+
$pendingMessage = new PublishedMessage($messageId, $topic, $message, $qualityOfService, $retain);
542+
$this->repository->addPendingPublishedMessage($pendingMessage);
541543
}
542544

543545
$this->publishMessage($topic, $message, $qualityOfService, $retain, $messageId);
@@ -629,7 +631,8 @@ public function subscribe(string $topic, callable $callback, int $qualityOfServi
629631
'qos' => $qualityOfService,
630632
]);
631633

632-
$this->repository->addNewTopicSubscription($topic, $callback, $messageId, $qualityOfService);
634+
$pendingMessage = new TopicSubscription($topic, $callback, $messageId, $qualityOfService);
635+
$this->repository->addTopicSubscription($pendingMessage);
633636

634637
$this->writeToSocket($data);
635638
}
@@ -659,7 +662,8 @@ public function unsubscribe(string $topic): void
659662
'topic' => $topic,
660663
]);
661664

662-
$this->repository->addNewPendingUnsubscribeRequest($messageId, $topic);
665+
$pendingMessage = new UnsubscribeRequest($messageId, $topic);
666+
$this->repository->addPendingUnsubscribeRequest($pendingMessage);
663667

664668
$this->writeToSocket($data);
665669
}
@@ -817,11 +821,14 @@ protected function handleMessage(Message $message): void
817821
if ($message->getQualityOfService() === self::QOS_EXACTLY_ONCE) {
818822
try {
819823
$this->sendPublishReceived($message->getMessageId());
820-
$this->repository->addNewPendingPublishConfirmation(
824+
$pendingMessage = new PublishedMessage(
821825
$message->getMessageId(),
822826
$message->getTopic(),
823-
$message->getContent()
827+
$message->getContent(),
828+
2,
829+
false
824830
);
831+
$this->repository->addPendingPublishConfirmation($pendingMessage);
825832
} catch (PendingPublishConfirmationAlreadyExistsException $e) {
826833
// We already received and processed this message, therefore we do not respond
827834
// with a receipt a second time and wait for the release instead.

src/Repositories/BaseRepository.php

Lines changed: 0 additions & 80 deletions
Original file line numberDiff line numberDiff line change
@@ -25,24 +25,6 @@ abstract class BaseRepository
2525
*/
2626
abstract public function addTopicSubscription(TopicSubscription $subscription): void;
2727

28-
/**
29-
* Adds a topic subscription to the repository.
30-
*
31-
* @param string $topic
32-
* @param callable $callback
33-
* @param int $messageId
34-
* @param int $qualityOfService
35-
* @return TopicSubscription
36-
*/
37-
public function addNewTopicSubscription(string $topic, callable $callback, int $messageId, int $qualityOfService): TopicSubscription
38-
{
39-
$subscription = new TopicSubscription($topic, $callback, $messageId, $qualityOfService);
40-
41-
$this->addTopicSubscription($subscription);
42-
43-
return $subscription;
44-
}
45-
4628
/**
4729
* Adds a pending published message to the repository.
4830
*
@@ -51,33 +33,6 @@ public function addNewTopicSubscription(string $topic, callable $callback, int $
5133
*/
5234
abstract public function addPendingPublishedMessage(PublishedMessage $message): void;
5335

54-
/**
55-
* Adds a new pending published message with the given settings to the repository.
56-
*
57-
* @param int $messageId
58-
* @param string $topic
59-
* @param string $message
60-
* @param int $qualityOfService
61-
* @param bool $retain
62-
* @param DateTime|null $sentAt
63-
* @return PublishedMessage
64-
*/
65-
public function addNewPendingPublishedMessage(
66-
int $messageId,
67-
string $topic,
68-
string $message,
69-
int $qualityOfService,
70-
bool $retain,
71-
DateTime $sentAt = null
72-
): PublishedMessage
73-
{
74-
$message = new PublishedMessage($messageId, $topic, $message, $qualityOfService, $retain, $sentAt);
75-
76-
$this->addPendingPublishedMessage($message);
77-
78-
return $message;
79-
}
80-
8136
/**
8237
* Adds a pending unsubscribe request to the repository.
8338
*
@@ -86,23 +41,6 @@ public function addNewPendingPublishedMessage(
8641
*/
8742
abstract public function addPendingUnsubscribeRequest(UnsubscribeRequest $request): void;
8843

89-
/**
90-
* Adds a new pending unsubscribe request with the given settings to the repository.
91-
*
92-
* @param int $messageId
93-
* @param string $topic
94-
* @param DateTime|null $sentAt
95-
* @return UnsubscribeRequest
96-
*/
97-
public function addNewPendingUnsubscribeRequest(int $messageId, string $topic, DateTime $sentAt = null): UnsubscribeRequest
98-
{
99-
$request = new UnsubscribeRequest($messageId, $topic, $sentAt);
100-
101-
$this->addPendingUnsubscribeRequest($request);
102-
103-
return $request;
104-
}
105-
10644
/**
10745
* Adds a pending publish confirmation to the repository.
10846
*
@@ -111,22 +49,4 @@ public function addNewPendingUnsubscribeRequest(int $messageId, string $topic, D
11149
* @throws PendingPublishConfirmationAlreadyExistsException
11250
*/
11351
abstract public function addPendingPublishConfirmation(PublishedMessage $message): void;
114-
115-
/**
116-
* Adds a new pending publish confirmation with the given settings to the repository.
117-
*
118-
* @param int $messageId
119-
* @param string $topic
120-
* @param string $message
121-
* @return PublishedMessage
122-
* @throws PendingPublishConfirmationAlreadyExistsException
123-
*/
124-
public function addNewPendingPublishConfirmation(int $messageId, string $topic, string $message): PublishedMessage
125-
{
126-
$message = new PublishedMessage($messageId, $topic, $message, 2, false);
127-
128-
$this->addPendingPublishConfirmation($message);
129-
130-
return $message;
131-
}
13252
}

0 commit comments

Comments
 (0)