real-digital/avro-serde-php

一个利用 confluent schema registry 序列化和反序列化 Avro 记录的库

1.7.3 2021-04-29 08:55 UTC

README

这是 flix-tech/avro-serde-php 的分支

php-confluent-serde Actions Status Maintainability Test Coverage Latest Stable Version Total Downloads License

动机

在序列化和反序列化消息时使用 Avro 序列化格式,尤其是在与 Confluent 平台集成时,您希望确保架构的演变不会影响下游消费者。

因此,Confluent 开发了 Schema Registry,它负责根据可配置的兼容性策略验证给定的架构演变。

不幸的是,Confluent 没有为 PHP 提供官方的 Avro SerDe 包。这个库旨在为 PHP 提供一个 Avro SerDe 库,该库实现了 Confluent wire format 并集成了 FlixTech 的 Schema Registry Client。

安装

这个库使用 PHP 的 composer 包管理器

composer require 'real-digital/avro-serde-php:^1.7'

快速入门

注意

您应该始终使用缓存的 schema registry 客户端,否则您将为每个序列化或反序列化的消息发送 HTTP 请求。

1. 创建缓存的 Schema Registry 客户端

有关缓存信息的更多信息,请参阅 Schema Registry 客户端文档

<?php

use FlixTech\SchemaRegistryApi\Registry\Cache\AvroObjectCacheAdapter;
use FlixTech\SchemaRegistryApi\Registry\CachedRegistry;
use FlixTech\SchemaRegistryApi\Registry\PromisingRegistry;
use GuzzleHttp\Client;

$schemaRegistryClient = new CachedRegistry(
    new PromisingRegistry(
        new Client(['base_uri' => 'registry.example.com'])
    ),
    new AvroObjectCacheAdapter()
);

2. 构建 RecordSerializer 实例

RecordSerializer 是您与该库交互的主要方式。它提供了用于 SerDe 操作的 encodeRecorddecodeMessage 方法。

<?php

use FlixTech\AvroSerializer\Objects\RecordSerializer;

/** @var \FlixTech\SchemaRegistryApi\Registry $schemaRegistry */
$recordSerializer = new RecordSerializer(
    $schemaRegistry,
    [
        // If you want to auto-register missing schemas set this to true
        RecordSerializer::OPTION_REGISTER_MISSING_SCHEMAS => false,
        // If you want to auto-register missing subjects set this to true
        RecordSerializer::OPTION_REGISTER_MISSING_SUBJECTS => false,
    ]
);

3. 编码记录

这是一个使用 RecordSerializer 在 Confluent Avro wire format 中编码消息的简单示例。

<?php

/** @var \FlixTech\AvroSerializer\Objects\RecordSerializer $recordSerializer */
$subject = 'my-topic-value';
$avroSchema = AvroSchema::parse('{"type": "string"}');
$record = 'Test message';

$encodedBinaryAvro = $recordSerializer->encodeRecord($subject, $avroSchema, $record);
// Send this over the wire...

4. 解码消息

这是一个使用 RecordSerializer 解码消息的简单示例。

<?php

/** @var \FlixTech\AvroSerializer\Objects\RecordSerializer $recordSerializer */
/** @var string $encodedBinaryAvro */
$record = $recordSerializer->decodeMessage($encodedBinaryAvro);

echo $record; // 'Test message'

Schema Resolvers

Schema Resolvers 负责知道哪些 Avro 架构属于哪种类型的记录。如果您想将 Avro 架构分别管理在单独的文件中,这特别有用。Schema Resolvers 使您能够与您可能在此库范围之外拥有的任何架构管理概念集成。

Schema Resolvers 接受任何类型的 $record,并尝试为其解析一个匹配的 AvroSchema 实例。

FileResolver

在即使是相当复杂的应用程序中,您也想要在 VCS 中管理您的架构,最可能的是作为 .avsc 文件。这些文件包含描述 Avro 架构的 JSON。

解析器接受一个 $baseDir,其中您想管理文件,以及一个 inflector callable,这是一个简单的函数,它接受记录作为第一个参数,以及一个表示 inflection 是否针对键架构的第二个布尔参数 $isKey

<?php

namespace MyNamespace;

use FlixTech\AvroSerializer\Objects\SchemaResolvers\FileResolver;
use function get_class;use function is_object;
use function str_replace;

class MyRecord {}

$record = new MyRecord();

$baseDir = __DIR__ . '/files';

$inflector = static function ($record, bool $isKey) {
    $ext = $isKey ? '.key.avsc' : '.avsc';
    $fileName = is_object($record)
        ? str_replace('\\', '.', get_class($record))
        : 'default';
    
    return $fileName . $ext;
};


