mateusjunges / avro-serde-php
一个库,用于使用 confluent schema registry 序列化和反序列化 Avro 记录
Requires
- php: ^8.0
- beberlei/assert: ^2.9.9|~3.0
- flix-tech/avro-php: ^5.0
- guzzlehttp/promises: ^2.0
- mateusjunges/confluent-schema-registry-api: ^9.0
- widmogrod/php-functional: ^6.0
Requires (Dev)
- doctrine/annotations: ^1.11
- phpbench/phpbench: 1.0.0-alpha2
- phpunit/phpunit: ^9.4.2
- roave/security-advisories: dev-latest
- symfony/serializer: ^3.4|^4.3
- vlucas/phpdotenv: ~2.4
Suggests
- doctrine/annotations: To enable the generation of avro schemas from annotations
- symfony/serializer: To integrate avro-serde-php into symfony ecosystem
This package is auto-updated.
Last update: 2024-09-06 18:14:25 UTC
README
动机
当使用 Avro 序列化格式进行消息序列化和反序列化时,尤其是在与 Confluent 平台集成时,您需要确保架构的演进不会影响到下游消费者。
因此,Confluent 开发了Confluent的Schema Registry,该注册表负责根据可配置的兼容性策略验证给定的架构演进。
不幸的是,Confluent 并未为 PHP 提供官方的 Avro SerDe 包。这个库旨在为 PHP 提供一个 Avro SerDe 库,该库实现了Confluent wire format并集成了 FlixTech 的Schema Registry Client。
安装
此库使用 PHP 的composer 包管理器。
composer require 'flix-tech/avro-serde-php:^1.6'
快速入门
注意
您应该始终使用缓存的 schema registry 客户端,否则您将为每个序列化或反序列化的消息进行 HTTP 请求。
1. 创建缓存的 Schema Registry 客户端
有关缓存的更多信息,请参阅Schema Registry 客户端文档。
<?php use FlixTech\SchemaRegistryApi\Registry\Cache\AvroObjectCacheAdapter; use FlixTech\SchemaRegistryApi\Registry\CachedRegistry; use FlixTech\SchemaRegistryApi\Registry\PromisingRegistry; use GuzzleHttp\Client; $schemaRegistryClient = new CachedRegistry( new PromisingRegistry( new Client(['base_uri' => 'registry.example.com']) ), new AvroObjectCacheAdapter() );
2. 构建 RecordSerializer
实例
RecordSerializer
是您与该库交互的主要方式。它为 SerDe 操作提供了 encodeRecord
和 decodeMessage
方法。
<?php use FlixTech\AvroSerializer\Objects\RecordSerializer; /** @var \FlixTech\SchemaRegistryApi\Registry $schemaRegistry */ $recordSerializer = new RecordSerializer( $schemaRegistry, [ // If you want to auto-register missing schemas set this to true RecordSerializer::OPTION_REGISTER_MISSING_SCHEMAS => false, // If you want to auto-register missing subjects set this to true RecordSerializer::OPTION_REGISTER_MISSING_SUBJECTS => false, ] );
3. 编码记录
这是一个如何使用 RecordSerializer
编码消息的简单示例。
<?php /** @var \FlixTech\AvroSerializer\Objects\RecordSerializer $recordSerializer */ $subject = 'my-topic-value'; $avroSchema = AvroSchema::parse('{"type": "string"}'); $record = 'Test message'; $encodedBinaryAvro = $recordSerializer->encodeRecord($subject, $avroSchema, $record); // Send this over the wire...
4. 解码消息
这是一个如何使用 RecordSerializer
解码消息的简单示例。
<?php /** @var \FlixTech\AvroSerializer\Objects\RecordSerializer $recordSerializer */ /** @var string $encodedBinaryAvro */ $record = $recordSerializer->decodeMessage($encodedBinaryAvro); echo $record; // 'Test message'
架构解析器
架构解析器负责知道哪个 Avro 架构属于哪种类型的记录。如果您想将 Avro 架构存储在单独的文件中,这尤其有用。架构解析器使您能够与该库范围之外的任何架构管理概念集成。
架构解析器接受任何类型的 $record
并尝试为其解析一个匹配的 AvroSchema
实例。
文件解析器
在即使是中等复杂的应用中,您也想在 VCS 中管理您的架构,最有可能的是作为 .avsc
文件。这些文件包含描述 Avro 架构的 JSON。
解析器接受一个 $baseDir
,您想要在其中管理文件,以及一个转换器 callable
,这是一个简单函数,它接受记录作为第一个参数,并接受一个表示转换是否针对键架构的第二个布尔参数 $isKey
。
<?php namespace MyNamespace; use FlixTech\AvroSerializer\Objects\SchemaResolvers\FileResolver; use function get_class;use function is_object; use function str_replace; class MyRecord {} $record = new MyRecord(); $baseDir = __DIR__ . '/files'; $inflector = static function ($record, bool $isKey) { $ext = $isKey ? '.key.avsc' : '.avsc'; $fileName = is_object($record) ? str_replace('\\', '.', get_class($record)) : 'default'; return $fileName . $ext; }; echo $inflector($record, false); // MyNamespace.MyRecord.avsc echo $inflector($record, true); // MyNamespace.MyRecord.key.avsc $resolver = new FileResolver($baseDir, $inflector); $resolver->valueSchemaFor($record); // This will load from $baseDir . '/' . MyNamespace.MyRecord.avsc $resolver->keySchemaFor($record); // This will load from $baseDir . '/' . MyNamespace.MyRecord.key.avsc
CallableResolver
这是一个最简单但也是最灵活的解析器。它只需要两个负责分别获取值或键模式的 callables
。键模式解析器是可选的。
<?php use FlixTech\AvroSerializer\Objects\SchemaResolvers\CallableResolver; use PHPUnit\Framework\Assert; use function Widmogrod\Functional\constt; $valueSchemaJson = ' { "type": "record", "name": "user", "fields": [ {"name": "name", "type": "string"}, {"name": "age", "type": "int"} ] } '; $valueSchema = AvroSchema::parse($valueSchemaJson); $resolver = new CallableResolver( constt( AvroSchema::parse($valueSchemaJson) ) ); $record = [ 'foo' => 'bar' ]; $schema = $resolver->valueSchemaFor($record); Assert::assertEquals($schema, $valueSchema);
DefinitionInterfaceResolver
这个库还提供了一个 HasSchemaDefinitionInterface
,它公开了两个静态方法
HasSchemaDefinitionInterface::valueSchemaJson
返回值的模式定义的 JSON 字符串HasSchemaDefinitionInterface::keySchemaJson
返回NULL
或键的模式定义的 JSON 字符串。
DefinitionInterfaceResolver
检查给定的记录是否实现该接口(如果没有,它将抛出 InvalidArgumentException
),并通过静态方法解析模式。
<?php namespace MyNamespace; use FlixTech\AvroSerializer\Objects\HasSchemaDefinitionInterface; use FlixTech\AvroSerializer\Objects\SchemaResolvers\DefinitionInterfaceResolver; class MyRecord implements HasSchemaDefinitionInterface { public static function valueSchemaJson() : string { return ' { "type": "record", "name": "user", "fields": [ {"name": "name", "type": "string"}, {"name": "age", "type": "int"} ] } '; } public static function keySchemaJson() : ?string { return '{"type": "string"}'; } } $record = new MyRecord(); $resolver = new DefinitionInterfaceResolver(); $resolver->valueSchemaFor($record); // Will resolve from $record::valueSchemaJson(); $resolver->keySchemaFor($record); // Will resolve from $record::keySchemaJson();
ChainResolver
链式解析器是组合多个解析器的有用工具。第一个能够解析模式解析器将获胜。如果链中的任何解析器都无法确定模式,则抛出 InvalidArgumentException
。
<?php namespace MyNamespace; use FlixTech\AvroSerializer\Objects\SchemaResolvers\ChainResolver; $record = ['foo' => 'bar']; /** @var \FlixTech\AvroSerializer\Objects\SchemaResolvers\FileResolver $fileResolver */ /** @var \FlixTech\AvroSerializer\Objects\SchemaResolvers\CallableResolver $callableResolver */ $resolver = new ChainResolver($fileResolver, $callableResolver); // or new ChainResolver(...[$fileResolver, $callableResolver]); $resolver->valueSchemaFor($record); // Will resolve $fileResolver, then $callableResolver $resolver->keySchemaFor($record); // Will resolve $fileResolver, then $callableResolver
Symfony 序列化器集成
此库提供了与 Symfony 序列化器组件 的集成。
<?php use FlixTech\AvroSerializer\Integrations\Symfony\Serializer\AvroSerDeEncoder; use FlixTech\AvroSerializer\Objects\DefaultRecordSerializerFactory; use PHPUnit\Framework\Assert; use Symfony\Component\Serializer\Normalizer\GetSetMethodNormalizer; use Symfony\Component\Serializer\Serializer; class User { /** @var string */ private $name; /** @var int */ private $age; public function __construct(string $name, int $age) { $this->name = $name; $this->age = $age; } public function getName(): string { return $this->name; } public function setName(string $name): void { $this->name = $name; } public function getAge(): int { return $this->age; } public function setAge(int $age): void { $this->age = $age; } } $recordSerializer = DefaultRecordSerializerFactory::get( getenv('SCHEMA_REGISTRY_HOST') ); $avroSchemaJson = '{ "type": "record", "name": "user", "fields": [ {"name": "name", "type": "string"}, {"name": "age", "type": "int"} ] }'; $user = new User('Thomas', 38); $normalizer = new GetSetMethodNormalizer(); $encoder = new AvroSerDeEncoder($recordSerializer); $symfonySerializer = new Serializer([$normalizer], [$encoder]); $serialized = $symfonySerializer->serialize( $user, AvroSerDeEncoder::FORMAT_AVRO, [ AvroSerDeEncoder::CONTEXT_ENCODE_SUBJECT => 'users-value', AvroSerDeEncoder::CONTEXT_ENCODE_WRITERS_SCHEMA => AvroSchema::parse($avroSchemaJson), ] ); $deserializedUser = $symfonySerializer->deserialize( $serialized, User::class, AvroSerDeEncoder::FORMAT_AVRO ); Assert::assertEquals($deserializedUser, $user);
名称转换器
有时您的属性名称可能与您的模式中的字段名称不同。解决此问题的选项之一是使用 自定义序列化器注解。但是,如果您正在使用此库提供的注解,您可以使用我们的 名称转换器,该转换器解析这些注解并将模式字段名称与属性名称映射。
<?php use FlixTech\AvroSerializer\Integrations\Symfony\Serializer\AvroSerDeEncoder; use FlixTech\AvroSerializer\Integrations\Symfony\Serializer\NameConverter\AvroNameConverter; use FlixTech\AvroSerializer\Objects\DefaultRecordSerializerFactory; use Symfony\Component\Serializer\Normalizer\GetSetMethodNormalizer; use Symfony\Component\Serializer\Serializer; use Doctrine\Common\Annotations\AnnotationReader as DoctrineAnnotationReader; use Doctrine\Common\Annotations\AnnotationRegistry; use FlixTech\AvroSerializer\Objects\Schema\Generation\AnnotationReader; $recordSerializer = DefaultRecordSerializerFactory::get( getenv('SCHEMA_REGISTRY_HOST') ); AnnotationRegistry::registerLoader('class_exists'); $reader = new AnnotationReader( new DoctrineAnnotationReader() ); $nameConverter = new AvroNameConverter($reader); $normalizer = new GetSetMethodNormalizer(null, $nameConverter); $encoder = new AvroSerDeEncoder($recordSerializer); $symfonySerializer = new Serializer([$normalizer], [$encoder]);
模式构建器
此库还提供了使用 PHP 定义模式的方法,与 Java SDK 提供的 SchemaBuilder API 非常相似
<?php use FlixTech\AvroSerializer\Objects\Schema; use FlixTech\AvroSerializer\Objects\Schema\Record\FieldOption; Schema::record() ->name('object') ->namespace('org.acme') ->doc('A test object') ->aliases(['stdClass', 'array']) ->field('name', Schema::string(), FieldOption::doc('Name of the object'), FieldOption::orderDesc()) ->field('answer', Schema::int(), FieldOption::default(42), FieldOption::orderAsc(), FieldOption::aliases('wrong', 'correct')) ->field('ignore', Schema::boolean(), FieldOption::orderIgnore()) ->parse();
模式生成器
除了提供用于定义模式的流畅 API 外,我们还提供了从类元数据(注解)生成模式的方法。为此,您必须安装 doctrine/annotations
包。
<?php use FlixTech\AvroSerializer\Objects\DefaultSchemaGeneratorFactory; use FlixTech\AvroSerializer\Objects\Schema\Generation\Annotations as SerDe; /** * @SerDe\AvroType("record") * @SerDe\AvroName("user") */ class User { /** * @SerDe\AvroType("string") * @var string */ private $firstName; /** * @SerDe\AvroType("string") * @var string */ private $lastName; /** * @SerDe\AvroType("int") * @var int */ private $age; public function __construct(string $firstName, string $lastName, int $age) { $this->firstName = $firstName; $this->lastName = $lastName; $this->age = $age; } public function getFirstName(): string { return $this->firstName; } public function getLastName(): string { return $this->lastName; } public function getAge(): int { return $this->age; } } $generator = DefaultSchemaGeneratorFactory::get(); $schema = $generator->generate(User::class); $avroSchema = $schema->parse();
有关可能注解的更多示例,请参阅 测试用例。
示例
此库在 示例 文件夹中提供了一些可执行的示例。您应该查看它们以了解此库的工作方式。