Skip to content

Introduce AbstractSpinlockWithTokenMutex class #72

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 24 commits into from
Dec 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 8 additions & 7 deletions src/Exception/ExecutionOutsideLockException.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

namespace Malkusch\Lock\Exception;

use Malkusch\Lock\Mutex\AbstractSpinlockMutex;
use Malkusch\Lock\Util\LockUtil;

/**
Expand All @@ -14,21 +15,21 @@
*
* Should only be used in contexts where the lock is being released.
*
* @see \Malkusch\Lock\Mutex\AbstractSpinlockMutex::unlock()
* @see AbstractSpinlockMutex::unlock()
*/
class ExecutionOutsideLockException extends LockReleaseException
{
/**
* @param float $elapsedTime Total elapsed time of the synchronized code callback execution
* @param float $timeout The lock timeout in seconds
* @param float $elapsedTime In seconds
* @param float $expireTimeout In seconds
*/
public static function create(float $elapsedTime, float $timeout): self
public static function create(float $elapsedTime, float $expireTimeout): self
{
return new self(\sprintf(
'The code executed for %s seconds. But the timeout is %s seconds. The last %s seconds were executed outside of the lock.',
'The code executed for %s seconds. But the expire timeout is %s seconds. The last %s seconds were executed outside of the lock.',
LockUtil::getInstance()->formatTimeout($elapsedTime),
LockUtil::getInstance()->formatTimeout($timeout),
LockUtil::getInstance()->formatTimeout(round($elapsedTime, 6) - round($timeout, 6))
LockUtil::getInstance()->formatTimeout($expireTimeout),
LockUtil::getInstance()->formatTimeout(round($elapsedTime, 6) - round($expireTimeout, 6))
));
}
}
2 changes: 0 additions & 2 deletions src/Mutex/AbstractLockMutex.php
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@
use Malkusch\Lock\Exception\LockReleaseException;

