hx / interop
让所有语言互相通信
Requires
- php: ^8
Requires (Dev)
- phpunit/phpunit: ^9
README
简单进程间消息通信的层次。
Go实现是其他实现的参考。
协议
Interop依赖于进程间的消息交换。消息由MIME风格的头部和体组成。
Content-Type: application/json
Content-Length: 14
{"foo":"bar"}
所有头部都是可选的。在没有Content-Length
头部的情况下,双行断行用作消息终止符。
RPC
在客户端/服务器配对中,客户端进程可以使用ID和类来发起RPC请求。ID应该在每个会话中是唯一的。客户端负责生成ID。类应该告诉服务器如何处理请求。
Interop-Rpc-Class: ping
Interop-Rpc-Id: 1
Are you there?
服务器应该发送带有相同ID头部的响应。
Interop-Rpc-Id: 1
Yes I am!
服务器可以在发送响应之前发送其他消息(包括事件和/或对其他请求的响应),因此客户端应该等待带有相同ID的消息。
服务器可以在任何时间向客户端发送事件。事件就是一个没有ID的消息。事件应该有类头部,告知客户端如何处理它们。
实现原则
消息
每种语言的基本类型是Message
,它应该有headers
和body
。
头部是MIME头部的集合。它们应该以不区分大小写的方式使用。
体是原始二进制,因此适合传输压缩数据、视频流等。
读取器/写入器
每种语言都有读取器和写入器接口,它们围绕本地的IO原语来包装,以便可以直接从文件(例如STDIN/STDOUT)、套接字、管道、缓冲区等读取/写入消息。
连接
连接是读取器和写入器的组合,应该代表进程的双向接口;例如,STDIO、TCP套接字或子进程的管道。
管道
管道简单地允许你读取写入到管道中的任何消息。管道可以根据实现阻塞或具有缓冲区。
关闭
读取器、写入器和连接通常不能直接关闭。它们的底层IO流应该被关闭。
然而,管道不代表IO原语,因此可以直接关闭。
RPC
RPC服务器和客户端围绕连接提供核心RPC(或任何其他请求-响应)功能。
客户端可以增强常规消息,添加所需的ID和类头部以进行RPC,并等待服务器向它们发送响应,通过ID将响应与请求匹配。客户端还可以监听服务器发送的特定类别的事件,为这些事件运行处理器。
服务器可以配置为对特定类别的消息使用不同的处理器,允许将响应无缝地返回给客户端。
由于客户端和服务器都是“监听”的(客户端监听事件,服务器监听请求),它们具有不同的阻塞行为,这取决于可用的工具。在大多数情况下,进程将能够等待客户端或服务器关闭或出错。
并发/并行
读取器和写入器使用互斥锁来确保它们一次只读取或写入一个消息,无论有多少线程访问它们。因此,涉及消息移动的操作通常是线程安全的。
客户端事件处理器和服务器请求处理器通常尽可能异步处理,以最小化对IO流的阻塞。
示例
在下面的示例中,每种语言都实现了相同的客户端/服务器组合。
客户端和服务器进程通过名为 a
和 b
的两个 FIFO 进行通信。客户端向 a
写入,从 b
读取,而服务器则向 b
写入,从 a
读取。
客户端向服务器发送单个 RPC 请求 countdown
,其中包含一个整数值的头部 ticks
。服务器响应 countdown
,通过每秒向客户端发送一个 tick
事件,直到请求的 ticks
头部中的数字。
客户端通过将 Tick n
写入其 STDOUT 来处理 tick
事件。在接收到最后的 tick
后,它们关闭 FIFO a
,这将导致服务器退出,进而导致客户端退出。
这些示例都可以在 examples 目录下运行。
Go
服务器
import ( "github.com/hx/interop/interop" "os" "strconv" "time" ) func main() { reader, _ := os.OpenFile("a", os.O_RDONLY, 0) writer, _ := os.OpenFile("b", os.O_WRONLY|os.O_SYNC, 0) server := interop.NewRpcServer(interop.BuildConn(reader, writer)) server.HandleClassName("countdown", interop.ResponderFunc(func(request interop.Message, _ *interop.MessageBuilder) { num, _ := strconv.Atoi(request.GetHeader("ticks")) for i := 1; i <= num; i++ { time.Sleep(time.Second) event := interop.NewRpcMessage("tick") event.SetContent(interop.ContentTypeJSON, i) server.Send(event) } })) server.Run() }
客户端
import ( "fmt" "github.com/hx/interop/interop" "os" ) func main() { writer, _ := os.OpenFile("a", os.O_WRONLY|os.O_SYNC, 0) reader, _ := os.OpenFile("b", os.O_RDONLY, 0) client := interop.NewRpcClient(interop.BuildConn(reader, writer)) client.Events.HandleClassName("tick", interop.HandlerFunc(func(event interop.Message) error { i := 0 interop.ContentTypeJSON.DecodeTo(event, &i) fmt.Println("Tick", i) if i == 5 { writer.Close() } return nil })) client.Start() client.Send(interop.NewRpcMessage("countdown").AddHeader("ticks", "5")) client.Wait() }
Ruby
服务器
require 'interop' reader = File.open('a', 'r') writer = File.open('b', 'w') server = Hx::Interop::RPC::Server.new(reader, writer) server.on 'countdown' do |request| request['ticks'].to_i.times do |i| sleep 1 server.send 'tick', Hx::Interop::ContentType::JSON.encode(i + 1) end nil end server.wait
客户端
require 'interop' writer = File.open('a', 'w') reader = File.open('b', 'r') client = Hx::Interop::RPC::Client.new(reader, writer) client.on 'tick' do |event| i = event.decode puts "Tick #{i}" writer.close if i == 5 end client.call :countdown, ticks: 5 client.wait
PHP
服务器
$reader = fopen('a', 'r'); $writer = fopen('b', 'w'); $server = new Hx\Interop\RPC\Server($reader, $writer); $server->on('countdown', function (Hx\Interop\Message $message) use($server) { $num = (int) $message['ticks']; for ($i = 1; $i <= $num; $i++) { sleep(1); $server->send('tick', Hx\Interop\Message::json($i)); } }); $server->wait();
客户端
$writer = fopen('a', 'w'); $reader = fopen('b', 'r'); $client = new Hx\Interop\RPC\Client($reader, $writer); $client->on('tick', function (Hx\Interop\Message $message) use ($writer) { $num = json_decode($message->body); echo "Tick $num\n"; if ($num === 5) { fclose($writer); } }); $client->call('countdown', ['ticks' => 5]); $client->wait();
JavaScript
待办事项