mateusjunges / confluent-schema-registry-api
一个PHP 7.4+库,用于消费Confluent Schema Registry REST API。它提供低级函数来创建PSR-7兼容的请求,同时也提供高级抽象以简化开发者体验。
Requires
- php: ^8.0
- ext-curl: *
- ext-json: *
- beberlei/assert: ~2.7|~3.0
- flix-tech/avro-php: ^5.0
- guzzlehttp/guzzle: ^7.0
- guzzlehttp/promises: ^2.0
- guzzlehttp/psr7: ^1.7|^2.1
Requires (Dev)
- doctrine/cache: ~1.3
- phpunit/phpunit: ^9.4.2
- psr/cache: ^1.0
- psr/simple-cache: ^1.0
- raphhh/trex-reflection: ~1.0
- symfony/cache: ^5.1
Suggests
- doctrine/cache: If you want to use the DoctrineCacheAdapter
- psr/cache: If you want to use the CacheItemPoolAdapter
- psr/simple-cache: If you want to use the SimpleCacheAdapter
This package is auto-updated.
Last update: 2024-09-06 18:17:07 UTC
README
一个PHP 7.4+库,用于消费Confluent Schema Registry REST API。它提供低级函数来创建PSR-7兼容的请求,同时也提供高级抽象以简化开发者体验。
内容
要求
硬依赖
可选依赖
安装
此库通过composer
安装。
composer require "flix-tech/confluent-schema-registry-api=^7.4"
注意
如果您仍在运行低于
5.0.3
的版本,我们建议立即更新,因为存在一个与异常处理的严重错误。
兼容性
此库遵循严格的语义版本控制,因此您可以期望任何次要和补丁版本都是兼容的,而主要版本升级将在UPGRADE.md文件中发布不兼容性。
使用
异步API
示例PromisingRegistry
<?php use GuzzleHttp\Client; use FlixTech\SchemaRegistryApi\Registry\PromisingRegistry; use FlixTech\SchemaRegistryApi\Exception\SchemaRegistryException; $registry = new PromisingRegistry( new Client(['base_uri' => 'registry.example.com']) ); // Register a schema with a subject $schema = AvroSchema::parse('{"type": "string"}'); // The promise will either contain a schema id as int when fulfilled, // or a SchemaRegistryException instance when rejected. // If the subject does not exist, it will be created implicitly $promise = $registry->register('test-subject', $schema); // If you want to resolve the promise, you might either get the value or an instance of a SchemaRegistryException // It is more like an Either Monad, since returning Exceptions from rejection callbacks will throw them. // We want to leave that decision to the user of the lib. // TODO: Maybe return an Either Monad instead $promise = $promise->then( static function ($schemaIdOrSchemaRegistryException) { if ($schemaIdOrSchemaRegistryException instanceof SchemaRegistryException) { throw $schemaIdOrSchemaRegistryException; } return $schemaIdOrSchemaRegistryException; } ); // Resolve the promise $schemaId = $promise->wait(); // Get a schema by schema id $promise = $registry->schemaForId($schemaId); // As above you could add additional callbacks to the promise $schema = $promise->wait(); // Get the version of a schema for a given subject. $version = $registry->schemaVersion( 'test-subject', $schema )->wait(); // You can also get a schema by subject and version $schema = $registry->schemaForSubjectAndVersion('test-subject', $version)->wait(); // You can additionally just query for the currently latest schema within a subject. // *NOTE*: Once you requested this it might not be the latest version anymore. $latestSchema = $registry->latestVersion('test-subject')->wait(); // Sometimes you want to find out the global schema id for a given schema $schemaId = $registry->schemaId('test-subject', $schema)->wait();
同步API
示例BlockingRegistry
<?php use FlixTech\SchemaRegistryApi\Registry\BlockingRegistry; use FlixTech\SchemaRegistryApi\Registry\PromisingRegistry; use GuzzleHttp\Client; $registry = new BlockingRegistry( new PromisingRegistry( new Client(['base_uri' => 'registry.example.com']) ) ); // What the blocking registry does is actually resolving the promises // with `wait` and adding a throwing rejection callback. $schema = AvroSchema::parse('{"type": "string"}'); // This will be an int, and not a promise $schemaId = $registry->register('test-subject', $schema);
缓存
存在一个CachedRegistry
,它接受一个CacheAdapter
和一个Registry
。它支持异步和同步API。
注意
从库的4.x版本开始,
CacheAdapterInterface
的API已更改,以便可以通过给定方案的哈希值缓存方案ID。
示例
<?php use FlixTech\SchemaRegistryApi\Registry\BlockingRegistry; use FlixTech\SchemaRegistryApi\Registry\PromisingRegistry; use FlixTech\SchemaRegistryApi\Registry\CachedRegistry; use FlixTech\SchemaRegistryApi\Registry\Cache\AvroObjectCacheAdapter; use FlixTech\SchemaRegistryApi\Registry\Cache\DoctrineCacheAdapter; use Doctrine\Common\Cache\ArrayCache; use GuzzleHttp\Client; $asyncApi = new PromisingRegistry( new Client(['base_uri' => 'registry.example.com']) ); $syncApi = new BlockingRegistry($asyncApi); $doctrineCachedSyncApi = new CachedRegistry( $asyncApi, new DoctrineCacheAdapter( new ArrayCache() ) ); // All adapters support both APIs, for async APIs additional fulfillment callbacks will be registered. $avroObjectCachedAsyncApi = new CachedRegistry( $syncApi, new AvroObjectCacheAdapter() ); // NEW in version 4.x, passing in custom hash functions to cache schema ids via the schema hash // By default the following function is used internally $defaultHashFunction = static function (AvroSchema $schema) { return md5((string) $schema); }; // You can also define your own hash callable $sha1HashFunction = static function (AvroSchema $schema) { return sha1((string) $schema); }; // Pass the hash function as optional 3rd parameter to the CachedRegistry constructor $avroObjectCachedAsyncApi = new CachedRegistry( $syncApi, new AvroObjectCacheAdapter(), $sha1HashFunction );
低级API
存在一个低级API,它提供简单的函数,返回注册表不同端点的PSR-7请求对象。有关更多信息,请参阅Requests/Functions。
还有请求使用方案注册表的新的DELETE
API。
测试
此库使用Makefile
来运行测试套件,并需要docker
。
您可以通过将variables.mk.dist
复制到variables.mk
并按需更改来设置默认变量。
构建本地docker镜像
PHP_VERSION=7.3 XDEBUG_VERSION=2.9.8 make docker
单元测试、编码规范和静态分析
PHP_VERSION=7.3 make ci-local
集成测试
此库使用docker-compose
配置来启动用于集成测试的方案注册表,因此需要从版本1.18.x开始的docker-compose
来运行这些测试。
可以使用以下环境变量控制平台
CONFLUENT_VERSION=latest
CONFLUENT_NETWORK_SUBNET=172.68.0.0/24
SCHEMA_REGISTRY_IPV4=172.68.0.103
KAFKA_BROKER_IPV4=172.68.0.102
ZOOKEEPER_IPV4=172.68.0.101
使用特定版本构建confluent平台并运行集成测试
CONFLUENT_VERSION=5.2.3 make platform make phpunit-integration make clean
贡献
要为此库做出贡献,请遵循以下工作流程
- 分叉存储库
- 创建功能分支
- 在工作上
- 运行测试以验证测试是否通过
- 向上游打开PR
- 为为开源做出贡献而感到高兴!