Skip to content

Added Config Options for Username and Prefix #15

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 37 additions & 31 deletions Classes/Queue/RedisQueue.php
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
use Flowpack\JobQueue\Common\Queue\QueueInterface;
use Neos\Flow\Utility\Algorithms;
use Flowpack\JobQueue\Common\Exception as JobQueueException;
use Redis;

/**
* A queue implementation using Redis as the queue backend
Expand All @@ -27,37 +28,37 @@ class RedisQueue implements QueueInterface
* @var string
*/
protected $name;

/**
* @var \Redis
*/
protected $client;

/**
* @var integer
*/
protected $defaultTimeout = 30;

/**
* @var array
*/
protected $clientOptions;

/**
* @var float
*/
protected $reconnectDelay = 1.0;

/**
* @var float
*/
protected $reconnectDecay = 1.5;

/**
* @var float
*/
protected $maxReconnectDelay = 30.0;

/**
* @param string $name
* @param array $options
Expand All @@ -75,15 +76,15 @@ public function __construct($name, array $options = [])
throw new JobQueueException('Could not connect to Redis', 1467382685);
}
}

/**
* @inheritdoc
*/
public function getName(): string
{
return $this->name;
}

/**
* @inheritdoc
* @throws JobQueueException
Expand All @@ -96,11 +97,11 @@ public function submit($payload, array $options = []): string
if ($idStored === 0) {
throw new JobQueueException(sprintf('Duplicate message id: "%s"', $messageId), 1470656350);
}

$this->client->lPush("queue:{$this->name}:messages", $messageId);
return $messageId;
}

/**
* @inheritdoc
* @throws JobQueueException
Expand All @@ -122,7 +123,7 @@ public function waitAndTake(?int $timeout = null): ?Message
}
return $message;
}

/**
* @inheritdoc
* @throws JobQueueException
Expand All @@ -139,7 +140,7 @@ public function waitAndReserve(?int $timeout = null): ?Message
}
return $this->getMessageById($messageId);
}

/**
* @inheritdoc
* @throws JobQueueException
Expand All @@ -148,12 +149,12 @@ public function release(string $messageId, array $options = []): void
{
$this->checkClientConnection();
$this->client->multi()
->lRem("queue:{$this->name}:processing", $messageId, 0)
->hIncrBy("queue:{$this->name}:releases", $messageId, 1)
->lPush("queue:{$this->name}:messages", $messageId)
->exec();
->lRem("queue:{$this->name}:processing", $messageId, 0)
->hIncrBy("queue:{$this->name}:releases", $messageId, 1)
->lPush("queue:{$this->name}:messages", $messageId)
->exec();
}

/**
* @inheritdoc
* @throws JobQueueException
Expand All @@ -166,7 +167,7 @@ public function abort(string $messageId): void
$this->client->lPush("queue:{$this->name}:failed", $messageId);
}
}

/**
* @inheritdoc
* @throws JobQueueException
Expand All @@ -179,7 +180,7 @@ public function finish(string $messageId): bool
$this->client->hDel("queue:{$this->name}:releases", $messageId);
return $numberOfRemoved > 0;
}

/**
* @inheritdoc
* @throws JobQueueException
Expand All @@ -198,7 +199,7 @@ public function peek(int $limit = 1): array
}
return $messages;
}

/**
* @inheritdoc
* @throws JobQueueException
Expand All @@ -208,7 +209,7 @@ public function countReady(): int
$this->checkClientConnection();
return $this->client->lLen("queue:{$this->name}:messages");
}

/**
* @inheritdoc
* @throws JobQueueException
Expand All @@ -218,7 +219,7 @@ public function countReserved(): int
$this->checkClientConnection();
return $this->client->lLen("queue:{$this->name}:processing");
}

/**
* @inheritdoc
* @throws JobQueueException
Expand All @@ -228,7 +229,7 @@ public function countFailed(): int
$this->checkClientConnection();
return $this->client->lLen("queue:{$this->name}:failed");
}

/**
* @return void
* @throws JobQueueException
Expand All @@ -237,7 +238,7 @@ public function setUp(): void
{
$this->checkClientConnection();
}

/**
* @inheritdoc
* @throws JobQueueException
Expand All @@ -247,7 +248,7 @@ public function flush(): void
$this->checkClientConnection();
$this->client->flushDB();
}

/**
* @param string $messageId
* @return Message
Expand All @@ -261,7 +262,7 @@ protected function getMessageById(string $messageId): ?Message
$numberOfReleases = (integer)$this->client->hGet("queue:{$this->name}:releases", $messageId);
return new Message($messageId, json_decode($encodedPayload, true), $numberOfReleases);
}

/**
* Check if the Redis client connection is still up and reconnect if Redis was disconnected
*
Expand All @@ -285,7 +286,7 @@ protected function checkClientConnection()
}
}
}

/**
* Connect the Redis client
*
Expand All @@ -299,21 +300,26 @@ protected function connectClient()
$host = isset($this->clientOptions['host']) ? $this->clientOptions['host'] : '127.0.0.1';
$port = isset($this->clientOptions['port']) ? $this->clientOptions['port'] : 6379;
$password = isset($this->clientOptions['password']) ? $this->clientOptions['password'] : '';
$username = isset($this->clientOptions['username']) ? $this->clientOptions['username'] : 'default';
$prefix = isset($this->clientOptions['prefix']) ? $this->clientOptions['prefix'] : '';
$database = isset($this->clientOptions['database']) ? $this->clientOptions['database'] : 0;
// The connection read timeout should be higher than the timeout for blocking operations!
$timeout = isset($this->clientOptions['timeout']) ? $this->clientOptions['timeout'] : round($this->defaultTimeout * 1.5);

$connected = $this->client->connect($host, $port, $timeout);
if ($connected) {
if ($password !== '') {
$authSuccess = $this->client->auth($password);
$authSuccess = $this->client->auth([$username,$password]);
if ($authSuccess !== true) {
throw new JobQueueException('Redis authentication failed.', 1536735535);
}
}
if($prefix !== ""){
$this->client->setOption(Redis::OPT_PREFIX, $prefix);
}
$connected = $this->client->select($database);
}

// Break the cycle that could cause a high CPU load
if (!$connected) {
usleep($this->reconnectDelay * 1e6);
Expand Down
14 changes: 14 additions & 0 deletions Configuration/Settings.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,17 @@ Flowpack:
port: 6379
password: 'My_Secret_Password'
database: 15
# Set password in your redis config
usernamePasswordProtectedClient:
host: 127.0.0.1
port: 6379
username: 'My_User'
password: 'My_Secret_Password'
database: 15
prefix:
host: 127.0.0.1
port: 6379
prefix: 'my_prefix:'
username: 'default'
password: 'My_Secret_Password'
database: 15
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,9 @@ Flowpack:
host: 127.0.0.1
port: 6379
database: 15
username: 'some username'
password: 'some long secret'
prefix: 'some redis prefix'
defaultTimeout: 20
```

Expand Down