grpc-4models

  1. gRPC的四种模式
  2. Go代码演示如下
    1. proto文件
    2. Server
    3. Client

gRPC的四种模式

  • 简单模式
    • image-20241004161135738
  • 服务端模式
    • image-20241004161151963
  • 客户端模式
    • image-20241004161156411
  • 双向模式
    • image-20241004161201663

Go代码演示如下

proto文件

syntax = "proto3";

option go_package=".;proto";

service Greeter {
  // 使用服务端地流模式
  rpc GetStream(StreamReqData) returns (stream StreamResData); // 服务端流模式
  rpc PutStream(stream StreamReqData) returns (StreamResData); // 客户端流模式
  rpc AllStream(stream StreamReqData) returns (stream StreamResData); // 双向流模式
}


message StreamReqData {
  string data = 1;
}

message StreamResData {
  string data = 1;
}

Server

package main

import (
	"fmt"
	"go-grpc/stream_grpc_test/proto"
	"google.golang.org/grpc"
	"io"
	"log"
	"net"
	"sync"
	"time"
)

const PORT = ":8888"

type server struct {
	proto.UnimplementedGreeterServer
}

// GetStream 服务端流模式
func (s *server) GetStream(req *proto.StreamReqData, res proto.Greeter_GetStreamServer) error {
	log.Printf("Received GetStream request: %v", req.Data)
	i := 0
	for {
		err := res.Send(&proto.StreamResData{Data: fmt.Sprintf("Current time: %v", time.Now())})
		if err != nil {
			log.Printf("Error sending stream data: %v", err)
			return err
		}
		time.Sleep(time.Second)
		i++
		if i > 10 {
			log.Println("Stream completed.")
			break
		}
	}
	return nil
}

// PutStream 客户端流模式
func (s *server) PutStream(cliStr proto.Greeter_PutStreamServer) error {
	log.Println("PutStream started")
	for {
		req, err := cliStr.Recv()
		if err == io.EOF {
			log.Println("Client finished sending stream data.")
			return cliStr.SendAndClose(&proto.StreamResData{Data: "Received all client stream data."})
		}
		if err != nil {
			log.Printf("Error receiving client stream data: %v", err)
			return err
		}
		log.Printf("Received from client: %v", req.Data)
	}
}

// AllStream 双向流
func (s *server) AllStream(allStr proto.Greeter_AllStreamServer) error {
	log.Println("AllStream started")
	var wg sync.WaitGroup
	wg.Add(2)

	// 接受客户端的流
	go func() {
		defer wg.Done()
		for {
			data, err := allStr.Recv()
			if err == io.EOF {
				log.Println("Client finished sending in AllStream.")
				break
			}
			if err != nil {
				log.Printf("Error receiving from client: %v", err)
				break
			}
			log.Printf("Received from client: %v", data.Data)
		}
	}()

	// 发送数据给客户端
	go func() {
		defer wg.Done()
		for i := 0; i <= 10; i++ {
			data := &proto.StreamResData{
				Data: fmt.Sprintf("Server data %d", i),
			}
			err := allStr.Send(data)
			if err != nil {
				log.Printf("Error sending to client: %v", err)
				break
			}
			time.Sleep(time.Second)
		}
		log.Println("Server finished sending in AllStream.")
	}()

	wg.Wait()
	return nil
}

func main() {
	// 启动TCP监听
	lis, err := net.Listen("tcp", PORT)
	if err != nil {
		log.Fatalf("Failed to listen: %v", err)
	}
	defer lis.Close()

	// 创建gRPC服务器
	grpcServer := grpc.NewServer()

	// 注册服务
	proto.RegisterGreeterServer(grpcServer, &server{})

	// 启动gRPC服务
	log.Printf("gRPC server listening on %s", PORT)
	if err := grpcServer.Serve(lis); err != nil {
		log.Fatalf("Failed to serve: %v", err)
	}
}

Client

package main

import (
	"context"
	"fmt"
	"go-grpc/stream_grpc_test/proto"
	"google.golang.org/grpc"
	"google.golang.org/grpc/credentials/insecure"
	"io"
	"log"
	"sync"
	"time"
)

// 创建 gRPC 连接,减少重复代码
func createConnection() (*grpc.ClientConn, error) {
	conn, err := grpc.Dial("127.0.0.1:8888", grpc.WithTransportCredentials(insecure.NewCredentials()))
	if err != nil {
		return nil, err
	}
	return conn, nil
}

// 服务端流模式
func stream_server() {
	conn, err := createConnection()
	if err != nil {
		log.Fatalf("Failed to create connection: %v", err)
	}
	defer conn.Close()

	c := proto.NewGreeterClient(conn)
	res, err := c.GetStream(context.Background(), &proto.StreamReqData{
		Data: "DJ",
	})
	if err != nil {
		log.Fatalf("Error calling GetStream: %v", err)
	}

	for {
		resp, err := res.Recv()
		if err == io.EOF {
			log.Println("Stream ended")
			break
		}
		if err != nil {
			log.Fatalf("Error receiving stream: %v", err)
		}
		fmt.Println(resp)
	}
}

// 客户端流模式
func stream_client() {
	conn, err := createConnection()
	if err != nil {
		log.Fatalf("Failed to create connection: %v", err)
	}
	defer conn.Close()

	c := proto.NewGreeterClient(conn)
	puts, err := c.PutStream(context.Background())
	if err != nil {
		log.Fatalf("Error starting PutStream: %v", err)
	}

	for i := 0; i <= 10; i++ {
		data := &proto.StreamReqData{
			Data: fmt.Sprintf("Client time: %v", time.Now()),
		}
		if err := puts.Send(data); err != nil {
			log.Fatalf("Error sending to stream: %v", err)
		}
		time.Sleep(time.Second)
	}

	// 结束流并关闭连接
	if _, err := puts.CloseAndRecv(); err != nil {
		log.Fatalf("Error closing client stream: %v", err)
	}
}

// 双向流模式
func stream_all() {
	conn, err := createConnection()
	if err != nil {
		log.Fatalf("Failed to create connection: %v", err)
	}
	defer conn.Close()

	c := proto.NewGreeterClient(conn)
	allStr, err := c.AllStream(context.Background())
	if err != nil {
		log.Fatalf("Error starting AllStream: %v", err)
	}

	wg := sync.WaitGroup{}
	wg.Add(2)

	// 发送数据
	go func() {
		defer wg.Done()
		for i := 0; i <= 10; i++ {
			err := allStr.Send(&proto.StreamReqData{Data: fmt.Sprintf("Client send %v", i)})
			if err != nil {
				log.Printf("Error sending stream data: %v", err)
				break
			}
			time.Sleep(time.Second)
		}
		// 在发送结束后,调用 CloseSend 通知服务端发送完毕
		allStr.CloseSend()
	}()

	// 接收数据
	go func() {
		defer wg.Done()
		for {
			data, err := allStr.Recv()
			if err == io.EOF { // 结束的标志
				log.Println("Stream ended")
				break
			}
			if err != nil {
				log.Printf("Error receiving stream data: %v", err)
				break
			}
			fmt.Println("Received from server:", data.Data)
		}
	}()

	wg.Wait()
}

func main() {
	// 测试双向流模式
	stream_all()
}
github