Featured image of post 微服务之服务注册和服务发现篇

微服务之服务注册和服务发现篇

服务注册与发现就是保证当服务上下线发生变更时,服务消费者和服务提供者能够保持正常通信。

有了服务注册和发现机制,消费者不需要知道具体服务提供者的真实物理地址就可以进行调用,也无须知道具体有多少个服务者可用;而服务提供者只需要注册到注册中心,就可以对外提供服务,在对外服务时不需要知道具体是哪些服务调用了自己。

RPC 配置

Name: user.rpc
ListenOn: 0.0.0.0:8081
Etcd:
  Hosts:
  - 127.0.0.1:2379
  Key: user.rpc

被调方-服务注册

  • mall/user/rpc/user.go 源码如下
package main

import (
	"flag"
	"fmt"

	"go-zero-demo-rpc/mall/user/rpc/internal/config"
	"go-zero-demo-rpc/mall/user/rpc/internal/server"
	"go-zero-demo-rpc/mall/user/rpc/internal/svc"
	"go-zero-demo-rpc/mall/user/rpc/types/user"

	"github.com/zeromicro/go-zero/core/conf"
	"github.com/zeromicro/go-zero/core/service"
	"github.com/zeromicro/go-zero/zrpc"
	"google.golang.org/grpc"
	"google.golang.org/grpc/reflection"
)

var configFile = flag.String("f", "etc/user.yaml", "the config file")

func main() {
    flag.Parse()
    
    var c config.Config
    conf.MustLoad(*configFile, &c)
    ctx := svc.NewServiceContext(c)
    svr := server.NewUserServer(ctx)
    
    s := zrpc.MustNewServer(c.RpcServerConf, func(grpcServer *grpc.Server) {
        user.RegisterUserServer(grpcServer, svr)
    
        if c.Mode == service.DevMode || c.Mode == service.TestMode {
            reflection.Register(grpcServer)
        }
    })
    defer s.Stop()
    
    fmt.Printf("Starting rpc server at %s...\n", c.ListenOn)
    s.Start()
}

  • MustNewServer内部实现调用了NewServer方法, 这里我们关注NewServer通过internal.NewRpcPubServer方法实例化了internal.Server
if c.HasEtcd() {
    server, err = internal.NewRpcPubServer(c.Etcd, c.ListenOn, serverOptions...)
    if err != nil {
        return nil, err
    }
}
  • internal.NewRpcPubServer
// NewRpcPubServer returns a Server.
func NewRpcPubServer(etcd discov.EtcdConf, listenOn string, opts ...ServerOption) (Server, error) {
	registerEtcd := func() error {
		pubListenOn := figureOutListenOn(listenOn)
		var pubOpts []discov.PubOption
		if etcd.HasAccount() {
			pubOpts = append(pubOpts, discov.WithPubEtcdAccount(etcd.User, etcd.Pass))
		}
		if etcd.HasTLS() {
			pubOpts = append(pubOpts, discov.WithPubEtcdTLS(etcd.CertFile, etcd.CertKeyFile,
				etcd.CACertFile, etcd.InsecureSkipVerify))
		}
		pubClient := discov.NewPublisher(etcd.Hosts, etcd.Key, pubListenOn, pubOpts...)
		return pubClient.KeepAlive()
	}
	server := keepAliveServer{
		registerEtcd: registerEtcd,
		Server:       NewRpcServer(listenOn, opts...),
	}

	return server, nil
}
  • internal.NewRpcPubServer中的figureOutListenOn是获取自己的地址,用于注册
    • listenOn就是配置文件中的listenOn
    • 如果不是:分隔, 直接返回
    • 如果不是0.0.0.0的注册, 也返回
    • 如果是0.0.0.0的寻找到自己的内网地址
func figureOutListenOn(listenOn string) string {
	fields := strings.Split(listenOn, ":")
	if len(fields) == 0 {
		return listenOn
	}

	host := fields[0]
	if len(host) > 0 && host != allEths {
		return listenOn
	}

	ip := os.Getenv(envPodIp)
	if len(ip) == 0 {
		ip = netx.InternalIp()
	}
	if len(ip) == 0 {
		return listenOn
	}

	return strings.Join(append([]string{ip}, fields[1:]...), ":")
}
  • internal.NewRpcPubServer中的registerEtcd会调用Publisher.KeepAlive方法
