rpcx 官方技术博客

啥是Oneshot?

2023.11.02

Oneshot在英文中是“一击”或“一枪”的意思, 形容词的意思是“一次性的”,名词的意思是“一次性用品”,在Rust生态圈中尝尝用作一次性使用的并发原语,在Go生态圈中很少使用。

这次我把Oneshot的概念引入到微服务框架里面,用来指代客户端对服务器一次性的调用,不需要等待服务器的返回。

为啥要引入 Oneshot ?

在微服务框架中,客户端调用服务端的时候,一般都是需要等待服务端的返回的,这样才能保证调用的结果是正确的,但是有时候,我们并不关心服务端的返回,比如,我们只需要把客户端的当前日志上传都服务器,传失败也没关系,服务器没有正确保存也无所谓,因为下一秒可能新的日志就会上传了,客户端丢一次两次数据不会影响业务,在这种情况下,我们就可以使用Oneshot,这样可以减少客户端的等待时间,提高客户端的吞吐量。

比如在我们的监控系统中,容许客户端丢失一些数据,因为我们的监控系统是基于时间序列的,如果客户端丢失了一个点数据,我们也不太关心,因为我们关注的是监控的趋势,这要保证一段时间内监控曲线能够画出来,看到变化趋势就好。

在rpcx的实现中,它的协议已经支持Oneway的方式了,它的实际功能就是Oneshot的功能,只不过当时我还接触到Oneshot的概念,借鉴的是微博Motan架构的Oneway的概念。但是在rpcx的客户端中,没有很好的专门为Oneshot设计的接口,你可能需要传入给XClient.Go一个空的reply,隐式的表明这次调用Oneway方式。这一次我需要在工作中明确要使用Oneshot的功能,所以我在rpcx的客户端中加入了Oneshot的支持。

其实修改起来也特别的简单,借助于rpcx灵活的架构和Oneway方式的已经支持,我们可以很容易的实现Oneshot的功能。

首先我们为XClient接口增加一个Oneshot方法的定义:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
type XClient interface {
	// 新增加的Oneshot方法
	Oneshot(ctx context.Context, serviceMethod string, args interface{}) error

	Go(ctx context.Context, serviceMethod string, args interface{}, reply interface{}, done chan *Call) (*Call, error)
	Call(ctx context.Context, serviceMethod string, args interface{}, reply interface{}) error
	Broadcast(ctx context.Context, serviceMethod string, args interface{}, reply interface{}) error
	Fork(ctx context.Context, serviceMethod string, args interface{}, reply interface{}) error
	Inform(ctx context.Context, serviceMethod string, args interface{}, reply interface{}) ([]Receipt, error)
	SendRaw(ctx context.Context, r *protocol.Message) (map[string]string, []byte, error)
	SendFile(ctx context.Context, fileName string, rateInBytesPerSecond int64, meta map[string]string) error
	DownloadFile(ctx context.Context, requestFileName string, saveTo io.Writer, meta map[string]string) error
	Stream(ctx context.Context, meta map[string]string) (net.Conn, error)
	Close() error
}

Oneshot方法类似Call方法,你需要传入上下文ctx,要调用的方法名serviceMethod和参数args,但是不需要传入reply参数,因为Oneshot不需要等待服务器的返回。如果发送失败,会返回一个error。

然后我们在XClient的实现类xClient中实现Oneshot方法:

1
2
3
4
5
6
// Oneshot invokes the named function, ** DOEST NOT ** wait for it to complete, and returns immediately.
func (c *xClient) Oneshot(ctx context.Context, serviceMethod string, args interface{}) error {
	_, err := c.Go(ctx, serviceMethod, args, nil, nil)

	return err
}

其实就是一行,利用Go方法就实现了OneshotOneshot调用Go方法是,传入的reply的值为nil, rpcx就聪明的知道客户端不需要返回值,然后把消息标记成Oneway模式,不需要等待服务器的返回,所以这个方法也不需要返回的*Call, 一切都不需要。

一个例子

既然实现了一个新的功能,那么我们一般的操作就是会在项目rpcx-examples中增加这个功能的例子。

服务

服务器你可以实现一个Onershot的服务器,也就是不需要设置Reply,你也可以设置Reply,但是服务器端不会把这个Reply返回给客户端,而是直接丢弃了。

这里我们实现一个既支持Oneshot又支持普通调用的服务器,它的功能是计算两个数的乘积,给客户端返回结果还是不返回结果,依赖客户端的调用方式。

这个例子也是我们众多的例子中同样的一个服务端:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
package main

import (
	"context"
	"flag"
	"fmt"

	example "github.com/rpcxio/rpcx-examples"
	"github.com/smallnest/rpcx/server"
)

var (
	addr = flag.String("addr", "localhost:8972", "server address")
)

// 乘法服务
type Arith struct{}

func (t *Arith) Mul(ctx context.Context, args example.Args, reply *example.Reply) error {
	reply.C = args.A * args.B
	fmt.Println("C=", reply.C)
	return nil
}

func main() {
	flag.Parse()

    // 创建一个新的rpcx服务
	s := server.NewServer()
    // 注册乘法服务
	s.RegisterName("Arith", new(Arith), "")
    // 开始服务
	err := s.Serve("tcp", *addr)
	if err != nil {
		panic(err)
	}
}

客户端

客户端的代码也很简单,我们可以实现一个Oneshot的客户端,它把两个乘数扔给服务器,不管服务器的计算和计算结果。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
package main

import (
	"context"
	"flag"
	"log"
	"time"

	"github.com/smallnest/rpcx/protocol"

	example "github.com/rpcxio/rpcx-examples"
	"github.com/smallnest/rpcx/client"
)

var (
    // 服务地址
	addr = flag.String("addr", "localhost:8972", "server address")
)

func main() {
	flag.Parse()

    // 不需要注册中心,直连模式
	d, _ := client.NewPeer2PeerDiscovery("tcp@"+*addr, "")
	opt := client.DefaultOption
	opt.SerializeType = protocol.JSON

    // 创建一个客户端
	xclient := client.NewXClient("Arith", client.Failtry, client.RandomSelect, d, opt)
	defer xclient.Close()

	args := example.Args{
		A: 10,
		B: 20,
	}

    // 一直发送乘法问题给服务器
	for {
		err := xclient.Oneshot(context.Background(), "Mul", args)
		if err != nil {
			log.Fatalf("failed to call: %v", err)
		}

		log.Printf("send the problem %d * %d to server", args.A, args.B)
		time.Sleep(time.Second)
	}

}