gRPC的四种模式
Go代码演示如下
proto文件
syntax = "proto3";
option go_package=".;proto";
service Greeter {
// 使用服务端地流模式
rpc GetStream(StreamReqData) returns (stream StreamResData); // 服务端流模式
rpc PutStream(stream StreamReqData) returns (StreamResData); // 客户端流模式
rpc AllStream(stream StreamReqData) returns (stream StreamResData); // 双向流模式
}
message StreamReqData {
string data = 1;
}
message StreamResData {
string data = 1;
}
Server
package main
import (
"fmt"
"go-grpc/stream_grpc_test/proto"
"google.golang.org/grpc"
"io"
"log"
"net"
"sync"
"time"
)
const PORT = ":8888"
type server struct {
proto.UnimplementedGreeterServer
}
// GetStream 服务端流模式
func (s *server) GetStream(req *proto.StreamReqData, res proto.Greeter_GetStreamServer) error {
log.Printf("Received GetStream request: %v", req.Data)
i := 0
for {
err := res.Send(&proto.StreamResData{Data: fmt.Sprintf("Current time: %v", time.Now())})
if err != nil {
log.Printf("Error sending stream data: %v", err)
return err
}
time.Sleep(time.Second)
i++
if i > 10 {
log.Println("Stream completed.")
break
}
}
return nil
}
// PutStream 客户端流模式
func (s *server) PutStream(cliStr proto.Greeter_PutStreamServer) error {
log.Println("PutStream started")
for {
req, err := cliStr.Recv()
if err == io.EOF {
log.Println("Client finished sending stream data.")
return cliStr.SendAndClose(&proto.StreamResData{Data: "Received all client stream data."})
}
if err != nil {
log.Printf("Error receiving client stream data: %v", err)
return err
}
log.Printf("Received from client: %v", req.Data)
}
}
// AllStream 双向流
func (s *server) AllStream(allStr proto.Greeter_AllStreamServer) error {
log.Println("AllStream started")
var wg sync.WaitGroup
wg.Add(2)
// 接受客户端的流
go func() {
defer wg.Done()
for {
data, err := allStr.Recv()
if err == io.EOF {
log.Println("Client finished sending in AllStream.")
break
}
if err != nil {
log.Printf("Error receiving from client: %v", err)
break
}
log.Printf("Received from client: %v", data.Data)
}
}()
// 发送数据给客户端
go func() {
defer wg.Done()
for i := 0; i <= 10; i++ {
data := &proto.StreamResData{
Data: fmt.Sprintf("Server data %d", i),
}
err := allStr.Send(data)
if err != nil {
log.Printf("Error sending to client: %v", err)
break
}
time.Sleep(time.Second)
}
log.Println("Server finished sending in AllStream.")
}()
wg.Wait()
return nil
}
func main() {
// 启动TCP监听
lis, err := net.Listen("tcp", PORT)
if err != nil {
log.Fatalf("Failed to listen: %v", err)
}
defer lis.Close()
// 创建gRPC服务器
grpcServer := grpc.NewServer()
// 注册服务
proto.RegisterGreeterServer(grpcServer, &server{})
// 启动gRPC服务
log.Printf("gRPC server listening on %s", PORT)
if err := grpcServer.Serve(lis); err != nil {
log.Fatalf("Failed to serve: %v", err)
}
}
Client
package main
import (
"context"
"fmt"
"go-grpc/stream_grpc_test/proto"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"io"
"log"
"sync"
"time"
)
// 创建 gRPC 连接,减少重复代码
func createConnection() (*grpc.ClientConn, error) {
conn, err := grpc.Dial("127.0.0.1:8888", grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
return nil, err
}
return conn, nil
}
// 服务端流模式
func stream_server() {
conn, err := createConnection()
if err != nil {
log.Fatalf("Failed to create connection: %v", err)
}
defer conn.Close()
c := proto.NewGreeterClient(conn)
res, err := c.GetStream(context.Background(), &proto.StreamReqData{
Data: "DJ",
})
if err != nil {
log.Fatalf("Error calling GetStream: %v", err)
}
for {
resp, err := res.Recv()
if err == io.EOF {
log.Println("Stream ended")
break
}
if err != nil {
log.Fatalf("Error receiving stream: %v", err)
}
fmt.Println(resp)
}
}
// 客户端流模式
func stream_client() {
conn, err := createConnection()
if err != nil {
log.Fatalf("Failed to create connection: %v", err)
}
defer conn.Close()
c := proto.NewGreeterClient(conn)
puts, err := c.PutStream(context.Background())
if err != nil {
log.Fatalf("Error starting PutStream: %v", err)
}
for i := 0; i <= 10; i++ {
data := &proto.StreamReqData{
Data: fmt.Sprintf("Client time: %v", time.Now()),
}
if err := puts.Send(data); err != nil {
log.Fatalf("Error sending to stream: %v", err)
}
time.Sleep(time.Second)
}
// 结束流并关闭连接
if _, err := puts.CloseAndRecv(); err != nil {
log.Fatalf("Error closing client stream: %v", err)
}
}
// 双向流模式
func stream_all() {
conn, err := createConnection()
if err != nil {
log.Fatalf("Failed to create connection: %v", err)
}
defer conn.Close()
c := proto.NewGreeterClient(conn)
allStr, err := c.AllStream(context.Background())
if err != nil {
log.Fatalf("Error starting AllStream: %v", err)
}
wg := sync.WaitGroup{}
wg.Add(2)
// 发送数据
go func() {
defer wg.Done()
for i := 0; i <= 10; i++ {
err := allStr.Send(&proto.StreamReqData{Data: fmt.Sprintf("Client send %v", i)})
if err != nil {
log.Printf("Error sending stream data: %v", err)
break
}
time.Sleep(time.Second)
}
// 在发送结束后,调用 CloseSend 通知服务端发送完毕
allStr.CloseSend()
}()
// 接收数据
go func() {
defer wg.Done()
for {
data, err := allStr.Recv()
if err == io.EOF { // 结束的标志
log.Println("Stream ended")
break
}
if err != nil {
log.Printf("Error receiving stream data: %v", err)
break
}
fmt.Println("Received from server:", data.Data)
}
}()
wg.Wait()
}
func main() {
// 测试双向流模式
stream_all()
}