mateusjunges/confluent-schema-registry-api

一个PHP 7.4+库,用于消费Confluent Schema Registry REST API。它提供低级函数来创建PSR-7兼容的请求,同时也提供高级抽象以简化开发者体验。

9.0.0 2023-10-06 16:05 UTC

This package is auto-updated.

Last update: 2024-09-06 18:17:07 UTC


README

schema-registry-ci Actions Status Maintainability Test Coverage Latest Stable Version Total Downloads License

一个PHP 7.4+库,用于消费Confluent Schema Registry REST API。它提供低级函数来创建PSR-7兼容的请求,同时也提供高级抽象以简化开发者体验。

内容

要求

硬依赖

可选依赖

安装

此库通过composer安装。

composer require "flix-tech/confluent-schema-registry-api=^7.4"

注意

如果您仍在运行低于5.0.3的版本,我们建议立即更新,因为存在一个与异常处理的严重错误。

兼容性

此库遵循严格的语义版本控制,因此您可以期望任何次要和补丁版本都是兼容的,而主要版本升级将在UPGRADE.md文件中发布不兼容性。

使用

异步API

接口声明

示例PromisingRegistry

<?php

use GuzzleHttp\Client;
use FlixTech\SchemaRegistryApi\Registry\PromisingRegistry;
use FlixTech\SchemaRegistryApi\Exception\SchemaRegistryException;

$registry = new PromisingRegistry(
    new Client(['base_uri' => 'registry.example.com'])
);

// Register a schema with a subject
$schema = AvroSchema::parse('{"type": "string"}');

// The promise will either contain a schema id as int when fulfilled,
// or a SchemaRegistryException instance when rejected.
// If the subject does not exist, it will be created implicitly
$promise = $registry->register('test-subject', $schema);

// If you want to resolve the promise, you might either get the value or an instance of a SchemaRegistryException
// It is more like an Either Monad, since returning Exceptions from rejection callbacks will throw them.
// We want to leave that decision to the user of the lib.
// TODO: Maybe return an Either Monad instead
$promise = $promise->then(
    static function ($schemaIdOrSchemaRegistryException) {
        if ($schemaIdOrSchemaRegistryException instanceof SchemaRegistryException) {
            throw $schemaIdOrSchemaRegistryException;
        }
        
        return $schemaIdOrSchemaRegistryException;
    }
);

// Resolve the promise
$schemaId = $promise->wait();

// Get a schema by schema id
$promise = $registry->schemaForId($schemaId);
// As above you could add additional callbacks to the promise
$schema = $promise->wait();

// Get the version of a schema for a given subject.
$version = $registry->schemaVersion(
    'test-subject',
    $schema
)->wait();

// You can also get a schema by subject and version
$schema = $registry->schemaForSubjectAndVersion('test-subject', $version)->wait();

// You can additionally just query for the currently latest schema within a subject.
// *NOTE*: Once you requested this it might not be the latest version anymore.
$latestSchema = $registry->latestVersion('test-subject')->wait();

// Sometimes you want to find out the global schema id for a given schema
$schemaId = $registry->schemaId('test-subject', $schema)->wait();

同步API

接口声明

示例BlockingRegistry

<?php

use FlixTech\SchemaRegistryApi\Registry\BlockingRegistry;
use FlixTech\SchemaRegistryApi\Registry\PromisingRegistry;
use GuzzleHttp\Client;

$registry = new BlockingRegistry(
    new PromisingRegistry(
        new Client(['base_uri' => 'registry.example.com'])
    )
);

// What the blocking registry does is actually resolving the promises
// with `wait` and adding a throwing rejection callback.
$schema = AvroSchema::parse('{"type": "string"}');

// This will be an int, and not a promise
$schemaId = $registry->register('test-subject', $schema);

缓存

存在一个CachedRegistry,它接受一个CacheAdapter和一个Registry。它支持异步和同步API。

注意

从库的4.x版本开始,CacheAdapterInterface的API已更改,以便可以通过给定方案的哈希值缓存方案ID。

示例

<?php

use FlixTech\SchemaRegistryApi\Registry\BlockingRegistry;
use FlixTech\SchemaRegistryApi\Registry\PromisingRegistry;
use FlixTech\SchemaRegistryApi\Registry\CachedRegistry;
use FlixTech\SchemaRegistryApi\Registry\Cache\AvroObjectCacheAdapter;
use FlixTech\SchemaRegistryApi\Registry\Cache\DoctrineCacheAdapter;
use Doctrine\Common\Cache\ArrayCache;
use GuzzleHttp\Client;

$asyncApi = new PromisingRegistry(
    new Client(['base_uri' => 'registry.example.com'])
);

$syncApi = new BlockingRegistry($asyncApi);

$doctrineCachedSyncApi = new CachedRegistry(
    $asyncApi,
    new DoctrineCacheAdapter(
        new ArrayCache()
    )
);

// All adapters support both APIs, for async APIs additional fulfillment callbacks will be registered.
$avroObjectCachedAsyncApi = new CachedRegistry(
    $syncApi,
    new AvroObjectCacheAdapter()
);

// NEW in version 4.x, passing in custom hash functions to cache schema ids via the schema hash
// By default the following function is used internally
$defaultHashFunction = static function (AvroSchema $schema) {
   return md5((string) $schema); 
};

// You can also define your own hash callable
$sha1HashFunction = static function (AvroSchema $schema) {
   return sha1((string) $schema); 
};

// Pass the hash function as optional 3rd parameter to the CachedRegistry constructor
$avroObjectCachedAsyncApi = new CachedRegistry(
    $syncApi,
    new AvroObjectCacheAdapter(),
    $sha1HashFunction
);

低级API

存在一个低级API,它提供简单的函数,返回注册表不同端点的PSR-7请求对象。有关更多信息,请参阅Requests/Functions

还有请求使用方案注册表的新的DELETE API。

测试

此库使用Makefile来运行测试套件,并需要docker

您可以通过将variables.mk.dist复制到variables.mk并按需更改来设置默认变量。

构建本地docker镜像

PHP_VERSION=7.3 XDEBUG_VERSION=2.9.8 make docker

单元测试、编码规范和静态分析

PHP_VERSION=7.3 make ci-local

集成测试

此库使用docker-compose配置来启动用于集成测试的方案注册表,因此需要从版本1.18.x开始的docker-compose来运行这些测试。

可以使用以下环境变量控制平台
CONFLUENT_VERSION=latest
CONFLUENT_NETWORK_SUBNET=172.68.0.0/24
SCHEMA_REGISTRY_IPV4=172.68.0.103
KAFKA_BROKER_IPV4=172.68.0.102
ZOOKEEPER_IPV4=172.68.0.101
使用特定版本构建confluent平台并运行集成测试
CONFLUENT_VERSION=5.2.3 make platform
make phpunit-integration
make clean

贡献

要为此库做出贡献,请遵循以下工作流程

  • 分叉存储库
  • 创建功能分支
  • 在工作上
  • 运行测试以验证测试是否通过
  • 向上游打开PR
  • 为为开源做出贡献而感到高兴!