strom 简介
官方网址: http://storm.apache.org/
是一个免费, 开源的分布式实时计算系统, 使用它可以轻松实现数据流的实时处理, Strom 很简单, 可以用任何编程语言
storm 用例: 实时在线分析 机器学习, 连续计算, 分布式 RPC,ETL 等.
Strom 的特点: 快速: 基准时钟在超过一百万元组 (可以理解为数据包) 每秒处理的每个节点
简单的设置: 有可扩展性, 容错性, 保证了数据的处理能力, 并且易于设置和操作
storm 实时流式计算系统
storm 集群与 hadoop 集群 (MapReduce) 对比
MapReduce 是批处理流程 //hadoop 处理海量历史任务, 不能做到实时
storm 没有缓冲区原数据源源不断的进入处理系统, 这是流处理 // 实时流计算, 一直运行直到停止.
Topology(拓扑)与 Mapreduce
一个关键的区别是: 一个 MapReduce job 最终会结束, 而一个 topology 永远会运行(除非你手动 kill 掉)
Nimbus(作业控制和资源管理 master 进程) 与 ResourManager
在 Storm 的集群里面有两种节点: 控制节点 (master node) 和工作节点(worker node). 控制节点上面运行一个叫 Nimbus 后台程序, 它的作用类似 Hadoop 里面的 JobTracker //JobTracker 是整个集群中唯一的全局管理者, 涉及的功能包括作业控制和资源管理.
Nimbus 负责在集群里面分发代码, 分配计算任务给机器, 并且监控状态.
Supervisor (worker 进程)与 NodeManager(YarnChild)
每一个工作节点上面运行一个叫做 Supervisor 的节点. Supervisor 会监听分配给它那台机器的工作, 根据需要启动 / 关闭工作进程. 每一个工作进程执行一个 topology 的一个子集; 一个运行的 topology 由运行在很多机器上的很多工作进程组成.
storm 实时流式计算的工作机制
2 个角色
nimbus 是集群的主节点: 负责在集群里面分发代码, 分配计算任务给机器, 并且监控状态.// 作业控制和资源管理
supervisor 是集群的从节点: 每一个工作节点上面运行一个叫做 Supervisor 的节点, 每一个 supervisor 里面会有 worker 进程在服务器上运行着, 这些 worker 是真正干活的.
nimbus 和 supervisor 直接并没有直接的联系, 而是需要第三方工具 zookeeper 实现的
第一个 supervisor 里面的 worker 会调用我们写的一个类比如叫 (采集水这个类), 处理好了之后, 会再次在这个 worker 里面封装成一定的数据包的格式发出去, 发给下一个 worker, 下一个 worker 会去处理上一个 worker 传给他的结果, 去调用我们写的另一个逻辑(调用我们写的类 过滤逻辑) 然后, 就是在第二个 worker 里面去处理, 然后再封装成一定的数据包的格式发出去, 发给下一个 worker.
下一个 worker 也是不知道自己怎么办, 而是调用我们程序自己写的逻辑(比如调用沉淀这个类), 处理完的数据再次的封装成一个数据包传给下一个 worker.
最后一个处理步骤, 会把处理的结果我们源源不断的放在一个内存数据库中,(处理结果的使用者)谁要用就可以直接的去使用数据.
小结:
整个处理流程的组织协调不用用户去关系, 用户只需要去定义每一个步骤中的具体的业务处理逻辑
具体执行任务的角色是 worker,worker 执行任务时具体的行为则由我们定义的业务逻辑决定.
storm 处理数据流程小结
1, 客户端 client 把任务 (topology) 提交给 nimbus
2,nimbus 会把任务分配的一些信息放在 zookeeper 上面;
3,supervisor 会通过 zookeeper 领取到任务
4,supervisor 再分配给 worker 去运行我们的任务
官方解释如下:// 这里的基本概念不懂的见下文
也可以叫做 Topology 运行机制
(1)Storm 提交后, 会把代码首先存放到 Nimbus 节点的 inbox 目录下, 之后, 会把当前 Storm 运行的配置生成一个 stormconf.ser 文件放到 Nimbus 节点的 stormdist 目录中, 在此目录中同时还有序列化之后的 Topology 代码文件;
(2)在设定 Topology 所关联的 Spouts 和 Bolts 时, 可以同时设置当前 Spout 和 Bolt 的 executor 数目和 task 数目, 默认情况下, 一个 Topology 的 task 的总和是和 executor 的总和一致的. 之后, 系统根据 worker 的数目, 尽量平均的分配这些 task 的执行. worker 在哪个 supervisor 节点上运行是由 storm 本身决定的;
(3)任务分配好之后, Nimbes 节点会将任务的信息提交到 zookeeper 集群, 同时在 zookeeper 集群中会有 workerbeats 节点, 这里存储了当前 Topology 的所有 worker 进程的心跳信息;
(4)Supervisor 节点会不断的轮询 zookeeper 集群, 在 zookeeper 的 assignments 节点中保存了所有 Topology 的任务分配信息, 代码存储目录, 任务之间的关联关系等, Supervisor 通过轮询此节点的内容, 来领取自己的任务, 启动 worker 进程运行;
(5)一个 Topology 运行之后, 就会不断的通过 Spouts 来发送 Stream 流, 通过 Bolts 来不断的处理接收到的 Stream 流, Stream 流是 *** 的.
最后一步会不间断的执行, 除非手动结束 Topology.
有几点需要说明的地方:
(1)每个组件 (Spout 或者 Bolt) 的构造方法和 declareOutputFields 方法都只被调用一次.
(2)open 方法, prepare 方法的调用是多次的. 入口函数中设定的 setSpout 或者 setBolt 里的并行度参数指的是 executor 的数目, 是负责运行组件中的 task 的线程的数目, 此数目是多少, 上述的两个方法就会被调用多少次, 在每个 executor 运行的时候调用一次. 相当于一个线程的构造方法.
(3)nextTuple 方法, execute 方法是一直被运行的, nextTuple 方法不断的发射 Tuple,Bolt 的 execute 不断的接收 Tuple 进行处理. 只有这样不断地运行, 才会产生 *** 的 Tuple 流, 体现实时性. 相当于线程的 run 方法.
(4)在提交了一个 topology 之后, Storm 就会创建 spout/bolt 实例并进行序列化. 之后, 将序列化的 component 发送给所有的任务所在的机器(即 Supervisor 节点), 在每一个任务上反序列化 component.
(5)Spout 和 Bolt 之间, Bolt 和 Bolt 之间的通信, 是通过 zeroMQ 的消息队列实现的.
(6)上图没有列出 ack 方法和 fail 方法, 在一个 Tuple 被成功处理之后, 需要调用 ack 方法来标记成功, 否则调用 fail 方法标记失败, 重新处理这个 Tuple.
终止 Topology
通过在 Nimbus 节点利用如下命令来终止一个 Topology 的运行:
bin/storm kill topologyName
kill 之后, 可以通过 UI 界面查看 topology 状态, 会首先变成 KILLED 状态, 在清理完本地目录和 zookeeper 集群中的和当前 Topology 相关的信息之后, 此 Topology 就会彻底消失.
小结 zookeeper 在 storm 中的作用
1,nimbus 会把任务分配的一些信息放在 zookeeper 上面;
2,supervisor 会通过 zookeeper 领取到任务
3,numbus 需要通过 zookeeper 去感知 supervisor 的健康状态
Topology 的概念类似于 MapReduce 中提交的一个任务 job
每台 supervisor 上会有多个 worker 进程
每个 worker 进程中运行着若干个 executor 线程
每个 executor 中运行着若干个相同的 task
部署 storm 集群
strom 里面处了 Nimbus Supervisor 还需要依赖 zookeeper, 所以在安装 Strom 的时候确保 zookeeper 安装了
storm 的配置与部署
下载 storm, 然后上传到 Linux 中
我们解压后到 conf 目录下修改配置
cd conf/
vi storm.YAML
告诉 zookeeper 在那几台机器上部署了
- storm.zookeeper.servers:
- "hadoop-server-00"
- "hadoop-server-01"
- "hadoop-server-02"
告诉 Strom nimbus 在那台主机上
nimbus.host: "hadoop-server-00"
保存退出
supervisor 是不需要去指定的, 他的数量是可以动态的去增减
然后把他分发到每台机器上去
- scp -r apache-storm-0.9.2-incubating/ hadoop-server-01:/usr/local/apps/
- scp -r apache-storm-0.9.2-incubating/ hadoop-server-02:/usr/local/apps/
启动 storm 要先启动 zookeeper
进入 zookeeper 的 bin 目录下去启动 zookeeper
- ./zkCli.sh start
- ./zkCli.sh status(查看他的状态)
启动 Strom
bin 目录上
./storm nimbus(那台机器上配置了 nimbus 就在那台机器上启动 nimbus)
在另外的两台机器上去启动 Supervisor
01 机器上的 bin 目录上
./storm Supervisor
02 机器上的 bin 目录上
./storm Supervisor
可以通过 jps 来看进程数
storm 也是可以通过网页来看的, 但是必须要启动打开网页的外部服务的进程命令, 也必须在启动 nimbus 的这台机器上去启动这个进程
启动外部服务的进程命令是 cd App/(strom 安装包)/bin/storm ui // 直接执行这个命令
jps 查看进程 ./strom ui
ui 的进程叫 core
这样我们就可以通过网页来看 Strom 的状态
HTTP://hadoop-server-00:8080
小结: // 这里是后台启动
在 nimbus 主机上
- // 启动协调管理 nimbus
- ./storm nimbus 1>/dev/null 2>&1 &
- // 启动 web 管理界面启动后可以通过 nimbus 主机名: 8080 端口进行仿问
- ./storm ui 1>/dev/null 2>&1 &
在 supervisor 主机上
./storm supervisor 1>/dev/null 2>&1 &
slots 代表: 槽位, 也就是 work 进程, supervisor 内启动的进程, 默认启动 4 个. 当你的机器的内核非常好的时候, 可以修改配置来增加槽位数
可以知道那个 worker 的数量 如果不指点默认为 4 个
(在配置之前需要把进程都停掉, 按 ctrl+c 就可以停掉进程了)
在配置项 vi storm.YAML 里面最后增加(要顶格写)
- supervisor.slots.ports:
- -6701
- -6702
- -6703
- -6704
- -6705
- -6706
- // 这些数字表示 worker 显示的端口
保存退出
完后, 我们要把这个配置文件分发到另外两台机器上去
- scp storm.YAML hadoop-server-01:/usr/local/apps/strom(安装包)/conf/
- scp storm.YAML hadoop-server-02:/usr/local/apps/strom(安装包)/conf/
这样每个最大的 worker 数量为 6
启动 Strom 为后台进程
在 00 机器上
bin/storm nimbus 1>/dev/null 2>&1 & (就是启动 nimbus 1 到 dev 下的 null 目录中[标准输出从定性到这个文件中] 把 2 也从定性到 1 所去的地方, 最后 & 表示为启动一个后台进程)
在 00 机器上
bin/storm supervisor 1>/dev/null 2>&1 &
注意: 如果有错误退出, 我们可以看看日志文件
- cd logs/
- ll
- Less supervisor.log
在 00 机器上
- bin/storm ui 1>/dev/null 2>&1 &
- (为了在网页中可以观察, 我们必须在启动 nimbus 的这台机器上去启动 ui)
我们切换到 zookeeper 下去打开 zookeeper 的客户端
- cd /apps/zookeeper(安装包)/bin
- ./zkCli.sh
就会发现一个 storm 的节点
ls /strom
就会看到 Strom 下的节点
ls /strom/supervisor
就会看到 supervisor 下的节点, 每个 supervisor 就会有一个相应的 id 和网页上的 id 是一一对应的
配置小结:
Storm 相关配置项
在 storm.YAML 中常用的几个选项
storm.zookeeper.root
Storm 在 zookeeper 集群中的根目录, 默认是 "/"
topology.workers
每个 Topology 运行时的 worker 的默认数目, 若在代码中设置, 则此选项值被覆盖
storm.zookeeper.servers
zookeeper 集群的节点列表
storm.local.dir
Storm 用于存储 jar 包和临时文件的本地存储目录
ui.port
Storm 集群的 UI 地址端口号, 默认是 8080
nimbus.host:
Nimbus 节点的 host
supervisor.slots.ports
Supervisor 节点的 worker 占位槽, 集群中的所有 Topology 公用这些槽位数, 即使提交时设置了较大数值的槽位数, 系统也会按照当前集群中实际剩余的槽位数来进行分配, 当所有的槽位数都分配完时, 新提交的 Topology 只能等待, 系统会一直监测是否有空余的槽位空出来, 如果有, 就再次给新提交的 Topology 分配.
storm 的编程基本概念
topology: 拓扑也叫一个任务, 只不过一旦启动起来就永不停歇, 和 mapreduce 里的 job 类似只不过 job 处理完一个任务后就自动停止了
topology 内部还分为 spouts 和 bolts
spouts: 拓扑的消息源, 类似于 mapreduce 中的 map, 为后续的处理流程读取数据源(拿数据)
bolts: 拓扑的处理逻辑单元(在 spouts 之后的组件叫 bolts),bolts 可以有很多级, 分别处理不同的功能, 类似于 mapreduce 的 reduce 只不过 bolts 组件可以有任意多级(处理数据)
tuple: 消息元组 // 是作为 spouts 往 bolts 之间传递数据, 封装数据之后叫做 tuple,tuple 框架来实现 spouts 往 bolts 之间的数据传递
tuple 里面可以传递多个 filed, 每个 filed 可以定义一个名称.
//spouts,bolts 组件之间传递数据必须封装在 tuple 中, tuple 可以哦实现定义 schema, 规定有哪些字段.
组件与组件之间数据传递的路线 // 叫做 streaming
stream: 流 // 数据的流向
stream grouping: 流里面的分组策略也可以叫做数据流向的策略, 可以理解为 MapReduce 中的 shuffle 阶段, 指的是在 stream 中两头的运行实例之间数据的分发规则,
类比 mapreduce 中的 maptask-->reduce task 之间的 partition(划分)策略(有很多策略)
tasks: 任务处理单元
executor: 工作进程(是在 workers 的线程)
workers: 工作进程(是一个多线程的程序)
tasks 在 executor 里面 executor 在 workers 里面
configuration:topology 的配置
编程的时候要导入 storm 的 jar 包 我们在用集群区工作的时候, 每一个集群机器都应该创建一个 storm 分析之后的目录
3 台机器, 就应该在 3 台机器上去创建
我们编好 java 程序后打成 jar 包, 长传到 Linux 机器上面去,
其实 storm 和 mapreduce 程序的编写差不多
执行 storm 的命令为
在 bin 目录下
./storm jar ~/phonetopo.jar 客户端主类 参数
~/phonetopo.jar: 表示, 用户主目录下的 phonetopo.jar
参数为: 集群提交的时候, 给他的名称
启动后我们可以通过命令来查看
bin/strom list
程序会一直运行下去, 实时在线分析
我们用命令去关闭程序
bin/storm kill phone-topo(phone-topo: 客户端给的名字)
来源: http://www.bubuko.com/infodetail-2963279.html