代码生成是一种常用的生产效率技术. 广义上看, 编译器通过高级语言生产出低级语言或者机器码, 也可以理解为一种代码生成. 这种技术在现代的工程实践里往往比较常见: IDE 通常自带了一些常见的单元测试生成工具; 根据特定的 snippet 可以生成比较常用的代码片段; 在 go 语言中, 由于目前缺乏对范型对支持, 为了节约重复代码, 通常实现了类似技术也是使用代码生成.
在 protobuf 生态中, 代码生成更为常见, 一般来说通过一个 proto 文件, protoc 工具可以生成各个语言的代码, 用于搭建一个基于 protobuf 或者 grpc 的工具. protoc 同时支持以插件等方式, 对 proto 文件进行拓展, 生成丰富的代码格式.
代码生成通常第一步是分析生成模板或者 DSL 文件的语法结构, 第二步采用字符串拼凑或者模板替换的方式生成代码.
golang/protobuf
问题:
1. 代码生成怎么做?
2. 如何实现一个类似的 parser
golang/protobuf 是 golang 对 protobuf 对支持对官方实现, 用于从 proto 文件生成对应对 go 版本代码文件.
入口在 protoc-gen-go/main.go, 本质是显示 protoc 对一个插件, 而 protoc 对于插件对实现比较直接, protoc 会按照 protobuf 的相关定义解析 protoc 文件, 然后把解析的结果传入插件的 stdin, 然后从插件的 stdout 获取生成文件的所有信息, 写文件, 完成生成.
代码结构
两个部分:
实现 protoc 的 protocol compiler plugin, 作用是从 proto 文件生成 go 文件, 通过这些 go 文件, 可以读取, 操作 proto buffer 里的内容
A Library: 实现了 encoding (marshaling), decoding (unmarshaling), and accessing protocol buffers.
├── descriptor:
├── jsonpb:
├── proto: library 部分
├── protoc-gen-go: 插件部分
├── ptypes: protobuf 里面的各种类型
proto: library 部分
重要的结构和函数
- // Message is implemented by generated protocol buffer messages.
- // Message 代表生成的数据结构,
- // lib.go
- type Message interface {
- Reset() // 重置为 0 值结构体
- String() string // string 格式的 pb 内容, 做了 unmarshal
- ProtoMessage() // 没有实现
- }
- // 内部使用了一个 Buffer 做数据存储, 以 marshal\unmarshal
- // lib.go
- type Buffer struct {
- buf []byte // encode/decode byte stream
- index int // read point
- deterministic bool // deterministic 模式 (尽力)same message =》 same bytes
- }
- // varint-encoded integer => int32, int64, uint32, uint64, bool, and enum protocol buffer types.
- // 类似的函数还有 DecodeFixed64, DecodeFixed32, DecodeZigzag64, DecodeRawBytes, DecodeStringBytes...
- // 和数据格式设计有关: 参考 https://blog.csdn.net/erlib/article/details/46345111
- // 里面比较关键的设计有:
- // 1. 基于 128bits 的数值存储方式 (Base 128 Varints): 每块数据由接连的若干个字节表示 (小的数据用 1 个字节就可以表示), 每个字节最高位标识本块数据是否结束 (1: 未结束, 0: 结束), 低 7 位表示数据内容.
- // 2. 基于序号的协议字段映射 (类似 key-value 结构) 序列号是 key, 很关键
- // 3. 基于无符号数的带符号数表示 (ZigZag 编码)
- // 4. 协议数据结构 data1_head + data1 ... data 表示一个数据如 int 0yyyyxxx x: 数据类型 y: 字段序号
- // decode.go
- func (p *Buffer) DecodeVarint() (x uint64, err error)
- // tag 指的是 字段序号
- type tagMap struct {
- fastTags []int // 优化, 默认为 - 1, 如果对应 index 值 >=0, 那么值就是一个字段序号
- slowTags map[int]int
- }
- // 代表结构体 struct
- // StructProperties represents properties for all the fields of a struct.
- // decoderTags and decoderOrigNames should only be used by the decoder.
- type StructProperties struct {
- Prop []*Properties // properties for each field
- reqCount int // required count
- decoderTags tagMap // map from proto tag to struct field number
- decoderOrigNames map[string]int // map from original name to struct field number
- order []int // list of struct field numbers in tag order
- // OneofTypes contains information about the oneof fields in this message.
- // It is keyed by the original name of a field.
- OneofTypes map[string]*OneofProperties
- }
- // 代表结构体的一个 field
- // Properties represents the protocol-specific behavior of a single struct field.
- // 例如一个字段转化成 go 加上了 `protobuf:"varint,1,opt,name=Id,proto3" json:"Id,omitempty"` 这里面的就是 Properties
- // 有 Parse 函数能解析, String 函数生成, 定义 proto => 生成 => go 结构体 => 解析 => 数据到二进制
- type Properties struct {
- Name string // name of the field, for error messages
- OrigName string // original name before protocol compiler (always set)
- JSONName string // name to use for JSON; determined by protoc
- Wire string
- WireType int
- Tag int
- Required bool
- Optional bool
- Repeated bool
- Packed bool // relevant for repeated primitives only
- Enum string // set for enum types only
- proto3 bool // whether this is known to be a proto3 field
- oneof bool // whether this is a oneof field
- Default string // default value
- HasDefault bool // whether an explicit default was provided
- stype reflect.Type // set for struct types only
- sprop *StructProperties // set for struct types only
- mtype reflect.Type // set for map types only
- MapKeyProp *Properties // set for map types only
- MapValProp *Properties // set for map types only
- }
- // 生成的代码里面使用这个结构做如 Unmarshal/Marshal/Merge 等等操作
- // 一个例子: xxx_messageInfo_ServiceConfig 就是 InternalMessageInfo 类型
- // func (m *S) XXX_Unmarshal(b []byte) error {
- // return xxx_messageInfo_S.Unmarshal(m, b)
- // }
- // InternalMessageInfo is a type used internally by generated .pb.go files.
- // This type is not intended to be used by non-generated code.
- // This type is not subject to any compatibility guarantee.
- type InternalMessageInfo struct {
- marshal *marshalInfo
- unmarshal *unmarshalInfo
- merge *mergeInfo
- discard *discardInfo
- }
- // table_unmarshal.go
- // Unmarshal is the entry point from the generated .pb.go files.
- // This function is not intended to be used by non-generated code.
- // This function is not subject to any compatibility guarantee.
- // msg contains a pointer to a protocol buffer struct.
- // b is the data to be unmarshaled into the protocol buffer.
- // a is a pointer to a place to store cached unmarshal information.
- func (a *InternalMessageInfo) Unmarshal(msg Message, b []byte) error {
- // Load the unmarshal information for this message type.
- // The atomic load ensures memory consistency.
- u := atomicLoadUnmarshalInfo(&a.unmarshal)
- if u == nil {
- // Slow path: find unmarshal info for msg, update a with it.
- u = getUnmarshalInfo(reflect.TypeOf(msg).Elem())
- atomicStoreUnmarshalInfo(&a.unmarshal, u)
- }
- // Then do the unmarshaling.
- err := u.unmarshal(toPointer(&msg), b)
- return err
- }
- // 一个 go 结构 UnMarshal => InternalMessageInfo.UnMarshal => unmarshalInfo.UnMarshal (类型 specific, 有 cache)
- // => 根据 field,tag, 对不同类型有 unmarshalFieldInfo.unmarshaler 用对应的 unmarshaler 做 unmarshal
- // => decodeVarint (主要就用这个函数, 因为各种数字都是这个函数处理, string 类型直接读 buf)
- // 这里的 decode 相关函数和 deocode.go 里面的不一样. 这里会写到 go 结构体的 field 里面去 (pointer)
- // decode.go 里面的函数可以作为 lib 用来做 debug
- // text_parser.go
- // text.go 是解析文本格式的 protobuf 的方法, 并不是 parse .proto 文件
proto-gen-go 插件部分
- grpc
- // grpc/grpc.go
- // proto-gen-go 的 plugin, 生成 go 结构体的同时, 生成 grpc 的代码, server/client, 以及需要用户实现的 interface
- // 生成的方式是文本拼接, 可读性很差, 没有 template
- // 比如下面的例子
- func (g *grpc) generateClientMethod(servName, fullServName, serviceDescVar string, method *pb.MethodDescriptorProto, descExpr string) {
- // ....
- g.P("func (c *", unexport(servName), "Client)", g.generateClientSignature(servName, method), "{")
- if !method.GetServerStreaming() && !method.GetClientStreaming() {
- g.P("out := new(", outType, ")")
- // TODO: Pass descExpr to Invoke.
- g.P(`err := c.cc.Invoke(ctx, "`, sname, `", in, out, opts...)`)
- g.P("if err != nil { return nil, err }")
- g.P("return out, nil")
- g.P("}")
- g.P()
- return
- }
- // ....
- }
- plugin
protoc 所有插件拿到的就是一个 CodeGeneratorRequest 的 protobuf 二进制结构, 插件返回 CodeGeneratorResponse 结构
- // The version number of protocol compiler.
- message Version {
- optional int32 major = 1;
- optional int32 minor = 2;
- optional int32 patch = 3;
- // A suffix for alpha, beta or rc release, e.g., "alpha-1", "rc2". It should
- // be empty for mainline stable releases.
- optional string suffix = 4;
- }
- // An encoded CodeGeneratorRequest is written to the plugin's stdin.
- message CodeGeneratorRequest {
- // 所有的 proto 文件
- repeated string file_to_generate = 1;
- // 插件运行参数
- optional string parameter = 2;
- // proto 文件解析后的描述结构, 带了所有 proto 文件的信息, 如有多少结构, 函数定义, 结构有多少字段, 每个字段定义等
- repeated FileDescriptorProto proto_file = 15;
- // The version number of protocol compiler.
- optional Version compiler_version = 3;
- }
- // The plugin writes an encoded CodeGeneratorResponse to stdout.
- message CodeGeneratorResponse {
- // Error message. If non-empty, code generation failed. T
- optional string error = 1;
- // Represents a single generated file.
- message File {
- // 生成的文件名
- optional string name = 1;
- // 表示文件已经存在, 内容插入到文件的一个位置去
- optional string insertion_point = 2;
- // 文件内容
- optional string content = 15;
- }
- repeated File file = 15;
- }
FileDescriptorProto 文件由 descriptor.proto 定义, 这个是各个语言生成工具的基础.
- // Describes a complete .proto file.
- message FileDescriptorProto {
- optional string name = 1; // file name, relative to root of source tree
- optional string package = 2; // e.g. "foo", "foo.bar", etc.
- repeated string dependency = 3;
- repeated int32 public_dependency = 10;.
- repeated int32 weak_dependency = 11;
- // All top-level definitions in this file.
- repeated DescriptorProto message_type = 4;
- repeated EnumDescriptorProto enum_type = 5;
- repeated ServiceDescriptorProto service = 6;
- repeated FieldDescriptorProto extension = 7;
- optional FileOptions options = 8;
- optional SourceCodeInfo source_code_info = 9;
- optional string syntax = 12;
- }
- ...
- ...
生成的主要函数在 protoc-gen-go/generator/generator.go, 生成方式采用字符串拼凑的方式, 字符串拼凑的方式较为灵活, 但是可读性比较差.
- // Fill the response protocol buffer with the generated output for all the files we're
- // supposed to generate.
- func (g *Generator) generate(file *FileDescriptor) {
- //...
- g.P("// This is a compile-time assertion to ensure that this generated file")
- g.P("// is compatible with the proto package it is being compiled against.")
- g.P("// A compilation error at this line likely means your copy of the")
- g.P("// proto package needs to be updated.")
- g.P("const _ =", g.Pkg["proto"], ".ProtoPackageIsVersion", generatedCodeVersion, "// please upgrade the proto package")
- g.P()
- for _, td := range g.file.imp {
- g.generateImported(td)
- }
- for _, enum := range g.file.enum {
- g.generateEnum(enum)
- }
- for _, desc := range g.file.desc {
- // Don't generate virtual messages for maps.
- if desc.GetOptions().GetMapEntry() {
- continue
- }
- g.generateMessage(desc)
- }
- for _, ext := range g.file.ext {
- g.generateExtension(ext)
- }
- g.generateInitFunction()
- g.generateFileDescriptor(file)
- // Run the plugins before the imports so we know which imports are necessary.
- g.runPlugins(file)
- // Generate header and imports last, though they appear first in the output.
- rem := g.Buffer
- remAnno := g.annotations
- g.Buffer = new(bytes.Buffer)
- g.annotations = nil
- g.generateHeader()
- g.generateImports()
- // ...
- g.Write(rem.Bytes())
- // Reformat generated code and patch annotation locations.
- fset := token.NewFileSet()
- // ...
- fileAST, err := parser.ParseFile(fset, "", original, parser.ParseComments)
- // ...
- ast.SortImports(fset, fileAST)
- // ...
- if g.annotateCode {
- ...
- }
- }
generateMessage 用于生成类型, 已经对应的函数等内容
- // Generate the type, methods and default constant definitions for this Descriptor.
- func (g *Generator) generateMessage(message *Descriptor) {
- }
- // 对于一个
- message Book {
- string title = 1;
- bytes raw_data = 2;
- }
- // 会生成
- type Book struct {
- Title string `protobuf:"bytes,1,opt,name=title,proto3" json:"title,omitempty"`
- RawData []byte `protobuf:"bytes,2,opt,name=raw_data,json=rawData,proto3" json:"raw_data,omitempty"`
- XXX_NoUnkeyedLiteral struct{} `json:"-"`
- XXX_unrecognized []byte `json:"-"`
- XXX_sizecache int32 `json:"-"`
- }
- func (m *Book) Reset() { *m = Book{} }
- func (m *Book) String() string { return proto.CompactTextString(m) }
- func (*Book) ProtoMessage() {}
- func (*Book) Descriptor() ([]byte, []int) {
- return fileDescriptor_ab04eb4084a521db, []int{1}
- }
- func (m *Book) XXX_Unmarshal(b []byte) error {
- return xxx_messageInfo_Book.Unmarshal(m, b)
- }
- func (m *Book) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
- return xxx_messageInfo_Book.Marshal(b, m, deterministic)
- }
- func (m *Book) XXX_Merge(src proto.Message) {
- xxx_messageInfo_Book.Merge(m, src)
- }
- func (m *Book) XXX_Size() int {
- return xxx_messageInfo_Book.Size(m)
- }
- func (m *Book) XXX_DiscardUnknown() {
- xxx_messageInfo_Book.DiscardUnknown(m)
- }
- var xxx_messageInfo_Book proto.InternalMessageInfo
- func (m *Book) GetTitle() string {
- if m != nil {
- return m.Title
- }
- return ""
- }
- func (m *Book) GetRawData() []byte {
- if m != nil {
- return m.RawData
- }
- return nil
- }
对于一个 proto.go 文件, 不足以完成所有 Unmarshal, Marshal 功能, 生成的 proto.go 文件里面会引用 GitHub.com/golang/protobuf/proto, 作为一个 go 语言的 proto 功能库, 配合完成如 Unmarshal, Marshal 功的功能, 对于 grpc 的支持更是如此
proto.plugin -> go 文件 引用 -> proto/lib
grpc-ecosystem/grpc-gateway
grpc-gateway 是 protoc 的另一个插件, 同时他对 protobuf 对描述也做了拓展, 用于生成 REST 风格对 http 函数和 server, 代码中建 http 请求打包转发给 grpc server, 再将返回解包成 JSON 格式, 完成对 http 对支持. 通过这个插件, 可以只需要添加很少量对代码, 给一个 gprc 服务添加 http 支持.
和 golang/protobuf 类似, 代码主要分成两个部分, plugin 部分, 用于生成代码; library 部分, 用于生成代码使用, 完成较为复杂的功能
plugin 部分: protoc-gen-grpc-gateway
和 golang/protobuf 不同, protoc-gen-grpc-gateway 使用了模版来生成代码, 这样的好处是可读性, 可修改性会高很多, 通过一种或者多种模版, 对应解析出来的语法结构定义中的变量, 渲染出生成代码.
protoc-gen-grpc-gateway/gengateway
比较重要的逻辑在 generator.go
- // 这是核心的生成输入文件的语法结构定义, 可以看出和 golang/protobuf 是类似的, 无非是 protoc 解析完成的 proto 语法结构
- // File wraps descriptor.FileDescriptorProto for richer features.
- type File struct {
- *descriptor.FileDescriptorProto
- // GoPkg is the go package of the go file generated from this file..
- GoPkg GoPackage
- // Messages is the list of messages defined in this file.
- Messages []*Message
- // Enums is the list of enums defined in this file.
- Enums []*Enum
- // Services is the list of services defined in this file.
- Services []*Service
- }
- // 核心处理函数, 可以看出主要由三个 template 完成 headerTemplate; handlerTemplate; trailerTemplate
- func applyTemplate(p param, reg *descriptor.Registry) (string, error) {
- w := bytes.NewBuffer(nil)
- if err := headerTemplate.Execute(w, p); err != nil {
- return "", err
- }
- //...
- for _, svc := range p.Services {
- for _, meth := range svc.Methods {
- for _, b := range meth.Bindings {
- methodWithBindingsSeen = true
- if err := handlerTemplate.Execute(w, binding{
- //...
- }
- }
- ...
- if err := trailerTemplate.Execute(w, tp); err != nil {
- return "", err
- }
- return w.String(), nil
- }
- // headerTemplate 和 handlerTemplate 的部分内容
- headerTemplate = template.Must(template.New("header").Parse(`
- // Code generated by protoc-gen-grpc-gateway. DO NOT EDIT.
- // source: {{.GetName}}
- /*
- Package {{.GoPkg.Name}} is a reverse proxy.
- It translates gRPC into RESTful JSON APIs.
- */
- package {{.GoPkg.Name}}
- import (
- {{range $i := .Imports}}{{if $i.Standard}}{{$i | printf "%s\n"}}{{end}}{{end}}
- {{range $i := .Imports}}{{if not $i.Standard}}{{$i | printf "%s\n"}}{{end}}{{end}}
- )
- var _ codes.Code
- var _ io.Reader
- var _ status.Status
- var _ = runtime.String
- var _ = utilities.NewDoubleArray
- `))
- handlerTemplate = template.Must(template.New("handler").Parse(`
- {{if and .Method.GetClientStreaming .Method.GetServerStreaming}}
- {{template "bidi-streaming-request-func" .}}
- {{else if .Method.GetClientStreaming}}
- {{template "client-streaming-request-func" .}}
- {{else}}
- {{template "client-rpc-request-func" .}}
- {{end}}
- `))
实践
实战代码: https://github.com/u2takey/mq-gateway
假设现在有一个类似 go-grpc-gateway 的需求, 不同的是, 输入是一个消息队列, 要求生成的代码完成这样的功能: 从消息队列取数据 -> 根据不同的 topic/service 发送给不同的 grpc server -> 处理完成之后返回给消息队列
为了简单, 沿用 go-grpc-gateway 对 http handler 的定义, 如'post /hello' 只是不把他作为一个 http path, 而是当成一个 mq topic, 比如 post /hello 对于的 topic 即 post_/hello, 返回数据的 topic 为 post_/hello_out
library 部分: runtime
和 grpc-gateway 不同, runtime 的定义变成处理 mq 相关的逻辑, 比如定义, 注册 handler 处理不同的 topic, 转发给对应的 grpc client
- func NewServeMux(amqpURI, exchangeName, exchangeType string) *ServeMux {
- serveMux := &ServeMux{
- exchangeType: exchangeType,
- exchangeName: exchangeName,
- handlers: make(map[string]handler),
- mqClient: newMqclient(amqpURI, exchangeName, exchangeType),
- }
- return serveMux
- }
- // handler 注册函数, 注册一个 h, 接收一个 queue 的 mq 消息, 经过 hanlterfunc 处理后 publish 给 queue
- func (s *ServeMux) Handle(queueName string, h HandlerFunc) {
- handle := handler{queue: queueName, h: h}
- s.handlers[queueName] = handle
- log.Printf("declaring Queue %q", queueName)
- queue, err := s.mqClient.channel.QueueDeclare(
- //...
- log.Printf("declared Queue (%q %d messages, %d consumers), binding to Exchange",
- queue.Name, queue.Messages, queue.Consumers)
- queue_out, err := s.mqClient.channel.QueueDeclare(
- //...
- if err = s.mqClient.channel.QueueBind(
- //..
- if err = s.mqClient.channel.QueueBind(
- //...
- deliveries, err := s.mqClient.channel.Consume(
- //...
- )
- if err != nil {
- log.Panic(err)
- }
- go s.consume(deliveries, handle)
- return
- }
- func (s *ServeMux) consume(deliveries <-chan amqp.Delivery, handle handler) {
- for d := range deliveries {
- out := handle.h(d.Body)
- s.publish(handle.queue, out)
- _ = d.Ack(false)
- }
- log.Printf("handle: deliveries channel closed")
- }
plugin 部分: protoc-gen-mq-gateway
可以使用大部分 protoc-gen-grpc-gateway 的的生成逻辑, 只要修改生成的 template 即可, 修改调其中耦合 http 的逻辑, 输入输出都使用 []byte, 为了简单, 我这里省略的大量元数据, 以及错误处理的逻辑.
使用
proto 定义, 定义了一个 echo service, 希望实现的逻辑是根据 echo 请求, 返回一个结果, 输入返回都通过 mq 完成
- message EchoRequest {
- // common
- string Hello = 1;
- repeated string Names = 2;
- }
- message EchoResponse {
- string Hello = 1;
- }
- service EchoService {
- rpc echo(EchoRequest) returns (EchoResponse) {
- option (google.API.http) = {
- post : "/v1/echo"
- body : "*"
- };
- }
- }
- protoc -I/usr/local/include -I. \
- -I$(GOPATH)/src \
- -I$(GOPATH)/src/GitHub.com/grpc-ecosystem/grpc-gateway/third_party/googleapis \
- --mq-gateway_out=logtostderr=true:. \
- ./simple.proto%
生成代码:
- package main
- import (
- "bytes"
- "context"
- "io"
- "github.com/golang/protobuf/proto"
- "github.com/u2takey/mq-gateway/runtime"
- "github.com/u2takey/mq-gateway/utilities"
- "google.golang.org/grpc"
- "google.golang.org/grpc/codes"
- "google.golang.org/grpc/grpclog"
- "google.golang.org/grpc/status"
- )
- var _ codes.Code
- var _ io.Reader
- var _ status.Status
- var _ = runtime.String
- var _ = utilities.NewDoubleArray
- func request_EchoService_Echo_0(ctx context.Context, marshaler runtime.Marshaler, client EchoServiceClient, req []byte, pathParams map[string]string) (proto.Message, error) {
- var protoReq EchoRequest
- if err := marshaler.NewDecoder(bytes.NewReader(req)).Decode(&protoReq); err != nil && err != io.EOF {
- return nil, status.Errorf(codes.InvalidArgument, "%v", err)
- }
- msg, err := client.Echo(ctx, &protoReq)
- return msg, err
- }
- func RegisterEchoServiceHandlerFromEndpoint(ctx context.Context, mux *runtime.ServeMux, endpoint string, opts []grpc.DialOption) (err error) {
- conn, err := grpc.Dial(endpoint, opts...)
- if err != nil {
- return err
- }
- defer func() {
- if err != nil {
- if cerr := conn.Close(); cerr != nil {
- grpclog.Infof("Failed to close conn to %s: %v", endpoint, cerr)
- }
- return
- }
- go func() {
- <-ctx.Done()
- if cerr := conn.Close(); cerr != nil {
- grpclog.Infof("Failed to close conn to %s: %v", endpoint, cerr)
- }
- }()
- }()
- return RegisterEchoServiceHandler(ctx, mux, conn)
- }
- func RegisterEchoServiceHandler(ctx context.Context, mux *runtime.ServeMux, conn *grpc.ClientConn) error {
- return RegisterEchoServiceHandlerClient(ctx, mux, NewEchoServiceClient(conn))
- }
- func RegisterEchoServiceHandlerClient(ctx context.Context, mux *runtime.ServeMux, client EchoServiceClient) error {
- mux.Handle("POST"+"_/v1/echo", func(r []byte) (out []byte) {
- inboundMarshaler, outboundMarshaler := &runtime.JSONPb{OrigName: true}, &runtime.JSONPb{OrigName: true}
- resp, _ := request_EchoService_Echo_0(ctx, inboundMarshaler, client, r, nil)
- out, _ = outboundMarshaler.Marshal(resp)
- return
- })
- return nil
- }
组合成一个 server
- package main
- import (
- "context"
- "flag"
- "log"
- "net"
- "github.com/u2takey/mq-gateway/runtime"
- "google.golang.org/grpc"
- )
- var (
- uri = flag.String("uri", "amqp://guest:guest@localhost:5672/", "AMQP URI")
- exchangeName = flag.String("exchange", "test-exchange", "Durable AMQP exchange name")
- exchangeType = flag.String("exchange-type", "direct", "Exchange type - direct|fanout|topic|x-custom")
- )
- func main() {
- flag.Parse()
- rpcAddress := "localhost:9008"
- // Establish gateways for incoming HTTP requests.
- mux := runtime.NewServeMux(*uri, *exchangeName, *exchangeType)
- ctx := context.Background()
- dialOpts := []grpc.DialOption{grpc.WithInsecure()}
- err := RegisterEchoServiceHandlerFromEndpoint(ctx, mux, rpcAddress, dialOpts)
- if err != nil {
- log.Panic(err)
- }
- go mux.Start()
- runRPCServer(rpcAddress)
- }
- // RunRPCServer ...
- func runRPCServer(rpcAddress string) {
- listen, err := net.Listen("tcp", rpcAddress)
- if err != nil {
- log.Fatalf("failed to listen: %v", err)
- }
- rpcs := grpc.NewServer()
- RegisterEchoServiceServer(rpcs, NewEchoService())
- err = rpcs.Serve(listen)
- if err != nil {
- log.Fatalf("failed to serve: %v", err)
- }
- }
- // EchoService ...
- type EchoService struct {
- }
- // NewEchoService ...
- func NewEchoService() *EchoService {
- return &EchoService{}
- }
- // DescribeEvent ...
- func (s *EchoService) Echo(ctx context.Context, req *EchoRequest) (res *EchoResponse, err error) {
- return &EchoResponse{
- Hello: req.GetHello() + "" + strings.Join(req.GetNames(),","),
- }, nil
- }
mq-http-gateway, 为了便于测试, 创建一个 http server 这个 server 比较检查, 接收请求, 把请求放入 mq, 同时从 mq 中取结果, 作为 http 请求结果返回
mq-http-gateway 代码
- #!/usr/bin/python
- import pika
- from flask import Flask, request
- import _thread
- import queue
- import uuid
- App = Flask(__name__)
- class Client:
- def __init__(self):
- // ...
- self.channel.basic_consume(
- queue=self.callback_queue,
- on_message_callback=self.on_response,
- auto_ack=True)
- def on_response(self, ch, method, props, body):
- self.response = body
- def call(self, body):
- self.response = None
- self.channel.basic_publish(...)
- while self.response is None:
- self.connection.process_data_events()
- self.connection.close()
- return self.response
- @App.route('/v1/echo', methods=['POST'])
- def echo():
- c = Client()
- return c.call(request.data)
- if __name__ == '__main__':
- credentials = pika.PlainCredentials('root', 'root')
- //...
- channel.queue_declare(queue='POST_/v1/echo', durable=True)
- print("start app")
- App.run(debug=True)
测试
- # 启动 server
- simple Git:(master) ./simple -uri amqp://root:root@xxxx
- 2019/07/28 16:46:48 dialing "amqp://root:root@xxx
- 2019/07/28 16:46:48 got Connection, getting Channel
- 2019/07/28 16:46:48 got Channel, declaring Exchange ("test-exchange")
- 2019/07/28 16:46:48 declaring Queue "POST_/v1/echo"
- 2019/07/28 16:46:48 declared Queue ("POST_/v1/echo" 0 messages, 0 consumers), binding to Exchange
- 2019/07/28 16:46:48 declared Queue ("POST_/v1/echo_out" 0 messages, 0 consumers), binding to Exchange
- 2019/07/28 16:46:48 Starting Consume
- # 在另一个 console 启动 mq-gateway.py
- simple Git:(master) python mq-gateway.py
- start App
- # 在另一个 console 测试, 一切正常!
- simple Git:(master) curl -H "Content-Type: application/json" localhost:5000/v1/echo -d '{"Hello":"x","Names":["a","b"] }'
- {"Hello":"x a,b"}%
来源: https://www.qcloud.com/developer/article/1474505