appserver-io/concurrency

它引入了抽象服务和对象,这些服务和对象提供了处理线程安全、并发和数据共享的简单方法。

0.3.0 2015-06-24 14:46 UTC

This package is auto-updated.

Last update: 2024-09-18 03:17:38 UTC


README

Latest Stable Version Total Downloads License Build Status Code Coverage Code Quality

这是什么?

并发工具包引入了抽象服务和对象,这些服务和对象提供了使用PHP-Userland编写的线程安全、并发编程和数据共享的简单处理方法。

为什么?

如果您正在实现多线程功能,您总是面临着相同的问题,称为竞态条件、死锁等。当在线程之间共享简单数据或甚至是普通的PHP对象时,您必须确保在代码的任何逻辑中一切都是同步的(线程安全)。这个工具包主要帮助您通过提供用PHP-Userland编写的抽象服务和对象来避免这些问题。

如何使用?

此工具包处于早期开发阶段。请谨慎使用。

ExecutorService

它允许您将普通的PHP对象持久化作为单例存储在内存中,可在代码的任何地方访问,即使在不同的线程上下文中。您可以使用简单的注解,如方法docblocks中的@Synchronized@Asynchronous,轻松地将这些普通PHP对象线程安全或调用某些异步方法。

工作原理

如果您想在项目中使用并发工具包,您必须将其添加到composer依赖项composer.json中,然后执行composer update

{
    "require": {
        "appserver-io/concurrency": "~0.2"
    },
}

首先,您必须在主PHP脚本main.php中初始化ExecutorService

<?php

define(AUTOLOADER, 'vendor/autoload.php');
require_once(AUTOLOADER);

// init executor service
\AppserverIo\Concurrecy\ExecutorService::__init(AUTOLOADER);

假设您想构建一个简单的存储PHP对象来在多线程实现中共享数据。存储对象可能看起来像这样。创建Storage.php并添加

<?php

class Storage
{
    public $data = array();

    /**
     * @Synchronized
     */
    public function all() {
        return $this->data;
    }

    /**
     * @Synchronized
     */
    public function set($key, $value) {
        $this->data[$key] = $value;
    }

    /**
     * @Synchronized
     */
    public function get($key) {
        if ($this->has($key)) {
            return $this->data[$key];
        }
    }

    /**
     * @Synchronized
     */
    public function has($key) {
        return isset($this->data[$key]);
    }

    /**
     * @Synchronized
     */
    public function del($key) {
        unset($this->data[$key]);
    }

    /**
     * @Synchronized
     */
    public function inc($key) {
        if ($this->has($key)) {
            ++$this->data[$key];
        }
    }

    /**
     * @Asynchronous
     */
    public function dump() {
        echo var_export($this->all(), true) . PHP_EOL;
    }
}

也许您已经注意到了我们在这里使用的方法docblock注解。标记有@Synchronized的方法不能同时被多次调用。这意味着当使用all()set()get()del()inc()时,$data数组始终是同步的。使用@Asynchronous的方法是异步调用的。因此,如果调用dump()方法,它将输出$data数组,而不会阻塞主逻辑的执行。

为了使用存储对象,我们需要创建一个多线程任务模拟,该模拟应代表多线程业务逻辑。因此,创建Task.php并添加

<?php
class Task extends Thread {

    public static function simulate($maxThreads) {
        $t = array();
        for ($i=0; $i<$maxThreads; $i++) {
            $t[$i] = new self();
            $t[$i]->start(PTHREADS_INHERIT_ALL | PTHREADS_ALLOW_GLOBALS);
        }
        for ($i=0; $i<$maxThreads; $i++) {
            $t[$i]->join();
        }
    }

    public function run() {
        // get the storage object
        $storage = \AppserverIo\Concurrency\ExecutorService::__getEntity('data');
        // add thread signature
        $storage->set($this->getThreadId(), __METHOD__);
        // increase internal counter
        $storage->inc('counter');
    }
}

现在,将所有这些合并到主脚本main.php

<?php

define('AUTOLOADER', 'vendor/autoload.php');
require_once(AUTOLOADER);
require_once('Storage.php');
require_once('Task.php');

use \AppserverIo\Concurrency\ExecutorService as ExS;

// init executor service
ExS\Core::init(AUTOLOADER);

// create storage instance with alias data
$data = ExS\Core::newFromEntity('Storage', 'data');

// preinit counter
$data->set('counter', 0);

// simulate multithreaded tasks
Task::simulate(10);

// dump data async
$data->dump();

echo 'finished' . PHP_EOL;

// shutdown executor service and its entities
ExS\Core::shutdown();

如果您使用线程安全的编译版PHP版本调用main.php,其中已安装pthreads扩展,例如由appserver.io运行时提供,则结果应如下所示

$ /opt/appserver/bin/php bootstrap.php 
finished
array (
  'counter' => 10,
  140470559000320 => 'Task::run',
  140470550284032 => 'Task::run',
  140470541891328 => 'Task::run',
  140470327965440 => 'Task::run',
  140470319572736 => 'Task::run',
  140470311180032 => 'Task::run',
  140470302787328 => 'Task::run',
  140470294394624 => 'Task::run',
  140470286001920 => 'Task::run',
  140470277609216 => 'Task::run',
)

执行器服务有一些内部函数,如以下所述

问题

为了集中精力,我们希望收集有关此包的所有问题,并在主项目存储库的问题跟踪器中。请将原始存储库作为问题标题的第一个元素引用,例如:[appserver-io/<ORIGINATING_REPO>] 我遇到的问题