rpcx 官方技术博客

什么,使用rpcx还可以上传下载大文件?

2019.07.22

绝大部分的 rpc 框架是不支持上传下载文件的。第一个原因是普通的rpc服务框架不支持流式通讯,或者是伪流式通讯,在发送数据的时候,需要把数据全部加载到内存中,然后封装成byte数组放入的request和response中,这会占用大量的内存资源。试想你要上传一个10G的葫芦娃视频,如果把这个10G的数据全部读入到内存中,对系统的资源占用就太大了。

第二个原因,rpc框架的实现中,客户端和服务器端很多时候会建立一个连接或者几个连接,服务之间的调用会共享这些连接。如果在连接上传输巨大的文件数据,势必会影响其它的服务调用,严重影响服务的质量(吞吐和延迟)。

所以很多情况下客户端和服务器端是通过其它服务进行文件的上传和下载,但是还是有些用户提出这样的需求,希望借助单一的rpc框架,能提供文件的上传和下载,比如在物联网的项目中,一些设备需要到服务器拉取配置,或者需要将设备的一些累积的统计文件上传到服务器上。

鱼和熊掌如何兼得?rpcx采用了一个巧妙的实现,既避免了降低服务的质量,还可以客户端和服务器端之间传输大文件。

FTP协议的启发,rpcx也采用独立的控制连接和数据连接方式。控制连接就是普通的rpcx服务,客户端和服务器端协商好数据,然后新建一个连接进行文件的传输。但是又和ftp协议不同,rpcx单独在服务器端开启了一个数据传输端口,客户端上传下载数据的时候会连接这个端口。

rpcx提供了一个服务器端文件传输插件: FileTransferService,你需要指定它用来做数据传输的监听地址和端口、上传文件的处理、下载文件的处理、令牌缓存的大小:

1
func NewFileTransfer(addr string, handler FileTransferHandler, downloadFileHandler DownloadFileHandler, waitNum int) *FileTransfer 

它提供了两个服务,客户端需要调用这两个服务以便获取token,并进行文件的传输:

  • TransferFile: 客户端要上传文件之前需要调用的服务
  • DownloadFile: 客户端要下载文件之前需要调用的服务

客户端获取到token之后,就可以连接服务器端的数据传输端口,进行文件上传下载了。

实际使用的时候不会那么复杂,因为rpcx已经把其中底层的交互封装好了,暴露给用户的只是两个方法:

1
2
	SendFile(ctx context.Context, fileName string, rateInBytesPerSecond int64) error
	DownloadFile(ctx context.Context, requestFileName string, saveTo io.Writer) error

上传的时候你只需配置要上传的文件的路径,客户端会读取这个文件并上传,为了避免将带宽打爆,你还可以设置上传的速率。客户端不会把文件全部读取到内存,而是使用一个buffer分片的上传文件。

下载文件时需要告诉服务器你需要下载什么文件,并且提供一个io.Writer处理接收到的文件数据。

例子

完整的例子在 filetransfer

服务器

服务器端需要注册文件传输服务:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
package main

import (
	"flag"
	"fmt"
	"io/ioutil"
	"net"

	"github.com/smallnest/rpcx/serverplugin"

	"github.com/smallnest/rpcx/server"
)

var (
	addr             = flag.String("addr", "localhost:8972", "server address")
	fileTransferAddr = flag.String("transfer-addr", "localhost:8973", "data transfer address")
)

func main() {
	flag.Parse()

	s := server.NewServer()

	p := serverplugin.NewFileTransfer(*fileTransferAddr, saveFilehandler, nil, 1000)
	serverplugin.RegisterFileTransfer(s, p)

	err := s.Serve("tcp", *addr)
	if err != nil {
		panic(err)
	}
}

func saveFilehandler(conn net.Conn, args *serverplugin.FileTransferArgs) {
	fmt.Printf("received file name: %s, size: %d\n", args.FileName, args.FileSize)
	data, err := ioutil.ReadAll(conn)
	if err != nil {
		fmt.Printf("error read: %v\n", err)
		return
	}
	fmt.Printf("file content: %s\n", data)
}

并且提供一个FileTransferHandler。当客户端上传文件的时候,服务器使用这个handler处理客户端上传的文件数据。这个handler只是把收到的文件内容输出到控制台中,它只是一个简单的演示。你可以扩展它,把内容保存到本地文件,或者解析后存入到数据库中。

这里我们没有提供DownloadFileHandler,也就是说我们不会处理客户端的下载文件请求。

客户端

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
package main

import (
	"context"
	"flag"
	"log"

	"github.com/smallnest/rpcx/serverplugin"

	"github.com/smallnest/rpcx/client"
)

var (
	addr = flag.String("addr", "localhost:8972", "server address")
)

func main() {
	flag.Parse()

	d := client.NewPeer2PeerDiscovery("tcp@"+*addr, "")
	xclient := client.NewXClient(serverplugin.SendFileServiceName, client.Failtry, client.RandomSelect, d, client.DefaultOption)
	defer xclient.Close()

	err := xclient.SendFile(context.Background(), "abc.txt", 0)
	if err != nil {
		panic(err)
	}
	log.Println("send ok")

}

客户端调用SendFile上传本地文件abc.txt,并且不会限流。

当这个客户端运行时,可以在服务器端看到上传的文件的内容。

所以,如果你使用rpcx上传下载文件,在服务器端,你需要实现上传下载的handler。客户端如果只是上传文件,直接调用SendFile即可,如果客户端想下载文件,还需要实现一个io.Writer去保存文件数据。