Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature request] Running commands in async mode #76

Open
halaei opened this issue Aug 3, 2020 · 1 comment
Open

[Feature request] Running commands in async mode #76

halaei opened this issue Aug 3, 2020 · 1 comment

Comments

@halaei
Copy link

halaei commented Aug 3, 2020

Extending predis pipeline, I was able to simultanously send the same command to multiple Redis instances and wait for the responses and process them in PHP to form the final result (map-reduce). As far as I know, this wasn't possible with phpredis. I am wondering if you can officially add such a feature to php-redis-client as well.
Here is my predis code and an example test:

class TwoStepPipeline extends Pipeline
{
    /**
     * @var ConnectionInterface
     */
    protected $waitingConnection;

    /**
     * @var \SplQueue
     */
    protected $waitingCommands;

    /**
     * Flush the queue without waiting for the responses.
     *
     * @inheritdoc
     */
    protected function executePipeline(ConnectionInterface $connection, \SplQueue $commands)
    {
        if (! $this->waitingCommands) {
            $this->waitingCommands = new \SplQueue();
        }

        foreach ($commands as $command) {
            $connection->writeRequest($command);
            $this->waitingCommands->enqueue($command);
        }

        $this->waitingConnection = $connection;

        return [];
    }

    /**
     * Read the responses of the last flushed queue.
     *
     * @return array
     */
    public function wait()
    {
        $responses = array();
        $exceptions = $this->throwServerExceptions();

        while (!$this->waitingCommands->isEmpty()) {
            $command = $this->waitingCommands->dequeue();
            $response = $this->waitingConnection->readResponse($command);

            if (!$response instanceof ResponseInterface) {
                $responses[] = $command->parseResponse($response);
            } elseif ($response instanceof ErrorResponseInterface && $exceptions) {
                $this->exception($this->waitingConnection, $response);
            } else {
                $responses[] = $response;
            }
        }

        $this->waitingConnection = null;

        return $responses;
    }
}

    public function test_two_step_pipeline()
    {
        $shards = ['redis-0', 'redis-1', 'redis-2', 'redis-3', 'redis-4', 'redis-5', 'redis-6', 'redis-7'];
        $pipelines = [];
        //Write to all pipes
        foreach ($shards as $shard) {
            $pipeline = new TwoStepPipeline(Redis::connection($shard)->client());
            $pipeline->execute(function ($redis) {
                $redis->set('key1', 'value1');
                $redis->set('key2', 'value2');
            });
            $pipeline->execute(function ($redis) {
                $redis->set('key3', 'value3');
                $redis->set('key4', 'value4');
            });
            $pipelines[] = $pipeline;
        }
        // Wait for all the responses
        foreach ($pipelines as $pipeline) {
            $pipeline->wait();
        }
        foreach ($shards as $shard) {
            $this->assertEquals('value1', Redis::connection($shard)->get('key1'));
            $this->assertEquals('value2', Redis::connection($shard)->get('key2'));
            $this->assertEquals('value3', Redis::connection($shard)->get('key3'));
            $this->assertEquals('value4', Redis::connection($shard)->get('key4'));
        }
    }

Our story: In our production use-case, the commands to be run in parallel need some heavy cpu-bound operations. For us, this was the only way to utilize the resources in order to reduce the time needed to process each request. Now that predis is not maintained, we are kind of in trouble if we can't migrate to a new solution.

Thanks in advance for your help and guides.

@cheprasov
Copy link
Owner

Hi @halaei
It is an interesting idea, I will think about it.
Currently, try to look at this library https://packagist.org/packages/cheprasov/php-parallel
The library allows allows to run multiple operations parallel in different thread. Maybe it would be helpful.
Thanks.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants