Open
Description
Extending predis pipeline, I was able to simultanously send the same command to multiple Redis instances and wait for the responses and process them in PHP to form the final result (map-reduce). As far as I know, this wasn't possible with phpredis. I am wondering if you can officially add such a feature to php-redis-client as well.
Here is my predis code and an example test:
class TwoStepPipeline extends Pipeline
{
/**
* @var ConnectionInterface
*/
protected $waitingConnection;
/**
* @var \SplQueue
*/
protected $waitingCommands;
/**
* Flush the queue without waiting for the responses.
*
* @inheritdoc
*/
protected function executePipeline(ConnectionInterface $connection, \SplQueue $commands)
{
if (! $this->waitingCommands) {
$this->waitingCommands = new \SplQueue();
}
foreach ($commands as $command) {
$connection->writeRequest($command);
$this->waitingCommands->enqueue($command);
}
$this->waitingConnection = $connection;
return [];
}
/**
* Read the responses of the last flushed queue.
*
* @return array
*/
public function wait()
{
$responses = array();
$exceptions = $this->throwServerExceptions();
while (!$this->waitingCommands->isEmpty()) {
$command = $this->waitingCommands->dequeue();
$response = $this->waitingConnection->readResponse($command);
if (!$response instanceof ResponseInterface) {
$responses[] = $command->parseResponse($response);
} elseif ($response instanceof ErrorResponseInterface && $exceptions) {
$this->exception($this->waitingConnection, $response);
} else {
$responses[] = $response;
}
}
$this->waitingConnection = null;
return $responses;
}
}
public function test_two_step_pipeline()
{
$shards = ['redis-0', 'redis-1', 'redis-2', 'redis-3', 'redis-4', 'redis-5', 'redis-6', 'redis-7'];
$pipelines = [];
//Write to all pipes
foreach ($shards as $shard) {
$pipeline = new TwoStepPipeline(Redis::connection($shard)->client());
$pipeline->execute(function ($redis) {
$redis->set('key1', 'value1');
$redis->set('key2', 'value2');
});
$pipeline->execute(function ($redis) {
$redis->set('key3', 'value3');
$redis->set('key4', 'value4');
});
$pipelines[] = $pipeline;
}
// Wait for all the responses
foreach ($pipelines as $pipeline) {
$pipeline->wait();
}
foreach ($shards as $shard) {
$this->assertEquals('value1', Redis::connection($shard)->get('key1'));
$this->assertEquals('value2', Redis::connection($shard)->get('key2'));
$this->assertEquals('value3', Redis::connection($shard)->get('key3'));
$this->assertEquals('value4', Redis::connection($shard)->get('key4'));
}
}
Our story: In our production use-case, the commands to be run in parallel need some heavy cpu-bound operations. For us, this was the only way to utilize the resources in order to reduce the time needed to process each request. Now that predis is not maintained, we are kind of in trouble if we can't migrate to a new solution.
Thanks in advance for your help and guides.
Metadata
Metadata
Assignees
Labels
No labels