rpcx 官方技术博客

rpcx支持websocket协议了

2021.04.11

当前, rpcx支持tcp、kcp、quic、unix domain、http、jsonrpc 2.0等传输协议,并没有考虑websocket的支持,原因在于考虑到微服务更多的是企业内部服务之间的通讯,不太可能暴露给浏览器,企业内部大多采用tcp的方式传输,或者udp族(kcp、quic)方式的传输,但是还是有用户提出希望能支持websocket。

我想websocket可能还是会有一些场景下使用,比如游戏开发,移动端核服务端的通讯等等,所以经过慎重考虑,我决定在 rpcx 1.6.2版本中增加对websocket的支持。

基于rpcx良好的架构设计,实现对websocket的支持并不难。

本来,rpcx对kcp、quic的支持也不复杂,原因在于rpcx的通讯采用的net.Conn的这样一个通用的接口,listent.Accept方法接受客户端的请求后返回一个net.Conn对象,后续请求的解析、处理,写回都是在这个通用接口上操作的,所以非常容易支持新的传输协议。

以前rpcx还支持其它的udp的扩展协议,比如

因为这两个协议用的人很少,所以在rpcx 1.6中把这两个协议的支持去掉了。

所以,对于websocket来说,找到一个可信赖的、合适的websocket库,可以很容易的实现基于websocket传输的微服务框架。

目前Go生态圈常用的websocket库如下:

虽然像gobwas/ws性能优良、gorilla/websocket功能齐全,但是考虑到API的便利性,我还是决定使用Go官方的x/net/websocket库,一来有官方背书,值得信赖,二来rpcx使用websocket功能比较简单,就是期望能获得一个net.Conn对象,x/net/websocket库正好能满足需求。

服务端

如果服务端采用websocket协议,那么network应该写成ws或者wss,而不是tcp或者quic

服务端会启动一个http server处理websocket请求。 默认的websocket path为 share.DefaultRPCPath,只要服务端核客户端保持一致即可。

ServeWS会传入一个*websocket.Conn参数,它实现了net.Conn接口,所以后续的处理使用通用的s.serveConn(conn)即可。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
func (s *Server) Serve(network, address string) (err error) {
	......

	if network == "ws" || network == "wss" {
		s.serveByWS(ln, "")
		return nil
	}

	......
}


func (s *Server) serveByWS(ln net.Listener, rpcPath string) {
	s.ln = ln

	if rpcPath == "" {
		rpcPath = share.DefaultRPCPath
	}
	mux := http.NewServeMux()
	mux.Handle(rpcPath, websocket.Handler(s.ServeWS))
	srv := &http.Server{Handler: mux}

	srv.Serve(ln)
}

func (s *Server) ServeWS(conn *websocket.Conn) {
	s.mu.Lock()
	s.activeConn[conn] = struct{}{}
	s.mu.Unlock()

	s.serveConn(conn)
}

服务端的改造就如此简单,接下来看看如何部署一个服务端的websocket服务:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
func main() {
	flag.Parse()

	s := server.NewServer()
	s.RegisterName("Arith", new(Arith), "")
	err := s.Serve("ws", *addr)
	if err != nil {
		panic(err)
	}
}

客户端

客户端最重要的是要实现一个websocket协议的建连,如果实现了这个方法,客户端的改造基本就完成了。这个方法的实现如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
func newDirectWSConn(c *Client, network, address string) (net.Conn, error) {
	if c == nil {
		return nil, errors.New("empty client")
	}
	path := c.option.RPCPath
	if path == "" {
		path = share.DefaultRPCPath
	}

	var conn net.Conn
	var err error

	// url := "ws://localhost:12345/ws"

	var url, origin string
	if network == "ws" {
		url = fmt.Sprintf("ws://%s%s", address, path)
		origin = fmt.Sprintf("http://%s", address)
	} else {
		url = fmt.Sprintf("wss://%s%s", address, path)
		origin = fmt.Sprintf("https://%s", address)
	}

	if c.option.TLSConfig != nil {
		config, err := websocket.NewConfig(url, origin)
		if err != nil {
			return nil, err
		}
		config.TlsConfig = c.option.TLSConfig
		conn, err = websocket.DialConfig(config)
	} else {
		conn, err = websocket.Dial(url, "", origin)
	}

	return conn, err
}

在建立连接的时候为wswss协议调用这个方法即可:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
func (c *Client) Connect(network, address string) error {
	var conn net.Conn
	var err error

	switch network {
	case "http":
		conn, err = newDirectHTTPConn(c, network, address)
	case "ws", "wss":
		conn, err = newDirectWSConn(c, network, address)
	case "kcp":
		conn, err = newDirectKCPConn(c, network, address)
    .......

客户端改造完成后,我们写一个客户端调用改车把刚才启动的websocket服务:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
func main() {
	flag.Parse()

	d, _ := client.NewPeer2PeerDiscovery("ws@"+*addr, "")
	opt := client.DefaultOption
	opt.SerializeType = protocol.JSON

	xclient := client.NewXClient("Arith", client.Failtry, client.RandomSelect, d, opt)
	defer xclient.Close()

	args := example.Args{
		A: 10,
		B: 20,
	}

	reply := &example.Reply{}
	err := xclient.Call(context.Background(), "Mul", args, reply)
	if err != nil {
		log.Fatalf("failed to call: %v", err)
	}

	log.Printf("%d * %d = %d", args.A, args.B, reply.C)

}

所以你看到,对于一个新的传输协议来说,如果它的连接能够遵循net.Conn接口,rpcx支持它还是很容易的。

完整的使用websocket传输协议的例子: websocket example