n0nag0n/simple-job-queue

一个简单的库,用于与其他作业队列提供者进行交互,并提供大量灵活性。目前支持MySQL

1.1.0 2024-06-14 15:11 UTC

This package is auto-updated.

Last update: 2024-09-14 15:52:57 UTC


README

我想/需要一种简单的作业队列,可以在数据库上使用,也可以在需要时与beanstalkd/redis等适配器一起使用。实际上并没有看到为数据库设计的独立作业队列的好选项。

安装

composer require n0nag0n/simple-job-queue

用法

添加新作业

MySQL

<?php

use n0nag0n\Job_Queue

// default is mysql based job queue
$Job_Queue = new Job_Queue('mysql', [
	'mysql' => [
		'table_name' => 'new_table_name', // default is job_queue_jobs
		'use_compression' => false // default is true to use COMPRESS() and UNCOMPRESS() for payload
	]
]);

$PDO = new PDO('mysql:dbname=testdb;host=127.0.0.1', 'user', 'pass');
$Job_Queue->addQueueConnection($PDO);

$Job_Queue->selectPipeline('send_important_emails');
$Job_Queue->addJob(json_encode([ 'something' => 'that', 'ends' => 'up', 'a' => 'string' ]));

PostgreSQL

<?php

use n0nag0n\Job_Queue

// default is mysql based job queue
$Job_Queue = new Job_Queue('pgsql', [
	'pgsql' => [
		'table_name' => 'new_table_name', // default is job_queue_jobs
	]
]);

$PDO = new PDO('pgsql:dbname=testdb;host=127.0.0.1', 'user', 'pass');
$Job_Queue->addQueueConnection($PDO);

$Job_Queue->selectPipeline('send_important_emails');
$Job_Queue->addJob(json_encode([ 'something' => 'that', 'ends' => 'up', 'a' => 'string' ]));

SQLite3

<?php

use n0nag0n\Job_Queue

// default is mysql based job queue
$Job_Queue = new Job_Queue('sqlite', [
	'sqlite' => [
		'table_name' => 'new_table_name' // default is job_queue_jobs
	]
]);

$PDO = new PDO('sqlite:example.db');
$Job_Queue->addQueueConnection($PDO);

$Job_Queue->selectPipeline('send_important_emails');
$Job_Queue->addJob(json_encode([ 'something' => 'that', 'ends' => 'up', 'a' => 'string' ]));

Beanstalkd

<?php

use n0nag0n\Job_Queue

// default is mysql based job queue
$Job_Queue = new Job_Queue('beanstalkd');

$pheanstalk = Pheanstalk\Pheanstalk::create('127.0.0.1');
$Job_Queue->addQueueConnection($pheanstalk);

$Job_Queue->selectPipeline('send_important_emails');
$Job_Queue->addJob(json_encode([ 'something' => 'that', 'ends' => 'up', 'a' => 'string' ]));

运行工作进程

请参阅 example_worker.php 文件或见下文

<?php
	$Job_Queue = new n0nag0n\Job_Queue('mysql');
	$PDO = new PDO('mysql:dbname=testdb;host=127.0.0.1', 'user', 'pass');
	$Job_Queue->addQueueConnection($PDO);
	$Job_Queue->watchPipeline('send_important_emails');
	while(true) {
		$job = $Job_Queue->getNextJobAndReserve();

		// adjust to whatever makes you sleep better at night (for database queues only, beanstalkd does not need this if statement)
		if(empty($job)) {
			usleep(500000);
			continue;
		}

		echo "Processing {$job['id']}\n";
		$payload = json_decode($job['payload'], true);

		try {
			$result = doSomethingThatDoesSomething($payload);

			if($result === true) {
				$Job_Queue->deleteJob($job);
			} else {
				// this takes it out of the ready queue and puts it in another queue that can be picked up and "kicked" later.
				$Job_Queue->buryJob($job);
			}
		} catch(Exception $e) {
			$Job_Queue->buryJob($job);
		}
	}

处理长时间进程

Supervisord 将成为你的最爱。查找许多关于如何实现此功能的文章。

测试

目前使用 sqlite3 示例的 PHPUnit 测试。

vendor/bin/phpunit