grpc-resolver

在 gRPC-Go 中,预留的 resolver.Resolver 接口 定义了 自定义解析器(Resolver) 需要实现的基本方法。gRPC 使用 resolver.Resolver 来解析目标地址(如 custom:///service),并动态提供服务器地址列表。


1. resolver.Resolver 预留接口

gRPC 预留的 resolver.Resolver 接口定义如下(google.golang.org/grpc/resolver):

package resolver

// Resolver 定义了解析 gRPC 目标地址的接口
type Resolver interface {
    ResolveNow(ResolveNowOptions) // 立即解析服务地址
    Close()                       // 关闭解析器,释放资源
}

// 立即解析的选项(目前为空)
type ResolveNowOptions struct{}

🔹 Resolver 关键方法

方法 作用
ResolveNow(ResolveNowOptions) 触发 服务地址解析,用于 监听变更
Close() 关闭解析器,释放资源

2. resolver.Builder 接口(解析器构造器)

在 gRPC 中,所有的解析器都需要先通过 resolver.Builder 进行注册:

package resolver

// Builder 是解析器的工厂接口
type Builder interface {
    Build(target Target, cc ClientConn, opts BuildOptions) (Resolver, error) // 创建解析器
    Scheme() string // 返回解析器的 scheme(如 "consul", "etcd")
}

Builder 关键方法

方法 作用
Build(target, cc, opts) 创建解析器实例,用于 解析 gRPC 地址
Scheme() 返回解析器的 scheme,如 "consul""etcd""custom"

3. resolver.ClientConn 预留接口

resolver.ClientConn 用于通知 gRPC 负载均衡器 解析后的服务地址:

package resolver

// ClientConn 是 Resolver 和 gRPC 负载均衡之间的通信接口
type ClientConn interface {
    UpdateState(State) error  // 更新 gRPC 服务器地址列表
    ReportError(error)        // 通知 gRPC 解析错误
    NewAddress([]Address)     // 过时方法(建议用 UpdateState)
    NewServiceConfig(string)  // 过时方法
}

ClientConn 关键方法

方法 作用
UpdateState(State) 更新 解析出的服务器列表
ReportError(error) 解析失败时通知 gRPC

4. 解析器的 resolver.State 结构

resolver.Resolver 解析完 gRPC 目标地址后,需要将解析结果传递给 gRPC 负载均衡器,格式如下:

package resolver

// State 代表解析后的 gRPC 服务器状态
type State struct {
    Addresses []Address // 解析出的服务器地址
}

示例:

state := resolver.State{
    Addresses: []resolver.Address{
        {Addr: "127.0.0.1:50051"},
        {Addr: "127.0.0.1:50052"},
    },
}
cc.UpdateState(state) // 更新解析结果

5. 组合:完整的 gRPC 自定义解析器

根据 gRPC 预留的 resolver.Resolver 接口,我们可以实现一个 自定义的 Consul 解析器

完整代码示例

package main

import (
	"fmt"
	"github.com/hashicorp/consul/api"
	"log"
	"net"
	"sync"

	"context"
	"google.golang.org/grpc"
	"google.golang.org/grpc/resolver"
	"test/proto"
)

// 自定义解析器的 scheme
const myScheme = "consul"

var consulAddress string = "127.0.0.1:8500"

// 构造器(Builder)实现 `resolver.Builder`
type myResolverBuilder struct{}

// Target url;
func (b *myResolverBuilder) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) {
	log.Printf("解析目标: %s", target.Endpoint())
	r := &myResolver{
		target: target,
		cc:     cc,
	}
	r.ResolveNow(resolver.ResolveNowOptions{}) // 立即解析
	return r, nil
}

func (b *myResolverBuilder) Scheme() string {
	return myScheme
}

// 解析器(Resolver)实现 `resolver.Resolver`
type myResolver struct {
	target resolver.Target
	cc     resolver.ClientConn
	mu     sync.Mutex
}

// 解析服务地址
func (r *myResolver) ResolveNow(opts resolver.ResolveNowOptions) {
	r.mu.Lock()
	defer r.mu.Unlock()

	// 假设这里从 Consul 获取服务地址
	addresses, err := queryConsul(consulAddress, r.target.Endpoint())
	if err != nil {
		panic(err)
	}

	// 更新 gRPC 服务器地址
	r.cc.UpdateState(resolver.State{Addresses: addresses})
	log.Println("解析器更新完成")
}

// 关闭解析器
func (r *myResolver) Close() {}

// 注册解析器
func init() {
	consulAddress, _ = GetLocalIP()
	consulAddress = fmt.Sprintf("%s:8500", consulAddress)
	resolver.Register(&myResolverBuilder{})
}

