互斥锁库,用于独占代码执行。

v2.7.2 2019-07-16 12:46 UTC

README

需求 | 安装 | 使用 | 许可证和作者 | 捐赠

php-lock/lock

Latest Stable Version Total Downloads Latest Unstable Version Build Status License

此库有助于在并发情况下执行关键代码。

php-lock/lock 遵循语义版本控制。更多请访问 semver.org

需求

  • PHP 7.1 或更高版本
  • 可选 nrk/predis 以使用 Predis 锁。
  • 可选 php-pcntl 扩展以在 CLI 脚本中使用 flock() 启用锁定而不进行忙等待。
  • 可选 flock()ext-redisext-pdo_mysqlext-pdo_sqliteext-pdo_pgsqlext-memcached 可用作锁的后端。以下为示例。
  • 如果使用 ext-redis 进行锁定,并配置为使用 igbinary 进行序列化或 lzf 进行压缩,则还需要安装 ext-igbinary 和/或 ext-lzf

安装

Composer

要在 composer 中使用此库,请在您的存储库根目录中运行以下终端命令。

composer require m_rubin_itmegastar_com/lock

使用

该软件包位于命名空间 m_rubin_itmegastar_com\lock 中。

互斥锁

malkusch\lock\mutex\Mutex 类是一个抽象类,为此库提供基本 API。

Mutex::synchronized()

malkusch\lock\mutex\Mutex::synchronized() 执行代码。此方法保证代码一次只能由一个进程执行。其他进程必须等待互斥锁可用。关键代码可能抛出异常,这也会释放锁。

此方法返回给定的可调用返回的内容。返回值未进行检查,因此用户需要决定是否将返回值 falsenull 视为失败操作。

示例

$newBalance = $mutex->synchronized(function () use ($bankAccount, $amount): int {
    $balance = $bankAccount->getBalance();
    $balance -= $amount;
    if ($balance < 0) {
        throw new \DomainException('You have no credit.');
    }
    $bankAccount->setBalance($balance);

    return $balance;
});

Mutex::check()

malkusch\lock\mutex\Mutex::check() 设置一个将在 malkusch\lock\util\DoubleCheckedLocking::then() 调用时执行的可调用,并执行双重检查锁定模式,其中其返回值决定是否需要获取锁以及执行同步代码。

有关该功能的更详细说明,请参阅 https://en.wikipedia.org/wiki/Double-checked_locking

如果检查的调用返回 false,则不会获取锁,并且同步代码将不会执行。在这种情况下,malkusch\lock\util\DoubleCheckedLocking::then() 方法也将返回 false,以指示在获取锁之前或之后检查都没有通过。

如果检查的调用返回除 false 之外的值,malkusch\lock\util\DoubleCheckedLocking::then() 方法将尝试获取锁,并在成功后再次执行检查。只有当检查第二次返回除 false 之外的值时,传递给 then() 的同步代码调用才会执行。在这种情况下,then() 的返回值将是给定调用返回的任何值,因此由用户决定返回 falsenull 来指示操作失败,因为这个返回值不会被库检查。

示例

$newBalance = $mutex->check(function () use ($bankAccount, $amount): bool {
    return $bankAccount->getBalance() >= $amount;
})->then(function () use ($bankAccount, $amount): int {
    $balance = $bankAccount->getBalance();
    $balance -= $amount;
    $bankAccount->setBalance($balance);

    return $balance;
});

if (false === $newBalance) {
    if ($balance < 0) {
        throw new \DomainException('You have no credit.');
    }
}

释放锁异常后的代码结果提取

基于 malkush\lock\mutex\LockMutex 的互斥锁实现,在锁释放问题时将抛出 malkusch\lock\exception\LockReleaseException,但此时同步代码块已经执行。为了读取代码结果(或抛出的异常),LockReleaseException 提供了提取它的方法。

示例

try {
    // or $mutex->check(...)
    $mutex->synchronized(function () {
        if (someCondition()) {
            throw new \DomainException();
        }

        return "result";
    });
} catch (LockReleaseException $unlock_exception) {
    if ($unlock_exception->getCodeException() !== null) {
        $code_exception = $unlock_exception->getCodeException()
        // do something with the code exception
    } else {
        $code_result = $unlock_exception->getCodeResult();
        // do something with the code result
    }
    
    // deal with LockReleaseException or propagate it
    throw $unlock_exception;
}

实现

由于 malkusch\lock\mutex\Mutex 类是一个抽象类,您可以选择提供的一种实现,或创建/扩展自己的实现。

CASMutex

CASMutex 必须与一个 比较并交换 操作一起使用。这个互斥锁是无锁的。它将重复执行代码,直到 CAS 操作成功。因此,代码应该通过调用 malkusch\lock\mutex\CASMutex::notify() 来通知互斥锁。

由于互斥锁在执行关键代码时持续执行,因此在 CAS 操作成功之前不得有任何副作用。

示例

$mutex = new CASMutex();
$mutex->synchronized(function () use ($memcached, $mutex, $amount): void {
    $balance = $memcached->get("balance", null, $casToken);
    $balance -= $amount;
    if (!$memcached->cas($casToken, "balance", $balance)) {
        return;

    }
    $mutex->notify();
});

FlockMutex

FlockMutex 是基于 flock() 的锁实现。

示例

