mkcg/php-query-model

一种可查询和滚动任何类型数据(SQL、搜索引擎、HTTP API、CSV、...)的通用模型,并可通过ETL将它们推送到任何地方。

0.9.14 2020-08-11 15:10 UTC

README

一个简单的多数据库库,用于在多种搜索引擎上搜索内容,并将这些结果聚合到面向文档的结构中。

该库定义了一个名为MKCG\Model\DBAL\QueryEngine的类,用于使用不同的驱动构建文档。

它还定义了一个简单的ETL,以便能够轻松有效地在不同数据源之间同步内容。

引擎API

QueryEngine API定义了两个方法:query()scroll()

每个方法都可以使用提供的\MKCG\Model\Model和适当的\MKCG\Model\DBAL\QueryCriteria获取并构建文档。然而,scroll()方法返回一个\Generator,并在内部执行多个批量操作,以有效地滚动大型集合。

示例

$model = Schema\User::make('default', 'user')
    ->with(Schema\Address::make())
    ->with(Schema\Post::make());

$criteria = (new QueryCriteria())
    ->forCollection('user')
        ->addFilter('status', FilterInterface::FILTER_IN, [ 2 , 3 , 5 , 7 ])
        ->addFilter('registered_at', FilterInterface::FILTER_GREATER_THAN_EQUAL, '2000-01-01')
        ->addSort('firstname', 'ASC')
        ->addSort('lastname', 'ASC')
        ->setLimit(10)
    ->forCollection('addresses')
        ->setLimitByParent(2)
    ->forCollection('posts')
        ->addFilter('title', FilterInterface::FILTER_FULLTEXT_MATCH, 'ab')
;

$users = $engine->query($model, $criteria);

echo json_encode($users->getContent(), JSON_PRETTY_PRINT) . "\n";
echo "\nFound : " . $users->getCount() . " users\n";


$iterator = $engine->scroll($model, $criteria);

foreach ($iterator as $user) {
    echo json_encode($user, JSON_PRETTY_PRINT) . "\n";
}

Example

驱动定义

每个驱动负责在单个数据源(数据库、HTTP API、本地文件、...)上执行查询,并且必须

  • 实现注册在QueryEngine中的MKCG\Model\DBAL\Drivers\DriverInterface
  • 内部

示例

use MKCG\Model\DBAL\QueryEngine;
use MKCG\Model\DBAL\Drivers;

$mongoClient = new MongoDB\Client('mongodb://root:password@mongodb');

$redisClient = new \Predis\Client([
    'scheme' => 'tcp',
    'host' => 'redisearch',
    'port' => 6379
]);

$sqlConnection = \Doctrine\DBAL\DriverManager::getConnection([
    'user' => 'root',
    'password' => 'root',
    'host' => 'mysql',
    'driver' => 'pdo_mysql',
]);

$engine = (new QueryEngine('mysql'))
    ->registerDriver(new Drivers\Doctrine($sqlConnection), 'mysql')
    ->registerDriver(new Drivers\CsvReader($fixturePath), 'csv')
    ->registerDriver(new Drivers\RssReader(new Adapters\Guzzle), 'rss')
    ->registerDriver(new Drivers\SitemapReader(new Adapters\Guzzle), 'sitemap')
    ->registerDriver(new Drivers\Http(new Adapters\Guzzle), 'http')
    ->registerDriver(new Drivers\HttpRobot(new Adapters\Guzzle), 'http_robot')
    ->registerDriver(new Drivers\MongoDB($mongoClient), 'mongodb')

运行时行为

驱动

驱动支持的功能

查询条件选项

基于HTTP的驱动

  • HTTP
  • HttpRobot
  • RssReader
  • SitemapReader
  • Elasticsearch

基于结果的过滤驱动

  • CsvReader
  • RssReader
  • SitemapReader

当同时提供url_generatorurl时,则仅使用url_generator

过滤器

常量由接口MKCG\Model\DBAL\FilterInterface定义