// KeepAlive keeps key:value alive.
func (p *Publisher) KeepAlive() error {
    // 这里获取 etcd 的连接
    cli, err := internal.GetRegistry().GetConn(p.endpoints)
    if err != nil {
        return err
    }
    
    p.lease, err = p.register(cli)
    if err != nil {
        return err
    }
    
    proc.AddWrapUpListener(func() {
        p.Stop()
    })
    
    return p.keepAliveAsync(cli)
}
  • p.register这里把自己注册到服务中
func (p *Publisher) register(client internal.EtcdClient) (clientv3.LeaseID, error) {

    // 这里新建一个租约
    resp, err := client.Grant(client.Ctx(), TimeToLive)
    if err != nil {
        return clientv3.NoLease, err
    }
    
    // 得到租约的 ID
    lease := resp.ID
    
    // 这里拼接出实际存储的 key
    if p.id > 0 {
        p.fullKey = makeEtcdKey(p.key, p.id)
    } else {
        p.fullKey = makeEtcdKey(p.key, int64(lease))
    }

    // p.value 是前面的 figureOutListenOn 方法获取到自己的地址
	_, err = client.Put(client.Ctx(), p.fullKey, p.value, clientv3.WithLease(lease))

    return lease, err
}
  • 注册完之后, keepAliveAsync开了一个协程保活这个服务
  • 当这个服务意外宕机时, 就不会再向etcd保活, etcd就会删除这个key
  • 注册好的服务如图

调用方-服务发现

  • order/api/order.go 源码如下
package main

import (
	"flag"
	"fmt"

	"go-zero-demo-rpc/order/api/internal/config"
	"go-zero-demo-rpc/order/api/internal/handler"
	"go-zero-demo-rpc/order/api/internal/svc"

	"github.com/zeromicro/go-zero/core/conf"
	"github.com/zeromicro/go-zero/rest"
)

var configFile = flag.String("f", "etc/order.yaml", "the config file")

func main() {
	flag.Parse()

	var c config.Config
	conf.MustLoad(*configFile, &c)

	server := rest.MustNewServer(c.RestConf)
	defer server.Stop()

	ctx := svc.NewServiceContext(c)
	handler.RegisterHandlers(server, ctx)

	fmt.Printf("Starting server at %s:%d...\n", c.Host, c.Port)
	server.Start()
}
  • svc.NewServiceContext方法内部又调用了zrpc.MustNewClient, zrpc.MustNewClient主要实现在zrpc.NewClient
func NewServiceContext(c config.Config) *ServiceContext {
	return &ServiceContext{
		Config:  c,
		UserRpc: user.NewUser(zrpc.MustNewClient(c.UserRpc)),
	}
}
  • 最后实际调用了internal.NewClient去实例化rpc client
func NewClient(c RpcClientConf, options ...ClientOption) (Client, error) {
	var opts []ClientOption
	if c.HasCredential() {
		opts = append(opts, WithDialOption(grpc.WithPerRPCCredentials(&auth.Credential{
			App:   c.App,
			Token: c.Token,
		})))
	}
	if c.NonBlock {
		opts = append(opts, WithNonBlock())
	}
	if c.Timeout > 0 {
		opts = append(opts, WithTimeout(time.Duration(c.Timeout)*time.Millisecond))
	}

	opts = append(opts, options...)

	target, err := c.BuildTarget()
	if err != nil {
		return nil, err
	}

	client, err := internal.NewClient(target, opts...)
	if err != nil {
		return nil, err
	}

	return &RpcClient{
		client: client,
	}, nil
}
  • zrpc/internal/client.go文件里, 包含一个init方法, 这里就是实际发现服务的地方, 在这里注册服务发现者
func init() {
	resolver.Register()
}
  • resolver.Register方法实现
package resolver

import (
	"github.com/zeromicro/go-zero/zrpc/resolver/internal"
)

