grpc-4models

  1. gRPC的四种模式
  2. Go代码演示如下
    1. proto文件
    2. Server
    3. Client

gRPC的四种模式

  • 简单模式
    • image-20241004161135738
  • 服务端模式
    • image-20241004161151963
  • 客户端模式
    • image-20241004161156411
  • 双向模式
    • image-20241004161201663

Go代码演示如下

proto文件

syntax = "proto3";

option go_package=".;proto";

service Greeter {
  // 使用服务端地流模式
  rpc GetStream(StreamReqData) returns (stream StreamResData); // 服务端流模式
  rpc PutStream(stream StreamReqData) returns (StreamResData); // 客户端流模式
  rpc AllStream(stream StreamReqData) returns (stream StreamResData); // 双向流模式
}


message StreamReqData {
  string data = 1;
}

message StreamResData {
  string data = 1;
}

Server

package main

import (
    "fmt"
    "go-grpc/stream_grpc_test/proto"
    "google.golang.org/grpc"
    "io"
    "log"
    "net"
    "sync"
    "time"
)

const PORT = ":8888"

type server struct {
    proto.UnimplementedGreeterServer
}

// GetStream 服务端流模式
func (s *server) GetStream(req *proto.StreamReqData, res proto.Greeter_GetStreamServer) error {
    log.Printf("Received GetStream request: %v", req.Data)
    i := 0
    for {
        err := res.Send(&proto.StreamResData{Data: fmt.Sprintf("Current time: %v", time.Now())})
        if err != nil {
            log.Printf("Error sending stream data: %v", err)
            return err
        }
        time.Sleep(time.Second)
        i++
        if i > 10 {
            log.Println("Stream completed.")
            break
        }
    }
    return nil
}

// PutStream 客户端流模式
func (s *server) PutStream(cliStr proto.Greeter_PutStreamServer) error {
    log.Println("PutStream started")
    for {
        req, err := cliStr.Recv()
        if err == io.EOF {
            log.Println("Client finished sending stream data.")
            return cliStr.SendAndClose(&proto.StreamResData{Data: "Received all client stream data."})
        }
        if err != nil {
            log.Printf("Error receiving client stream data: %v", err)
            return err
        }
        log.Printf("Received from client: %v", req.Data)
    }
}

// AllStream 双向流
func (s *server) AllStream(allStr proto.Greeter_AllStreamServer) error {
    log.Println("AllStream started")
    var wg sync.WaitGroup
    wg.Add(2)

    // 接受客户端的流
    go func() {
        defer wg.Done()
        for {
            data, err := allStr.Recv()
            if err == io.EOF {
                log.Println("Client finished sending in AllStream.")
                break
            }
            if err != nil {
                log.Printf("Error receiving from client: %v", err)
                break
            }
            log.Printf("Received from client: %v", data.Data)
        }
    }()

    // 发送数据给客户端
    go func() {
        defer wg.Done()
        for i := 0; i <= 10; i++ {
            data := &proto.StreamResData{
                Data: fmt.Sprintf("Server data %d", i),
            }
            err := allStr.Send(data)
            if err != nil {
                log.Printf("Error sending to client: %v", err)
                break
            }
            time.Sleep(time.Second)
        }
        log.Println("Server finished sending in AllStream.")
    }()

    wg.Wait()
    return nil
}

func main() {
    // 启动TCP监听
    lis, err := net.Listen("tcp", PORT)
    if err != nil {
        log.Fatalf("Failed to listen: %v", err)
    }
    defer lis.Close()

    // 创建gRPC服务器
    grpcServer := grpc.NewServer()

    // 注册服务
    proto.RegisterGreeterServer(grpcServer, &server{})

    // 启动gRPC服务
    log.Printf("gRPC server listening on %s", PORT)
    if err := grpcServer.Serve(lis); err != nil {
        log.Fatalf("Failed to serve: %v", err)
    }
}

Client

package main

import (
    "context"
    "fmt"
    "go-grpc/stream_grpc_test/proto"
    "google.golang.org/grpc"
    "google.golang.org/grpc/credentials/insecure"
    "io"
    "log"
    "sync"
    "time"
)

// 创建 gRPC 连接,减少重复代码
func createConnection() (*grpc.ClientConn, error) {
    conn, err := grpc.Dial("127.0.0.1:8888", grpc.WithTransportCredentials(insecure.NewCredentials()))
    if err != nil {
        return nil, err
    }
    return conn, nil
}

// 服务端流模式
func stream_server() {
    conn, err := createConnection()
    if err != nil {
        log.Fatalf("Failed to create connection: %v", err)
    }
    defer conn.Close()

    c := proto.NewGreeterClient(conn)
    res, err := c.GetStream(context.Background(), &proto.StreamReqData{
        Data: "DJ",
    })
    if err != nil {
        log.Fatalf("Error calling GetStream: %v", err)
    }

    for {
        resp, err := res.Recv()
        if err == io.EOF {
            log.Println("Stream ended")
            break
        }
        if err != nil {
            log.Fatalf("Error receiving stream: %v", err)
        }
        fmt.Println(resp)
    }
}

// 客户端流模式
func stream_client() {
    conn, err := createConnection()
    if err != nil {
        log.Fatalf("Failed to create connection: %v", err)
    }
    defer conn.Close()

    c := proto.NewGreeterClient(conn)
    puts, err := c.PutStream(context.Background())
    if err != nil {
        log.Fatalf("Error starting PutStream: %v", err)
    }

    for i := 0; i <= 10; i++ {
        data := &proto.StreamReqData{
            Data: fmt.Sprintf("Client time: %v", time.Now()),
        }
        if err := puts.Send(data); err != nil {
            log.Fatalf("Error sending to stream: %v", err)
        }
        time.Sleep(time.Second)
    }

    // 结束流并关闭连接
    if _, err := puts.CloseAndRecv(); err != nil {
        log.Fatalf("Error closing client stream: %v", err)
    }
}

// 双向流模式
func stream_all() {
    conn, err := createConnection()
    if err != nil {
        log.Fatalf("Failed to create connection: %v", err)
    }
    defer conn.Close()

    c := proto.NewGreeterClient(conn)
    allStr, err := c.AllStream(context.Background())
    if err != nil {
        log.Fatalf("Error starting AllStream: %v", err)
    }

    wg := sync.WaitGroup{}
    wg.Add(2)

    // 发送数据
    go func() {
        defer wg.Done()
        for i := 0; i <= 10; i++ {
            err := allStr.Send(&proto.StreamReqData{Data: fmt.Sprintf("Client send %v", i)})
            if err != nil {
                log.Printf("Error sending stream data: %v", err)
                break
            }
            time.Sleep(time.Second)
        }
        // 在发送结束后,调用 CloseSend 通知服务端发送完毕
        allStr.CloseSend()
    }()

    // 接收数据
    go func() {
        defer wg.Done()
        for {
            data, err := allStr.Recv()
            if err == io.EOF { // 结束的标志
                log.Println("Stream ended")
                break
            }
            if err != nil {
                log.Printf("Error receiving stream data: %v", err)
                break
            }
            fmt.Println("Received from server:", data.Data)
        }
    }()

    wg.Wait()
}

func main() {
    // 测试双向流模式
    stream_all()
}
github