real-digital / avro-serde-php
一个利用 confluent schema registry 序列化和反序列化 Avro 记录的库
Requires
- php: ^7.3|^8.0
- beberlei/assert: ^2.9.9|~3.0
- flix-tech/avro-php: ^4.1.0
- flix-tech/confluent-schema-registry-api: ^7.3.1
- guzzlehttp/promises: ^1.4.0
- widmogrod/php-functional: ^6.0
Requires (Dev)
- doctrine/annotations: ^1.11
- phpbench/phpbench: 1.0.0-alpha2
- phpunit/phpunit: ^8.2.3|^9.4.2
- symfony/serializer: ^3.4|^4.3
- vlucas/phpdotenv: ~2.4
Suggests
- doctrine/annotations: To enable the generation of avro schemas from annotations
- symfony/serializer: To integrate avro-serde-php into symfony ecosystem
This package is auto-updated.
Last update: 2024-08-29 05:44:00 UTC
README
这是 flix-tech/avro-serde-php 的分支
动机
在序列化和反序列化消息时使用 Avro 序列化格式,尤其是在与 Confluent 平台集成时,您希望确保架构的演变不会影响下游消费者。
因此,Confluent 开发了 Schema Registry,它负责根据可配置的兼容性策略验证给定的架构演变。
不幸的是,Confluent 没有为 PHP 提供官方的 Avro SerDe 包。这个库旨在为 PHP 提供一个 Avro SerDe 库,该库实现了 Confluent wire format 并集成了 FlixTech 的 Schema Registry Client。
安装
这个库使用 PHP 的 composer 包管理器。
composer require 'real-digital/avro-serde-php:^1.7'
快速入门
注意
您应该始终使用缓存的 schema registry 客户端,否则您将为每个序列化或反序列化的消息发送 HTTP 请求。
1. 创建缓存的 Schema Registry 客户端
有关缓存信息的更多信息,请参阅 Schema Registry 客户端文档。
<?php use FlixTech\SchemaRegistryApi\Registry\Cache\AvroObjectCacheAdapter; use FlixTech\SchemaRegistryApi\Registry\CachedRegistry; use FlixTech\SchemaRegistryApi\Registry\PromisingRegistry; use GuzzleHttp\Client; $schemaRegistryClient = new CachedRegistry( new PromisingRegistry( new Client(['base_uri' => 'registry.example.com']) ), new AvroObjectCacheAdapter() );
2. 构建 RecordSerializer 实例
RecordSerializer 是您与该库交互的主要方式。它提供了用于 SerDe 操作的 encodeRecord 和 decodeMessage 方法。
<?php use FlixTech\AvroSerializer\Objects\RecordSerializer; /** @var \FlixTech\SchemaRegistryApi\Registry $schemaRegistry */ $recordSerializer = new RecordSerializer( $schemaRegistry, [ // If you want to auto-register missing schemas set this to true RecordSerializer::OPTION_REGISTER_MISSING_SCHEMAS => false, // If you want to auto-register missing subjects set this to true RecordSerializer::OPTION_REGISTER_MISSING_SUBJECTS => false, ] );
3. 编码记录
这是一个使用 RecordSerializer 在 Confluent Avro wire format 中编码消息的简单示例。
<?php /** @var \FlixTech\AvroSerializer\Objects\RecordSerializer $recordSerializer */ $subject = 'my-topic-value'; $avroSchema = AvroSchema::parse('{"type": "string"}'); $record = 'Test message'; $encodedBinaryAvro = $recordSerializer->encodeRecord($subject, $avroSchema, $record); // Send this over the wire...
4. 解码消息
这是一个使用 RecordSerializer 解码消息的简单示例。
<?php /** @var \FlixTech\AvroSerializer\Objects\RecordSerializer $recordSerializer */ /** @var string $encodedBinaryAvro */ $record = $recordSerializer->decodeMessage($encodedBinaryAvro); echo $record; // 'Test message'
Schema Resolvers
Schema Resolvers 负责知道哪些 Avro 架构属于哪种类型的记录。如果您想将 Avro 架构分别管理在单独的文件中,这特别有用。Schema Resolvers 使您能够与您可能在此库范围之外拥有的任何架构管理概念集成。
Schema Resolvers 接受任何类型的 $record,并尝试为其解析一个匹配的 AvroSchema 实例。
FileResolver
在即使是相当复杂的应用程序中,您也想要在 VCS 中管理您的架构,最可能的是作为 .avsc 文件。这些文件包含描述 Avro 架构的 JSON。
解析器接受一个 $baseDir,其中您想管理文件,以及一个 inflector callable,这是一个简单的函数,它接受记录作为第一个参数,以及一个表示 inflection 是否针对键架构的第二个布尔参数 $isKey。
<?php namespace MyNamespace; use FlixTech\AvroSerializer\Objects\SchemaResolvers\FileResolver; use function get_class;use function is_object; use function str_replace; class MyRecord {} $record = new MyRecord(); $baseDir = __DIR__ . '/files'; $inflector = static function ($record, bool $isKey) { $ext = $isKey ? '.key.avsc' : '.avsc'; $fileName = is_object($record) ? str_replace('\\', '.', get_class($record)) : 'default'; return $fileName . $ext; }; echo $inflector($record, false); // MyNamespace.MyRecord.avsc echo $inflector($record, true); // MyNamespace.MyRecord.key.avsc $resolver = new FileResolver($baseDir, $inflector); $resolver->valueSchemaFor($record); // This will load from $baseDir . '/' . MyNamespace.MyRecord.avsc $resolver->keySchemaFor($record); // This will load from $baseDir . '/' . MyNamespace.MyRecord.key.avsc
CallableResolver
这是一个最简单但也最灵活的解析器。它只接受两个 callables,分别负责获取值架构或键架构。键架构解析器是可选的。
<?php use FlixTech\AvroSerializer\Objects\SchemaResolvers\CallableResolver; use PHPUnit\Framework\Assert; use function Widmogrod\Functional\constt; $valueSchemaJson = ' { "type": "record", "name": "user", "fields": [ {"name": "name", "type": "string"}, {"name": "age", "type": "int"} ] } '; $valueSchema = AvroSchema::parse($valueSchemaJson); $resolver = new CallableResolver( constt( AvroSchema::parse($valueSchemaJson) ) ); $record = [ 'foo' => 'bar' ]; $schema = $resolver->valueSchemaFor($record); Assert::assertEquals($schema, $valueSchema);
DefinitionInterfaceResolver
该库还提供了一个暴露两个静态方法的HasSchemaDefinitionInterface接口。
HasSchemaDefinitionInterface::valueSchemaJson返回值模式的JSON字符串。HasSchemaDefinitionInterface::keySchemaJson返回NULL或者键模式的JSON字符串。
DefinitionInterfaceResolver检查给定的记录是否实现了该接口(如果没有,将抛出InvalidArgumentException),并通过静态方法解决模式。
<?php namespace MyNamespace; use FlixTech\AvroSerializer\Objects\HasSchemaDefinitionInterface; use FlixTech\AvroSerializer\Objects\SchemaResolvers\DefinitionInterfaceResolver; class MyRecord implements HasSchemaDefinitionInterface { public static function valueSchemaJson() : string { return ' { "type": "record", "name": "user", "fields": [ {"name": "name", "type": "string"}, {"name": "age", "type": "int"} ] } '; } public static function keySchemaJson() : ?string { return '{"type": "string"}'; } } $record = new MyRecord(); $resolver = new DefinitionInterfaceResolver(); $resolver->valueSchemaFor($record); // Will resolve from $record::valueSchemaJson(); $resolver->keySchemaFor($record); // Will resolve from $record::keySchemaJson();
链式解析器
链式解析器是组合多个解析器的有用工具。能够解析模式的第一位解析器将获胜。如果链中的解析器都无法确定模式,则抛出InvalidArgumentException。
<?php namespace MyNamespace; use FlixTech\AvroSerializer\Objects\SchemaResolvers\ChainResolver; $record = ['foo' => 'bar']; /** @var \FlixTech\AvroSerializer\Objects\SchemaResolvers\FileResolver $fileResolver */ /** @var \FlixTech\AvroSerializer\Objects\SchemaResolvers\CallableResolver $callableResolver */ $resolver = new ChainResolver($fileResolver, $callableResolver); // or new ChainResolver(...[$fileResolver, $callableResolver]); $resolver->valueSchemaFor($record); // Will resolve $fileResolver, then $callableResolver $resolver->keySchemaFor($record); // Will resolve $fileResolver, then $callableResolver
Symfony序列化器集成
该库提供了与Symfony序列化器组件的集成。
<?php use FlixTech\AvroSerializer\Integrations\Symfony\Serializer\AvroSerDeEncoder; use FlixTech\AvroSerializer\Objects\DefaultRecordSerializerFactory; use PHPUnit\Framework\Assert; use Symfony\Component\Serializer\Normalizer\GetSetMethodNormalizer; use Symfony\Component\Serializer\Serializer; class User { /** @var string */ private $name; /** @var int */ private $age; public function __construct(string $name, int $age) { $this->name = $name; $this->age = $age; } public function getName(): string { return $this->name; } public function setName(string $name): void { $this->name = $name; } public function getAge(): int { return $this->age; } public function setAge(int $age): void { $this->age = $age; } } $recordSerializer = DefaultRecordSerializerFactory::get( getenv('SCHEMA_REGISTRY_HOST') ); $avroSchemaJson = '{ "type": "record", "name": "user", "fields": [ {"name": "name", "type": "string"}, {"name": "age", "type": "int"} ] }'; $user = new User('Thomas', 38); $normalizer = new GetSetMethodNormalizer(); $encoder = new AvroSerDeEncoder($recordSerializer); $symfonySerializer = new Serializer([$normalizer], [$encoder]); $serialized = $symfonySerializer->serialize( $user, AvroSerDeEncoder::FORMAT_AVRO, [ AvroSerDeEncoder::CONTEXT_ENCODE_SUBJECT => 'users-value', AvroSerDeEncoder::CONTEXT_ENCODE_WRITERS_SCHEMA => AvroSchema::parse($avroSchemaJson), ] ); $deserializedUser = $symfonySerializer->deserialize( $serialized, User::class, AvroSerDeEncoder::FORMAT_AVRO ); Assert::assertEquals($deserializedUser, $user);
名称转换器
有时您的属性名称可能与模式中的字段名称不同。解决此问题的方法之一是使用自定义序列化器注解。但是,如果您使用该库提供的注解,可以使用我们的名称转换器,该转换器解析这些注解并将模式字段名称与属性名称映射。
<?php use FlixTech\AvroSerializer\Integrations\Symfony\Serializer\AvroSerDeEncoder; use FlixTech\AvroSerializer\Integrations\Symfony\Serializer\NameConverter\AvroNameConverter; use FlixTech\AvroSerializer\Objects\DefaultRecordSerializerFactory; use Symfony\Component\Serializer\Normalizer\GetSetMethodNormalizer; use Symfony\Component\Serializer\Serializer; use Doctrine\Common\Annotations\AnnotationReader as DoctrineAnnotationReader; use Doctrine\Common\Annotations\AnnotationRegistry; use FlixTech\AvroSerializer\Objects\Schema\Generation\AnnotationReader; $recordSerializer = DefaultRecordSerializerFactory::get( getenv('SCHEMA_REGISTRY_HOST') ); AnnotationRegistry::registerLoader('class_exists'); $reader = new AnnotationReader( new DoctrineAnnotationReader() ); $nameConverter = new AvroNameConverter($reader); $normalizer = new GetSetMethodNormalizer(null, $nameConverter); $encoder = new AvroSerDeEncoder($recordSerializer); $symfonySerializer = new Serializer([$normalizer], [$encoder]);
模式构建器
该库还提供了使用PHP定义模式的方法,这与Java SDK提供的SchemaBuilder API非常相似。
<?php use FlixTech\AvroSerializer\Objects\Schema; use FlixTech\AvroSerializer\Objects\Schema\Record\FieldOption; Schema::record() ->name('object') ->namespace('org.acme') ->doc('A test object') ->aliases(['stdClass', 'array']) ->field('name', Schema::string(), FieldOption::doc('Name of the object'), FieldOption::orderDesc()) ->field('answer', Schema::int(), FieldOption::default(42), FieldOption::orderAsc(), FieldOption::aliases('wrong', 'correct')) ->field('ignore', Schema::boolean(), FieldOption::orderIgnore()) ->parse();
模式生成器
除了提供定义模式的流畅API外,我们还提供了从类元数据(注解)生成模式的方法。为此,您必须安装doctrine/annotations包。
<?php use FlixTech\AvroSerializer\Objects\DefaultSchemaGeneratorFactory; use FlixTech\AvroSerializer\Objects\Schema\Generation\Annotations as SerDe; /** * @SerDe\AvroType("record") * @SerDe\AvroName("user") */ class User { /** * @SerDe\AvroType("string") * @var string */ private $firstName; /** * @SerDe\AvroType("string") * @var string */ private $lastName; /** * @SerDe\AvroType("int") * @var int */ private $age; public function __construct(string $firstName, string $lastName, int $age) { $this->firstName = $firstName; $this->lastName = $lastName; $this->age = $age; } public function getFirstName(): string { return $this->firstName; } public function getLastName(): string { return $this->lastName; } public function getAge(): int { return $this->age; } } $generator = DefaultSchemaGeneratorFactory::get(); $schema = $generator->generate(User::class); $avroSchema = $schema->parse();
有关可能注解的更多示例,请参阅测试用例。
示例
该库在examples文件夹中提供了一些可执行的示例。您应该查看这些示例,以了解该库的工作方式。