基于时间倾斜的数据流聚集压缩算法

时间:2022-10-26 11:45:18

基于时间倾斜的数据流聚集压缩算法

摘 要: 对于许多应用领域不断产生的数据流,面向数据流聚集查询的应用最为广泛。本文在构造压缩桶的基础上,提出了基于时间维度压缩数据流的算法,来动态地形成压缩数据流,并进一步给出了使用压缩桶获得数据流聚集查询的数学方法。

关键词: 数据流; 压缩桶; 聚集查询; 时间维度

中图分类号:TP393 文献标识码:A 文章编号:1006-8228(2012)04-29-03

Aggregate compression algorithm for data stream

Wang Baojun, Zhan Ying

(Zhejiang Institute of Communications, Hangzhou, Zhejiang 311112, China)

Abstract: In many fields, data stream continues to grow in terms of generation speed. Aggregate query for data stream was most widely used. By constructing compression buckets, the authors provides in this paper a compression algorithm for data stream based on time dimension, in order to dynamically form compression data stream, and give mathematical method of aggregate query for data stream, by use of compression buckets.

Key words: data stream; compression buckets; aggregate query; time dimension

0 引言

数据流是随着网络的广泛应用而出现的一种新的数据形式。数据流聚集查询是数据流管理与知识发现系统中一种重要的数据知识发现模型,但快速流动的流数据与有限的处理能力之间的矛盾使得流数据的聚集查询分析比关系数据库的聚集分析更困难。

目前国内外已经对数据流聚集查询模式展开了研究。Dobra A等人研究利用随机草图技术,提取数据流的轮廓,减少数据的处理量来加快数据处理速度,并提出了一种草图分割技术来提高算法的性能[1]。Gilbert A C等人研究采用小波技术对数据流进行压缩,实现了近似聚集查询[2]。Madden研究了传感器网络中的聚集查询问题,重点是如何动态地建立路由树,实现流水线聚集操作[3,4]。Ahnad Y提出了数据流查询的分布式操作[5]。张冬冬等人提出了一种新的数据流传输方式,有效地减少网络中分布式数据流的传输量[9]。傅鹂等人建立了基于数据流驱动的数据流连续查询模型,设计并使用查询算子在查询链中的有序组合来构造出各种复杂的连续查询语句[7]。李建中等人提出利用多元线性回归方法来预测具有线性关系的数据流的未来聚集值,但如果数据不具有线性关系,该模型误差就会增大[10]。

以上的数据流聚集查询相关算法采用近似聚集、压缩数据流等技术来提高查询速度。由于数据流的“流”性和随机性,使得流量的变化具有突发性,然而,商业活动中,普遍要求能够实时地检索面向数据流的聚集查询结果,并获得更高的准确率。

1 数据流压缩

1.1 相关问题描述

数据流是一个以数据到达时间为戳的数据序列。流数据的聚集查询分为预定义查询(Predefined Query)和即席查询(Real-time Query)两类。预定义查询主要针对数据流后续到来的数据计算查询结果;而即席查询是针对数据流中流过的所有数据。数据流源源不断地流入系统,因此无法将所有数据流保存起来,为了获得更为准确的即时查询结果,在聚集查询中,需要对数据流进行压缩。由于数据流动态振荡流动,面向数据流的数据流聚集查询系统无法存储所有流数据,而用户有查询分析过去与未来流数据的需求,因此需要不断地压缩数据流,来满足用户需求。压缩后的数据流结构应该是简单的,方便为用户提供各类流数据聚集查询,并能够最大程度地反映原始流数据。压缩后的数据流结构是对压缩后的数据流的静态特征的描述,它描述数据的内容和流数据之间的相互关系。

由于数据流连续无限地流动,数据流具有时间特征,因此可以在时间维度上压缩数据流。本文采用基于对数尺度的时间倾斜框架模型[8]来压缩数据流。面向数据流压缩算法以增量的方式对压缩数据流进行更新,从而提高数据流的压缩速度,满足数据流聚集查询的实时性要求。用户会根据需求向系统提出多种聚集查询,这要求压缩数据流尽可能地反映原数据流的信息。随着时间的流逝,流过的流数据被不断地压缩,历史流数据被不断地抛弃。

1.2 相关定义