echo $inflector($record, false); // MyNamespace.MyRecord.avsc
echo $inflector($record, true); // MyNamespace.MyRecord.key.avsc

$resolver = new FileResolver($baseDir, $inflector);

$resolver->valueSchemaFor($record); // This will load from $baseDir . '/' . MyNamespace.MyRecord.avsc
$resolver->keySchemaFor($record); // This will load from $baseDir . '/' . MyNamespace.MyRecord.key.avsc

CallableResolver

这是一个最简单但也最灵活的解析器。它只接受两个 callables,分别负责获取值架构或键架构。键架构解析器是可选的。

<?php

use FlixTech\AvroSerializer\Objects\SchemaResolvers\CallableResolver;
use PHPUnit\Framework\Assert;
use function Widmogrod\Functional\constt;

$valueSchemaJson = '
{
  "type": "record",
  "name": "user",
  "fields": [
    {"name": "name", "type": "string"},
    {"name": "age", "type": "int"}
  ]
}
';
$valueSchema = AvroSchema::parse($valueSchemaJson);

$resolver = new CallableResolver(
    constt(
        AvroSchema::parse($valueSchemaJson)
    )
);

$record = [ 'foo' => 'bar' ];

$schema = $resolver->valueSchemaFor($record);

Assert::assertEquals($schema, $valueSchema);

DefinitionInterfaceResolver

该库还提供了一个暴露两个静态方法的HasSchemaDefinitionInterface接口。

  • HasSchemaDefinitionInterface::valueSchemaJson返回值模式的JSON字符串。
  • HasSchemaDefinitionInterface::keySchemaJson返回NULL或者键模式的JSON字符串。

DefinitionInterfaceResolver检查给定的记录是否实现了该接口(如果没有,将抛出InvalidArgumentException),并通过静态方法解决模式。

<?php

namespace MyNamespace;

use FlixTech\AvroSerializer\Objects\HasSchemaDefinitionInterface;
use FlixTech\AvroSerializer\Objects\SchemaResolvers\DefinitionInterfaceResolver;

class MyRecord implements HasSchemaDefinitionInterface {
    public static function valueSchemaJson() : string
    {
        return '
               {
                 "type": "record",
                 "name": "user",
                 "fields": [
                   {"name": "name", "type": "string"},
                   {"name": "age", "type": "int"}
                 ]
               }
               ';
    }
    
    public static function keySchemaJson() : ?string
    {
        return '{"type": "string"}';
    }
}

$record = new MyRecord();

$resolver = new DefinitionInterfaceResolver();

$resolver->valueSchemaFor($record); // Will resolve from $record::valueSchemaJson();
$resolver->keySchemaFor($record); // Will resolve from $record::keySchemaJson();

链式解析器

链式解析器是组合多个解析器的有用工具。能够解析模式的第一位解析器将获胜。如果链中的解析器都无法确定模式,则抛出InvalidArgumentException

<?php

namespace MyNamespace;

use FlixTech\AvroSerializer\Objects\SchemaResolvers\ChainResolver;

$record = ['foo' => 'bar'];

/** @var \FlixTech\AvroSerializer\Objects\SchemaResolvers\FileResolver $fileResolver */
/** @var \FlixTech\AvroSerializer\Objects\SchemaResolvers\CallableResolver $callableResolver */

$resolver = new ChainResolver($fileResolver, $callableResolver);
// or new ChainResolver(...[$fileResolver, $callableResolver]);

$resolver->valueSchemaFor($record); // Will resolve $fileResolver, then $callableResolver
$resolver->keySchemaFor($record); // Will resolve $fileResolver, then $callableResolver

Symfony序列化器集成

该库提供了与Symfony序列化器组件的集成。

<?php

use FlixTech\AvroSerializer\Integrations\Symfony\Serializer\AvroSerDeEncoder;
use FlixTech\AvroSerializer\Objects\DefaultRecordSerializerFactory;
use PHPUnit\Framework\Assert;
use Symfony\Component\Serializer\Normalizer\GetSetMethodNormalizer;
use Symfony\Component\Serializer\Serializer;

class User
{
    /** @var string */
    private $name;

    /** @var int */
    private $age;

    public function __construct(string $name, int $age)
    {
        $this->name = $name;
        $this->age = $age;
    }

    public function getName(): string
    {
        return $this->name;
    }

    public function setName(string $name): void
    {
        $this->name = $name;
    }

    public function getAge(): int
    {
        return $this->age;
    }

    public function setAge(int $age): void
    {
        $this->age = $age;
    }
}

$recordSerializer = DefaultRecordSerializerFactory::get(
    getenv('SCHEMA_REGISTRY_HOST')
);

