客户端编写一系列消息并将其发送到服务器,同样使用提供的流。一旦客户端写完消息,它就等待服务器读取消息并返回响应
gRPC再次保证了单个RPC调用中的消息排序
。
假设希望在订单管理服务中添加新的 updateOrders 方法,从而更新一个订单集合,
如图所示。在这里,我们想以消息流的形式发送订单列表到服务器端,服务器端会处理这个流并发送一条带有已更新订单状态的消息给客户端。
// 服务端 推送流
rpc GetStream (StreamReqData) returns (stream StreamResData){}
目录结构
在proto文件夹新建clientstream.proto文件
syntax = "proto3";
option go_package= "./;clientstream";
package clientstream;
service Greeter {
// 客户端 推送流
rpc PutStream (stream StreamReqData) returns (StreamResData){}
}
// request
message StreamReqData {
string data = 1;
}
// response
message StreamResData {
string data = 1;
}
编译clientstream.proto
在上级目录新建clientstream文件夹 然后编译
protoc.exe --go_out=../clientstream --go-grpc_out=../clientstream clientstream.proto
会 在clientstream文件夹下生成: clientstream_grpc.pb.go clientstream.pb.go
实现GRPC服务端
在clientstream文件夹下创建clientstreamservice.go 在这个文件中我们要实现GreeterServer接口的如下方法
这代表GRPC服务对象
// GreeterServer is the server API for Greeter service.
// All implementations must embed UnimplementedGreeterServer
// for forward compatibility
type GreeterServer interface {
// 客户端 推送流
PutStream(Greeter_PutStreamServer) error
mustEmbedUnimplementedGreeterServer()
}
第二个方法不重要可以什么都不写
主要是实现第一个PutStream方法 : 参数是个Greeter_PutStreamServer接口 它有两个公开方法 以及它也继承了 grpc.ServerStream的接口
这里注意
RegisterGreeterServer(注册服务)的工厂方法 以及 var Greeter_ServiceDesc = grpc.ServiceDesc的服务结构
这两个就是把服务相关的结构关联了起来
func _Greeter_PutStream_Handler(srv interface{}, stream grpc.ServerStream) error {
return srv.(GreeterServer).PutStream(&greeterPutStreamServer{stream})
}
type Greeter_PutStreamServer interface {
SendAndClose(*StreamResData) error
Recv() (*StreamReqData, error)
grpc.ServerStream
}
type greeterPutStreamServer struct {
grpc.ServerStream
}
func (x *greeterPutStreamServer) SendAndClose(m *StreamResData) error {
return x.ServerStream.SendMsg(m)
}
func (x *greeterPutStreamServer) Recv() (*StreamReqData, error) {
m := new(StreamReqData)
if err := x.ServerStream.RecvMsg(m); err != nil {
return nil, err
}
return m, nil
}
func RegisterGreeterServer(s grpc.ServiceRegistrar, srv GreeterServer) {
s.RegisterService(&Greeter_ServiceDesc, srv)
}
// Greeter_ServiceDesc is the grpc.ServiceDesc for Greeter service.
// It's only intended for direct use with grpc.RegisterService,
// and not to be introspected or modified (even as a copy)
var Greeter_ServiceDesc = grpc.ServiceDesc{
ServiceName: "clientstream.Greeter",
HandlerType: (*GreeterServer)(nil),
Methods: []grpc.MethodDesc{},
Streams: []grpc.StreamDesc{
{
StreamName: "PutStream",
Handler: _Greeter_PutStream_Handler,
ClientStreams: true,
},
},
Metadata: "clientstream.proto",
}
clientstreamservice.go内容如下
package clientstream
import "log"
type ClientstreamServer struct {
}
func (s *ClientstreamServer) mustEmbedUnimplementedGreeterServer() {}
// 客户端 单向流-- 服务器端接收客户端发送过来的流数据
func (s *ClientstreamServer) PutStream(cliStr Greeter_PutStreamServer) error {
for {
if tem, err := cliStr.Recv(); err == nil {
log.Println(tem)
} else {
log.Println("break, err :", err)
break
}
}
return nil
}
在braceapi/grpc/test下创建clientstream文件夹
在braceapi/grpc/test/clientstream下面创建clientstreamService.go文件 用于启动服务端
clientstreamService.go内容如下
//go:build ignore
package main
import (
"net"
"braceapi.hgbaoxian.cn/braceapi/grpc/grpcserver/clientstream"
"google.golang.org/grpc"
)
func main() {
//创建一个grpc服务器
s := grpc.NewServer()
//注册GRPC服务
clientstream.RegisterGreeterServer(s, &clientstream.ClientstreamServer{})
//监听
lis, err := net.Listen("tcp", ":50051")
if err != nil {
return
}
//启动服务
s.Serve(lis)
}
实现GRPC客户端
重要的接口或结构如下
type GreeterClient interface {
// 客户端 推送流
PutStream(ctx context.Context, opts ...grpc.CallOption) (Greeter_PutStreamClient, error)
}
type greeterClient struct {
cc grpc.ClientConnInterface
}
func NewGreeterClient(cc grpc.ClientConnInterface) GreeterClient {
return &greeterClient{cc}
}
func (c *greeterClient) PutStream(ctx context.Context, opts ...grpc.CallOption) (Greeter_PutStreamClient, error) {
stream, err := c.cc.NewStream(ctx, &Greeter_ServiceDesc.Streams[0], Greeter_PutStream_FullMethodName, opts...)
if err != nil {
return nil, err
}
x := &greeterPutStreamClient{stream}
return x, nil
}
type Greeter_PutStreamClient interface {
Send(*StreamReqData) error
CloseAndRecv() (*StreamResData, error)
grpc.ClientStream
}
type greeterPutStreamClient struct {
grpc.ClientStream
}
func (x *greeterPutStreamClient) Send(m *StreamReqData) error {
return x.ClientStream.SendMsg(m)
}
func (x *greeterPutStreamClient) CloseAndRecv() (*StreamResData, error) {
if err := x.ClientStream.CloseSend(); err != nil {
return nil, err
}
m := new(StreamResData)
if err := x.ClientStream.RecvMsg(m); err != nil {
return nil, err
}
return m, nil
}
因为greeterClient这个结构已经实现了GreeterClient的接口
而这个greeterClient结构是未公开的 所以为了方便,这里有个NewGreeterClient工厂函数返回了这个结构的引用
所以客户端主要工作是:
通过NewGreeterClient方法实例化一个GRPC客户端对象greeterClient 然后调用greeterClient结构的PutStream方法
返回一个grpc.ClientStream客户端数据流对象 然后调用Send方法 向服务端推送数据
把Send方法放入for死循环中 可以不断地读取向服务端发送流数据
在braceapi/grpc/test/clientstream下面创建clientstreamClient.go文件 用于启动服务端
clientstreamClient.go内容如下
//go:build ignore
package main
import (
"context"
"time"
"braceapi.hgbaoxian.cn/braceapi/grpc/grpcserver/clientstream"
"google.golang.org/grpc"
)
func main() {
// 通过grpc库,建立一个连接
conn, err := grpc.Dial("localhost:50051", grpc.WithInsecure())
if err != nil {
return
}
defer conn.Close()
//通过建立的连接生成一个client对象
c := clientstream.NewGreeterClient(conn)
//客户端持续推送数据流到服务端
putRes, _ := c.PutStream(context.Background())
i := 1
for {
i++
putRes.Send(&clientstream.StreamReqData{Data: "Psych"})
time.Sleep(time.Second)
if i > 10 {
break
}
}
}
可以看出 客户端流和服务端流相反 , 客户端发送流数据 服务端接收流数据
服务端流则是 服务端发送流数据 客户端接收流数据
启动测试:首先启动服务端 然后启动客户端