Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
!/extensions/available/.gitkeep
!/extensions/enabled/.gitkeep

/tmp/events.bin
/tmp/cache/di_*
/tmp/plugin_artifacts/*
/tmp/cache/database/*
Expand Down
2 changes: 2 additions & 0 deletions chandler-example.yml
Original file line number Diff line number Diff line change
Expand Up @@ -33,3 +33,5 @@ chandler:
pass: "word"
addr: "noreply@example.com"
ssl: true

redisUrl: "tcp://10.0.0.1:6379"
48 changes: 46 additions & 2 deletions chandler/Signaling/SignalManager.php
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
<?php declare(strict_types=1);
namespace Chandler\Signaling;
use Chandler\Patterns\TSimpleSingleton;
use Predis\Client as RedisClient;

/**
* Signal manager (singleton).
* Signals are events, that are meant to be recieved by end user.
*
* @author kurotsun <celestine@vriska.ru>
* @author Vladimir Barinov <veselcraft@icloud.com>
*/
class SignalManager
{
Expand Down Expand Up @@ -43,7 +45,7 @@ private function __construct()
* @return array|null Array of events if there are any, null otherwise
*/
private function eventFor(int $for): ?array
{
{
$since = $this->since - 1;
$statement = $this->connection->query("SELECT * FROM pool WHERE `for` = $for AND `since` > $since ORDER BY since DESC");
$event = $statement->fetch(\PDO::FETCH_LAZY);
Expand All @@ -66,8 +68,41 @@ private function eventFor(int $for): ?array
*/
function listen(\Closure $callback, int $for, int $time = 25): void
{
try {
$redisClient = new RedisClient(CHANDLER_ROOT_CONF["redisUrl"], ['read_write_timeout' => $time]);

// We will catch the old message first
$oldEvent = $this->eventFor($for);

if ($oldEvent) {
list($id, $evt) = $oldEvent;
$id = crc32((string)$id);
$callback($evt, $id);
}

// And then we will subscribe to user's channel
$subscriber = $redisClient->pubSubLoop();
$subscriber->subscribe('im'.$for);

foreach($subscriber as $event) {
if ($event->kind == 'message' && $event->channel == 'im'.$for) {
list($id, $evt) = json_decode($event->payload);
$id = crc32((string)$id);
$evt = unserialize(hex2bin($evt));
$callback($evt, $id);
}
}

// On timeout we're returning nothing
exit("[]");
}
catch (Exception $e)
{
error_log("Couldn't connect to Redis server, fallback to old sqlite method. Exception Message: ".$e->getMessage());
}

$this->since = time() - 1;
for($i = 0; $i < $time; $i++) {
for($i = 0; $i < ($time / 5); $i++) {
sleep(1);

$event = $this->eventFor($for);
Expand Down Expand Up @@ -131,7 +166,16 @@ function triggerEvent(object $event, int $for): bool
$event = bin2hex(serialize($event));
$since = time();

// add it to the history
$this->connection->query("INSERT INTO pool VALUES (NULL, $since, $for, '$event')");
$id = $this->connection->lastInsertId();

try {
$redisClient = new RedisClient(CHANDLER_ROOT_CONF["redisUrl"]);
$redisClient->publish('im'.$for, json_encode([$id, $event]));
} catch (Exception $e) {
error_log("Couldn't connect to Redis server and push the event. Exception Message: ".$e->getMessage());
}
return true;
}

Expand Down
3 changes: 2 additions & 1 deletion composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@
"symfony/yaml": "^5.3",
"guzzlehttp/guzzle": "^6.0",
"wildbit/postmark-php": "^4.0",
"tracy/tracy": "^2.10"
"tracy/tracy": "^2.10",
"predis/predis": "^3.2"
},
"suggest": {
"ext-yaml": "for faster yaml parsing"
Expand Down