amphp/sync

基于 Amp 和 Revolt 的 PHP 非阻塞同步原语。

维护者

详细信息

github.com/amphp/sync

主页

源代码

问题

资助包维护!
amphp

安装数: 19,814,160

依赖者: 35

建议者: 0

安全: 0

星标: 158

关注者: 8

分支: 12

开放问题: 0

v2.3.0 2024-08-03 19:31 UTC

README

AMPHP 是一组为 PHP 设计的事件驱动库,考虑到纤维和并发。具体而言,amphp/sync 提供了同步原语,如锁和信号量,用于异步和并发编程。

Latest Release MIT License

安装

此包可以作为 Composer 依赖项安装。

composer require amphp/sync

使用方法

管理并发时,人类是薄弱环节;因此,amphp/sync 提供了抽象来隐藏一些复杂性。

互斥锁

可以使用 Amp\Sync\synchronized() 和任何 Mutex 实现来实现互斥锁,或者通过手动使用 Mutex 实例来获取锁。

只要不使用 Lock::release() 或垃圾回收释放最终的 Lock 对象,锁的持有者就可以在所有其他执行相同代码的各方在执行之前获取锁的情况下,独占运行一些代码。

function writeExclusively(Amp\Sync\Mutex $mutex, string $filePath, string $data) {
    $lock = $mutex->acquire();
    
    try {
        Amp\File\write($filePath, $data);
    } finally {
        $lock->release();
    }
}
function writeExclusively(Amp\Sync\Mutex $mutex, string $filePath, string $data) {
    Amp\Sync\synchronized($mutex, fn () => Amp\File\write($filePath, $data));
}

信号量

除了互斥锁之外,信号量是另一种同步原语。

它们不是为单个实体提供独占访问权限,而是为 N 个实体提供同时访问权限。这使得它们非常适合控制并发,例如,限制 HTTP 客户端只能并发发送 X 个请求,以免 HTTP 服务器过载。

Mutex 类似,可以使用 Semaphore::acquire() 获取 Lock 实例。请参阅 Mutex 文档以获取更多使用文档,因为它们基本上是等效的,除了 Mutex 总是具有正好一个实体的信号量之外。

在许多情况下,可以使用 amphp/pipeline 来代替直接使用信号量。

包裹

包裹用于在多个执行上下文之间同步访问值,例如多个协程或多个进程。下面的示例演示了使用 LocalParcel 在两个协程之间共享一个整数。

use Amp\Future;
use Amp\Sync\LocalMutex;
use Amp\Sync\LocalParcel;
use function Amp\async;
use function Amp\delay;

$parcel = new LocalParcel(new LocalMutex(), 42);

$future1 = async(function () use ($parcel): void {
    echo "Coroutine 1 started\n";

    $result = $parcel->synchronized(function (int $value): int {
        delay(1); // Delay for 1s to simulate I/O.
        return $value * 2;
    });

    echo "Value after access in coroutine 1: ", $result, "\n";
});

$future2 = async(function () use ($parcel): void {
    echo "Coroutine 2 started\n";

    $result = $parcel->synchronized(function (int $value): int {
        delay(1); // Delay again in this coroutine.
        return $value + 8;
    });

    echo "Value after access in coroutine 2: ", $result, "\n";
});

Future\await([$future1, $future2]); // Wait until both coroutines complete.

通道

通道用于在执行上下文之间发送数据,例如多个协程或多个进程。下面的示例在两个协程之间共享两个 Channel。这些通道是连接的。在通道上发送的数据在配对的通道上接收,反之亦然。

use Amp\Future;
use function Amp\async;
use function Amp\delay;

[$left, $right] = createChannelPair();

$future1 = async(function () use ($left): void {
    echo "Coroutine 1 started\n";
    delay(1); // Delay to simulate I/O.
    $left->send(42);
    $received = $left->receive();
    echo "Received ", $received, " in coroutine 1\n";
});

$future2 = async(function () use ($right): void {
    echo "Coroutine 2 started\n";
    $received = $right->receive();
    echo "Received ", $received, " in coroutine 2\n";
    delay(1); // Delay to simulate I/O.
    $right->send($received * 2);
});

Future\await([$future1, $future2]); // Wait until both coroutines complete.

进程间共享数据

要在 PHP 中在进程之间共享数据,数据必须是可序列化的并使用外部存储或 IPC(进程间通信)通道。

外部存储中的包裹

SharedMemoryParcel 使用共享内存与 PosixSemaphore 结合使用,并通过 SemaphoreMutex 包装(尽管可以使用其他跨上下文互斥锁实现,例如在 amphp/redis 中的 RedisMutex)。

注意 ext-shmopext-sysvmsg 分别是使用 SharedMemoryParcelPosixSemaphore 的必要扩展。

amphp/redis 提供了 RedisParcel 用于在 Redis 中存储共享数据。

通过管道创建通道

通过在进程之间添加序列化(原生 PHP 序列化、JSON 序列化等)层,可以在进程间创建通道。

amphp/byte-stream 中的 StreamChannel 可以从任何 ReadableStreamWritableStream 创建通道。这允许从各种流源(如套接字或进程管道)创建通道。

amphp/parallel 中的 ProcessContext 实现了 Channel 以在父进程和子进程之间发送数据。

amphp/parallel 中的任务 Execution 对象包含一个 Channel,用于在任务运行和提交任务的进程之间发送数据。

并发方法

假设您有一份要抓取的 URL 列表,我们将讨论几种可能的方法。为了简单起见,我们将假设已经存在一个 fetch 函数,它接受一个 URL 并返回 HTTP 状态码(这是我们这些例子中想知道的所有内容)。

方法 1:顺序执行

使用非阻塞 I/O 的简单循环,但在抓取单个 URL 时没有并发;一旦第一个请求完成,就开始第二个请求。

$urls = [...];

$results = [];

foreach ($urls as $url) {
    $results[$url] = fetch($url);
}

var_dump($results);

方法 2:全部并发

几乎相同的循环,但一次性等待所有操作;立即启动所有请求。当 URL 数量太多时可能不可行。

$urls = [...];

$results = [];

foreach ($urls as $url) {
    $results[$url] = Amp\async(fetch(...), $url);
}

$results = Amp\Future\await($results);

var_dump($results);

方法 3:并发块

将工作分成十个块;块内的所有请求都并发执行,但每个块按顺序执行,因此每个块的执行时间取决于最慢的响应;一旦前十个请求完成,就开始第 11 个请求。

$urls = [...];

$results = [];

foreach (\array_chunk($urls, 10) as $chunk) {
    $futures = [];

    foreach ($chunk as $url) {
        $futures[$url] = Amp\async(fetch(...), $url);
    }

    $results = \array_merge($results, Amp\Future\await($futures));
}

var_dump($results);

方法 4:并发迭代器

amphp/pipeline 库提供了并发迭代器,可以用于在多个纤维中并发处理和消耗数据。

use Amp\Pipeline\Pipeline;
use function Amp\delay;

$urls = [...];

$results = Pipeline::fromIterable($urls)
    ->concurrent(10) // Process up to 10 URLs concurrently
    ->unordered() // Results may arrive out of order
    ->map(fetch(...)) // Map each URL to fetch(...)
    ->toArray();

var_dump($results);

有关使用管道进行并发的更多信息,请参阅 amphp/pipeline 的文档。

版本管理

amphp/sync 遵循与所有其他 amphp 包相同的 semver 语义版本规范。

安全

如果您发现任何安全相关的问题,请使用私有安全问题报告器,而不是使用公共问题跟踪器。

许可协议

MIT 许可协议 (MIT)。有关更多信息,请参阅 LICENSE