定义1. 设PT为时间分区长度。构造压缩桶Buckcets(BuckcetsID=0…n),压缩桶有三个抽屉drawer(drawerID=0…2),每个抽屉存放流数据的时间长度为2 BuckcetsID×PT。压缩桶的结构如图1所示。其中每个桶的2号抽屉是临时存储单元。如果0号抽屉是空的,则同一个桶的1号抽屉也空。

设i(i=0…n)为压缩桶的编号, i号桶中的抽屉存储流数据的时间长度为2i×PT。每个压缩桶的第0号与第1号抽屉存放流数据,2号抽屉是临时存储空间,只有当这个桶中的第0号与第1号抽屉非空,此时只能将新流入的流数据临时存放到2号抽屉,系统合并此桶的第0号与第1号抽屉,并推入下一桶后,新流入到2号抽屉的流数据被转移到同一桶的0号抽屉。例如,第0号桶流入第3个PT时间长度的流数据,而第0号桶的第0号与第1号抽屉已经分别存储了第1个和第2个PT时间长度,系统压缩第0号桶的第0号与第1号抽屉,并将流数据推入第1号桶的第0号抽屉后,第3个PT时间长度的流数据才可以流入第0号桶的第0号抽屉。也就是说,桶号为i的流数据来源于桶号为i-1的桶,系统压缩第i-1号桶的第0号与第1号抽屉,并将流数据推入第i号桶的第0号或第1号抽屉。压缩桶间的数据压缩与流动示意图如图2所示。

图1 压缩桶的结构

图2 压缩桶间的数据压缩与流动示意图

引理 保存流数据的最大时间长度为LongTime,MaxBCount为保存LongTime时长的流数据所需压缩桶的数量。则

证明:设m个桶最多可以存储流数据的时间长度为MaxT(m),则

MaxT=(2×20+2×21+…+2×2m) ×PT

所以MaxT(m)=(2m+1-2)×PT

设m-1个桶最多可以存储流数据的时间长度为MaxT(m-1),则

当时间长度LongTime满足:

MaxT(m-1)<LongTime≤MaxT(m)

则存储时间长度LongTime的流数据至少需要m个桶。所以:

证毕。

1.3 数据流压缩算法

以商业零售实际业务数据流为例,本文将探索针对数据流的聚集查询与压缩方法。商业零售数据流结构如下:sale(ProductID,OrderQty),sale是超市商业零售数据流,ProductID表示产品编号,OrderQty表示订货量。用户根据需求提交各类查询,并请求实时获得各类查询结果。例如,系统根据用户提交的产品号 ProductID,选择相关产品进行压缩。

定义初始数据流结构:

Datasourse(timestamp;productID;orderqty),timestamp记录了流数

据到达的时间点。

定义压缩后数据流的数据结构:

Compresssourse(starttime; productID; maxorderqty;minorderqty;sumorderqty; countorderqty),starttime表示压缩的初

始时间; maxorderqty表示订货量的最大值;minorderqty表示订

货量的最小值;sumorderqty表示订货量的总和; countorderqty表

示订货次数。

算法1:数据流压缩算法

输入:初始数据流。

输出:经过压缩后的数据流存储在桶中,每个抽屉存储压缩后的数据流。

定义桶的数据结构:

public struct buckets

{public compresssourse drawer0;

public compresssourse drawer1;

public compresssourse drawer2;}

根据需存储的最大时间长度,计算需要的桶数MAXBcount;

定义桶DataS:

buckets[] DataS = new buckets[MAXBcount];

初始化桶中的所有抽屉;定义记录时间长度的变量feng;定义时间分区PT;

While(true)

{根据用户提交查询的产品号ProductID 获取原始数据流;

获得产生数据流的当前时间;

if(接收的是第一个流数据)

{压缩后直接推入0号桶2号抽屉,它的starttime为被推入流数据的

timestamp。接着进入下一循环等待下一个流数据;}

计算新流入的流数据的timestamp与0号桶2号抽屉的starttime相隔时间feng:

if (是同一个时间分区feng < PT)

{压缩同一时间分区内的数据到0号桶2号抽屉;

回到循环开头,继续读下一个数据;

continue;}

else

{ 记录当前桶号码;

while (DBcount < MAXBcount)

{if (桶0号抽屉有空)

{将桶2号抽屉的数据移到桶0号抽屉;

break;}

else

{if (桶1号抽屉有空)

{将桶2号抽屉的数据移到桶1号抽屉;

break; }

else

{if (不是最后一桶)

{将该桶的0号与1号抽屉合并后放入下一桶中的2号抽屉;

该桶的0号与1号抽屉变空;}

else

{丢弃该桶的0号与1号抽屉;}

合并后,该桶0号抽屉空出来,放入该桶2号抽屉的流数据; }}}

if (0号桶2号抽屉空))

{将新读入的数据放入0桶2号抽屉;

重新设置starttime;}

else {break;}}}

