grpc-resolver

在 gRPC-Go 中,预留的 resolver.Resolver 接口 定义了 自定义解析器(Resolver) 需要实现的基本方法。gRPC 使用 resolver.Resolver 来解析目标地址(如 custom:///service),并动态提供服务器地址列表。


1. resolver.Resolver 预留接口

gRPC 预留的 resolver.Resolver 接口定义如下(google.golang.org/grpc/resolver):

package resolver

// Resolver 定义了解析 gRPC 目标地址的接口
type Resolver interface {
    ResolveNow(ResolveNowOptions) // 立即解析服务地址
    Close()                       // 关闭解析器,释放资源
}

// 立即解析的选项(目前为空)
type ResolveNowOptions struct{}

🔹 Resolver 关键方法

方法 作用
ResolveNow(ResolveNowOptions) 触发 服务地址解析,用于 监听变更
Close() 关闭解析器,释放资源

2. resolver.Builder 接口(解析器构造器)

在 gRPC 中,所有的解析器都需要先通过 resolver.Builder 进行注册:

package resolver

// Builder 是解析器的工厂接口
type Builder interface {
    Build(target Target, cc ClientConn, opts BuildOptions) (Resolver, error) // 创建解析器
    Scheme() string // 返回解析器的 scheme(如 "consul", "etcd")
}

🔹 Builder 关键方法

方法 作用
Build(target, cc, opts) 创建解析器实例,用于 解析 gRPC 地址
Scheme() 返回解析器的 scheme,如 "consul""etcd""custom"

3. resolver.ClientConn 预留接口

resolver.ClientConn 用于通知 gRPC 负载均衡器 解析后的服务地址:

package resolver

// ClientConn 是 Resolver 和 gRPC 负载均衡之间的通信接口
type ClientConn interface {
    UpdateState(State) error  // 更新 gRPC 服务器地址列表
    ReportError(error)        // 通知 gRPC 解析错误
    NewAddress([]Address)     // 过时方法(建议用 UpdateState)
    NewServiceConfig(string)  // 过时方法
}

🔹 ClientConn 关键方法

方法 作用
UpdateState(State) 更新 解析出的服务器列表
ReportError(error) 解析失败时通知 gRPC

4. 解析器的 resolver.State 结构

resolver.Resolver 解析完 gRPC 目标地址后,需要将解析结果传递给 gRPC 负载均衡器,格式如下:

package resolver

// State 代表解析后的 gRPC 服务器状态
type State struct {
    Addresses []Address // 解析出的服务器地址
}

示例:

state := resolver.State{
    Addresses: []resolver.Address{
        {Addr: "127.0.0.1:50051"},
        {Addr: "127.0.0.1:50052"},
    },
}
cc.UpdateState(state) // 更新解析结果

5. 组合:完整的 gRPC 自定义解析器

根据 gRPC 预留的 resolver.Resolver 接口,我们可以实现一个 自定义的 Consul 解析器

完整代码示例

package main

import (
    "fmt"
    "github.com/hashicorp/consul/api"
    "log"
    "net"
    "sync"

    "context"
    "google.golang.org/grpc"
    "google.golang.org/grpc/resolver"
    "test/proto"
)

// 自定义解析器的 scheme
const myScheme = "consul"

var consulAddress string = "127.0.0.1:8500"

// 构造器(Builder)实现 `resolver.Builder`
type myResolverBuilder struct{}

// Target url;
func (b *myResolverBuilder) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) {
    log.Printf("解析目标: %s", target.Endpoint())
    r := &myResolver{
        target: target,
        cc:     cc,
    }
    r.ResolveNow(resolver.ResolveNowOptions{}) // 立即解析
    return r, nil
}

func (b *myResolverBuilder) Scheme() string {
    return myScheme
}

// 解析器(Resolver)实现 `resolver.Resolver`
type myResolver struct {
    target resolver.Target
    cc     resolver.ClientConn
    mu     sync.Mutex
}

// 解析服务地址
func (r *myResolver) ResolveNow(opts resolver.ResolveNowOptions) {
    r.mu.Lock()
    defer r.mu.Unlock()

    // 假设这里从 Consul 获取服务地址
    addresses, err := queryConsul(consulAddress, r.target.Endpoint())
    if err != nil {
        panic(err)
    }

    // 更新 gRPC 服务器地址
    r.cc.UpdateState(resolver.State{Addresses: addresses})
    log.Println("解析器更新完成")
}

// 关闭解析器
func (r *myResolver) Close() {}

// 注册解析器
func init() {
    consulAddress, _ = GetLocalIP()
    consulAddress = fmt.Sprintf("%s:8500", consulAddress)
    resolver.Register(&myResolverBuilder{})
}

