SparkStreaming写入HBase的实现和优化

时间:2022-05-14 10:45:27

摘 要:海量数据的实时处理不仅要求计算框架快速高效,同时要求流处理过程中产生的中间数据的存储过程同样高效,因此,可通过提高Spark Streaming对中间结果数据的处理速度来提升流处理效率。为提高Spark Streaming处理中间结果的效率,文中选择HBase作为中间数据存储系统,并通过分析Spark Streaming的架构及HBase的存储原理,给出了Spark Streaming向HBase写入数据的方法并进行优化。通过对Spark Streaming存储过程的优化,可以一定程度上提高实时数据的流处理效率。

关键词:Spark Streaming;HBase;大数据;内存计算;流处理

中图分类号:TP274.2 文献标识码:A 文章编号:2095-1302(2016)04-00-03

0 引 言

随着移动互联网、社交网络等领域的快速发展,数据量呈指数式增长,大数据时代全面来临。在这个高速发展的时代,数据的变化速度也越来越快,对数据处理和响应时间的要求也更加苛刻,数据的实时分析和流式处理变得尤为重要。例如,在移动通信领域,对海量数据进行实时的挖掘分析,可以准确识别类似于诈骗的电信请求,从而有效避免电信诈骗的发生。再比如,通过对移动人口数据的实时挖掘分析,快速预测可能的突发事件。Spark Streaming是建立在Spark上的实时计算框架,拥有基于内存的高速执行引擎,并且提供丰富的接口和API,被广泛用于实时数据流的分析处理。

对海量实时数据进行处理,必然会产生大量的中间数据,如何高效存储Spark Streaming处理过程中产生的数据也是大数据处理过程中常见的问题。HBase的LSM树型存储结构使其具有实时读写数据的功能。使用HBase作为Spark Streaming中间数据的存储数据库可大大提高数据处理的效率。

本文基于实时数据的流式处理过程,给出了Spark Streaming将数据写入HBase的具体方法,并在此基础上进行了优化。

1 Spark Streaming简介

Spark Streaming是Spark生态系统的重要组成部分,主要用于实时数据流的处理。Spark Streaming的工作原理是将流式计算分解成一系列短小的批处理作业,本质上也是数据的批量处理,只是将时间跨度控制在数十毫秒到数秒之间。这里批处理的引擎依然为Spark,Spark Streaming将输入数据按照批量大小(此处指时间跨度如1秒)分成一段一段的数据(Discretized Stream),然后每一段数据都会转换成Spark中的弹性数据集(Resilient Distributed Dataset,RDD),最后将Spark Streaming中对DStream的具体操作都转换成Spark中对RDD的操作,并将中间结果暂存在内存中。整个流式数据处理任务可以根据需求对中间数据加以利用,比如叠加,或者将结果存储到外部设备,例如文件系统HDFS,或者外部数据库Hive,HBase。

HBase C Hadoop Database是Apache Hadoop的一个子项目,是一个高可靠性、高性能、面向列、可伸缩的分布式存储系统。HBase采用LSM树的存储结构,这种结构的核心在于每一次执行插入操作时数据都会先进入MemStore(内存缓冲区),当MemStore达到上限时,HBase会将内存中的数据输出为有序的StoreFile文件数据。而在HBase中数据列是由列簇来组织的,所以每一个列簇都会有对应的一个数据结构,HBase将列簇的存储数据结构抽象为Store,一个Store代表一个列簇。这样在Store中会形成很多个小的StoreFile,当这些小的File数量达到阈值时,HBase会用一个线程来把这些小File合并成一个大File。这样,HBase就把效率低下的文件中的插入、移动操作转变成了单纯的文件输出、合并操作。从而使HBase的读写数据速度非常快,能够支持实时读写。所以在对海量实时数据进行处理时通常使用HBase作为数据存储系统。

2 Spark Streaming写入数据到HBase

2.1 实现方法

Spark Streaming向HBase写入数据时需要对每一条数据执行插入操作,通常会采用输出方法foreachRDD(func),将func(此处指将数据插入HBase表格)作用于DStream的每一个RDD。

在上述代码中,countBase为待处理的DStream,首先对countBase进行foreachRDD操作,然后对每个RDD进行操作。此处依据项目需求对每个RDD进行非空判断,然后对每个RDD执行foreach操作,进而对RDD的每条数据record调用writeToHBase方法,实现数据写入HBase表格。其中zkQuorum为HBase的zookeeper服务的主机名配置信息,row为HBase表的行键,family为表的列簇,key为表的列,value为列的值。writeToHBase方法为自定义的将数据写入HBase的方法。

由上述代码可看出,在向外部HBase数据库写数据时,通常要先创建与数据库的连接,并获取HTable实例,其中HTable为操作HBase表格的接口,通过HTable对象对HBase表格中的数据进行增,删,查询等操作。对RDD的每条数据调用writeToHBase进行写入操作之前先对setTable对象进行序列化,即对每条数据都创建了连接,获取HTable实例的confTable方法具体代码如下,其中tableName为建立连接的表格名称。

2.2 优化方法