/**
* Locking mutex.
*
* @internal
*/
abstract class AbstractLockMutex extends AbstractMutex
Expand Down
35 changes: 17 additions & 18 deletions src/Mutex/AbstractRedlockMutex.php
Original file line number Diff line number Diff line change
Expand Up @@ -18,52 +18,51 @@
*
* @see http://redis.io/topics/distlock
*/
abstract class AbstractRedlockMutex extends AbstractSpinlockMutex implements LoggerAwareInterface
abstract class AbstractRedlockMutex extends AbstractSpinlockWithTokenMutex implements LoggerAwareInterface
{
use LoggerAwareTrait;

/** @var array<int, TClient> */
private array $clients;

private string $token;

/**
* The Redis APIs needs to be connected. I.e. Redis::connect() was
* The Redis instance needs to be connected. I.e. Redis::connect() was
* called already.
*
* @param array<int, TClient> $clients
* @param float $acquireTimeout In seconds
* @param float $expireTimeout In seconds
*/
public function __construct(array $clients, string $name, float $acquireTimeout = 3)
public function __construct(array $clients, string $name, float $acquireTimeout = 3, float $expireTimeout = \INF)
{
parent::__construct($name, $acquireTimeout);
parent::__construct($name, $acquireTimeout, $expireTimeout);

$this->clients = $clients;
$this->logger = new NullLogger();
}

#[\Override]
protected function acquire(string $key, float $expire): bool
protected function acquireWithToken(string $key, float $expireTimeout)
{
// 1. This differs from the specification to avoid an overflow on 32-Bit systems.
$time = microtime(true);
$startTs = microtime(true);

// 2.
$acquired = 0;
$errored = 0;
$this->token = LockUtil::getInstance()->makeRandomToken();
$token = LockUtil::getInstance()->makeRandomToken();
$exception = null;
foreach ($this->clients as $index => $client) {
try {
if ($this->add($client, $key, $this->token, $expire)) {
if ($this->add($client, $key, $token, $expireTimeout)) {
++$acquired;
}
} catch (LockAcquireException $exception) {
// todo if there is only one redis server, throw immediately.
$context = [
'key' => $key,
'index' => $index,
'token' => $this->token,
'token' => $token,
'exception' => $exception,
];
$this->logger->warning('Could not set {key} = {token} at server #{index}', $context);
Expand All @@ -73,16 +72,16 @@ protected function acquire(string $key, float $expire): bool
}

// 3.
$elapsedTime = microtime(true) - $time;
$isAcquired = $this->isMajority($acquired) && $elapsedTime <= $expire;
$elapsedTime = microtime(true) - $startTs;
$isAcquired = $this->isMajority($acquired) && $elapsedTime <= $expireTimeout;

if ($isAcquired) {
// 4.
return true;
return $token;
}

// 5.
$this->release($key);
$this->releaseWithToken($key, $token);

// In addition to RedLock it's an exception if too many servers fail.
if (!$this->isMajority(count($this->clients) - $errored)) {
Expand All @@ -99,7 +98,7 @@ protected function acquire(string $key, float $expire): bool
}

#[\Override]
protected function release(string $key): bool
protected function releaseWithToken(string $key, string $token): bool
{
/*
* All Redis commands must be analyzed before execution to determine which keys the command will operate on. In
Expand All @@ -117,15 +116,15 @@ protected function release(string $key): bool
$released = 0;
foreach ($this->clients as $index => $client) {
try {
if ($this->evalScript($client, $script, [$key], [$this->token])) {
if ($this->evalScript($client, $script, [$key], [$token])) {
++$released;
}
} catch (LockReleaseException $e) {
// todo throw if there is only one redis server
$context = [
'key' => $key,
'index' => $index,
'token' => $this->token,
'token' => $token,
'exception' => $e,
];
$this->logger->warning('Could not unset {key} = {token} at server #{index}', $context);
Expand Down
33 changes: 6 additions & 27 deletions src/Mutex/AbstractSpinlockMutex.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,27 +4,22 @@

namespace Malkusch\Lock\Mutex;

use Malkusch\Lock\Exception\ExecutionOutsideLockException;
use Malkusch\Lock\Exception\LockAcquireException;
use Malkusch\Lock\Exception\LockReleaseException;
use Malkusch\Lock\Util\LockUtil;
use Malkusch\Lock\Util\Loop;

/**
* Spinlock implementation.
*
* @internal
*/
abstract class AbstractSpinlockMutex extends AbstractLockMutex
{
/** @var non-falsy-string */
private string $key;

/** In seconds */
private float $acquireTimeout;

/** The timestamp when the lock was acquired */
private ?float $acquiredTs = null;

/**
* @param float $acquireTimeout In seconds
*/
Expand All @@ -40,16 +35,7 @@ protected function lock(): void
$loop = new Loop();

$loop->execute(function () use ($loop): void {
$this->acquiredTs = microtime(true);

/*
* The expiration timeout for the lock is increased by one second
* to ensure that we delete only our keys. This will prevent the
* case that this key expires before the timeout, and another process
* acquires successfully the same key which would then be deleted
* by this process.
*/
if ($this->acquire($this->key, $this->acquireTimeout + 1)) {
if ($this->acquire($this->key)) {
$loop->end();
}
}, $this->acquireTimeout);
Expand All @@ -58,15 +44,6 @@ protected function lock(): void
#[\Override]
protected function unlock(): void
{
$elapsedTime = microtime(true) - $this->acquiredTs;
if ($elapsedTime > $this->acquireTimeout) {
throw ExecutionOutsideLockException::create($elapsedTime, $this->acquireTimeout);
}

/*
* Worst case would still be one second before the key expires.
* This guarantees that we don't delete a wrong key.
*/
if (!$this->release($this->key)) {
throw new LockReleaseException('Failed to release the lock');
}
Expand All @@ -75,17 +52,19 @@ protected function unlock(): void
/**
* Try to acquire a lock.
*
* @param float $expire In seconds
* @param non-falsy-string $key
*
* @return bool True if the lock was acquired
*
* @throws LockAcquireException An unexpected error happened
*/
abstract protected function acquire(string $key, float $expire): bool;
abstract protected function acquire(string $key): bool;

/**
* Try to release a lock.
*
* @param non-falsy-string $key
*
* @return bool True if the lock was released
*/
abstract protected function release(string $key): bool;
Expand Down
87 changes: 87 additions & 0 deletions src/Mutex/AbstractSpinlockWithTokenMutex.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
<?php

declare(strict_types=1);

namespace Malkusch\Lock\Mutex;

use Malkusch\Lock\Exception\ExecutionOutsideLockException;

/**
* Spinlock implementation with expirable resource locking.
*
* Lock is acquired with an unique token that is verified when the lock is being released.
*/
abstract class AbstractSpinlockWithTokenMutex extends AbstractSpinlockMutex
{
/** In seconds */
private float $expireTimeout;

private ?float $acquireTs = null;

/** @var non-falsy-string */
private ?string $token = null;

/**
* @param float $acquireTimeout In seconds
* @param float $expireTimeout In seconds
*/
public function __construct(string $name, float $acquireTimeout = 3, float $expireTimeout = \INF)
{
parent::__construct($name, $acquireTimeout);

$this->expireTimeout = $expireTimeout;
}

#[\Override]
protected function acquire(string $key): bool
{
$acquireTs = microtime(true);

$token = $this->acquireWithToken($key, $this->expireTimeout);

if ($token === false) {
return false;
}

$this->acquireTs = $acquireTs;
$this->token = $token;

return true;
}

#[\Override]
protected function release(string $key): bool
{
try {
return $this->releaseWithToken($key, $this->token);
} finally {
try {
$elapsedTime = microtime(true) - $this->acquireTs;
if ($elapsedTime >= $this->expireTimeout) {
throw ExecutionOutsideLockException::create($elapsedTime, $this->expireTimeout);
}
} finally {
$this->token = null;
$this->acquireTs = null;
}
}
}

/**
* Same as self::acquire() but with expire timeout and token.
*
* @param non-falsy-string $key
* @param float $expireTimeout In seconds
*
* @return non-falsy-string|false
*/
abstract protected function acquireWithToken(string $key, float $expireTimeout);

/**
* Same as self::release() but with expire timeout and token.
*
* @param non-falsy-string $key
* @param non-falsy-string $token
*/
abstract protected function releaseWithToken(string $key, string $token): bool;
}
Loading
Loading