第8章 流计算
流计算概述
静态数据和流数据
1 | 很多企业为了支持决策分析而构建的数据仓库系统,其中存放的大量历史数据就是静态数据。技术人员可以利用数据挖掘和OLAP(OnLine Analytical Processing)分析工具从静态数据中找到对企业有价值的信息 |

1 | • 近年来,在Web应用、网络监控、传感监测等领域,兴起了一种新的数据密集型应用——流数据,即数据以大量、快速、时变的流形式持续到达 |
批量计算和实时计算
1 | • 对静态数据和流数据的处理,对应着两种截然不同的计算模式:批量计算和实时计算 |

1 | • 批量计算以“静态数据”为对象,可在充裕的时间内对海量数据进行批量处理,计算得到有价值的信息。Hadoop是典型的批处理模型,由HDFS和HBase存放大量的静态数据,由MapReduce负责对海量数据执行批量计算 |
流计算概念
1 | • 流计算:实时获取来自不同数据源的海量数据,经过实时分析处理,获得有价值的信息 |

1 | • 流计算秉承一个基本理念,即数据的价值随着时间的流逝而降低。因此,当事件出现时就应该立即进行处理,而不是缓存起来进行批量处理。为了及时处理流数据,就需要一个低延迟、可扩展、高可靠的处理引擎 |
流计算与Hadoop
1 | • Hadoop设计的初衷是面向大规模数据的批量处理,每台机器并行运行MapReduce任务,最后对结果进行汇总输出 |
流计算框架
1 | • 当前业界诞生了许多专门的流数据实时计算系统来满足各自需求 |
流计算处理流程
概述
1 | • 传统的数据处理流程,需要先采集数据并存储在关系数据库等数据管理系统中,之后由用户通过查询操作和数据管理系统进行交互 |

1 | • 传统的数据处理流程隐含了两个前提: |
1 | • 流计算的处理流程一般包含三个阶段:数据实时采集、数据实时计算、实时查询服务 |

数据实时采集
1 | • 数据实时采集阶段通常采集多个数据源的海量数据,需要保证实时性、低延迟与稳定可靠 |
1 | • 数据采集系统的基本架构一般有以下三个部分: |

数据实时计算
1 | • 数据实时计算阶段对采集的数据进行实时的分析和计算,并反馈实时结果 |
实时查询服务
1 | • 实时查询服务:经由流计算框架得出的结果可供用户进行实时查询、展示或储存 |
1 | • 实时查询服务:经由流计算框架得出的结果可供用户进行实时查询、展示或储存 |
1 | • 可见,流处理系统与传统的数据处理系统有如下不同: |
流计算的应用
1 | • 流计算是针对流数据的实时计算,可以应用在多种场景中,如Web服务、机器翻译、广告投放、自然语言处理、气候模拟预测等 |
应用场景1: 实时分析
1 | • 传统的业务分析一般采用分布式离线计算的方式,即将数据全部保存起来,然后每隔一定的时间进行离线分析来得到结果。但这样会导致一定的延时,难以保证结果的实时性 |
1 | • 针对流数据,“量子恒道”开发了海量数据实时流计算框架Super Mario。通过该框架,量子恒道可处理每天TB级的实时流数据,并且从用户发出请求到数据展示,整个延时控制在2-3秒内,达到了实时性的要求 |
应用场景2: 实时交通
1 | • 流计算不仅为互联网带来改变,也能改变我们的生活 |
1 | • IBM的流计算平台InfoSphere Streams,广泛应用于制造、零售、交通运输、金融证券以及监管各行各业的解决方案之中,使得实时快速做出决策的理念得以实现 |
流计算开源框架 – Storm
Storm简介
1 | • Twitter Storm是一个免费、开源的分布式实时计算系统,Storm对于实时计算的意义类似于Hadoop对于批处理的意义,Storm可以简单、高效、可靠地处理流数据,并支持多种编程语言 |
1 | • Twitter采用了由实时系统和批处理系统组成的分层数据处理架构,一方面由Hadoop和ElephantDB组成批处理系统,另一方面由Storm和Cassandra组成实时系统 |
Storm的特点
1 | • Storm可用于许多领域中,如实时分析、在线机器学习、持续计算、远程RPC、数据提取加载转换等 |
Storm设计思想
Streams
1 | • 要了解Storm,首先需要了解Storm的设计思想。Storm对一些设计思想进行了抽象化,其主要术语包括Streams、Spouts、Bolts、Topology和Stream Groupings |
Spouts
1 | • Spouts:Storm认为每个Stream都有一个源头,并把这个源头抽象为Spouts。Spouts会从外部读取流数据并持续发出Tuple |
Bolts
1 | • Bolts:Storm将Streams的状态转换过程抽象为Bolts。Bolts即可以处理Tuple,也可以将处理后的Tuple作为新的Streams发送给其他Bolts。对Tuple的处理逻辑都被封装在Bolts中,可执行过滤、聚合、查询等操作 |
Topology
1 | • Topology:Storm将Spouts和Bolts组成的网络抽象成Topology,它可以被提交到Storm集群执行。Topology可视为流转换图,图中节点是一个Spout或Bolt,边则表示Bolt订阅了哪个Stream。当Spout或者Bolt发送元组时,它会把元组发送到每个订阅了该Stream的Bolt上进行处理 |
Stream Groupings
1 | • Stream Groupings:Storm中的Stream Groupings用于告知Topology如何在两个组件间(如Spout和Bolt之间,或者不同的Bolt之间)进行Tuple的传送。每一个Spout和Bolt都可以有多个分布式任务,一个任务在什么时候、以什么方式发送Tuple就是由Stream Groupings来决定的 |