驱动支持的过滤器

CUSTOM过滤器类型

可以通过向QueryCriteria实例提供callable来应用自定义过滤器

(new QueryCriteria())
    ->forCollection('order')
        ->addCallableFilter(function(Query $query, ...$arguments) {
            // do something
        })

callable的第一个参数应该是Query实例。其他参数可能根据驱动而变化。

一些驱动在获取结果时应用过滤器,并期望当过滤器不匹配时返回一个false值。在内部,它们在每个获取的结果上应用一个array_filter,在调用callable参数之前

  • CsvReader
  • RssReader
  • SitemapReader

callable的参数由驱动决定

ETL

一个极其简单的ETL定义为单个类\MKCG\Model\ETL。它可以与QueryEngine的scroll API结合使用,以转换并推送内容到不同的加载器;

示例

function pipelineEtl(QueryEngine $engine)
{
    $model = Schema\Product::make('default', 'products');
    $criteria = (new QueryCriteria())
        ->forCollection('products')
            ->addFilter('sku.color', FilterInterface::FILTER_IN, ['aqua', 'purple'])
        ;

    $iterator = $engine->scroll($model, $criteria, 100);

    $pushed = ETL::extract($engine->scroll($model, $criteria, 100), 1000, 500)
        ->transform(function($item) {
            return [
                'id' => $item['_id'],
                'sku' => $item['sku']
            ];
        })
        ->transform(function($item) {
            return $item + [
                'sku_count' => count($item['sku'] ?? [])
            ];
        })
        ->load(function(iterable $bulk) {
            echo sprintf("[ETL] Loader 1 - Loading %d elements\n", count($bulk));
        })
        ->load(function(iterable $bulk) {
            echo sprintf("[ETL] Loader 2 - Loading %d elements\n", count($bulk));
        })
        ->load(function(iterable $bulk) {
            echo sprintf("[ETL] Loader 3 - Loading %d elements\n", count($bulk));
        })
        ->run();

    echo sprintf("[ETL] Pushed %d elements\n", $pushed);
}

聚合

支持的聚合

测试和示例

虽然将使用Behat进行一些测试以发布1.0.0版本,但提供了一个完全功能的示例在examples/中,并使用不同类型的驱动构建文档。

来源:./examples

docker-compose up --build -d
docker exec -it php_query_model sh -c "cd /home/php-query-model/examples && composer install"
docker exec -it php_query_model sh -c "php /home/php-query-model/examples/index.php"

默认情况下,这只会运行两个函数(位于index.php

pipelineEtl($engine);
searchOrder($engine);
// searchProducts($engine);
// searchGithubRobot($engine);
// searchSitemaps($engine);
// searchPackages($engine);
// searchUsers($engine);
// searchHackerNews($engine);

pipelineEtl使用引擎scroll API迭代存储在MongoDB中的Product列表,并在推送内容之前应用不同的转换,使用三个加载器通过ETL组件。

searchOrder使用引擎scroll API

  • 扫描包含电商OrderCSV文件
    • 然后将相应的Product存储在MongoDB中
    • 然后将相应的客户存储为User插入到MySQL中
      • 以及他们的前两个定义的Address也存储在MySQL中
      • 以及所有他们的Post存储在MySQL中

您可能想要取消注释其他搜索函数以执行HTTP查询和获取

路线图

预期功能

进行中

待办事项

期望功能

数据库驱动

  • Algolia
  • ArangoDB
  • Cassandra
  • Illuminate\Eloquent(Laravel使用的库)
  • Neo4J
  • PostgreSQL
  • ScyllaDB
  • Solr

  • Kafka
  • MySQL binlog
  • RabbitMQ

存储

  • AWS S3
  • 文件系统
  • OpenIO

基础设施

  • AWS
  • OVH

服务

  • Cloudinary
  • Sendinblue

社交网络

  • Facebook
  • LinkedIn
  • Twitter

贡献

欢迎对任何建议或为该项目做出贡献打开合并请求。