// Register registers schemes defined zrpc.
// Keep it in a separated package to let third party register manually.
func Register() {
	internal.RegisterResolver()
}
  • 最后又回到interval包的internal.RegisterResolver方法, 这里我们关注etcdResolverBuilder
func RegisterResolver() {
	resolver.Register(&directResolverBuilder)
	resolver.Register(&discovResolverBuilder)
	resolver.Register(&etcdResolverBuilder)
	resolver.Register(&k8sResolverBuilder)
}
  • etcdBuilder的内嵌了discovBuilder结构体,
    • Build方法调用过程:
    • 实例化服务端: internal.NewClient->client.dial->grpc.DialContext
    • 由于etcdresolver.BuildDiscovTarget生成的taget所以是类似这样子的: discov://127.0.0.1:2379/user.rpc
    • 解析服务发现:ClientConn.parseTargetAndFindResolver->grpc.parseTarget->ClientConn.getResolver
    • 然后在grpc.newCCResolverWrapper调用resolver.Builder.Build方法去发现服务
  • 我们着重关注discovBuilder.Build方法
type etcdBuilder struct {
	discovBuilder
}


type discovBuilder struct{}

func (b *discovBuilder) Build(target resolver.Target, cc resolver.ClientConn, _ resolver.BuildOptions) (
	resolver.Resolver, error) {
	hosts := strings.FieldsFunc(targets.GetAuthority(target), func(r rune) bool {
		return r == EndpointSepChar
	})
	sub, err := discov.NewSubscriber(hosts, targets.GetEndpoints(target))
	if err != nil {
		return nil, err
	}

	update := func() {
		var addrs []resolver.Address
		for _, val := range subset(sub.Values(), subsetSize) {
			addrs = append(addrs, resolver.Address{
				Addr: val,
			})
		}
		if err := cc.UpdateState(resolver.State{
			Addresses: addrs,
		}); err != nil {
			logx.Error(err)
		}
	}
	sub.AddListener(update)
	update()

	return &nopResolver{cc: cc}, nil
}

func (b *discovBuilder) Scheme() string {
	return DiscovScheme
}
  • discov.NewSubscriber方法调用internal.GetRegistry().Monitor最后调用Registry.monitor方法进行监视
    • cluster.getClient拿到etcd连接
    • cluster.load作为第一次载入数据
    • cluster.watchwatch监听etcd前缀key的改动
func (c *cluster) monitor(key string, l UpdateListener) error {
	c.lock.Lock()
	c.listeners[key] = append(c.listeners[key], l)
	c.lock.Unlock()

	cli, err := c.getClient()
	if err != nil {
		return err
	}

	c.load(cli, key)
	c.watchGroup.Run(func() {
		c.watch(cli, key)
	})

	return nil
}
  • 如下图是cluster.load的实现, 就是根据前缀拿到user.prc服务注册的所有地址

Q

  • 为什么不用Redis做注册中心(反正只是把被调方的地址存储, 过期 Redis也能胜任), 找了很久找到这个说法

简单从以下几个方面说一下瑞迪斯为啥在微服务中不能取代 etcd:

1、redis 没有版本的概念,历史版本数据在大规模微服务中非常有必要,对于状态回滚和故障排查,甚至定锅都很重要

2、redis 的注册和发现目前只能通过 pub 和 sub 来实现,这两个命令完全不能满足生产环境的要求,具体原因可以 gg 或看源码实现

3、etcd 在 2.+版本时,watch 到数据官方文档均建议再 get 一次,因为会存在数据延迟,3.+版本不再需要,可想 redis 的 pub 和 sub 能否达到此种低延迟的要求

4、楼主看到的微服务架构应该都是将 etcd 直接暴露给 client 和 server 的,etcd 的性能摆在那,能够承受多少的 c/s 直连呢,更好的做法应该是对 etcd 做一层保护,当然这种做法会损失一些功能

5、redis 和 etcd 的集群实现方案是不一致的,etcd 采用的是 raft 协议,一主多从,只能写主,底层采用 boltdb 作为 k/v 存储,直接落盘

6、redis 的持久化方案有 aof 和 rdb,这两种方案在宕机的时候都或多或少的会丢失数据