Oneshot在英文中是“一击”或“一枪”的意思, 形容词的意思是“一次性的”,名词的意思是“一次性用品”,在Rust生态圈中尝尝用作一次性使用的并发原语,在Go生态圈中很少使用。
这次我把Oneshot
的概念引入到微服务框架里面,用来指代客户端对服务器一次性的调用,不需要等待服务器的返回。
为啥要引入 Oneshot
?
在微服务框架中,客户端调用服务端的时候,一般都是需要等待服务端的返回的,这样才能保证调用的结果是正确的,但是有时候,我们并不关心服务端的返回,比如,我们只需要把客户端的当前日志上传都服务器,传失败也没关系,服务器没有正确保存也无所谓,因为下一秒可能新的日志就会上传了,客户端丢一次两次数据不会影响业务,在这种情况下,我们就可以使用Oneshot,这样可以减少客户端的等待时间,提高客户端的吞吐量。
比如在我们的监控系统中,容许客户端丢失一些数据,因为我们的监控系统是基于时间序列的,如果客户端丢失了一个点数据,我们也不太关心,因为我们关注的是监控的趋势,这要保证一段时间内监控曲线能够画出来,看到变化趋势就好。
在rpcx的实现中,它的协议已经支持Oneway
的方式了,它的实际功能就是Oneshot
的功能,只不过当时我还接触到Oneshot
的概念,借鉴的是微博Motan架构的Oneway
的概念。但是在rpcx的客户端中,没有很好的专门为Oneshot
设计的接口,你可能需要传入给XClient.Go
一个空的reply,隐式的表明这次调用Oneway
方式。这一次我需要在工作中明确要使用Oneshot
的功能,所以我在rpcx的客户端中加入了Oneshot
的支持。
其实修改起来也特别的简单,借助于rpcx灵活的架构和Oneway
方式的已经支持,我们可以很容易的实现Oneshot
的功能。
首先我们为XClient
接口增加一个Oneshot
方法的定义:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
|
type XClient interface {
// 新增加的Oneshot方法
Oneshot(ctx context.Context, serviceMethod string, args interface{}) error
Go(ctx context.Context, serviceMethod string, args interface{}, reply interface{}, done chan *Call) (*Call, error)
Call(ctx context.Context, serviceMethod string, args interface{}, reply interface{}) error
Broadcast(ctx context.Context, serviceMethod string, args interface{}, reply interface{}) error
Fork(ctx context.Context, serviceMethod string, args interface{}, reply interface{}) error
Inform(ctx context.Context, serviceMethod string, args interface{}, reply interface{}) ([]Receipt, error)
SendRaw(ctx context.Context, r *protocol.Message) (map[string]string, []byte, error)
SendFile(ctx context.Context, fileName string, rateInBytesPerSecond int64, meta map[string]string) error
DownloadFile(ctx context.Context, requestFileName string, saveTo io.Writer, meta map[string]string) error
Stream(ctx context.Context, meta map[string]string) (net.Conn, error)
Close() error
}
|
Oneshot
方法类似Call
方法,你需要传入上下文ctx
,要调用的方法名serviceMethod
和参数args
,但是不需要传入reply
参数,因为Oneshot
不需要等待服务器的返回。如果发送失败,会返回一个error。
然后我们在XClient
的实现类xClient
中实现Oneshot
方法:
1
2
3
4
5
6
|
// Oneshot invokes the named function, ** DOEST NOT ** wait for it to complete, and returns immediately.
func (c *xClient) Oneshot(ctx context.Context, serviceMethod string, args interface{}) error {
_, err := c.Go(ctx, serviceMethod, args, nil, nil)
return err
}
|
其实就是一行,利用Go
方法就实现了Oneshot
。
Oneshot
调用Go
方法是,传入的reply
的值为nil, rpcx就聪明的知道客户端不需要返回值,然后把消息标记成Oneway
模式,不需要等待服务器的返回,所以这个方法也不需要返回的*Call
, 一切都不需要。
一个例子
既然实现了一个新的功能,那么我们一般的操作就是会在项目rpcx-examples中增加这个功能的例子。
服务
服务器你可以实现一个Onershot
的服务器,也就是不需要设置Reply
,你也可以设置Reply
,但是服务器端不会把这个Reply
返回给客户端,而是直接丢弃了。
这里我们实现一个既支持Oneshot
又支持普通调用的服务器,它的功能是计算两个数的乘积,给客户端返回结果还是不返回结果,依赖客户端的调用方式。
这个例子也是我们众多的例子中同样的一个服务端:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
|
package main
import (
"context"
"flag"
"fmt"
example "github.com/rpcxio/rpcx-examples"
"github.com/smallnest/rpcx/server"
)
var (
addr = flag.String("addr", "localhost:8972", "server address")
)
// 乘法服务
type Arith struct{}
func (t *Arith) Mul(ctx context.Context, args example.Args, reply *example.Reply) error {
reply.C = args.A * args.B
fmt.Println("C=", reply.C)
return nil
}
func main() {
flag.Parse()
// 创建一个新的rpcx服务
s := server.NewServer()
// 注册乘法服务
s.RegisterName("Arith", new(Arith), "")
// 开始服务
err := s.Serve("tcp", *addr)
if err != nil {
panic(err)
}
}
|
客户端
客户端的代码也很简单,我们可以实现一个Oneshot
的客户端,它把两个乘数扔给服务器,不管服务器的计算和计算结果。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
|
package main
import (
"context"
"flag"
"log"
"time"
"github.com/smallnest/rpcx/protocol"
example "github.com/rpcxio/rpcx-examples"
"github.com/smallnest/rpcx/client"
)
var (
// 服务地址
addr = flag.String("addr", "localhost:8972", "server address")
)
func main() {
flag.Parse()
// 不需要注册中心,直连模式
d, _ := client.NewPeer2PeerDiscovery("tcp@"+*addr, "")
opt := client.DefaultOption
opt.SerializeType = protocol.JSON
// 创建一个客户端
xclient := client.NewXClient("Arith", client.Failtry, client.RandomSelect, d, opt)
defer xclient.Close()
args := example.Args{
A: 10,
B: 20,
}
// 一直发送乘法问题给服务器
for {
err := xclient.Oneshot(context.Background(), "Mul", args)
if err != nil {
log.Fatalf("failed to call: %v", err)
}
log.Printf("send the problem %d * %d to server", args.A, args.B)
time.Sleep(time.Second)
}
}
|