// 从 Consul 查询 `serviceName` 并返回可用的 gRPC 服务器列表
func queryConsul(consulAddress string, serviceName string) ([]resolver.Address, error) {
    // 创建 Consul 客户端
    consulConfig := api.DefaultConfig()
    consulConfig.Address = consulAddress
    client, err := api.NewClient(consulConfig)
    if err != nil {
        return nil, err
    }

    // 查询 Consul 中健康的 gRPC 服务
    services, _, err := client.Health().Service(serviceName, "", true, nil)
    if err != nil {
        return nil, err
    }

    // 转换为 gRPC `resolver.Address`
    var addresses []resolver.Address
    for _, service := range services {
        addr := service.Service.Address
        port := service.Service.Port
        addresses = append(addresses, resolver.Address{
            Addr:       addr + ":" + fmt.Sprintf("%d", port),
            ServerName: serviceName})
    }

    // 如果没有可用服务,返回错误
    if len(addresses) == 0 {
        return nil, fmt.Errorf("未找到任何健康的 %s 服务", serviceName)
    }

    for _, address := range addresses {
        fmt.Println(address)
    }

    return addresses, nil
}

// 获取本机 IPv4 地址(第一个非 127.0.0.1 的)
func GetLocalIP() (string, error) {
    interfaces, err := net.Interfaces()
    if err != nil {
        return "", err
    }

    for _, iface := range interfaces {
        // 忽略未启用的网卡
        if iface.Flags&net.FlagUp == 0 || iface.Flags&net.FlagLoopback != 0 {
            continue
        }

        addrs, err := iface.Addrs()
        if err != nil {
            continue
        }

        for _, addr := range addrs {
            ipNet, ok := addr.(*net.IPNet)
            if !ok || ipNet.IP.IsLoopback() || ipNet.IP.To4() == nil {
                continue
            }
            return ipNet.IP.String(), nil // 找到第一个 IPv4 地址直接返回
        }
    }

    return "", fmt.Errorf("no valid IPv4 address found")
}

func main() {
    // 使用自定义解析器
    conn, err := grpc.Dial(
        "consul:///mx_shop_users",
        grpc.WithInsecure(),
        grpc.WithDefaultServiceConfig(`{"loadBalancingPolicy":"round_robin"}`),
    )
    if err != nil {
        log.Fatalf("连接 gRPC 服务器失败: %v", err)
    }
    defer conn.Close()

    log.Println("gRPC 连接成功")
    UserSrvClient := proto.NewUserClient(conn)
    rsp, err := UserSrvClient.GetUserList(context.Background(), &proto.PageInfo{
        Pn:    1,
        PSize: 2,
    })
    if err != nil {
        panic(err)
    }
    for index, data := range rsp.Data {
        fmt.Println(index, data)
    }
}

6. 运行流程

  1. grpc.Dial("custom:///myservice") 触发 gRPC 解析器。
  2. gRPC 调用 customResolverBuilder.Build() 创建解析器。
  3. ResolveNow() 解析目标地址,返回 服务器列表(如 127.0.0.1:50051127.0.0.1:50052)。
  4. UpdateState() 更新服务器地址列表,gRPC 负载均衡器 开始均衡请求
  5. 所有 gRPC 请求 都会自动负载均衡到解析出来的服务器。

7. 实验

  • user.proto
syntax = "proto3";

import "google/protobuf/empty.proto";

option go_package = ".;proto";

service User {
//  传递一些信息进来
  rpc GetUserList(PageInfo) returns (UserListResponse); // 用户的列表
  rpc GetUserByMobile(MobileRequest) returns (UserInfoResponse); // 通过mobile查询用户
  rpc GetUserById(IdRequest) returns (UserInfoResponse); // 通过id查询用户
  rpc CreateUser(CreateUserInfo) returns (UserInfoResponse); // 添加用户
  rpc UpdateUser(UpdateUserInfo) returns (google.protobuf.Empty); // 更新用户
  rpc CheckPassWord(PasswordCheckInfo) returns (CheckResponse);// 检查
}


message PasswordCheckInfo {
  string password = 1;
  string encryptedPassword = 2;
}
message CheckResponse {
  bool success = 1;
}


message PageInfo {
  uint32 pn = 1; // 页码
  uint32 pSize = 2; // 每一页的大小
}

message UserInfoResponse {
  int32 id = 1;
  string password = 2;
  string mobile = 3;
  string nickName = 4;
  uint64 birthDay = 5;
  string gender = 6;
  int32 role = 7;
}

message UserListResponse {
  int32 total = 1;
  repeated UserInfoResponse data = 2;
}

message MobileRequest {
  string mobile = 1;
}

message IdRequest {
  int32 id = 1;
}

message CreateUserInfo {
  string nickName = 1;
  string password = 2;
  string mobile = 3;
}

message UpdateUserInfo {
  int32 id = 1;
  string nickName = 2;
  string gender = 3;
  uint64 birthday = 4;
}
  • main.go
package main

import (
    "fmt"
    "github.com/hashicorp/consul/api"
    "log"
    "net"
    "sync"

    "context"
    "google.golang.org/grpc"
    "google.golang.org/grpc/resolver"
    "test/proto"
)

