Skip to content

[5.4] Test and fix RedisQueue for phpredis #17853

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ services:
before_install:
- if [[ $TRAVIS_PHP_VERSION != 7.1 ]] ; then phpenv config-rm xdebug.ini; fi
- echo "extension = memcached.so" >> ~/.phpenv/versions/$(phpenv version-name)/etc/php.ini
- echo "extension = redis.so" >> ~/.phpenv/versions/$(phpenv version-name)/etc/php.ini
- travis_retry composer self-update

install:
Expand Down
6 changes: 3 additions & 3 deletions src/Illuminate/Queue/RedisQueue.php
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ public function later($delay, $job, $data = '', $queue = null)
protected function laterRaw($delay, $payload, $queue = null)
{
$this->getConnection()->zadd(
$this->getQueue($queue).':delayed', $this->availableAt($delay), $payload
$this->getQueue($queue).':delayed', [$payload => $this->availableAt($delay)]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not necessary.

);

return Arr::get(json_decode($payload, true), 'id');
Expand Down Expand Up @@ -212,7 +212,7 @@ protected function retrieveNextJob($queue)
* Delete a reserved job from the queue.
*
* @param string $queue
* @param \Illuminate\Queues\Jobs\RedisJob $job
* @param \Illuminate\Queue\Jobs\RedisJob $job
* @return void
*/
public function deleteReserved($queue, $job)
Expand All @@ -224,7 +224,7 @@ public function deleteReserved($queue, $job)
* Delete a reserved job from the reserved queue and release it.
*
* @param string $queue
* @param \Illuminate\Queues\Jobs\RedisJob $job
* @param \Illuminate\Queue\Jobs\RedisJob $job
* @param int $delay
* @return void
*/
Expand Down
27 changes: 21 additions & 6 deletions tests/Cache/RedisCacheIntegrationTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -25,31 +25,46 @@ public function tearDown()
$this->tearDownRedis();
}

public function testRedisCacheAddTwice()
/**
* @dataProvider redisDriverProvider
*
* @param string $driver
*/
public function testRedisCacheAddTwice($driver)
{
$store = new RedisStore($this->redis);
$store = new RedisStore($this->redis[$driver]);
$repository = new Repository($store);
$this->assertTrue($repository->add('k', 'v', 60));
$this->assertFalse($repository->add('k', 'v', 60));
$this->assertGreaterThan(3500, $this->redis[$driver]->connection()->ttl('k'));
}

/**
* Breaking change.
*
* @dataProvider redisDriverProvider
*
* @param string $driver
*/
public function testRedisCacheAddFalse()
public function testRedisCacheAddFalse($driver)
{
$store = new RedisStore($this->redis);
$store = new RedisStore($this->redis[$driver]);
$repository = new Repository($store);
$repository->forever('k', false);
$this->assertFalse($repository->add('k', 'v', 60));
$this->assertEquals(-1, $this->redis[$driver]->connection()->ttl('k'));
}

