安装
下载源码包
go get GitHub.com/aliyun/aliyun-tablestore-go-sdk/tunnel
安装依赖
可以在 tunnel 目录下使用 dep 安装依赖
安装 https://github.com/golang/dep#installation
dep ensure -v
也可以直接使用 go get 安装依赖包:
- go get -u go.uber.org/zap
- go get GitHub.com/cenkalti/backoff
- go get GitHub.com/golang/protobuf/proto
- go get GitHub.com/satori/go.uuid
- go get GitHub.com/stretchr/testify/assert
- go get GitHub.com/smartystreets/goconvey/convey
- go get GitHub.com/golang/mock/gomock
- go get gopkg.in/natefinch/lumberjack.v2
[]() 快速开始
初始化 Tunnel client:
- // endpoint 是表格存储实例 endpoint, 如 https://instance.cn-hangzhou.ots.aliyun.com
- // instance 为实例名称
- // accessKeyId 和 accessKeySecret 分别为访问表格存储服务的 AccessKey 的 Id 和 Secret
- tunnelClient := tunnel.NewTunnelClient(endpoint, instance,
- accessKeyId, accessKeySecret)
创建新 Tunnel:
- req := &tunnel.CreateTunnelRequest{
- TableName: "testTable",
- TunnelName: "testTunnel",
- Type: tunnel.TunnelTypeBaseStream, // 全量加增量类型 Tunnel
- }
- resp, err := tunnelClient.CreateTunnel(req)
- if err != nil {
- log.Fatal("create test tunnel failed", err)
- }
- log.Println("tunnel id is", resp.TunnelId)
获取已有 Tunnel 信息:
- req := &tunnel.DescribeTunnelRequest{
- TableName: "testTable",
- TunnelName: "testTunnel",
- }
- resp, err := tunnelClient.DescribeTunnel(req)
- if err != nil {
- log.Fatal("create test tunnel failed", err)
- }
- log.Println("tunnel id is", resp.Tunnel.TunnelId)
注册 callback, 开始数据消费:
- // 用户定义消费 callback 函数
- func exampleConsumeFunction(ctx *tunnel.ChannelContext, records []*tunnel.Record) error {
- fmt.Println("user-defined information", ctx.CustomValue)
- for _, rec := range records {
- fmt.Println("tunnel record detail:", rec.String())
- }
- fmt.Println("a round of records consumption finished")
- return nil
- }
- // 配置 callback 到 SimpleProcessFactory, 配置消费端 TunnelWorkerConfig
- workConfig := &tunnel.TunnelWorkerConfig{
- ProcessorFactory: &tunnel.SimpleProcessFactory{
- CustomValue: "user custom interface{} value",
- ProcessFunc: exampleConsumeFunction,
- },
- }
- // 使用 TunnelDaemon 持续消费指定 tunnel
- daemon := tunnel.NewTunnelDaemon(tunnelClient, tunnelId, workConfig)
- log.Fatal(daemon.Run())
删除 Tunnel
- req := &tunnel.DeleteTunnelRequest {
- TableName: "testTable",
- TunnelName: "testTunnel",
- }
- _, err := tunnelClient.DeleteTunnel(req)
- if err != nil {
- log.Fatal("delete test tunnel failed", err)
- }
[]() 配置项
tunnel client 配置
初始化 tunnel client 时可以通过 NewTunnelClientWithConfig 接口自定义客户端配置, 使用不指定 config 初始化接口或者 config 为 nil 时会使用 DefaultTunnelConfig:
- var DefaultTunnelConfig = &TunnelConfig{
- // 最大指数退避重试时间
- MaxRetryElapsedTime: 45 * time.Second,
- //HTTP 请求超时时间
- RequestTimeout: 30 * time.Second,
- //http.DefaultTransport
- Transport: http.DefaultTransport,
- }
数据消费 worker 配置
TunnelWorkerConfig 中包含了数据消费 worker 需要的配置, 其中 ProcessorFactory 为必填项, 其余字段不填将使用默认值, 通常使用默认值即可:
- type TunnelWorkerConfig struct {
- //worker 同 Tunnel 服务的心跳超时时间, 通常使用默认值即可
- HeartbeatTimeout time.Duration
- //worker 发送心跳的频率, 通常使用默认值即可
- HeartbeatInterval time.Duration
- //tunnel 下消费连接建立接口, 通常使用默认值即可
- ChannelDialer ChannelDialer
- // 消费连接上具体处理器产生接口, 通常使用 callback 函数初始化 SimpleProcessFactory 即可
- ProcessorFactory ChannelProcessorFactory
- //zap 日志配置, 默认值为 DefaultLogConfig
- LogConfig *zap.Config
- //zap 日志轮转配置, 默认值为 DefaultSyncer
- LogWriteSyncer zapcore.WriteSyncer
- }
其中的 ProcessorFactory 为用户注册消费 callback 函数以及其他信息的接口, 建议使用 SDK 中自带 SimpleProcessorFactory 实现:
- type SimpleProcessFactory struct {
- // 用户自定义信息, 会传递到 ProcessFunc 和 ShutdownFunc 中的 ChannelContext 参数中
- CustomValue interface{}
- //Worker 记录 checkpoint 的间隔, CpInterval<=0 时会使用 DefaultCheckpointInterval
- CpInterval time.Duration
- //worker 数据处理的同步调用 callback,ProcessFunc 返回 error 时 worker 会用本批数据退避重试 ProcessFunc
- ProcessFunc func(channelCtx *ChannelContext, records []*Record) error
- //worker 退出时的同步调用 callback
- ShutdownFunc func(channelCtx *ChannelContext)
- // 日志配置, Logger 为 nil 时会使用 DefaultLogConfig 初始化 logger
- Logger *zap.Logger
- }
日志配置
默认日志配置:
- //DefaultLogConfig 是 TunnelWorkerConfig 和 SimpleProcessFactory 使用的默认日志配置
- var DefaultLogConfig = zap.Config{
- Level: zap.NewAtomicLevelAt(zap.InfoLevel),
- Development: false,
- Sampling: &zap.SamplingConfig{
- Initial: 100,
- Thereafter: 100,
- },
- Encoding: "json",
- EncoderConfig: zapcore.EncoderConfig{
- TimeKey: "ts",
- LevelKey: "level",
- NameKey: "logger",
- CallerKey: "caller",
- MessageKey: "msg",
- StacktraceKey: "stacktrace",
- LineEnding: zapcore.DefaultLineEnding,
- EncodeLevel: zapcore.LowercaseLevelEncoder,
- EncodeTime: zapcore.ISO8601TimeEncoder,
- EncodeDuration: zapcore.SecondsDurationEncoder,
- EncodeCaller: zapcore.ShortCallerEncoder,
- },
- }
日志轮转配置:
- //DefaultSyncer 是 TunnelWorkerConfig 和 SimpleProcessFactory 使用的默认日志轮转配置
- var DefaultSyncer = zapcore.AddSync(&lumberjack.Logger{
- // 日志文件路径
- Filename: "tunnelClient.log",
- // 最大日志文件大小
- MaxSize: 512, //MB
- // 压缩轮转的日志文件数
- MaxBackups: 5,
- // 轮转日志文件保留的最大天数
- MaxAge: 30, //days
- // 是否压缩轮转日志文件
- Compress: true,
- })
来源: https://yq.aliyun.com/articles/686415