$avroSchemaJson = '{
  "type": "record",
  "name": "user",
  "fields": [
    {"name": "name", "type": "string"},
    {"name": "age", "type": "int"}
  ]
}';

$user = new User('Thomas', 38);

$normalizer = new GetSetMethodNormalizer();
$encoder = new AvroSerDeEncoder($recordSerializer);

$symfonySerializer = new Serializer([$normalizer], [$encoder]);

$serialized = $symfonySerializer->serialize(
    $user,
    AvroSerDeEncoder::FORMAT_AVRO,
    [
        AvroSerDeEncoder::CONTEXT_ENCODE_SUBJECT => 'users-value',
        AvroSerDeEncoder::CONTEXT_ENCODE_WRITERS_SCHEMA => AvroSchema::parse($avroSchemaJson),
    ]
);

$deserializedUser = $symfonySerializer->deserialize(
    $serialized,
    User::class,
    AvroSerDeEncoder::FORMAT_AVRO
);

Assert::assertEquals($deserializedUser, $user);

名称转换器

有时您的属性名称可能与模式中的字段名称不同。解决此问题的方法之一是使用自定义序列化器注解。但是,如果您使用该库提供的注解,可以使用我们的名称转换器,该转换器解析这些注解并将模式字段名称与属性名称映射。

<?php

use FlixTech\AvroSerializer\Integrations\Symfony\Serializer\AvroSerDeEncoder;
use FlixTech\AvroSerializer\Integrations\Symfony\Serializer\NameConverter\AvroNameConverter;
use FlixTech\AvroSerializer\Objects\DefaultRecordSerializerFactory;
use Symfony\Component\Serializer\Normalizer\GetSetMethodNormalizer;
use Symfony\Component\Serializer\Serializer;
use Doctrine\Common\Annotations\AnnotationReader as DoctrineAnnotationReader;
use Doctrine\Common\Annotations\AnnotationRegistry;
use FlixTech\AvroSerializer\Objects\Schema\Generation\AnnotationReader;

$recordSerializer = DefaultRecordSerializerFactory::get(
    getenv('SCHEMA_REGISTRY_HOST')
);

AnnotationRegistry::registerLoader('class_exists');

$reader = new AnnotationReader(
    new DoctrineAnnotationReader()
);

$nameConverter = new AvroNameConverter($reader);

$normalizer = new GetSetMethodNormalizer(null, $nameConverter);
$encoder = new AvroSerDeEncoder($recordSerializer);

$symfonySerializer = new Serializer([$normalizer], [$encoder]);

模式构建器

该库还提供了使用PHP定义模式的方法,这与Java SDK提供的SchemaBuilder API非常相似。

<?php

use FlixTech\AvroSerializer\Objects\Schema;
use FlixTech\AvroSerializer\Objects\Schema\Record\FieldOption;

Schema::record()
    ->name('object')
    ->namespace('org.acme')
    ->doc('A test object')
    ->aliases(['stdClass', 'array'])
    ->field('name', Schema::string(), FieldOption::doc('Name of the object'), FieldOption::orderDesc())
    ->field('answer', Schema::int(), FieldOption::default(42), FieldOption::orderAsc(), FieldOption::aliases('wrong', 'correct'))
    ->field('ignore', Schema::boolean(), FieldOption::orderIgnore())
    ->parse();

模式生成器

除了提供定义模式的流畅API外,我们还提供了从类元数据(注解)生成模式的方法。为此,您必须安装doctrine/annotations包。

<?php

use FlixTech\AvroSerializer\Objects\DefaultSchemaGeneratorFactory;
use FlixTech\AvroSerializer\Objects\Schema\Generation\Annotations as SerDe;

/**
 * @SerDe\AvroType("record")
 * @SerDe\AvroName("user")
 */
class User
{
    /**
     * @SerDe\AvroType("string")
     * @var string
     */
    private $firstName;

    /**
     * @SerDe\AvroType("string")
     * @var string
     */
    private $lastName;

    /**
     * @SerDe\AvroType("int")
     * @var int
     */
    private $age;

    public function __construct(string $firstName, string $lastName, int $age)
    {
        $this->firstName = $firstName;
        $this->lastName = $lastName;
        $this->age = $age;
    }

    public function getFirstName(): string
    {
        return $this->firstName;
    }

    public function getLastName(): string
    {
        return $this->lastName;
    }

    public function getAge(): int
    {
        return $this->age;
    }
}

$generator = DefaultSchemaGeneratorFactory::get();

$schema = $generator->generate(User::class);
$avroSchema = $schema->parse();

有关可能注解的更多示例,请参阅测试用例

示例

该库在examples文件夹中提供了一些可执行的示例。您应该查看这些示例,以了解该库的工作方式。