flix-tech / avro-serde-php
一个库,用于利用 confluent schema registry 序列化和反序列化 Avro 记录
Requires
- php: ^7.3|^8.0
- beberlei/assert: ^2.9.9|~3.0
- flix-tech/avro-php: ^5.0
- flix-tech/confluent-schema-registry-api: ^8.0
- guzzlehttp/promises: ^1.4.0|^2.0.0
- widmogrod/php-functional: ^6.0
Requires (Dev)
- doctrine/annotations: ^1.11
- phpbench/phpbench: 1.0.0-alpha2
- phpunit/phpunit: ^9.4.2
- roave/security-advisories: dev-latest
- symfony/serializer: ^3.4|^4.3
- vlucas/phpdotenv: ~2.4
Suggests
- doctrine/annotations: To enable the generation of avro schemas from annotations
- symfony/serializer: To integrate avro-serde-php into symfony ecosystem
- 2.1.0
- dev-master / 2.0.x-dev
- 2.0.0
- 1.x-dev
- 1.7.2
- 1.7.1
- 1.7.0
- 1.7.0-rc1
- 1.6.1
- 1.6.0
- 1.5.0
- 1.4.0
- 1.3.0
- v1.2.0
- v1.1.0
- v1.0.0
- 0.2.0
- 0.1.0
- 0.0.1
- dev-dependabot/github_actions/dot-github/workflows/actions/download-artifact-4.1.7
- dev-ci/update-all
- dev-ft/update-versions
- dev-ft/build-env
- dev-fix/coverage-actions
- dev-fx/ci-imrpovements
- dev-fx/phpunit-action
- dev-ft/php-version
- dev-issue/31
- dev-ft-new-avro
This package is auto-updated.
Last update: 2024-09-03 21:38:35 UTC
README
动机
当使用 Avro 序列化格式进行消息序列化和反序列化时,特别是在与 Confluent 平台集成时,您希望确保模式以不影响下游消费者的方式进行演化。
因此,Confluent 开发了 Schema Registry,该注册表负责根据可配置的兼容性策略验证给定的模式演化。
不幸的是,Confluent 并未提供官方的 PHP Avro SerDe 包。本库旨在为 PHP 提供一个 Avro SerDe 库,该库实现了 Confluent wire format 并集成了 FlixTech 的 Schema Registry Client。
安装
此库使用 composer 包管理器 进行 PHP。
composer require 'flix-tech/avro-serde-php:^1.6'
快速入门
注意
您应始终使用缓存的 schema registry 客户端,否则您将为每个序列化或反序列化的消息进行 HTTP 请求。
1. 创建缓存的 Schema Registry 客户端
有关更多详细信息,请参阅 Schema Registry 客户端缓存文档。
<?php use FlixTech\SchemaRegistryApi\Registry\Cache\AvroObjectCacheAdapter; use FlixTech\SchemaRegistryApi\Registry\CachedRegistry; use FlixTech\SchemaRegistryApi\Registry\PromisingRegistry; use GuzzleHttp\Client; $schemaRegistryClient = new CachedRegistry( new PromisingRegistry( new Client(['base_uri' => 'registry.example.com']) ), new AvroObjectCacheAdapter() );
2. 构建 RecordSerializer
实例
RecordSerializer
是您与此库交互的主要方式。它提供了用于 SerDe 操作的 encodeRecord
和 decodeMessage
方法。
<?php use FlixTech\AvroSerializer\Objects\RecordSerializer; /** @var \FlixTech\SchemaRegistryApi\Registry $schemaRegistry */ $recordSerializer = new RecordSerializer( $schemaRegistry, [ // If you want to auto-register missing schemas set this to true RecordSerializer::OPTION_REGISTER_MISSING_SCHEMAS => false, // If you want to auto-register missing subjects set this to true RecordSerializer::OPTION_REGISTER_MISSING_SUBJECTS => false, ] );
3. 编码记录
这是一个如何使用 RecordSerializer
对消息进行编码的简单示例。
<?php /** @var \FlixTech\AvroSerializer\Objects\RecordSerializer $recordSerializer */ $subject = 'my-topic-value'; $avroSchema = AvroSchema::parse('{"type": "string"}'); $record = 'Test message'; $encodedBinaryAvro = $recordSerializer->encodeRecord($subject, $avroSchema, $record); // Send this over the wire...
4. 解码消息
这是一个如何使用 RecordSerializer
解码消息的简单示例。
<?php /** @var \FlixTech\AvroSerializer\Objects\RecordSerializer $recordSerializer */ /** @var string $encodedBinaryAvro */ $record = $recordSerializer->decodeMessage($encodedBinaryAvro); echo $record; // 'Test message'
Schema Resolvers
Schema Resolvers 负责知道哪种 Avro 模式属于哪种记录类型。如果您想在单独的文件中管理您的 Avro 模式,这将特别有用。
Schema Resolvers 接受任何类型的 $record
,并尝试为它解析一个匹配的 AvroSchema
实例。
FileResolver
在更复杂的应用程序中,您想在版本控制系统中管理您的模式,最可能的是作为 .avsc
文件。这些文件包含描述 Avro 模式的 JSON。
解析器接受一个 $baseDir
,您在其中管理文件,以及一个转换函数 callable
,这是一个简单函数,它接受记录作为第一个参数,并接受一个表示转换是否针对键模式的第二个布尔参数 $isKey
。
<?php namespace MyNamespace; use FlixTech\AvroSerializer\Objects\SchemaResolvers\FileResolver; use function get_class;use function is_object; use function str_replace; class MyRecord {} $record = new MyRecord(); $baseDir = __DIR__ . '/files'; $inflector = static function ($record, bool $isKey) { $ext = $isKey ? '.key.avsc' : '.avsc'; $fileName = is_object($record) ? str_replace('\\', '.', get_class($record)) : 'default'; return $fileName . $ext; }; echo $inflector($record, false); // MyNamespace.MyRecord.avsc echo $inflector($record, true); // MyNamespace.MyRecord.key.avsc $resolver = new FileResolver($baseDir, $inflector); $resolver->valueSchemaFor($record); // This will load from $baseDir . '/' . MyNamespace.MyRecord.avsc $resolver->keySchemaFor($record); // This will load from $baseDir . '/' . MyNamespace.MyRecord.key.avsc
CallableResolver
这是一个最简单但同时也最灵活的解析器。它只需要两个负责分别获取值或键模式的 callables
。键模式解析器是可选的。
<?php use FlixTech\AvroSerializer\Objects\SchemaResolvers\CallableResolver; use PHPUnit\Framework\Assert; use function Widmogrod\Functional\constt; $valueSchemaJson = ' { "type": "record", "name": "user", "fields": [ {"name": "name", "type": "string"}, {"name": "age", "type": "int"} ] } '; $valueSchema = AvroSchema::parse($valueSchemaJson); $resolver = new CallableResolver( constt( AvroSchema::parse($valueSchemaJson) ) ); $record = [ 'foo' => 'bar' ]; $schema = $resolver->valueSchemaFor($record); Assert::assertEquals($schema, $valueSchema);
DefinitionInterfaceResolver
这个库还提供了一个 HasSchemaDefinitionInterface
,它公开了两个静态方法
HasSchemaDefinitionInterface::valueSchemaJson
返回值的模式定义的 JSON 字符串HasSchemaDefinitionInterface::keySchemaJson
返回NULL
或键的模式定义的 JSON 字符串。
DefinitionInterfaceResolver
检查给定的记录是否实现该接口(如果没有,它将抛出 InvalidArgumentException
)并通过静态方法解析模式。
<?php namespace MyNamespace; use FlixTech\AvroSerializer\Objects\HasSchemaDefinitionInterface; use FlixTech\AvroSerializer\Objects\SchemaResolvers\DefinitionInterfaceResolver; class MyRecord implements HasSchemaDefinitionInterface { public static function valueSchemaJson() : string { return ' { "type": "record", "name": "user", "fields": [ {"name": "name", "type": "string"}, {"name": "age", "type": "int"} ] } '; } public static function keySchemaJson() : ?string { return '{"type": "string"}'; } } $record = new MyRecord(); $resolver = new DefinitionInterfaceResolver(); $resolver->valueSchemaFor($record); // Will resolve from $record::valueSchemaJson(); $resolver->keySchemaFor($record); // Will resolve from $record::keySchemaJson();
ChainResolver
链解析器是一个用于组合多个解析器的有用工具。第一个能够解析模式的解析器将获胜。如果链中的任何解析器都无法确定模式,将抛出 InvalidArgumentException
。
<?php namespace MyNamespace; use FlixTech\AvroSerializer\Objects\SchemaResolvers\ChainResolver; $record = ['foo' => 'bar']; /** @var \FlixTech\AvroSerializer\Objects\SchemaResolvers\FileResolver $fileResolver */ /** @var \FlixTech\AvroSerializer\Objects\SchemaResolvers\CallableResolver $callableResolver */ $resolver = new ChainResolver($fileResolver, $callableResolver); // or new ChainResolver(...[$fileResolver, $callableResolver]); $resolver->valueSchemaFor($record); // Will resolve $fileResolver, then $callableResolver $resolver->keySchemaFor($record); // Will resolve $fileResolver, then $callableResolver
Symfony Serializer 集成
此库提供了与 Symfony Serializer 组件 的集成。
<?php use FlixTech\AvroSerializer\Integrations\Symfony\Serializer\AvroSerDeEncoder; use FlixTech\AvroSerializer\Objects\DefaultRecordSerializerFactory; use PHPUnit\Framework\Assert; use Symfony\Component\Serializer\Normalizer\GetSetMethodNormalizer; use Symfony\Component\Serializer\Serializer; class User { /** @var string */ private $name; /** @var int */ private $age; public function __construct(string $name, int $age) { $this->name = $name; $this->age = $age; } public function getName(): string { return $this->name; } public function setName(string $name): void { $this->name = $name; } public function getAge(): int { return $this->age; } public function setAge(int $age): void { $this->age = $age; } } $recordSerializer = DefaultRecordSerializerFactory::get( getenv('SCHEMA_REGISTRY_HOST') ); $avroSchemaJson = '{ "type": "record", "name": "user", "fields": [ {"name": "name", "type": "string"}, {"name": "age", "type": "int"} ] }'; $user = new User('Thomas', 38); $normalizer = new GetSetMethodNormalizer(); $encoder = new AvroSerDeEncoder($recordSerializer); $symfonySerializer = new Serializer([$normalizer], [$encoder]); $serialized = $symfonySerializer->serialize( $user, AvroSerDeEncoder::FORMAT_AVRO, [ AvroSerDeEncoder::CONTEXT_ENCODE_SUBJECT => 'users-value', AvroSerDeEncoder::CONTEXT_ENCODE_WRITERS_SCHEMA => AvroSchema::parse($avroSchemaJson), ] ); $deserializedUser = $symfonySerializer->deserialize( $serialized, User::class, AvroSerDeEncoder::FORMAT_AVRO ); Assert::assertEquals($deserializedUser, $user);
名称转换器
有时您的属性名称可能与您模式中的字段名称不同。解决此问题的方法之一是使用 自定义 Serializer 注解。但是,如果您使用此库提供的注解,您可以使用我们的 名称转换器,该转换器解析这些注解并将模式字段名称与属性名称映射。
<?php use FlixTech\AvroSerializer\Integrations\Symfony\Serializer\AvroSerDeEncoder; use FlixTech\AvroSerializer\Integrations\Symfony\Serializer\NameConverter\AvroNameConverter; use FlixTech\AvroSerializer\Objects\DefaultRecordSerializerFactory; use Symfony\Component\Serializer\Normalizer\GetSetMethodNormalizer; use Symfony\Component\Serializer\Serializer; use Doctrine\Common\Annotations\AnnotationReader as DoctrineAnnotationReader; use Doctrine\Common\Annotations\AnnotationRegistry; use FlixTech\AvroSerializer\Objects\Schema\Generation\AnnotationReader; $recordSerializer = DefaultRecordSerializerFactory::get( getenv('SCHEMA_REGISTRY_HOST') ); AnnotationRegistry::registerLoader('class_exists'); $reader = new AnnotationReader( new DoctrineAnnotationReader() ); $nameConverter = new AvroNameConverter($reader); $normalizer = new GetSetMethodNormalizer(null, $nameConverter); $encoder = new AvroSerDeEncoder($recordSerializer); $symfonySerializer = new Serializer([$normalizer], [$encoder]);
模式构建器
此库还提供了使用 php 定义模式的方法,与 Java SDK 提供的 SchemaBuilder API 非常相似
<?php use FlixTech\AvroSerializer\Objects\Schema; use FlixTech\AvroSerializer\Objects\Schema\Record\FieldOption; Schema::record() ->name('object') ->namespace('org.acme') ->doc('A test object') ->aliases(['stdClass', 'array']) ->field('name', Schema::string(), FieldOption::doc('Name of the object'), FieldOption::orderDesc()) ->field('answer', Schema::int(), FieldOption::default(42), FieldOption::orderAsc(), FieldOption::aliases('wrong', 'correct')) ->field('ignore', Schema::boolean(), FieldOption::orderIgnore()) ->parse();
模式生成器
除了提供定义模式的流畅 API 之外,我们还提供了从类元数据(注解)生成模式的方法。为此,您必须安装 doctrine/annotations
包。
<?php use FlixTech\AvroSerializer\Objects\DefaultSchemaGeneratorFactory; use FlixTech\AvroSerializer\Objects\Schema\Generation\Annotations as SerDe; /** * @SerDe\AvroType("record") * @SerDe\AvroName("user") */ class User { /** * @SerDe\AvroType("string") * @var string */ private $firstName; /** * @SerDe\AvroType("string") * @var string */ private $lastName; /** * @SerDe\AvroType("int") * @var int */ private $age; public function __construct(string $firstName, string $lastName, int $age) { $this->firstName = $firstName; $this->lastName = $lastName; $this->age = $age; } public function getFirstName(): string { return $this->firstName; } public function getLastName(): string { return $this->lastName; } public function getAge(): int { return $this->age; } } $generator = DefaultSchemaGeneratorFactory::get(); $schema = $generator->generate(User::class); $avroSchema = $schema->parse();
有关可能的注解的更多示例,请参阅 测试用例。
示例
此库在 examples 文件夹中提供了一些可执行的示例。您应该查看这些示例以了解此库的工作原理。