1 | • 目前,Storm中的Stream Groupings有如下几种方式: |
Storm框架设计
1 | • Storm运行任务的方式与Hadoop类似:Hadoop运行的是MapReduce作业,而Storm运行的是“Topology” |
1 | • Storm使用Zookeeper来作为分布式协调组件,负责Nimbus和多个Supervisor之间的所有协调工作。借助于Zookeeper,若Nimbus进程或Supervisor进程意外终止,重启时也能读取、恢复之前的状态并继续工作,使得Storm极其稳定 |
1 | • 基于这样的架构设计,Storm的工作流程如下图所示: |

Storm实例
1 | • 我们以单词统计的实例来加深对Storm的认识 |

1 | • Topology中仅定义了整体的计算逻辑,还需要定义具体的处理函数 |

1 | • Python脚本splitsentence.py定义了一个简单的单词分割方法,即通过空格来分割单词。分割后的单词通过emit()方法以Tuple的形式发送给订阅了该Stream的Bolt进行接收和处理 |

1 | • 单词统计的具体逻辑:首先判断单词是否统计过,若未统计过,需先将count值置为0。若单词已统计过,则每出现一次该单词,count值就加1 |

1 | • 基于Storm的单词统计在形式上与基于MapReduce的单词统计是类似的,MapReduce使用的是Map和Reduce的抽象,而Storm使用的是Soput和Bolt的抽象 |
1 | • 上述虽然是一个简单的单词统计,但对其进行扩展,便可应用到许多场景中,如微博中的实时热门话题。Twitter也正是使用了Storm框架实现了实时热门话题 |

哪些公司在使用Storm
1 | • Storm自2011年发布以来,凭借其优良的框架设计及开源特性,在流计算领域获得了广泛认可,已应用到许多大型互联网公司的实际项目中 |

Storm安装和运行实例
Storm安装教程_CentOS6.4/Storm0.9.6
安装Storm的基本过程
1 | 本实例中Storm具体运行环境如下: |
1 | 安装Storm的基本过程如下: |
第一步:安装Java环境
1 | •Storm 运行需要 Java 环境,可选择 Oracle 的 JDK,或是 OpenJDK |

第二步:安装Zookeeper
1 | 到官网下载Zookeeper,比如下载 ―zookeeper-3.4.6.tar.gz‖ |

1 | chown命令让hadoop用户拥有zookeeper目录下的所有文件的权限 |
1 | 接着执行如下命令进行zookeeper配置: |

1 | 将当中的 dataDir=/tmp/zookeeper 更改为 |

第三步:安装Storm(单机)
1 | 到官网下载Storm,比如Storm0.9.6 |

1 | 接着执行如下命令进行Storm配置: |

1 | 修改其中的 storm.zookeeper.servers 和 nimbus.host 两个配置项,即取消掉 |
1 | 然后就可以启动 Storm 了。执行如下命令启动 nimbus 后台进程: |
1 | 同样的,启动 supervisor 后,我们还需要开启另外的终端才能执行其他命令 |

第四步:关闭Storm
1 | 之前启动的 nimbus 和 supervisor 占用了两个终端窗口,切换到这两个终 |
运行Storm实例
1 | Storm中自带了一些例子,我们可以执行一下 WordCount 例子来感受一 |