// 从 Consul 查询 `serviceName` 并返回可用的 gRPC 服务器列表
func queryConsul(consulAddress string, serviceName string) ([]resolver.Address, error) {
	// 创建 Consul 客户端
	consulConfig := api.DefaultConfig()
	consulConfig.Address = consulAddress
	client, err := api.NewClient(consulConfig)
	if err != nil {
		return nil, err
	}

	// 查询 Consul 中健康的 gRPC 服务
	services, _, err := client.Health().Service(serviceName, "", true, nil)
	if err != nil {
		return nil, err
	}

	// 转换为 gRPC `resolver.Address`
	var addresses []resolver.Address
	for _, service := range services {
		addr := service.Service.Address
		port := service.Service.Port
		addresses = append(addresses, resolver.Address{
			Addr:       addr + ":" + fmt.Sprintf("%d", port),
			ServerName: serviceName})
	}

	// 如果没有可用服务,返回错误
	if len(addresses) == 0 {
		return nil, fmt.Errorf("未找到任何健康的 %s 服务", serviceName)
	}

	for _, address := range addresses {
		fmt.Println(address)
	}

	return addresses, nil
}

// 获取本机 IPv4 地址(第一个非 127.0.0.1 的)
func GetLocalIP() (string, error) {
	interfaces, err := net.Interfaces()
	if err != nil {
		return "", err
	}

	for _, iface := range interfaces {
		// 忽略未启用的网卡
		if iface.Flags&net.FlagUp == 0 || iface.Flags&net.FlagLoopback != 0 {
			continue
		}

		addrs, err := iface.Addrs()
		if err != nil {
			continue
		}

		for _, addr := range addrs {
			ipNet, ok := addr.(*net.IPNet)
			if !ok || ipNet.IP.IsLoopback() || ipNet.IP.To4() == nil {
				continue
			}
			return ipNet.IP.String(), nil // 找到第一个 IPv4 地址直接返回
		}
	}

	return "", fmt.Errorf("no valid IPv4 address found")
}

func main() {
	// 使用自定义解析器
	conn, err := grpc.Dial(
		"consul:///mx_shop_users",
		grpc.WithInsecure(),
		grpc.WithDefaultServiceConfig(`{"loadBalancingPolicy":"round_robin"}`),
	)
	if err != nil {
		log.Fatalf("连接 gRPC 服务器失败: %v", err)
	}
	defer conn.Close()

	log.Println("gRPC 连接成功")
	UserSrvClient := proto.NewUserClient(conn)
	rsp, err := UserSrvClient.GetUserList(context.Background(), &proto.PageInfo{
		Pn:    1,
		PSize: 2,
	})
	if err != nil {
		panic(err)
	}
	for index, data := range rsp.Data {
		fmt.Println(index, data)
	}
}

6. 运行流程

  1. grpc.Dial("custom:///myservice") 触发 gRPC 解析器。
  2. gRPC 调用 customResolverBuilder.Build() 创建解析器。
  3. ResolveNow() 解析目标地址,返回 服务器列表(如 127.0.0.1:50051127.0.0.1:50052)。
  4. UpdateState() 更新服务器地址列表,gRPC 负载均衡器 开始均衡请求
  5. 所有 gRPC 请求 都会自动负载均衡到解析出来的服务器。

7. 实验

  • user.proto
syntax = "proto3";

import "google/protobuf/empty.proto";

option go_package = ".;proto";

service User {
//  传递一些信息进来
  rpc GetUserList(PageInfo) returns (UserListResponse); // 用户的列表
  rpc GetUserByMobile(MobileRequest) returns (UserInfoResponse); // 通过mobile查询用户
  rpc GetUserById(IdRequest) returns (UserInfoResponse); // 通过id查询用户
  rpc CreateUser(CreateUserInfo) returns (UserInfoResponse); // 添加用户
  rpc UpdateUser(UpdateUserInfo) returns (google.protobuf.Empty); // 更新用户
  rpc CheckPassWord(PasswordCheckInfo) returns (CheckResponse);// 检查
}


message PasswordCheckInfo {
  string password = 1;
  string encryptedPassword = 2;
}
message CheckResponse {
  bool success = 1;
}


message PageInfo {
  uint32 pn = 1; // 页码
  uint32 pSize = 2; // 每一页的大小
}

message UserInfoResponse {
  int32 id = 1;
  string password = 2;
  string mobile = 3;
  string nickName = 4;
  uint64 birthDay = 5;
  string gender = 6;
  int32 role = 7;
}

message UserListResponse {
  int32 total = 1;
  repeated UserInfoResponse data = 2;
}

message MobileRequest {
  string mobile = 1;
}

message IdRequest {
  int32 id = 1;
}

message CreateUserInfo {
  string nickName = 1;
  string password = 2;
  string mobile = 3;
}

message UpdateUserInfo {
  int32 id = 1;
  string nickName = 2;
  string gender = 3;
  uint64 birthday = 4;
}
  • main.go
package main

import (
	"fmt"
	"github.com/hashicorp/consul/api"
	"log"
	"net"
	"sync"

	"context"
	"google.golang.org/grpc"
	"google.golang.org/grpc/resolver"
	"test/proto"
)