2 获得压缩桶状态的数学方法

当用户向系统提出面向数据流的查询请求时,系统首先判断流数据被压缩到哪些桶中,而压缩流数据存储了最大值、总和等聚集值,使得用户获得聚集值变得非常方便。

在压缩过程的任意时刻,用户均可能提出获得流数据的聚集值,这要求系统能够迅速判断各个桶的状态,也就是每个桶中的0号抽屉或1号抽屉是否存储了压缩数据。

假设j为最后流入桶中的时间分区流数据,求每个桶中含有数据的抽屉数。存储第j个时间分区,需要BCount个桶。则:

, ⑵

如果,BCount大于MaxBCount,则从MaxBCount+1到BCount号桶的流数据被丢弃。所以,

,j∈N+。

则,

, ⑶

其中ai的取值仅为0或1,表示第i个桶中有ai+1个抽屉有流数据。ai=0表示0号抽屉存储了压缩流数据,ai=1表示0号与1号抽屉存储了压缩流数据。

例如j=33,表示持续流入数据流的时间长度为33×PT个时间长度。根据公式⑵,此时需要的桶数为5。根据公式⑶,得到33-25+1=2。则2=0×20+1×21+0×21+0×21+0×21,由此,我们可以得到压缩桶的状态为0号桶、2号桶、3号桶、4号桶的0号抽屉存储了压缩数据,1号桶的0号与1号抽屉存储了压缩数据。

3 结束语

本文提出了在时间维度上压缩数据流的方法:不断流入压缩桶的流数据被不断地以2为底的对数尺度进行压缩。实验表明,压缩桶结构在满足了压缩数据的存储需求的同时,大大减少了存储空间,桶中的压缩数据能够随着时间不断地更新,基于时间倾斜的数据流压缩算法能够提高数据流的压缩速度。能够满足数据流聚集查询的实时性要求,也能够提高数据流动态聚集查询的效率及灵活性。

参考文献:

[1] Dobra A,Garofalakis M,Gehrke J,et a1.Processing Complex Aggregate Queries over Data Streams[C].Proceedings of the 2002 ACM SIGMOD International Conference on Management of Data,M acIison.W isconsin.2002.

[2] Gilbert A C,Kotidis M uthukrishnan S M ,et a1.Surfing Wavelets on Streams: One―pass Summaries for Approximate Aggregate Queries[C] .Proceedings of the 27th International conference on Very Large Data Bases.2001

[3] Madden S R,Franklin M J,Hellerstein J M ,et a1.TAG :A Tiny Aggregation Service for Ad―hoc Sensor Networks[C] .Proc.of the 5th Symp.on Operating Systems Design and Implementation,Boston,USA 2002.

[4] Madden S R.Szewczyk R.Franklin M J.et a1.Supporting Aggregate Queries Over Ad―hoc Wireless Sensor Networks[C].Proceedings of the Workshop on Mobile Computing and Systems Applications.Los Alamitos:IEEE Computer Press.2002.

[5] Ahnad Y,Berg B,Cetintemel U,et a1.Distributed operation in the borealis stream processing engine[C].Proc of ACM SIGMOD Conference.Baltimore:[s.n.],2005:882~884

[6] 詹英,吴春明,王宝军.一种与缓冲区紧耦合的环形循环滑动窗口的数据流抽取算法[J].电子学报,2011.39(4):2262~2267

[7] 傅鹂,鲁先志,蔡斌.一种基于数据流驱动的数据流连续查询模型[J].重庆工学院学报(自然科学),2008.22(10)

[8] Jiawei Han,Micheline Kamber.Data Mining Concepts and Techniques[M].China Machine Press.

[9] 张冬冬,李建中,王伟平,等.分布式复式数据流的处理[J].计算机研究与发展,2004.41(10):1780~1785

[10] Li Jian -zhong,Guo Long-jiang,Zhang Dong-dong,et a1.Processing algorithms or predictive aggregate queries over data streams[J].Journal of Software,2005.16(7):1251~1261

上一篇:DELL XPS 14Z 下一篇:计算机软件专业人才培养模式的创新与实践