方法一成功将DStream数据写入HBase数据库,但是资源开销较大。Sparking Streaming在向HBase写入数据时,必须给每条数据都创建一次连接,获取一个HTable实例,但是创建连接是一项非常耗时的操作,通常耗时数秒才能完成。在资源高度紧张的环境下,每秒都有几千个请求,为每条数据单独创建HTable实例是非常消耗资源的。基于此提出优化方法,减少建立连接与创建HTable实例的次数,从而降低资源消耗,提高数据写入HBase表的效率。方法二的代码如下:

依据方法二依次对countBase执行foreachRDD与foreachPartition操作,为每个分区创建一个confTable对象。对于RDD一个分区内的所有数据,这一个confTable对象是共用的。相比于给RDD中所有数据都实例化一个HTable,方法二明显减少了实例创建次数,大大提升了Spark Streaming向HBase写入数据的性能。

方法一和方法二的操作流程见图2和图3所示。

上图中方形表示一个RDD,每一个椭圆形代表RDD的一个分区Partition,分区里的每个圆形代表RDD的一条数据record。上图展示了DStream经过foreachRDD操作后对每个RDD的操作。由图2和图3可知,方法一对RDD经过foreach操作后对每条数据record都要经过创建连接然后才能写入HBase表格。方法二先对RDD进行foreachPartition操作,然后对一个分区创建一个连接,连接创建后对该分区foreach每条数据record进行写入操作。对比两图可看出优化后的方法创建连接的次数明显比原始方法少。而对于整个处理任务来说,建立连接,实例化HTable对象消耗资源过多,所以优化后的方法二性能大大提升。

HTable是HBase客户端,实现对HBase表的增加 (Create)、查询(Retrieve)、更新(Update)和删除(Delete)。但是HTable适合对单表操作,对读或写操作都不是线程安全的。对于写操作(Put或Delete),如果多线程共享一个HTable实例,写缓冲区可能会被破坏。对于读操作,一些被scan使用的字段同时被多个线程共享,如果此时有Get操作,不能保证数据的一致性。而大量数据的事实分析通常是多线程运作,为了解决线程安全问题,我们使用HTablePool类创建一个HTable的对象池,让多个HTable实例共享一个Configuration,使用时通过getTable方法获取一个HTable对象,然后可以进行各种增加(Create)、删除(Delete)和更新(Update)等操作,使用完后调用close()方法可将HTable对象归还到池中。方法三代码如下:

def confTable(zkQuorum:String,tableName:String): HTable = {

import org.apache.hadoop.fs.Path

val conf = HBaseConfiguration.create()

conf.addResource(new Path(“/etc/HBase/conf/core-site.xml”))

conf.addResource(new Path(“/etc/HBase/conf/HBase-site.xml”))

conf.set(“HBase.zookeeper.quorum”,zkQuorum)

val pool = new HTablePool(conf, SIZE);

val hTable = pool.getTable(tableName);

return hTable

}

由于HTablePool仅仅作为HTable的连接池,里面维护的HTable使用的Configuration是同一个,所以本质上所有的HTable共用同一个HConnection。而对于创建一个连接,HConnection的创建所损耗的资源远远多于创建一个HTable。既然HTable的创建是轻量级的,那么共享一个HConnection的HTablePool实际价值就不大。只要保证HConnection实例是唯一的,全局共享的,然后在每次操作HBase表时根据HConnection对象来重新创建,使用完成之后及时关闭即可。

最简单的创建HConnection实例的方式是HConnectionManager.createConnection()。HConnectionManager是一个不可实例化的类,专门用于创建HConnection。该方法创建了一个连接到集群的HConnection实例,该实例被创建的程序管理。通过这个HConnection实例,可以使用HConnection.getTable(byte[])方法取得HTableInterface implementations的实现。方法四的代码如下:

def confTable(zkQuorum:String,tableName:String): HTableInterface = {

import org.apache.hadoop.fs.Path

val conf = HBaseConfiguration.create()

conf.addResource(new Path(“/etc/HBase/conf/core-site.xml”))

conf.addResource(new Path(“/etc/HBase/conf/HBase-site.xml”))

conf.set(“HBase.zookeeper.quorum”,zkQuorum)

val connection = HConnectionManager.createConnection(conf);

val table = connection.getTable(“tablename”);

return table

}

3 结 语

本文先给出了海量数据的实时流处理过程中将数据存储到HBabe中的方法。通过Spark Streaming直接将中间数据实时写入HBase表格,在解决中间数据存储问题的同时,确保了流处理过程的高效率,并进一步优化此方法,将数据的处理速度大大提高,更大程度的提升流处理的效率。

参考文献

[1] Spark[EB/OL].http:////. 2015

[2] Apache HBase[EB/OL]. http:///. 2015

[3] 卓海艺.基于HBase的海量数据实时查询系统设计与实现[D].北京:北京邮电大学, 2013.

[4] 张榆,马友忠,孟小峰.一种基于HBase的高效空间关键字查询策略[J].小型微型计算机系统, 2012,33(10):2141-2146.

[5] 夏俊鸾,邵赛赛.Spark Streaming: 大规模流式数据处理的新贵[EB/OL]. http:///article/2014-01-28/2818282-Spark-Streaming-big-data. 2014.

上一篇:基于Android的校园订餐系统 下一篇:强化心理健康教育促进学生全面发展