// 自定义解析器的 scheme
const myScheme = "consul"

var consulAddress string = "127.0.0.1:8500"

// 构造器(Builder)实现 `resolver.Builder`
type myResolverBuilder struct{}

// Target url;
func (b *myResolverBuilder) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) {
    log.Printf("解析目标: %s", target.Endpoint())
    r := &myResolver{
        target: target,
        cc:     cc,
    }
    r.ResolveNow(resolver.ResolveNowOptions{}) // 立即解析
    return r, nil
}

func (b *myResolverBuilder) Scheme() string {
    return myScheme
}

// 解析器(Resolver)实现 `resolver.Resolver`
type myResolver struct {
    target resolver.Target
    cc     resolver.ClientConn
    mu     sync.Mutex
}

// 解析服务地址
func (r *myResolver) ResolveNow(opts resolver.ResolveNowOptions) {
    r.mu.Lock()
    defer r.mu.Unlock()

    // 假设这里从 Consul 获取服务地址
    addresses, err := queryConsul(consulAddress, r.target.Endpoint())
    if err != nil {
        panic(err)
    }

    // 更新 gRPC 服务器地址
    r.cc.UpdateState(resolver.State{Addresses: addresses})
    log.Println("解析器更新完成")
}

// 关闭解析器
func (r *myResolver) Close() {}

// 注册解析器
func init() {
    consulAddress, _ = GetLocalIP()
    consulAddress = fmt.Sprintf("%s:8500", consulAddress)
    resolver.Register(&myResolverBuilder{})
}

// 从 Consul 查询 `serviceName` 并返回可用的 gRPC 服务器列表
func queryConsul(consulAddress string, serviceName string) ([]resolver.Address, error) {
    // 创建 Consul 客户端
    consulConfig := api.DefaultConfig()
    consulConfig.Address = consulAddress
    client, err := api.NewClient(consulConfig)
    if err != nil {
        return nil, err
    }

    // 查询 Consul 中健康的 gRPC 服务
    services, _, err := client.Health().Service(serviceName, "", true, nil)
    if err != nil {
        return nil, err
    }

    // 转换为 gRPC `resolver.Address`
    var addresses []resolver.Address
    for _, service := range services {
        addr := service.Service.Address
        port := service.Service.Port
        addresses = append(addresses, resolver.Address{
            Addr:       addr + ":" + fmt.Sprintf("%d", port),
            ServerName: serviceName})
    }

    // 如果没有可用服务,返回错误
    if len(addresses) == 0 {
        return nil, fmt.Errorf("未找到任何健康的 %s 服务", serviceName)
    }

    for _, address := range addresses {
        fmt.Println(address)
    }

    return addresses, nil
}

// 获取本机 IPv4 地址(第一个非 127.0.0.1 的)
func GetLocalIP() (string, error) {
    interfaces, err := net.Interfaces()
    if err != nil {
        return "", err
    }

    for _, iface := range interfaces {
        // 忽略未启用的网卡
        if iface.Flags&net.FlagUp == 0 || iface.Flags&net.FlagLoopback != 0 {
            continue
        }

        addrs, err := iface.Addrs()
        if err != nil {
            continue
        }

        for _, addr := range addrs {
            ipNet, ok := addr.(*net.IPNet)
            if !ok || ipNet.IP.IsLoopback() || ipNet.IP.To4() == nil {
                continue
            }
            return ipNet.IP.String(), nil // 找到第一个 IPv4 地址直接返回
        }
    }

    return "", fmt.Errorf("no valid IPv4 address found")
}

func main() {
    // 使用自定义解析器
    conn, err := grpc.Dial(
        "consul:///mx_shop_users",
        grpc.WithInsecure(),
        grpc.WithDefaultServiceConfig(`{"loadBalancingPolicy":"round_robin"}`),
    )
    if err != nil {
        log.Fatalf("连接 gRPC 服务器失败: %v", err)
    }
    defer conn.Close()

    log.Println("gRPC 连接成功")
    UserSrvClient := proto.NewUserClient(conn)
    rsp, err := UserSrvClient.GetUserList(context.Background(), &proto.PageInfo{
        Pn:    1,
        PSize: 2,
    })
    if err != nil {
        panic(err)
    }
    for index, data := range rsp.Data {
        fmt.Println(index, data)
    }
}

是有负载均衡的

9. 总结

gRPC 预留的 resolver.Resolver 接口 主要用于 解析目标地址,并提供服务器列表

核心接口

  • **resolver.Resolver**:解析地址并返回服务器列表
  • **resolver.Builder**:创建解析器
  • **resolver.ClientConn**:通知 gRPC 解析结果
  • **resolver.State**:存储解析后的服务器地址

自定义 gRPC 解析器适用场景

  • Consul、Etcd、Kubernetes 服务发现
  • 动态解析 gRPC 服务器
  • 自定义负载均衡策略

🚀 这样,你的 gRPC 应用就能动态解析服务,并自动负载均衡!态解析并连接多个后端服务器!**

github