malkusch/lock

互斥锁库,用于实现代码的独占执行。

v2.2.1 2022-04-26 09:25 UTC

This package is auto-updated.

Last update: 2024-08-26 15:01:51 UTC


README

需求 | 安装 | 使用 | 许可和作者 | 捐赠

php-lock/lock

Latest Stable Version Total Downloads Latest Unstable Version Build Status License

此库有助于在并发情况下执行关键代码。

php-lock/lock 遵循语义化版本控制。更多关于 semver.org 的信息。

需求

  • PHP 7.1 或更高版本
  • 可选 nrk/predis 以使用 Predis 锁。
  • 可选的 php-pcntl 扩展,以在 CLI 脚本中启用使用 flock() 的锁定,而不需要忙等待。
  • 可选的 flock()ext-redisext-pdo_mysqlext-pdo_sqliteext-pdo_pgsqlext-memcached 可用作锁的后端。下面有示例。
  • 如果使用 ext-redis 进行锁定,并且配置为使用 igbinary 进行序列化或 lzf 进行压缩,则还需要安装 ext-igbinary 和/或 ext-lzf

安装

Composer

要使用此库,请在您的仓库根目录中运行以下终端命令。

composer require "malkusch/lock"

使用

此库使用命名空间 malkusch\lock

互斥锁

malkusch\lock\mutex\Mutex 类是一个抽象类,并为该库提供基础 API。

Mutex::synchronized()

malkusch\lock\mutex\Mutex::synchronized() 执行代码。此方法保证代码只能由一个进程一次性执行。其他进程必须等待互斥锁可用。关键代码可能抛出异常,这将释放锁。

此方法返回给定的可调用返回的任何内容。返回值不被检查,因此用户需要决定是否将返回值 falsenull 视为失败操作。

示例

$newBalance = $mutex->synchronized(function () use (
    $bankAccount,
    $amount
): int {
    $balance = $bankAccount->getBalance();
    $balance -= $amount;
    if ($balance < 0) {
        throw new \DomainException('You have no credit.');
    }
    $bankAccount->setBalance($balance);

    return $balance;
});

Mutex::check()

malkusch\lock\mutex\Mutex::check() 设置一个可调用的函数,该函数将在调用 malkusch\lock\util\DoubleCheckedLocking::then() 时执行,并执行双检查锁定模式,其中它的返回值决定是否需要获取锁以及执行同步代码。

有关该功能的更详细解释,请参阅 https://en.wikipedia.org/wiki/Double-checked_locking

如果检查的可调用函数返回 false,则不会获取锁,并且不会执行同步代码。在这种情况下,malkusch\lock\util\DoubleCheckedLocking::then() 方法也会返回 false,以指示检查在获取锁之前或之后都没有通过。

如果检查的可调用函数返回除了 false 之外的其他值,则 malkusch\lock\util\DoubleCheckedLocking::then() 方法将尝试获取锁,并在成功后再次执行检查。只有当检查第二次返回除了 false 之外的其他值时,传递给 then() 的同步代码可调用才会执行。在这种情况下,then() 的返回值将是给定的可调用返回的任何值,因此由用户自行返回 falsenull 来指示操作失败,因为库不会检查此返回值。

示例

$newBalance = $mutex->check(function () use ($bankAccount, $amount): bool {
    return $bankAccount->getBalance() >= $amount;
})->then(function () use ($bankAccount, $amount): int {
    $balance = $bankAccount->getBalance();
    $balance -= $amount;
    $bankAccount->setBalance($balance);

    return $balance;
});

if (false === $newBalance) {
    if ($balance < 0) {
        throw new \DomainException('You have no credit.');
    }
}

在释放锁异常后提取代码结果

基于 malkush\lock\mutex\LockMutex 的互斥锁实现,在锁释放问题时将抛出 malkusch\lock\exception\LockReleaseException 异常,但此时同步代码块已经执行。为了读取代码结果(或抛出的异常),LockReleaseException 提供了提取它的方法。

示例

try {
    // or $mutex->check(...)
    $result = $mutex->synchronized(function () {
        if (someCondition()) {
            throw new \DomainException();
        }

        return "result";
    });
} catch (LockReleaseException $unlockException) {
    if ($unlockException->getCodeException() !== null) {
        $codeException = $unlockException->getCodeException()
        // do something with the code exception
    } else {
        $code_result = $unlockException->getCodeResult();
        // do something with the code result
    }

    // deal with LockReleaseException or propagate it
    throw $unlockException;
}

实现

因为 malkusch\lock\mutex\Mutex 类是一个抽象类,您可以选择提供的实现之一或创建/扩展自己的实现。

CASMutex

**CASMutex** 必须与一个 比较并交换 操作一起使用。这个互斥锁是无锁的。它将重复执行代码,直到比较并交换操作成功。因此,代码应该通过调用 malkusch\lock\mutex\CASMutex::notify() 来通知互斥锁。

因为互斥锁在执行关键代码时持续执行,所以在比较并交换操作未成功之前,它不能有任何副作用。

示例

$mutex = new CASMutex();
$mutex->synchronized(function () use ($memcached, $mutex, $amount): void {
    $balance = $memcached->get("balance", null, $casToken);
    $balance -= $amount;
    if (!$memcached->cas($casToken, "balance", $balance)) {
        return;
    }
    $mutex->notify();
});

FlockMutex

**FlockMutex** 是基于 flock() 的锁实现。

示例

