rpcx 官方技术博客

使用Redis做服务发现

2019.10.14

Redis 是一个高性能的key/value数据库,和Memcached类似,但是它支持存储的value类型相对更多,包括string(字符串)、list(链表)、set(集合)、zset(sorted set --有序集合)和hash(哈希类型)。与memcached一样,为了保证效率,数据都是缓存在内存中。区别的是redis会周期性的把更新的数据写入磁盘或者把修改操作写入追加的记录文件,并且在此基础上实现了master-slave(主从)同步。

Redis读写性能优异,又方便进行主从复制,横向扩展方便,在大厂中得到了广泛应用。微博曾部署了最大的redis节点,即使是现在,微博内部也部署了大量的Redis集群。

如果我们能把它作为服务发现的注册中心的话,可以利用它的高性能和容错机制,实现微服务部署中的服务发现功能。

rpcx使用libkv实现ZooKeeper、Etcd、Consul的统一访问,采用一致的代码实现ZooKeeper、Etcd和 Consul作为服务发现的注册中心。其中libkv虽然支持etcd 3.x, 但是还是使用etcd v2 API接口进行存储。我额外提供了一个插件ETCDV3,可以支持采用etcd v3 API接口支持Etcd的访问。

我倾向于采用libkv的统一访问方式提供对其它产品的支持,值的称赞的是,abronan/valkeyrie fork了libkv的库,提供了后续的维护和额外的功能,其中就提供了redis的支持。

在此基础上, rpcx提供了Redis作为服务发现注册中心的插件。

它的使用方法和Etcd、ZooKeeper、Consul插件类似。下面提供了服务端的例子和客户端的例子。

服务端

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
package main

import (
	"flag"
	"log"
	"time"

	example "github.com/rpcx-ecosystem/rpcx-examples3"
	"github.com/smallnest/rpcx/server"
	"github.com/smallnest/rpcx/serverplugin"
)

var (
	addr      = flag.String("addr", "localhost:8972", "server address")
	redisAddr = flag.String("redisAddr", "localhost:6379", "redis address")
	basePath  = flag.String("base", "/rpcx_test", "prefix path")
)

func main() {
	flag.Parse()

	s := server.NewServer()
	addRegistryPlugin(s)

	s.RegisterName("Arith", new(example.Arith), "")
	err := s.Serve("tcp", *addr)
	if err != nil {
		panic(err)
	}
}

func addRegistryPlugin(s *server.Server) {

	r := &serverplugin.RedisRegisterPlugin{
		ServiceAddress: "tcp@" + *addr,
		RedisServers:   []string{*redisAddr},
		BasePath:       *basePath,
		UpdateInterval: time.Minute,
	}
	err := r.Start()
	if err != nil {
		log.Fatal(err)
	}
	s.Plugins.Add(r)
}

服务器配置了RedisRegisterPlugin插件,你需要指定此节点注册到注册中心的IP地址。有时候你监听的IP地址是局域网的IP地址,但是对外提供的是广播地址,这里你就可以配置和监听地址不同的地址,如果没有这个需求,你就配置成你的监听地址即可。

前面的tcp@指的是它使用TCP作为传输协议。

RedisServers 指定redis集群中的redis地址。

BasePath 是命名空间,在服务很多的情况下你可以指定不同的命名空间,方便管理。

UpdateInterval 用来设置心跳,一旦超过时间节点就会过期。

客户端

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
package main

import (
	"context"
	"flag"
	"log"
	"time"

	example "github.com/rpcx-ecosystem/rpcx-examples3"
	"github.com/smallnest/rpcx/client"
)

var (
	redisAddr = flag.String("redisAddr", "localhost:6379", "redis address")
	basePath  = flag.String("base", "/rpcx_test", "prefix path")
)

func main() {
	flag.Parse()

	d := client.NewRedisDiscovery(*basePath, "Arith", []string{*redisAddr}, nil)
	xclient := client.NewXClient("Arith", client.Failover, client.RoundRobin, d, client.DefaultOption)
	defer xclient.Close()

	args := &example.Args{
		A: 10,
		B: 20,
	}

	for {
		reply := &example.Reply{}
		err := xclient.Call(context.Background(), "Mul", args, reply)
		if err != nil {
			log.Printf("failed to call: %v\n", err)
			time.Sleep(5 * time.Second)
			continue
		}

		log.Printf("%d * %d = %d", args.A, args.B, reply.C)

		time.Sleep(5 * time.Second)
	}

}

客户端使用NewRedisDiscovery从Redis集群中获取服务的信息。你需要指定命名空间,服务的名称,redis集群中的redis地址。

简单设置,服务发现搞定,就这么简单就可以把你的服务迅速横向扩展开来。

更多设置

Redi支持密码设置和数据库选择。

如果你的Redis需要密码访问,你可以设置options *store.Config中的Password字段。

1
func NewRedisDiscovery(basePath string, servicePath string, etcdAddr []string, options *store.Config) ServiceDiscovery 

如果你的数据库不是第0号数据库,你可以设置options *store.Config中的Bucket字段,设置为你的数据库索引值。