/**
* Breaking change.
*
* @dataProvider redisDriverProvider
*
* @param string $driver
*/
public function testRedisCacheAddNull()
public function testRedisCacheAddNull($driver)
{
$store = new RedisStore($this->redis);
$store = new RedisStore($this->redis[$driver]);
$repository = new Repository($store);
$repository->forever('k', null);
$this->assertFalse($repository->add('k', 'v', 60));
Expand Down
10 changes: 6 additions & 4 deletions tests/Queue/QueueRedisQueueTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,9 @@ public function testDelayedPushProperlyPushesJobOntoRedis()
$redis->shouldReceive('connection')->once()->andReturn($redis);
$redis->shouldReceive('zadd')->once()->with(
'queues:default:delayed',
2,
json_encode(['displayName' => 'foo', 'job' => 'foo', 'maxTries' => null, 'timeout' => null, 'data' => ['data'], 'id' => 'foo', 'attempts' => 0])
[
json_encode(['displayName' => 'foo', 'job' => 'foo', 'maxTries' => null, 'timeout' => null, 'data' => ['data'], 'id' => 'foo', 'attempts' => 0]) => 2,
]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not necessary.

);

$id = $queue->later(1, 'foo', ['data']);
Expand All @@ -50,8 +51,9 @@ public function testDelayedPushWithDateTimeProperlyPushesJobOntoRedis()
$redis->shouldReceive('connection')->once()->andReturn($redis);
$redis->shouldReceive('zadd')->once()->with(
'queues:default:delayed',
2,
json_encode(['displayName' => 'foo', 'job' => 'foo', 'maxTries' => null, 'timeout' => null, 'data' => ['data'], 'id' => 'foo', 'attempts' => 0])
[
json_encode(['displayName' => 'foo', 'job' => 'foo', 'maxTries' => null, 'timeout' => null, 'data' => ['data'], 'id' => 'foo', 'attempts' => 0]) => 2,
]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not necessary.

);

$queue->later($date, 'foo', ['data']);
Expand Down
133 changes: 100 additions & 33 deletions tests/Queue/RedisQueueIntegrationTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,6 @@ public function setUp()
Carbon::setTestNow();
parent::setUp();
$this->setUpRedis();

$this->queue = new RedisQueue($this->redis);
$this->queue->setContainer(m::mock(Container::class));
}

public function tearDown()
Expand All @@ -37,8 +34,15 @@ public function tearDown()
m::close();
}

