AWS SQS 传输(基于 enqueue/sqs 进行分支和改进)

2.0.1 2022-04-24 09:52 UTC

README

Software license Version Build status Coverage

配置

"require": {
        "micronative/sqs": "^2.0.0"
},
"repositories": [
    { "type": "vcs", "url": "https://github.com/micronative/sqs" }
],

运行

composer require micronative/sqs:2.0.0

描述

此项目是从 enqueue/sqs 分支出来的,并进行了以下改进:

  • 将所有类移动到 src 目录
  • 将测试重命名为 tests
  • 将示例移动到 tests 目录
  • 更改命名空间为 Micronative\Sqs

SqsProducer->send()

public function send(Destination $destination, Message $message): void
    {
        InvalidDestinationException::assertDestinationInstanceOf($destination, SqsDestination::class);
        InvalidMessageException::assertMessageInstanceOf($message, SqsMessage::class);

        $body = $message->getBody();
        if (empty($body)) {
            throw new InvalidMessageException('The message body must be a non-empty string.');
        }

        $arguments = [
            '@region' => $destination->getRegion(),
            'MessageBody' => $body,
            'QueueUrl' => $this->context->getQueueUrl($destination),
        ];

        if (null !== $this->deliveryDelay) {
            $arguments['DelaySeconds'] = (int) $this->deliveryDelay / 1000;
        }

        if ($message->getDelaySeconds()) {
            $arguments['DelaySeconds'] = $message->getDelaySeconds();
        }

        if ($message->getMessageDeduplicationId()) {
            $arguments['MessageDeduplicationId'] = $message->getMessageDeduplicationId();
        }

        if ($message->getMessageGroupId()) {
            $arguments['MessageGroupId'] = $message->getMessageGroupId();
        }

        if ($message->getHeaders()) {
            $arguments['MessageAttributes']['Headers'] = [
                'DataType' => 'String',
                'StringValue' => json_encode([$message->getHeaders()]),
            ];
        }
        
        if ($message->getProperties()) {
            foreach ($message->getProperties() as $name => $value) {
                $arguments['MessageAttributes'][$name] = ['DataType' => 'String', 'StringValue' => $value];
            }
        }

        $result = $this->context->getSqsClient()->sendMessage($arguments);

        if (false == $result->hasKey('MessageId')) {
            throw new \RuntimeException('Message was not sent');
        }

        $message->setMessageId($result['MessageId']);
    }

SqsConsumer->covertMessage()

protected function convertMessage(array $sqsMessage): SqsMessage
    {
        $message = $this->context->createMessage();

        $message->setBody($sqsMessage['Body']);
        $message->setReceiptHandle($sqsMessage['ReceiptHandle']);

        if (isset($sqsMessage['Attributes'])) {
            $message->setAttributes($sqsMessage['Attributes']);

            if (isset($sqsMessage['Attributes']['MessageDeduplicationId'])) {
                $message->setMessageDeduplicationId($sqsMessage['Attributes']['MessageDeduplicationId']);
            }

            if (isset($sqsMessage['Attributes']['MessageGroupId'])) {
                $message->setMessageGroupId($sqsMessage['Attributes']['MessageGroupId']);
            }
        }

        if (isset($sqsMessage['Attributes']['ApproximateReceiveCount'])) {
            $message->setRedelivered(((int) $sqsMessage['Attributes']['ApproximateReceiveCount']) > 1);
        }

        if (isset($sqsMessage['MessageAttributes'])) {
            foreach ($sqsMessage['MessageAttributes'] as $name => $attribute) {
                if ($name == 'Headers') {
                    $headers = json_decode($attribute['StringValue'], true);
                    $message->setHeaders($headers);
                } else {
                    $message->setProperty($name, $attribute['StringValue']);
                }
            }
        }

        if (isset($sqsMessage['MessageId'])) {
            $message->setMessageId($sqsMessage['MessageId']);
        }

        return $message;
    }