$mutex = new FlockMutex(fopen(__FILE__, "r"));
$mutex->synchronized(function () use ($bankAccount, $amount) {
    $balance = $bankAccount->getBalance();
    $balance -= $amount;
    if ($balance < 0) {
        throw new \DomainException("You have no credit.");

    }
    $bankAccount->setBalance($balance);
});

支持超时作为可选的第二个参数。如果可能,则使用 ext-pcntl 扩展,否则使用忙等待。

MemcachedMutex

MemcachedMutex 是一个自旋锁实现,它使用 Memcached API

示例

$memcache = new \Memcached();
$memcache->addServer("localhost", 11211);

$mutex = new MemcachedMutex("balance", $memcache);
$mutex->synchronized(function () use ($bankAccount, $amount) {
    $balance = $bankAccount->getBalance();
    $balance -= $amount;
    if ($balance < 0) {
        throw new \DomainException("You have no credit.");

    }
    $bankAccount->setBalance($balance);
});

PHPRedisMutex

PHPRedisMutexRedLock 的分布式锁实现,它使用 phpredis 扩展

此实现需要至少 phpredis-2.2.4

如果与 Redis 服务器集群一起使用,只要大多数服务器仍然工作,获取和释放锁将继续正常工作。

示例

$redis = new Redis();
$redis->connect("localhost");

$mutex = new PHPRedisMutex([$redis], "balance");
$mutex->synchronized(function () use ($bankAccount, $amount) {
    $balance = $bankAccount->getBalance();
    $balance -= $amount;
    if ($balance < 0) {
        throw new \DomainException("You have no credit.");

    }
    $bankAccount->setBalance($balance);
});

PredisMutex

PredisMutex 是基于 RedLock 的分布式锁实现,它使用 Predis API

示例

$redis = new Client("redis://");

$mutex = new PredisMutex([$redis], "balance");
$mutex->synchronized(function () use ($bankAccount, $amount) {
    $balance = $bankAccount->getBalance();
    $balance -= $amount;
    if ($balance < 0) {
        throw new \DomainException("You have no credit.");

    }
    $bankAccount->setBalance($balance);
});

SemaphoreMutex

《SemaphoreMutex》是一种基于信号量的锁实现。

示例

$semaphore = sem_get(ftok(__FILE__, "a"));
$mutex     = new SemaphoreMutex($semaphore);
$mutex->synchronized(function () use ($bankAccount, $amount) {
    $balance = $bankAccount->getBalance();
    $balance -= $amount;
    if ($balance < 0) {
        throw new \DomainException("You have no credit.");

    }
    $bankAccount->setBalance($balance);
});

TransactionalMutex

《TransactionalMutex》将序列化委托给DBS。独占代码在事务中执行。您需要设置正确的隔离级别。然而,如果事务失败(即抛出了PDOException),则代码将在新事务中再次执行。因此,代码除了SQL语句外不应有任何副作用。另外,隔离级别应保持不变以重复事务。如果代码抛出异常,则事务将回滚,并且不会再次重放。

示例

$mutex = new TransactionalMutex($pdo);
$mutex->synchronized(function () use ($pdo, $accountId, $amount) {
    $select = $pdo->prepare("SELECT balance FROM account WHERE id = ? FOR UPDATE");
    $select->execute([$accountId]);
    $balance = $select->fetchColumn();

    $balance -= $amount;
    if ($balance < 0) {
        throw new \DomainException("You have no credit.");

    }
    $pdo->prepare("UPDATE account SET balance = ? WHERE id = ?")
        ->execute([$balance, $accountId]);
});

MySQLMutex

《MySQLMutex》使用MySQL的GET_LOCK函数。

它支持超时。如果与数据库服务器的连接丢失或中断,则锁将自动释放。

请注意,在MySQL 5.7.5之前,您不能使用嵌套锁,任何新的锁都将静默释放已持有的锁。您可能应该避免在MySQL版本< 5.7.5上使用此互斥锁。

$pdo = new PDO("mysql:host=localhost;dbname=test", "username");

$mutex = new MySQLMutex($pdo, "balance", 15);
$mutex->synchronized(function () use ($bankAccount, $amount) {
    $balance = $bankAccount->getBalance();
    $balance -= $amount;
    if ($balance < 0) {
        throw new \DomainException("You have no credit.");

    }
    $bankAccount->setBalance($balance);
});

PgAdvisoryLockMutex

《PgAdvisoryLockMutex》使用PostgreSQL的建议锁定函数。

提供命名锁。PostgreSQL锁定函数需要整数,但转换是自动处理的。

不支持超时。如果与数据库服务器的连接丢失或中断,则锁将自动释放。

$pdo = new PDO("pgsql:host=localhost;dbname=test;", "username");

$mutex = new PgAdvisoryLockMutex($pdo, "balance");
$mutex->synchronized(function () use ($bankAccount, $amount) {
    $balance = $bankAccount->getBalance();
    $balance -= $amount;
    if ($balance < 0) {
        throw new \DomainException("You have no credit.");

    }
    $bankAccount->setBalance($balance);
});

许可和作者

此项目免费且受WTFPL许可。对此项目负责的是Willem Stuursma-Ruwen willem@stuursma.name

捐赠

如果您喜欢这个项目并且愿意慷慨捐赠,请在这里捐赠一些比特币:1P5FAZ4QhXCuwYPnLZdk3PJsqePbu1UDDA