flix-tech/confluent-schema-registry-api

一个用于消费 Confluent Schema Registry REST API 的 PHP 7.4+ 库。它提供底层函数来创建 PSR-7 兼容的请求,同时也提供高级抽象以简化开发者体验。

8.1.0 2024-01-26 13:22 UTC

README

schema-registry-ci Actions Status Maintainability Test Coverage Latest Stable Version Total Downloads License

一个用于消费 Confluent Schema Registry REST API 的 PHP 7.4+ 库。它提供底层函数来创建 PSR-7 兼容的请求,同时也提供高级抽象以简化开发者体验。

内容

需求

硬依赖

可选依赖

安装

此库通过 composer 安装。

composer require "flix-tech/confluent-schema-registry-api=^7.4"

注意

如果您仍在运行低于 5.0.3 的版本,我们建议立即更新,因为存在一个与异常处理的严重错误。

兼容性

此库遵循严格的语义版本控制,因此您可以期望任何次要和补丁版本都是兼容的,而主要版本升级将会有不兼容性,这些不兼容性将在 UPGRADE.md 文件中发布。

使用方法

异步 API

接口声明

示例 PromisingRegistry

<?php

use GuzzleHttp\Client;
use FlixTech\SchemaRegistryApi\Registry\PromisingRegistry;
use FlixTech\SchemaRegistryApi\Exception\SchemaRegistryException;

$registry = new PromisingRegistry(
    new Client(['base_uri' => 'registry.example.com'])
);

// Register a schema with a subject
$schema = AvroSchema::parse('{"type": "string"}');

// The promise will either contain a schema id as int when fulfilled,
// or a SchemaRegistryException instance when rejected.
// If the subject does not exist, it will be created implicitly
$promise = $registry->register('test-subject', $schema);

// If you want to resolve the promise, you might either get the value or an instance of a SchemaRegistryException
// It is more like an Either Monad, since returning Exceptions from rejection callbacks will throw them.
// We want to leave that decision to the user of the lib.
// TODO: Maybe return an Either Monad instead
$promise = $promise->then(
    static function ($schemaIdOrSchemaRegistryException) {
        if ($schemaIdOrSchemaRegistryException instanceof SchemaRegistryException) {
            throw $schemaIdOrSchemaRegistryException;
        }
        
        return $schemaIdOrSchemaRegistryException;
    }
);

// Resolve the promise
$schemaId = $promise->wait();

// Get a schema by schema id
$promise = $registry->schemaForId($schemaId);
// As above you could add additional callbacks to the promise
$schema = $promise->wait();

// Get the version of a schema for a given subject.
$version = $registry->schemaVersion(
    'test-subject',
    $schema
)->wait();

// You can also get a schema by subject and version
$schema = $registry->schemaForSubjectAndVersion('test-subject', $version)->wait();

// You can additionally just query for the currently latest schema within a subject.
// *NOTE*: Once you requested this it might not be the latest version anymore.
$latestSchema = $registry->latestVersion('test-subject')->wait();

// Sometimes you want to find out the global schema id for a given schema
$schemaId = $registry->schemaId('test-subject', $schema)->wait();

同步 API

接口声明

示例 BlockingRegistry

<?php

use FlixTech\SchemaRegistryApi\Registry\BlockingRegistry;
use FlixTech\SchemaRegistryApi\Registry\PromisingRegistry;
use GuzzleHttp\Client;

$registry = new BlockingRegistry(
    new PromisingRegistry(
        new Client(['base_uri' => 'registry.example.com'])
    )
);

// What the blocking registry does is actually resolving the promises
// with `wait` and adding a throwing rejection callback.
$schema = AvroSchema::parse('{"type": "string"}');

// This will be an int, and not a promise
$schemaId = $registry->register('test-subject', $schema);

缓存

有一个 CachedRegistry 接受一个 CacheAdapter 和一个 Registry。它支持异步和同步 API。

注意

从本库的 4.x 版本开始,CacheAdapterInterface 的 API 已经更改,以便可以通过给定模式的哈希值缓存模式 ID。

示例

<?php

use FlixTech\SchemaRegistryApi\Registry\BlockingRegistry;
use FlixTech\SchemaRegistryApi\Registry\PromisingRegistry;
use FlixTech\SchemaRegistryApi\Registry\CachedRegistry;
use FlixTech\SchemaRegistryApi\Registry\Cache\AvroObjectCacheAdapter;
use FlixTech\SchemaRegistryApi\Registry\Cache\DoctrineCacheAdapter;
use Doctrine\Common\Cache\ArrayCache;
use GuzzleHttp\Client;

$asyncApi = new PromisingRegistry(
    new Client(['base_uri' => 'registry.example.com'])
);

$syncApi = new BlockingRegistry($asyncApi);

$doctrineCachedSyncApi = new CachedRegistry(
    $asyncApi,
    new DoctrineCacheAdapter(
        new ArrayCache()
    )
);

// All adapters support both APIs, for async APIs additional fulfillment callbacks will be registered.
$avroObjectCachedAsyncApi = new CachedRegistry(
    $syncApi,
    new AvroObjectCacheAdapter()
);

// NEW in version 4.x, passing in custom hash functions to cache schema ids via the schema hash
// By default the following function is used internally
$defaultHashFunction = static function (AvroSchema $schema) {
   return md5((string) $schema); 
};

// You can also define your own hash callable
$sha1HashFunction = static function (AvroSchema $schema) {
   return sha1((string) $schema); 
};

// Pass the hash function as optional 3rd parameter to the CachedRegistry constructor
$avroObjectCachedAsyncApi = new CachedRegistry(
    $syncApi,
    new AvroObjectCacheAdapter(),
    $sha1HashFunction
);

底层 API

有一个低级 API,它提供了简单的函数,返回注册表不同端点的 PSR-7 请求对象。有关更多信息,请参阅 Requests/Functions

还有使用模式注册表的新 DELETE API 的请求。

测试

此库使用 Makefile 来运行测试套件,并需要 docker

您可以通过将 variables.mk.dist 复制到 variables.mk 并更改它们来设置默认变量。

构建本地 Docker 镜像

PHP_VERSION=7.3 XDEBUG_VERSION=2.9.8 make docker

单元测试、编码标准和静态分析

PHP_VERSION=7.3 make ci-local

集成测试

此库使用 docker-compose 配置来启动用于集成测试的模式注册表,因此需要从版本 1.18.x 开始的 docker-compose 来运行这些测试。

可以通过以下环境变量控制平台
CONFLUENT_VERSION=latest
CONFLUENT_NETWORK_SUBNET=172.68.0.0/24
SCHEMA_REGISTRY_IPV4=172.68.0.103
KAFKA_BROKER_IPV4=172.68.0.102
ZOOKEEPER_IPV4=172.68.0.101
使用特定版本构建 confluent 平台并运行集成测试
CONFLUENT_VERSION=5.2.3 make platform
make phpunit-integration
make clean

贡献

为了为此库做出贡献,请遵循以下工作流程

  • 分支存储库
  • 创建功能分支
  • 在功能上工作
  • 运行测试以验证测试是否通过
  • 向上游打开 PR
  • 为为开源项目做出贡献而感到高兴!