hx/interop

让所有语言互相通信

维护者

详细信息

github.com/hx/interop

源代码

问题

安装: 2

依赖: 0

建议者: 0

安全: 0

星星: 0

关注者: 2

分支: 1

开放问题: 0

语言:Go

v0.5.0 2023-02-08 05:17 UTC

This package is auto-updated.

Last update: 2024-09-15 11:49:16 UTC


README

简单进程间消息通信的层次。

Go实现是其他实现的参考。

协议

Interop依赖于进程间的消息交换。消息由MIME风格的头部和体组成。

Content-Type: application/json
Content-Length: 14

{"foo":"bar"}

所有头部都是可选的。在没有Content-Length头部的情况下,双行断行用作消息终止符。

RPC

在客户端/服务器配对中,客户端进程可以使用ID和类来发起RPC请求。ID应该在每个会话中是唯一的。客户端负责生成ID。类应该告诉服务器如何处理请求。

Interop-Rpc-Class: ping
Interop-Rpc-Id: 1

Are you there?

服务器应该发送带有相同ID头部的响应。

Interop-Rpc-Id: 1

Yes I am!

服务器可以在发送响应之前发送其他消息(包括事件和/或对其他请求的响应),因此客户端应该等待带有相同ID的消息。

服务器可以在任何时间向客户端发送事件。事件就是一个没有ID的消息。事件应该有类头部,告知客户端如何处理它们。

实现原则

消息

每种语言的基本类型是Message,它应该有headersbody

头部是MIME头部的集合。它们应该以不区分大小写的方式使用。

体是原始二进制,因此适合传输压缩数据、视频流等。

读取器/写入器

每种语言都有读取器和写入器接口,它们围绕本地的IO原语来包装,以便可以直接从文件(例如STDIN/STDOUT)、套接字、管道、缓冲区等读取/写入消息。

连接

连接是读取器和写入器的组合,应该代表进程的双向接口;例如,STDIO、TCP套接字或子进程的管道。

管道

管道简单地允许你读取写入到管道中的任何消息。管道可以根据实现阻塞或具有缓冲区。

关闭

读取器、写入器和连接通常不能直接关闭。它们的底层IO流应该被关闭。

然而,管道不代表IO原语,因此可以直接关闭。

RPC

RPC服务器和客户端围绕连接提供核心RPC(或任何其他请求-响应)功能。

客户端可以增强常规消息,添加所需的ID和类头部以进行RPC,并等待服务器向它们发送响应,通过ID将响应与请求匹配。客户端还可以监听服务器发送的特定类别的事件,为这些事件运行处理器。

服务器可以配置为对特定类别的消息使用不同的处理器,允许将响应无缝地返回给客户端。

由于客户端和服务器都是“监听”的(客户端监听事件,服务器监听请求),它们具有不同的阻塞行为,这取决于可用的工具。在大多数情况下,进程将能够等待客户端或服务器关闭或出错。

并发/并行

读取器和写入器使用互斥锁来确保它们一次只读取或写入一个消息,无论有多少线程访问它们。因此,涉及消息移动的操作通常是线程安全的。

客户端事件处理器和服务器请求处理器通常尽可能异步处理,以最小化对IO流的阻塞。

示例

在下面的示例中,每种语言都实现了相同的客户端/服务器组合。

客户端和服务器进程通过名为 ab 的两个 FIFO 进行通信。客户端向 a 写入,从 b 读取,而服务器则向 b 写入,从 a 读取。

客户端向服务器发送单个 RPC 请求 countdown,其中包含一个整数值的头部 ticks。服务器响应 countdown,通过每秒向客户端发送一个 tick 事件,直到请求的 ticks 头部中的数字。

客户端通过将 Tick n 写入其 STDOUT 来处理 tick 事件。在接收到最后的 tick 后,它们关闭 FIFO a,这将导致服务器退出,进而导致客户端退出。

这些示例都可以在 examples 目录下运行。

Go

服务器

import (
    "github.com/hx/interop/interop"
    "os"
    "strconv"
    "time"
)

func main() {
    reader, _ := os.OpenFile("a", os.O_RDONLY, 0)
    writer, _ := os.OpenFile("b", os.O_WRONLY|os.O_SYNC, 0)

    server := interop.NewRpcServer(interop.BuildConn(reader, writer))

    server.HandleClassName("countdown", interop.ResponderFunc(func(request interop.Message, _ *interop.MessageBuilder) {
        num, _ := strconv.Atoi(request.GetHeader("ticks"))
        for i := 1; i <= num; i++ {
            time.Sleep(time.Second)
            event := interop.NewRpcMessage("tick")
            event.SetContent(interop.ContentTypeJSON, i)
            server.Send(event)
        }
    }))

    server.Run()
}

客户端

import (
    "fmt"
    "github.com/hx/interop/interop"
    "os"
)

func main() {
    writer, _ := os.OpenFile("a", os.O_WRONLY|os.O_SYNC, 0)
    reader, _ := os.OpenFile("b", os.O_RDONLY, 0)

    client := interop.NewRpcClient(interop.BuildConn(reader, writer))

    client.Events.HandleClassName("tick", interop.HandlerFunc(func(event interop.Message) error {
        i := 0
        interop.ContentTypeJSON.DecodeTo(event, &i)
        fmt.Println("Tick", i)
        if i == 5 {
            writer.Close()
        }
        return nil
    }))

    client.Start()

    client.Send(interop.NewRpcMessage("countdown").AddHeader("ticks", "5"))

    client.Wait()
}

Ruby

服务器

require 'interop'

reader = File.open('a', 'r')
writer = File.open('b', 'w')

server = Hx::Interop::RPC::Server.new(reader, writer)

server.on 'countdown' do |request|
  request['ticks'].to_i.times do |i|
    sleep 1
    server.send 'tick', Hx::Interop::ContentType::JSON.encode(i + 1)
  end
  nil
end

server.wait

客户端

require 'interop'

writer = File.open('a', 'w')
reader = File.open('b', 'r')

client = Hx::Interop::RPC::Client.new(reader, writer)

client.on 'tick' do |event|
  i = event.decode
  puts "Tick #{i}"
  writer.close if i == 5
end

client.call :countdown, ticks: 5

client.wait

PHP

服务器

$reader = fopen('a', 'r');
$writer = fopen('b', 'w');

$server = new Hx\Interop\RPC\Server($reader, $writer);

$server->on('countdown', function (Hx\Interop\Message $message) use($server) {
    $num = (int) $message['ticks'];
    for ($i = 1; $i <= $num; $i++) {
        sleep(1);
        $server->send('tick', Hx\Interop\Message::json($i));
    }
});

$server->wait();

客户端

$writer = fopen('a', 'w');
$reader = fopen('b', 'r');

$client = new Hx\Interop\RPC\Client($reader, $writer);

$client->on('tick', function (Hx\Interop\Message $message) use ($writer) {
    $num = json_decode($message->body);
    echo "Tick $num\n";
    if ($num === 5) {
        fclose($writer);
    }
});

$client->call('countdown', ['ticks' => 5]);

$client->wait();

JavaScript

待办事项