$mutex = new FlockMutex(fopen(__FILE__, "r"));
$mutex->synchronized(function () use ($bankAccount, $amount) {
    $balance = $bankAccount->getBalance();
    $balance -= $amount;
    if ($balance < 0) {
        throw new \DomainException("You have no credit.");
    }
    $bankAccount->setBalance($balance);
});

支持超时,作为可选的第二个参数。如果可能,则使用 ext-pcntl 扩展,否则使用忙等待。

MemcachedMutex

**MemcachedMutex** 是一个自旋锁实现,它使用 Memcached API

示例

$memcache = new \Memcached();
$memcache->addServer("localhost", 11211);

$mutex = new MemcachedMutex("balance", $memcache);
$mutex->synchronized(function () use ($bankAccount, $amount) {
    $balance = $bankAccount->getBalance();
    $balance -= $amount;
    if ($balance < 0) {
        throw new \DomainException("You have no credit.");
    }
    $bankAccount->setBalance($balance);
});

PHPRedisMutex

**PHPRedisMutex** 是基于 RedLock 的分布式锁实现,它使用 phpredis 扩展

此实现需要至少 phpredis-2.2.4

如果与 Redis 服务器集群一起使用,只要大多数服务器仍然工作,获取和释放锁将继续正常工作。

示例

$redis = new Redis();
$redis->connect("localhost");

$mutex = new PHPRedisMutex([$redis], "balance");
$mutex->synchronized(function () use ($bankAccount, $amount) {
    $balance = $bankAccount->getBalance();
    $balance -= $amount;
    if ($balance < 0) {
        throw new \DomainException("You have no credit.");
    }
    $bankAccount->setBalance($balance);
});

PredisMutex

**PredisMutex** 是基于 RedLock 的分布式锁实现,它使用 Predis API

示例

$redis = new Client("redis://localhost");

$mutex = new PredisMutex([$redis], "balance");
$mutex->synchronized(function () use ($bankAccount, $amount) {
    $balance = $bankAccount->getBalance();
    $balance -= $amount;
    if ($balance < 0) {
        throw new \DomainException("You have no credit.");
    }
    $bankAccount->setBalance($balance);
});

SemaphoreMutex

**SemaphoreMutex** 是基于 信号量 的锁实现。

示例

$semaphore = sem_get(ftok(__FILE__, "a"));
$mutex = new SemaphoreMutex($semaphore);
$mutex->synchronized(function () use ($bankAccount, $amount) {
    $balance = $bankAccount->getBalance();
    $balance -= $amount;
    if ($balance < 0) {
        throw new \DomainException("You have no credit.");
    }
    $bankAccount->setBalance($balance);
});

TransactionalMutex

事务互斥锁将序列化委托给DBS。独占代码在事务中执行。设置正确的隔离级别由您决定。然而,如果事务失败(即抛出了PDOException),则代码将在新的事务中再次执行。因此,代码除了SQL语句外不应有其他副作用。此外,隔离级别应保留用于重复事务。如果代码抛出异常,事务将回滚而不会再次重播。

示例

$mutex = new TransactionalMutex($pdo);
$mutex->synchronized(function () use ($pdo, $accountId, $amount) {
    $select = $pdo->prepare(
        "SELECT balance FROM account WHERE id = ? FOR UPDATE"
    );
    $select->execute([$accountId]);
    $balance = $select->fetchColumn();

    $balance -= $amount;
    if ($balance < 0) {
        throw new \DomainException("You have no credit.");
    }
    $pdo->prepare("UPDATE account SET balance = ? WHERE id = ?")
        ->execute([$balance, $accountId]);
});

MySQLMutex

MySQL互斥锁使用MySQL的GET_LOCK函数。

它支持超时。如果与数据库服务器的连接丢失或中断,则锁将自动释放。

注意,在MySQL 5.7.5之前,您不能使用嵌套锁,任何新的锁都将静默释放已经持有的锁。您可能应该避免在MySQL版本 < 5.7.5上使用此互斥锁。

$pdo = new PDO("mysql:host=localhost;dbname=test", "username");

$mutex = new MySQLMutex($pdo, "balance", 15);
$mutex->synchronized(function () use ($bankAccount, $amount) {
    $balance = $bankAccount->getBalance();
    $balance -= $amount;
    if ($balance < 0) {
        throw new \DomainException("You have no credit.");
    }
    $bankAccount->setBalance($balance);
});

PgAdvisoryLockMutex

PgAdvisoryLockMutex使用PostgreSQL的advisory locking函数。

提供命名锁。PostgreSQL锁定函数需要整数,但转换将自动处理。

不支持超时。如果与数据库服务器的连接丢失或中断,则锁将自动释放。

$pdo = new PDO("pgsql:host=localhost;dbname=test;", "username");

$mutex = new PgAdvisoryLockMutex($pdo, "balance");
$mutex->synchronized(function () use ($bankAccount, $amount) {
    $balance = $bankAccount->getBalance();
    $balance -= $amount;
    if ($balance < 0) {
        throw new \DomainException("You have no credit.");
    }
    $bankAccount->setBalance($balance);
});

许可和作者

此项目是免费的,并受WTFPL许可。此项目的负责人是Willem Stuursma-Ruwen,电子邮件:[email protected]

捐赠

如果您喜欢此项目并感到慷慨,请在此处捐赠一些比特币:1P5FAZ4QhXCuwYPnLZdk3PJsqePbu1UDDA