Skip to content

WIP: fix subscriptions #32

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 6 additions & 19 deletions src/MQTTClient.php
Original file line number Diff line number Diff line change
Expand Up @@ -879,6 +879,8 @@ protected function handlePublishedMessage(string $buffer, int $qualityOfServiceL
$topic = substr($buffer, 2, $topicLength);
$message = substr($buffer, ($topicLength + 2));

$this->logger->debug("GOT PUBLISH MESSAGE");

if ($qualityOfServiceLevel > 0) {
if (strlen($message) < 2) {
$this->logger->error(sprintf(
Expand Down Expand Up @@ -1224,17 +1226,10 @@ protected function deliverPublishedMessage(string $topic, string $message, int $
]);

foreach ($subscribers as $subscriber) {
if ($subscriber->getQualityOfServiceLevel() > $qualityOfServiceLevel) {
// At this point we need to assume that this subscriber does not want to receive
// the message, but maybe there are other subscribers waiting for the message.
continue;
}

try {
call_user_func($subscriber->getCallback(), $topic, $message, $retained);
} catch (\Throwable $e) {
// We ignore errors produced by custom callbacks.
}
// We deliver the message to the first subscriber which matches this topic,
// if there are overlapping topics the message will be delivered multiple times by the broker!
call_user_func($subscriber->getCallback(), $topic, $message, $retained);
break;
}
}

Expand Down Expand Up @@ -1297,10 +1292,6 @@ protected function sendPublishComplete(int $messageId): void
*/
protected function republishPendingMessages(): void
{
$this->logger->debug('Re-publishing pending messages to MQTT broker.', [
'broker' => sprintf('%s:%s', $this->host, $this->port),
]);

/** @noinspection PhpUnhandledExceptionInspection */
$dateTime = (new DateTime())->sub(new DateInterval('PT' . $this->settings->getResendTimeout() . 'S'));
$messages = $this->repository->getPendingPublishedMessagesLastSentBefore($dateTime);
Expand Down Expand Up @@ -1333,10 +1324,6 @@ protected function republishPendingMessages(): void
*/
protected function republishPendingUnsubscribeRequests(): void
{
$this->logger->debug('Re-sending pending unsubscribe requests to MQTT broker.', [
'broker' => sprintf('%s:%s', $this->host, $this->port),
]);

/** @noinspection PhpUnhandledExceptionInspection */
$dateTime = (new DateTime())->sub(new DateInterval('PT' . $this->settings->getResendTimeout() . 'S'));
$requests = $this->repository->getPendingUnsubscribeRequestsLastSentBefore($dateTime);
Expand Down
40 changes: 30 additions & 10 deletions src/Repositories/MemoryRepository.php
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,14 @@
*/
class MemoryRepository implements Repository
{
/** @var SplObjectStorage|TopicSubscription[] */
private $topicSubscriptions;
/** @var array<int, TopicSubscription> */
private $topicSubscriptions = array();

/** @var array<int> */
private $topicSubscriptionsQueue = array();

/** @var int */
private $topicSubscriptionsPtr = 0;

/** @var SplObjectStorage|PublishedMessage[] */
private $pendingPublishedMessages;
Expand All @@ -38,7 +44,6 @@ class MemoryRepository implements Repository
*/
public function __construct()
{
$this->topicSubscriptions = new SplObjectStorage();
$this->pendingPublishedMessages = new SplObjectStorage();
$this->pendingUnsubscribeRequests = new SplObjectStorage();
$this->pendingPublishConfirmations = new SplObjectStorage();
Expand All @@ -52,7 +57,7 @@ public function __construct()
*/
public function countTopicSubscriptions(): int
{
return $this->topicSubscriptions->count();
return count($this->topicSubscriptions);
}

/**
Expand All @@ -63,7 +68,13 @@ public function countTopicSubscriptions(): int
*/
public function addTopicSubscription(TopicSubscription $subscription): void
{
$this->topicSubscriptions->attach($subscription);
// Finds the next available subscription id (FIXME)
while (isset($this->topicSubscriptions[$this->topicSubscriptionsPtr])) {
$this->topicSubscriptionsPtr++;
}

$this->topicSubscriptions[$this->topicSubscriptionsPtr] = $subscription;
$this->topicSubscriptionsQueue[] = $this->topicSubscriptionsPtr;
}

/**
Expand Down Expand Up @@ -113,13 +124,21 @@ public function getTopicSubscriptionsMatchingTopic(string $topic): array
{
$result = [];

foreach ($this->topicSubscriptions as $subscription) {
for ($i = 0; $i < count($this->topicSubscriptionsQueue); $i++) {
$subscriptionId = $this->topicSubscriptionsQueue[$i];
$subscription = $this->topicSubscriptions[$subscriptionId];

if (preg_match($subscription->getRegexifiedTopic(), $topic)) {
$result[] = $subscription;
// We have a match! Move this to the bottom
$this->topicSubscriptionsQueue[] = $subscriptionId;
unset($this->topicSubscriptionsQueue[$i]);
$this->topicSubscriptionsQueue = array_values($this->topicSubscriptionsQueue);

return array($subscription);
}
}

return $result;
return array();
}

/**
Expand All @@ -134,10 +153,11 @@ public function removeTopicSubscription(string $topic): bool
{
$result = false;

foreach ($this->topicSubscriptions as $subscription) {
foreach ($this->topicSubscriptions as $id => $subscription) {
if ($subscription->getTopic() === $topic) {
$this->topicSubscriptions->detach($subscription);
unset($this->topicSubscriptions[$topic]);
$result = true;
// FIXME: broken
break;
}
}
Expand Down