Kinesis 是 AWS 上实时流数据处理平台, 可以实时收集, 处理和分析流中的数据.
问题:
kinesis 的相关原理 可以对标 kafka, 但是比 kafka 相对简单, 下面聊一聊分片带来的问题.
Kinesis 的每一个分片接收数据的大小和记录数都有相关限制: 单个分片可以提取多达每秒 1 MiB 的数据 (包括分区键) 或每秒写入 1000 个记录.
当遇到流量的高峰和低谷的时候, 分片的大小不是固定的, 当然你可以一直选择一个高峰时分片数, 但这个在每天流量低谷的时候, 分片就显得比较冗余, 也浪费钱.
所以需要根据实时监控流量调整分片数量显得非常必要了, AWS 只是提供了能修改分片的 API, 如何调整, 就需要用户自己来调整策略了.
如果流量超过分片的限制, 则会直接抛出以下异常: botocore.errorfactory.ProvisionedThroughputExceededException: An error occurred (ProvisionedThroughputExceededException) when calling the PutRecord operation: Rate exceeded for shard shardId-000000000008 in stream
接下来聊一聊如何根据流量弹性调整分片策略.
指标:
先来认识几个指标, Kinesis 会向 CloudWatch 每一分钟发送分片和流的监控指标:
指标 | 定义 |
---|---|
GetRecords.Bytes | 在指定时段内从 Kinesis 流检索的字节数。 |
GetRecords.Records | 在指定时段内测量的从分片中检索的记录数。 |
IncomingBytes | 在指定时段内成功放置到 Kinesis 流的字节数。该指标包含来自 PutRecord 和 PutRecords 的字节数。 |
IncomingRecords | 在指定时段内成功放置到 Kinesis 流的记录数。 |
主要是这四个指标, 其中 IncomingBytes 可以获取 Stream 和 ShardId 两个维度的数据, 即整个 DataStream 流的接收的字节数, 以及每一个分片所接收的字节数. 分片的指标需要额外单独在 Kinesis 界面开放, 默认是不开放的.
同时每一个指标的统计数据维度有: Minimum,Maximum,Average,Sum.
可以简单理解为 IncomingRecords 和 GetRecords.Records 就是选择的间隔时间内流入和流出的记录数. 而 GetRecords.Bytes 和 IncomingBytes 分别表示流出和流入的字节数.
下面代码实现如何获取相关指标:
- def get_metric(self,
- metric_name,
- minutes_period=5,
- statistics_model="Sum",
- shard_id=None):
- start_time = datetime.datetime.utcnow() - datetime.timedelta(
- minutes=minutes_period)
- end_time = datetime.datetime.utcnow()
- dimensions = []
- dimensions.append(
- dict(Name="StreamName", Value=api_config.KINESIS_STREAM_NAME))
- if shard_id:
- dimensions.append(dict(Name="ShardId", Value=shard_id))
- result = self.cloudwatch_client.get_metric_statistics(
- Namespace='AWS/Kinesis',
- MetricName=metric_name,
- Dimensions=dimensions,
- StartTime=start_time,
- EndTime=end_time,
- Period=60,
- Statistics=[
- statistics_model
- ])
- max_value = 0
- for item in result.get("Datapoints", []):
- if item[statistics_model]> max_value:
- max_value = item[statistics_model]
- return max_value
上面函数的目标就是获取指定指标 最近 5 分钟内 每一分钟的最大值. 比如想获取 IncomingRecords 指标最近 5 分钟内间隔时间为 1 分钟的最大记录数. get_metric("IncomingRecords") 即可. result 会返回 5 条记录, 拿出最大值即可.
如何获取当前分片数:
流量监控指标获取到之后, 接下来如何获取当前分片数了, Kinesis 的 python 包这一点非常尴尬, 居然没有可以直接获取当前打开的分片数, 为什么是需要打开的分片数了, 因为关闭的分片数信息在 Kinesis 包的获取分片的 API 里也会返回, 比如最近一段时间内, 如果有操作分片数量, 则 AWS 会关掉或者打开一些分片, 关掉的分片不会再接收数据, 但里面有的数据依然可以被消费者消费, 从而保证数据在修改分片的操作上不会丢失, 这样就会导致获取分片列表的时候, 关闭的分片依然会返回, 返回的数据中也没有状态标注哪个分片是打开或者关闭.
上面聊到的 IncomingBytes 关于分片维度的指标就起作用了, 可以通过循环枚举所有分片找出可以接收数据的分片, 则说明均是打开的分片.
代码部分如下:
- def get_shards_count(self, shards_data=None):
- if not shards_data:
- result = self.kinesis_client.list_shards(
- StreamName=api_config.KINESIS_STREAM_NAME)
- shards_data = result.get("Shards", [])
- shards_count = 0
- for item in shards_data:
- shard_id = item["ShardId"]
- shard_incoming_bytes = self.get_metric(
- "IncomingBytes", shard_id=shard_id)
- if shard_incoming_bytes> 0:
- shards_count += 1
- return shards_count
相关 API 介绍:
AWS 提供 3 个 API, 可以重新分片:
update_shard_count 直接更新分片数量.
split_shard 拆分分片
merge_shards 合并分片
update_shard_count 最简单, 直接调取更新分片数量即可, 但此 API 有一些限制: 查看官网资料:
- This operation has the following default limits. By default, you cannot do the following:
- 1.Scale more than twice per rolling 24-hour period per stream
- 2.Scale up to more than double your current shard count for a stream
- 3.Scale down below half your current shard count for a stream
主要是这三点, 第一点最蛋疼, 24 小时内只能操作 2 次 API, 但这个可以提写工单向 AWS 客服申请多次. 第二点和第三点是说伸缩的分片的时候, 不能超过当前分片的 2 倍或不能低于当前分片的一半.
split_shard 和 merge_shards 属于 Kinesis 的高级主题, 这一篇文章暂时不涉及详细讲解, 简单提一下场景, 主要用于单个分片的流量超过阈值, 可以对此分片进行一分为二, 能够分当然也能够合, 里面也有一些限制, 以后再聊这个话题.
弹性伸缩策略:
重点来了, 先来了解一下分片的计算方式:
写入流的数据记录的平均大小 (以 KB 为单位, 向上取整为 1 KB), 数据大小 (average_data_size_in_KiB).
每秒写入流和从流读取的数据记录数 (records_per_second).
并发且独立使用流中数据的 Kinesis Data Streams 应用程序的数量, 即使用者数量 (number_of_consumers).
以 KB 为单位的传入写入带宽 (incoming_write_bandwidth_in_KiB), 等于 average_data_size_in_KiB 乘以 records_per_second.
以 KB 为单位的传出读取带宽 (outgoing_read_bandwidth_in_KiB), 等于 incoming_write_bandwidth_in_KiB 乘以 number_of_consumers.
可使用以下公式中的输入值来计算流所需的分片的数量 (number_of_shards):
number_of_shards = max(incoming_write_bandwidth_in_KiB/1024, outgoing_read_bandwidth_in_KiB/2048)
一一来获取上面提到的指标参数:
average_data_size_in_KiB 就是写入流数据记录的平均大小, 即通过 IncomingBytes , 维度是 Average 即可获取.
records_per_second 通过 GetRecords.Records 和 IncomingRecords 一分钟记录之和 除以 60s 即可.
number_of_consumers 就是消费者, 这个可以通过函数获取, 这个我们默认为 1.
根据公式 number_of_shards = max(incoming_write_bandwidth_in_KiB/1024, outgoing_read_bandwidth_in_KiB/2048) , 因为我们消费者数目是 1 , 那么其实 outgoing_read_bandwidth_in_KiB / 2048 其实就是 ncoming_write_bandwidth_in_KiB / 2048. 显然:
number_of_shards = incoming_write_bandwidth_in_KiB/1024 .
这是第一种获取当前需要的分片数的方案.
第二种方案, 也可以简单粗暴的根据 当前流中的 总字节数 除以 每一个分片所能容纳的 1M 即可.
公式:
(SUM(IncomingBytes) + SUM(GetRecords.Bytes)) / 1024 / 1024 / Period 其中 Period 这里就是 60s.
代码实现部分:
- def scaling(self):
- average_incoming_bytes = self.get_metric(
- "IncomingBytes", statistics_model="Average")
- outgoing_records = self.get_metric("GetRecords.Records")
- incoming_records = self.get_metric("IncomingRecords")
- records_per_secord = int(
- (outgoing_records + incoming_records) / self.METRIC_PERIOD)
- number_of_shards_1 = round(
- average_incoming_bytes * records_per_secord / 1024 / 1000, 4)
- sum_incoming_bytes = self.get_metric("IncomingBytes")
- sum_outgoing_bytes = self.get_metric("GetRecords.Bytes")
- sum_bytes = sum_incoming_bytes + sum_outgoing_bytes
- number_of_shards_2 = round(
- sum_bytes / 1024 / 1024 / 60, 4)
- return max(number_of_shards_1, number_of_shards_2)
number_of_shards_1 和 number_of_shards_2 就是方案一和方案二的实现.
拿到了当前所需要的分片数, 接下来就是根据实际分片数来调整了, 代码如下:
- def execute(self):
- cur_shards_count = self.get_shards_count()
- number_of_shards = self.scaling()
- need_shards = int(number_of_shards) + 1
- shard_ratio = round(number_of_shards * 100 % 100, 2)
- if shard_ratio> int(api_config.TRIGGER_SCALING_UP_SHARD_RATIO):
- need_shards += 1
- elif shard_ratio <int(api_config.TRIGGER_SCALING_DOWN_SHARD_RATIO):
- need_scaling_down = True
- if need_scaling_down and cur_shards_count> need_shards:
- need_shards = cur_shards_count - 1
need_shards 就是我们需要更新的分片数, 调取 API update_shard_count 即可.
上面代码描述叫简单, 提供 2 个环境变量, TRIGGER_SCALING_UP_SHARD_RATIO 触发增加分片的比率, TRIGGER_SCALING_DOWN_SHARD_RATIO 降低分片的比率. 比如 cur_shards_count = 3, number_of_shards = 2.7, 那么 shard_ratio = 70% 如果设置 TRIGGER_SCALING_UP_SHARD_RATIO = 60%, 那么 need_shards = 4, 则需要扩展分片为 4 个, 缩小的方案也是如此.
上面描述简单, 实际过程中需要控制伸缩的频率, 而且 AWS 也有频率限制, 如何在合适的时机伸缩, 需要根据业务的实际过程进行分析. 同时还要上报更新的日志, 存储上一次更新的记录数据等等.
Kinesis 根据流量弹性伸缩分片就分享到这里, 欢迎大家在评论区一起讨论更优方案.
来源: https://juejin.im/post/5c828f9b6fb9a049ec6be057