在 gRPC-Go 中,预留的 resolver.Resolver
接口 定义了 自定义解析器(Resolver) 需要实现的基本方法。gRPC 使用 resolver.Resolver
来解析目标地址(如 custom:///service
),并动态提供服务器地址列表。
1. resolver.Resolver
预留接口
gRPC 预留的 resolver.Resolver
接口定义如下(google.golang.org/grpc/resolver
):
package resolver
// Resolver 定义了解析 gRPC 目标地址的接口
type Resolver interface {
ResolveNow(ResolveNowOptions) // 立即解析服务地址
Close() // 关闭解析器,释放资源
}
// 立即解析的选项(目前为空)
type ResolveNowOptions struct{}
🔹 Resolver
关键方法
方法 | 作用 |
---|---|
ResolveNow(ResolveNowOptions) |
触发 服务地址解析,用于 监听变更 |
Close() |
关闭解析器,释放资源 |
2. resolver.Builder
接口(解析器构造器)
在 gRPC 中,所有的解析器都需要先通过 resolver.Builder
进行注册:
package resolver
// Builder 是解析器的工厂接口
type Builder interface {
Build(target Target, cc ClientConn, opts BuildOptions) (Resolver, error) // 创建解析器
Scheme() string // 返回解析器的 scheme(如 "consul", "etcd")
}
🔹 Builder
关键方法
方法 | 作用 |
---|---|
Build(target, cc, opts) |
创建解析器实例,用于 解析 gRPC 地址 |
Scheme() |
返回解析器的 scheme,如 "consul" 、"etcd" 、"custom" |
3. resolver.ClientConn
预留接口
resolver.ClientConn
用于通知 gRPC 负载均衡器 解析后的服务地址:
package resolver
// ClientConn 是 Resolver 和 gRPC 负载均衡之间的通信接口
type ClientConn interface {
UpdateState(State) error // 更新 gRPC 服务器地址列表
ReportError(error) // 通知 gRPC 解析错误
NewAddress([]Address) // 过时方法(建议用 UpdateState)
NewServiceConfig(string) // 过时方法
}
🔹 ClientConn
关键方法
方法 | 作用 |
---|---|
UpdateState(State) |
更新 解析出的服务器列表 |
ReportError(error) |
解析失败时通知 gRPC |
4. 解析器的 resolver.State
结构
当 resolver.Resolver
解析完 gRPC 目标地址后,需要将解析结果传递给 gRPC 负载均衡器,格式如下:
package resolver
// State 代表解析后的 gRPC 服务器状态
type State struct {
Addresses []Address // 解析出的服务器地址
}
示例:
state := resolver.State{
Addresses: []resolver.Address{
{Addr: "127.0.0.1:50051"},
{Addr: "127.0.0.1:50052"},
},
}
cc.UpdateState(state) // 更新解析结果
5. 组合:完整的 gRPC 自定义解析器
根据 gRPC 预留的 resolver.Resolver
接口,我们可以实现一个 自定义的 Consul 解析器:
完整代码示例
package main
import (
"fmt"
"github.com/hashicorp/consul/api"
"log"
"net"
"sync"
"context"
"google.golang.org/grpc"
"google.golang.org/grpc/resolver"
"test/proto"
)
// 自定义解析器的 scheme
const myScheme = "consul"
var consulAddress string = "127.0.0.1:8500"
// 构造器(Builder)实现 `resolver.Builder`
type myResolverBuilder struct{}
// Target url;
func (b *myResolverBuilder) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) {
log.Printf("解析目标: %s", target.Endpoint())
r := &myResolver{
target: target,
cc: cc,
}
r.ResolveNow(resolver.ResolveNowOptions{}) // 立即解析
return r, nil
}
func (b *myResolverBuilder) Scheme() string {
return myScheme
}
// 解析器(Resolver)实现 `resolver.Resolver`
type myResolver struct {
target resolver.Target
cc resolver.ClientConn
mu sync.Mutex
}
// 解析服务地址
func (r *myResolver) ResolveNow(opts resolver.ResolveNowOptions) {
r.mu.Lock()
defer r.mu.Unlock()
// 假设这里从 Consul 获取服务地址
addresses, err := queryConsul(consulAddress, r.target.Endpoint())
if err != nil {
panic(err)
}
// 更新 gRPC 服务器地址
r.cc.UpdateState(resolver.State{Addresses: addresses})
log.Println("解析器更新完成")
}
// 关闭解析器
func (r *myResolver) Close() {}
// 注册解析器
func init() {
consulAddress, _ = GetLocalIP()
consulAddress = fmt.Sprintf("%s:8500", consulAddress)
resolver.Register(&myResolverBuilder{})
}
// 从 Consul 查询 `serviceName` 并返回可用的 gRPC 服务器列表
func queryConsul(consulAddress string, serviceName string) ([]resolver.Address, error) {
// 创建 Consul 客户端
consulConfig := api.DefaultConfig()
consulConfig.Address = consulAddress
client, err := api.NewClient(consulConfig)
if err != nil {
return nil, err
}
// 查询 Consul 中健康的 gRPC 服务
services, _, err := client.Health().Service(serviceName, "", true, nil)
if err != nil {
return nil, err
}
// 转换为 gRPC `resolver.Address`
var addresses []resolver.Address
for _, service := range services {
addr := service.Service.Address
port := service.Service.Port
addresses = append(addresses, resolver.Address{
Addr: addr + ":" + fmt.Sprintf("%d", port),
ServerName: serviceName})
}
// 如果没有可用服务,返回错误
if len(addresses) == 0 {
return nil, fmt.Errorf("未找到任何健康的 %s 服务", serviceName)
}
for _, address := range addresses {
fmt.Println(address)
}
return addresses, nil
}
// 获取本机 IPv4 地址(第一个非 127.0.0.1 的)
func GetLocalIP() (string, error) {
interfaces, err := net.Interfaces()
if err != nil {
return "", err
}
for _, iface := range interfaces {
// 忽略未启用的网卡
if iface.Flags&net.FlagUp == 0 || iface.Flags&net.FlagLoopback != 0 {
continue
}
addrs, err := iface.Addrs()
if err != nil {
continue
}
for _, addr := range addrs {
ipNet, ok := addr.(*net.IPNet)
if !ok || ipNet.IP.IsLoopback() || ipNet.IP.To4() == nil {
continue
}
return ipNet.IP.String(), nil // 找到第一个 IPv4 地址直接返回
}
}
return "", fmt.Errorf("no valid IPv4 address found")
}
func main() {
// 使用自定义解析器
conn, err := grpc.Dial(
"consul:///mx_shop_users",
grpc.WithInsecure(),
grpc.WithDefaultServiceConfig(`{"loadBalancingPolicy":"round_robin"}`),
)
if err != nil {
log.Fatalf("连接 gRPC 服务器失败: %v", err)
}
defer conn.Close()
log.Println("gRPC 连接成功")
UserSrvClient := proto.NewUserClient(conn)
rsp, err := UserSrvClient.GetUserList(context.Background(), &proto.PageInfo{
Pn: 1,
PSize: 2,
})
if err != nil {
panic(err)
}
for index, data := range rsp.Data {
fmt.Println(index, data)
}
}
6. 运行流程
grpc.Dial("custom:///myservice")
触发 gRPC 解析器。- gRPC 调用
customResolverBuilder.Build()
创建解析器。 ResolveNow()
解析目标地址,返回 服务器列表(如127.0.0.1:50051
、127.0.0.1:50052
)。UpdateState()
更新服务器地址列表,gRPC 负载均衡器 开始均衡请求。- 所有 gRPC 请求 都会自动负载均衡到解析出来的服务器。
7. 实验
- user.proto
syntax = "proto3";
import "google/protobuf/empty.proto";
option go_package = ".;proto";
service User {
// 传递一些信息进来
rpc GetUserList(PageInfo) returns (UserListResponse); // 用户的列表
rpc GetUserByMobile(MobileRequest) returns (UserInfoResponse); // 通过mobile查询用户
rpc GetUserById(IdRequest) returns (UserInfoResponse); // 通过id查询用户
rpc CreateUser(CreateUserInfo) returns (UserInfoResponse); // 添加用户
rpc UpdateUser(UpdateUserInfo) returns (google.protobuf.Empty); // 更新用户
rpc CheckPassWord(PasswordCheckInfo) returns (CheckResponse);// 检查
}
message PasswordCheckInfo {
string password = 1;
string encryptedPassword = 2;
}
message CheckResponse {
bool success = 1;
}
message PageInfo {
uint32 pn = 1; // 页码
uint32 pSize = 2; // 每一页的大小
}
message UserInfoResponse {
int32 id = 1;
string password = 2;
string mobile = 3;
string nickName = 4;
uint64 birthDay = 5;
string gender = 6;
int32 role = 7;
}
message UserListResponse {
int32 total = 1;
repeated UserInfoResponse data = 2;
}
message MobileRequest {
string mobile = 1;
}
message IdRequest {
int32 id = 1;
}
message CreateUserInfo {
string nickName = 1;
string password = 2;
string mobile = 3;
}
message UpdateUserInfo {
int32 id = 1;
string nickName = 2;
string gender = 3;
uint64 birthday = 4;
}
- main.go
package main
import (
"fmt"
"github.com/hashicorp/consul/api"
"log"
"net"
"sync"
"context"
"google.golang.org/grpc"
"google.golang.org/grpc/resolver"
"test/proto"
)
// 自定义解析器的 scheme
const myScheme = "consul"
var consulAddress string = "127.0.0.1:8500"
// 构造器(Builder)实现 `resolver.Builder`
type myResolverBuilder struct{}
// Target url;
func (b *myResolverBuilder) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) {
log.Printf("解析目标: %s", target.Endpoint())
r := &myResolver{
target: target,
cc: cc,
}
r.ResolveNow(resolver.ResolveNowOptions{}) // 立即解析
return r, nil
}
func (b *myResolverBuilder) Scheme() string {
return myScheme
}
// 解析器(Resolver)实现 `resolver.Resolver`
type myResolver struct {
target resolver.Target
cc resolver.ClientConn
mu sync.Mutex
}
// 解析服务地址
func (r *myResolver) ResolveNow(opts resolver.ResolveNowOptions) {
r.mu.Lock()
defer r.mu.Unlock()
// 假设这里从 Consul 获取服务地址
addresses, err := queryConsul(consulAddress, r.target.Endpoint())
if err != nil {
panic(err)
}
// 更新 gRPC 服务器地址
r.cc.UpdateState(resolver.State{Addresses: addresses})
log.Println("解析器更新完成")
}
// 关闭解析器
func (r *myResolver) Close() {}
// 注册解析器
func init() {
consulAddress, _ = GetLocalIP()
consulAddress = fmt.Sprintf("%s:8500", consulAddress)
resolver.Register(&myResolverBuilder{})
}
// 从 Consul 查询 `serviceName` 并返回可用的 gRPC 服务器列表
func queryConsul(consulAddress string, serviceName string) ([]resolver.Address, error) {
// 创建 Consul 客户端
consulConfig := api.DefaultConfig()
consulConfig.Address = consulAddress
client, err := api.NewClient(consulConfig)
if err != nil {
return nil, err
}
// 查询 Consul 中健康的 gRPC 服务
services, _, err := client.Health().Service(serviceName, "", true, nil)
if err != nil {
return nil, err
}
// 转换为 gRPC `resolver.Address`
var addresses []resolver.Address
for _, service := range services {
addr := service.Service.Address
port := service.Service.Port
addresses = append(addresses, resolver.Address{
Addr: addr + ":" + fmt.Sprintf("%d", port),
ServerName: serviceName})
}
// 如果没有可用服务,返回错误
if len(addresses) == 0 {
return nil, fmt.Errorf("未找到任何健康的 %s 服务", serviceName)
}
for _, address := range addresses {
fmt.Println(address)
}
return addresses, nil
}
// 获取本机 IPv4 地址(第一个非 127.0.0.1 的)
func GetLocalIP() (string, error) {
interfaces, err := net.Interfaces()
if err != nil {
return "", err
}
for _, iface := range interfaces {
// 忽略未启用的网卡
if iface.Flags&net.FlagUp == 0 || iface.Flags&net.FlagLoopback != 0 {
continue
}
addrs, err := iface.Addrs()
if err != nil {
continue
}
for _, addr := range addrs {
ipNet, ok := addr.(*net.IPNet)
if !ok || ipNet.IP.IsLoopback() || ipNet.IP.To4() == nil {
continue
}
return ipNet.IP.String(), nil // 找到第一个 IPv4 地址直接返回
}
}
return "", fmt.Errorf("no valid IPv4 address found")
}
func main() {
// 使用自定义解析器
conn, err := grpc.Dial(
"consul:///mx_shop_users",
grpc.WithInsecure(),
grpc.WithDefaultServiceConfig(`{"loadBalancingPolicy":"round_robin"}`),
)
if err != nil {
log.Fatalf("连接 gRPC 服务器失败: %v", err)
}
defer conn.Close()
log.Println("gRPC 连接成功")
UserSrvClient := proto.NewUserClient(conn)
rsp, err := UserSrvClient.GetUserList(context.Background(), &proto.PageInfo{
Pn: 1,
PSize: 2,
})
if err != nil {
panic(err)
}
for index, data := range rsp.Data {
fmt.Println(index, data)
}
}
是有负载均衡的
9. 总结
✅ gRPC 预留的 resolver.Resolver
接口 主要用于 解析目标地址,并提供服务器列表。
✅ 核心接口
- **
resolver.Resolver
**:解析地址并返回服务器列表 - **
resolver.Builder
**:创建解析器 - **
resolver.ClientConn
**:通知 gRPC 解析结果 - **
resolver.State
**:存储解析后的服务器地址
✅ 自定义 gRPC 解析器适用场景
- Consul、Etcd、Kubernetes 服务发现
- 动态解析 gRPC 服务器
- 自定义负载均衡策略
🚀 这样,你的 gRPC 应用就能动态解析服务,并自动负载均衡!态解析并连接多个后端服务器!**