// 自定义解析器的 scheme
const myScheme = "consul"

var consulAddress string = "127.0.0.1:8500"

// 构造器(Builder)实现 `resolver.Builder`
type myResolverBuilder struct{}

// Target url;
func (b *myResolverBuilder) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) {
	log.Printf("解析目标: %s", target.Endpoint())
	r := &myResolver{
		target: target,
		cc:     cc,
	}
	r.ResolveNow(resolver.ResolveNowOptions{}) // 立即解析
	return r, nil
}

func (b *myResolverBuilder) Scheme() string {
	return myScheme
}

// 解析器(Resolver)实现 `resolver.Resolver`
type myResolver struct {
	target resolver.Target
	cc     resolver.ClientConn
	mu     sync.Mutex
}

// 解析服务地址
func (r *myResolver) ResolveNow(opts resolver.ResolveNowOptions) {
	r.mu.Lock()
	defer r.mu.Unlock()

	// 假设这里从 Consul 获取服务地址
	addresses, err := queryConsul(consulAddress, r.target.Endpoint())
	if err != nil {
		panic(err)
	}

	// 更新 gRPC 服务器地址
	r.cc.UpdateState(resolver.State{Addresses: addresses})
	log.Println("解析器更新完成")
}

// 关闭解析器
func (r *myResolver) Close() {}

// 注册解析器
func init() {
	consulAddress, _ = GetLocalIP()
	consulAddress = fmt.Sprintf("%s:8500", consulAddress)
	resolver.Register(&myResolverBuilder{})
}

// 从 Consul 查询 `serviceName` 并返回可用的 gRPC 服务器列表
func queryConsul(consulAddress string, serviceName string) ([]resolver.Address, error) {
	// 创建 Consul 客户端
	consulConfig := api.DefaultConfig()
	consulConfig.Address = consulAddress
	client, err := api.NewClient(consulConfig)
	if err != nil {
		return nil, err
	}

	// 查询 Consul 中健康的 gRPC 服务
	services, _, err := client.Health().Service(serviceName, "", true, nil)
	if err != nil {
		return nil, err
	}

	// 转换为 gRPC `resolver.Address`
	var addresses []resolver.Address
	for _, service := range services {
		addr := service.Service.Address
		port := service.Service.Port
		addresses = append(addresses, resolver.Address{
			Addr:       addr + ":" + fmt.Sprintf("%d", port),
			ServerName: serviceName})
	}

	// 如果没有可用服务,返回错误
	if len(addresses) == 0 {
		return nil, fmt.Errorf("未找到任何健康的 %s 服务", serviceName)
	}

	for _, address := range addresses {
		fmt.Println(address)
	}

	return addresses, nil
}

// 获取本机 IPv4 地址(第一个非 127.0.0.1 的)
func GetLocalIP() (string, error) {
	interfaces, err := net.Interfaces()
	if err != nil {
		return "", err
	}

	for _, iface := range interfaces {
		// 忽略未启用的网卡
		if iface.Flags&net.FlagUp == 0 || iface.Flags&net.FlagLoopback != 0 {
			continue
		}

		addrs, err := iface.Addrs()
		if err != nil {
			continue
		}

		for _, addr := range addrs {
			ipNet, ok := addr.(*net.IPNet)
			if !ok || ipNet.IP.IsLoopback() || ipNet.IP.To4() == nil {
				continue
			}
			return ipNet.IP.String(), nil // 找到第一个 IPv4 地址直接返回
		}
	}

	return "", fmt.Errorf("no valid IPv4 address found")
}

func main() {
	// 使用自定义解析器
	conn, err := grpc.Dial(
		"consul:///mx_shop_users",
		grpc.WithInsecure(),
		grpc.WithDefaultServiceConfig(`{"loadBalancingPolicy":"round_robin"}`),
	)
	if err != nil {
		log.Fatalf("连接 gRPC 服务器失败: %v", err)
	}
	defer conn.Close()

	log.Println("gRPC 连接成功")
	UserSrvClient := proto.NewUserClient(conn)
	rsp, err := UserSrvClient.GetUserList(context.Background(), &proto.PageInfo{
		Pn:    1,
		PSize: 2,
	})
	if err != nil {
		panic(err)
	}
	for index, data := range rsp.Data {
		fmt.Println(index, data)
	}
}

是有负载均衡的

9. 总结

gRPC 预留的 resolver.Resolver 接口 主要用于 解析目标地址,并提供服务器列表

核心接口

  • resolver.Resolver:解析地址并返回服务器列表
  • resolver.Builder:创建解析器
  • resolver.ClientConn:通知 gRPC 解析结果
  • resolver.State:存储解析后的服务器地址

自定义 gRPC 解析器适用场景

  • Consul、Etcd、Kubernetes 服务发现
  • 动态解析 gRPC 服务器
  • 自定义负载均衡策略

**这样,你的 gRPC 应用就能动态解析服务,并自动负载均衡!**态解析并连接多个后端服务器!

github