rpcx 官方技术博客

细说 XClient 的那些方法

2023.11.02

继续我们的rpcx挖掘宝藏之旅,这次我们来看看XClient的哪些方法,以及它们的作用。

在上一篇Oneshot的文章中,我们其实列举了XClient的主要的服务调用方法,目前一共十个方法。十个方法就是十个葫芦娃,各有各的能耐,各有各的用处,下面我们来一一介绍。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
type XClient interface {
	Call(ctx context.Context, serviceMethod string, args interface{}, reply interface{}) error
	Go(ctx context.Context, serviceMethod string, args interface{}, reply interface{}, done chan *Call) (*Call, error)
	Oneshot(ctx context.Context, serviceMethod string, args interface{}) error
	Broadcast(ctx context.Context, serviceMethod string, args interface{}, reply interface{}) error
	Fork(ctx context.Context, serviceMethod string, args interface{}, reply interface{}) error
	Inform(ctx context.Context, serviceMethod string, args interface{}, reply interface{}) ([]Receipt, error)
	SendRaw(ctx context.Context, r *protocol.Message) (map[string]string, []byte, error)
	SendFile(ctx context.Context, fileName string, rateInBytesPerSecond int64, meta map[string]string) error
	DownloadFile(ctx context.Context, requestFileName string, saveTo io.Writer, meta map[string]string) error
	Stream(ctx context.Context, meta map[string]string) (net.Conn, error)
}

这些方法也不是凭空想象出来的,都是用户在实际的项目中提出的需求。

Call

最简单的最常用的调用方法,客户端调用它的时候,传入参数,阻塞直到服务端返回结果或者超时,然后获得reply值或者error:

1
2
    reply := &example.Reply{}
	err := xclient.Call(context.Background(), "Mul", args, reply)

这个方法是阻塞的,这液意味着调用者goroutine会被block,不过这个方法也是最常用的方法,因为大部分的调用都是需要等待服务端的返回的。

完整例子:102basic

Go

Go是异步的方法调用,这意味着调用者goroutine不会被block,它会立即返回,但是它会返回一个Call对象,你可以通过这个对象来获取服务端的返回结果或者错误。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
    reply := &example.Reply{}
    call, err := xclient.Go(context.Background(), "Mul", args, reply, nil)
    if err != nil {
        log.Fatalf("failed to call: %v", err)
    }

    replyCall := <-call.Done
    if replyCall.Error != nil {
        log.Fatalf("failed to call: %v", replyCall.Error)
    }

以上代码居然是 chatgpt 自动生成出来的,太强大了。你如果在使用rpcx做开发,也会享受到这种自动生成rpcx代码的乐趣。

完整例子:client_async

Oneshot

在上一篇Oneshot中我们已经介绍过它了。

客户端发送请求,不等待不需要而且服务端也不会返回结果,这样可以提高客户端的吞吐量。

1
    err := xclient.Oneshot(context.Background(), "Mul", args)

完整例子:oneshot

Broadcast

Broadcast实现了广播的功能。

客户端调用服务的时候,请求会发送给所有的客户端。

1
2
    reply := &example.Reply{}
    err := xclient.Broadcast(context.Background(), "Mul", args, reply)

问题来了:

  1. 如果某个节点执行失败怎么办? 不会影响其他节点。会记录到error值中。
  2. 如果某个节点执行时间太长怎么办? 请设置timeout。
  3. reply是谁的的返回结果? 第一个成功返回的结果。
  4. 失败模式和选择模式怎么设置? 随便设置,反正在这种调用下不起作用。

完整的例子: broadcast

Fork

类似广播模式,客户端将请求群发给所有的服务节点,但是与广播模式不同的是,只要有一个任意节点返回成功的结果,就直接返回。

1
2
    reply := &example.Reply{}
    err := xclient.Fork(context.Background(), "Mul", args, reply)

这种模式属于财大气粗的场景,广撒网,只要有一个节点返回成功的结果就行了。

完整例子:fork

Inform

通知模式类似广播模式,但是它返回每一个服务节点的返回结果,而不像广播模式只返回其中的一个成功结果。

1
2
    reply := &example.Reply{}
    receipts, err := xclient.Inform(context.Background(), "Mul", args, reply)

SendRaw

rpcx提供了一种底层的能力,你可以发送你自己组装好的rpcx消息,也就是rpcx客户端和服务端通讯用的原始的消息。

1
    m, reply, err := xclient.SendRaw(context.Background(), "Mul", args, reply)

如果不是使用rpcx做自己更强大的微服务框架,你几乎不会用到它。

SendFile

rpcx还提供了便利的发送文件的能力。主要用于文件传输。

1
    err := xclient.SendFile(context.Background(), "test.txt", 1024, nil)

rpcx客户端和服务端会协商生成一个新的通道,不会占用服务调用的连接。

它提供了限流的功能,你可以限制传输的带宽。

完整例子:sendfile

DownloadFile

DownloadFileSendFile的反向操作,用于下载文件。

1
    err := xclient.DownloadFile(context.Background(), "test.txt", os.Stdout, nil)

它也是和服务端协商了一个新的通道,不影响服务调用的连接。

完整例子:downloadfile

Stream

客户端和服务端会协商一个专门用来流式通讯的通道,这个通道不会影响服务调用的连接。

这需要客户端和服务器都要支持流式通讯。

流式通讯的好处可以利用“零拷贝”的技术,更高性能的传输文件。

1
    conn, err := xclient.Stream(context.Background(), nil)