public function testExpiredJobsArePopped()
/**
* @dataProvider redisDriverProvider
*
* @param string $driver
*/
public function testExpiredJobsArePopped($driver)
{
$this->setQueue($driver);

$jobs = [
new RedisQueueIntegrationTestJob(0),
new RedisQueueIntegrationTestJob(1),
Expand All @@ -56,12 +60,19 @@ public function testExpiredJobsArePopped()
$this->assertEquals($jobs[3], unserialize(json_decode($this->queue->pop()->getRawBody())->data->command));
$this->assertNull($this->queue->pop());

$this->assertEquals(1, $this->redis->connection()->zcard('queues:default:delayed'));
$this->assertEquals(3, $this->redis->connection()->zcard('queues:default:reserved'));
$this->assertEquals(1, $this->redis[$driver]->connection()->zcard('queues:default:delayed'));
$this->assertEquals(3, $this->redis[$driver]->connection()->zcard('queues:default:reserved'));
}

public function testPopProperlyPopsJobOffOfRedis()
/**
* @dataProvider redisDriverProvider
*
* @param string $driver
*/
public function testPopProperlyPopsJobOffOfRedis($driver)
{
$this->setQueue($driver);

// Push an item into queue
$job = new RedisQueueIntegrationTestJob(10);
$this->queue->push($job);
Expand All @@ -79,17 +90,23 @@ public function testPopProperlyPopsJobOffOfRedis()
$this->assertEquals($redisJob->getJobId(), json_decode($redisJob->getReservedJob())->id);

// Check reserved queue
$this->assertEquals(1, $this->redis->connection()->zcard('queues:default:reserved'));
$result = $this->redis->connection()->zrangebyscore('queues:default:reserved', -INF, INF, ['WITHSCORES' => true]);
$this->assertEquals(1, $this->redis[$driver]->connection()->zcard('queues:default:reserved'));
$result = $this->redis[$driver]->connection()->zrangebyscore('queues:default:reserved', -INF, INF, ['withscores' => true]);
$reservedJob = array_keys($result)[0];
$score = $result[$reservedJob];
$this->assertLessThanOrEqual($score, $before + 60);
$this->assertGreaterThanOrEqual($score, $after + 60);
$this->assertEquals($job, unserialize(json_decode($reservedJob)->data->command));
}

public function testPopProperlyPopsDelayedJobOffOfRedis()
/**
* @dataProvider redisDriverProvider
*
* @param string $driver
*/
public function testPopProperlyPopsDelayedJobOffOfRedis($driver)
{
$this->setQueue($driver);
// Push an item into queue
$job = new RedisQueueIntegrationTestJob(10);
$this->queue->later(-10, $job);
Expand All @@ -100,18 +117,23 @@ public function testPopProperlyPopsDelayedJobOffOfRedis()
$after = time();

// Check reserved queue
$this->assertEquals(1, $this->redis->connection()->zcard('queues:default:reserved'));
$result = $this->redis->connection()->zrangebyscore('queues:default:reserved', -INF, INF, ['WITHSCORES' => true]);
$this->assertEquals(1, $this->redis[$driver]->connection()->zcard('queues:default:reserved'));
$result = $this->redis[$driver]->connection()->zrangebyscore('queues:default:reserved', -INF, INF, ['withscores' => true]);
$reservedJob = array_keys($result)[0];
$score = $result[$reservedJob];
$this->assertLessThanOrEqual($score, $before + 60);
$this->assertGreaterThanOrEqual($score, $after + 60);
$this->assertEquals($job, unserialize(json_decode($reservedJob)->data->command));
}

public function testPopPopsDelayedJobOffOfRedisWhenExpireNull()
/**
* @dataProvider redisDriverProvider
*
* @param string $driver
*/
public function testPopPopsDelayedJobOffOfRedisWhenExpireNull($driver)
{
$this->queue = new RedisQueue($this->redis, 'default', null, null);
$this->queue = new RedisQueue($this->redis[$driver], 'default', null, null);
$this->queue->setContainer(m::mock(Container::class));

// Push an item into queue
Expand All @@ -124,18 +146,23 @@ public function testPopPopsDelayedJobOffOfRedisWhenExpireNull()
$after = time();

// Check reserved queue
$this->assertEquals(1, $this->redis->connection()->zcard('queues:default:reserved'));
$result = $this->redis->connection()->zrangebyscore('queues:default:reserved', -INF, INF, ['WITHSCORES' => true]);
$this->assertEquals(1, $this->redis[$driver]->connection()->zcard('queues:default:reserved'));
$result = $this->redis[$driver]->connection()->zrangebyscore('queues:default:reserved', -INF, INF, ['withscores' => true]);
$reservedJob = array_keys($result)[0];
$score = $result[$reservedJob];
$this->assertLessThanOrEqual($score, $before);
$this->assertGreaterThanOrEqual($score, $after);
$this->assertEquals($job, unserialize(json_decode($reservedJob)->data->command));
}

public function testNotExpireJobsWhenExpireNull()
/**
* @dataProvider redisDriverProvider
*
* @param string $driver
*/
public function testNotExpireJobsWhenExpireNull($driver)
{
$this->queue = new RedisQueue($this->redis, 'default', null, null);
$this->queue = new RedisQueue($this->redis[$driver], 'default', null, null);
$this->queue->setContainer(m::mock(Container::class));

// Make an expired reserved job
Expand All @@ -155,8 +182,8 @@ public function testNotExpireJobsWhenExpireNull()
$after = time();

// Check reserved queue
$this->assertEquals(2, $this->redis->connection()->zcard('queues:default:reserved'));
$result = $this->redis->connection()->zrangebyscore('queues:default:reserved', -INF, INF, ['WITHSCORES' => true]);
$this->assertEquals(2, $this->redis[$driver]->connection()->zcard('queues:default:reserved'));
$result = $this->redis[$driver]->connection()->zrangebyscore('queues:default:reserved', -INF, INF, ['withscores' => true]);

foreach ($result as $payload => $score) {
$command = unserialize(json_decode($payload)->data->command);
Expand All @@ -172,9 +199,14 @@ public function testNotExpireJobsWhenExpireNull()
}
}

public function testExpireJobsWhenExpireSet()
/**
* @dataProvider redisDriverProvider
*
* @param string $driver
*/
public function testExpireJobsWhenExpireSet($driver)
{
$this->queue = new RedisQueue($this->redis, 'default', null, 30);
$this->queue = new RedisQueue($this->redis[$driver], 'default', null, 30);
$this->queue->setContainer(m::mock(Container::class));

// Push an item into queue
Expand All @@ -187,17 +219,24 @@ public function testExpireJobsWhenExpireSet()
$after = time();

// Check reserved queue
$this->assertEquals(1, $this->redis->connection()->zcard('queues:default:reserved'));
$result = $this->redis->connection()->zrangebyscore('queues:default:reserved', -INF, INF, ['WITHSCORES' => true]);
$this->assertEquals(1, $this->redis[$driver]->connection()->zcard('queues:default:reserved'));
$result = $this->redis[$driver]->connection()->zrangebyscore('queues:default:reserved', -INF, INF, ['withscores' => true]);
$reservedJob = array_keys($result)[0];
$score = $result[$reservedJob];
$this->assertLessThanOrEqual($score, $before + 30);
$this->assertGreaterThanOrEqual($score, $after + 30);
$this->assertEquals($job, unserialize(json_decode($reservedJob)->data->command));
}

public function testRelease()
/**
* @dataProvider redisDriverProvider
*
* @param string $driver
*/
public function testRelease($driver)
{
$this->setQueue($driver);

//push a job into queue
$job = new RedisQueueIntegrationTestJob(30);
$this->queue->push($job);
Expand All @@ -210,9 +249,9 @@ public function testRelease()
$after = time();

//check the content of delayed queue
$this->assertEquals(1, $this->redis->connection()->zcard('queues:default:delayed'));
$this->assertEquals(1, $this->redis[$driver]->connection()->zcard('queues:default:delayed'));

$results = $this->redis->connection()->zrangebyscore('queues:default:delayed', -INF, INF, ['WITHSCORES' => true]);
$results = $this->redis[$driver]->connection()->zrangebyscore('queues:default:delayed', -INF, INF, ['withscores' => true]);

$payload = array_keys($results)[0];

Expand All @@ -230,8 +269,14 @@ public function testRelease()
$this->assertNull($this->queue->pop());
}

public function testReleaseInThePast()
/**
* @dataProvider redisDriverProvider
*
* @param string $driver
*/
public function testReleaseInThePast($driver)
{
$this->setQueue($driver);
$job = new RedisQueueIntegrationTestJob(30);
$this->queue->push($job);

Expand All @@ -242,8 +287,15 @@ public function testReleaseInThePast()
$this->assertInstanceOf(RedisJob::class, $this->queue->pop());
}

public function testDelete()
/**
* @dataProvider redisDriverProvider
*
* @param string $driver
*/
public function testDelete($driver)
{
$this->setQueue($driver);

$job = new RedisQueueIntegrationTestJob(30);
$this->queue->push($job);

Expand All @@ -252,15 +304,21 @@ public function testDelete()

$redisJob->delete();

$this->assertEquals(0, $this->redis->connection()->zcard('queues:default:delayed'));
$this->assertEquals(0, $this->redis->connection()->zcard('queues:default:reserved'));
$this->assertEquals(0, $this->redis->connection()->llen('queues:default'));
$this->assertEquals(0, $this->redis[$driver]->connection()->zcard('queues:default:delayed'));
$this->assertEquals(0, $this->redis[$driver]->connection()->zcard('queues:default:reserved'));
$this->assertEquals(0, $this->redis[$driver]->connection()->llen('queues:default'));

$this->assertNull($this->queue->pop());
}

public function testSize()
/**
* @dataProvider redisDriverProvider
*
* @param string $driver
*/
public function testSize($driver)
{
$this->setQueue($driver);
$this->assertEquals(0, $this->queue->size());
$this->queue->push(new RedisQueueIntegrationTestJob(1));
$this->assertEquals(1, $this->queue->size());
Expand All @@ -273,6 +331,15 @@ public function testSize()
$job->delete();
$this->assertEquals(2, $this->queue->size());
}

/**
* @param string $driver
*/
private function setQueue($driver)
{
$this->queue = new RedisQueue($this->redis[$driver]);
$this->queue->setContainer(m::mock(Container::class));
}
}

class RedisQueueIntegrationTestJob
Expand Down
Loading