• 2.10 MB
  • 2022-04-22 13:43:23 发布

一种应对云环境下在线聚集的数据倾斜方法.pdf

  • 17页
  • 当前文档由用户上传发布,收益归属用户
  1. 1、本文档共5页,可阅读全部内容。
  2. 2、本文档内容版权归属内容提供方,所产生的收益全部归内容提供方所有。如果您对本文有版权争议,可选择认领,认领后既往收益都归您。
  3. 3、本文档由用户上传,本站不保证质量和数量令人满意,可能有诸多瑕疵,付费之前,请仔细先通过免费阅读内容等途径辨别内容交易风险。如存在严重挂羊头卖狗肉之情形,可联系本站下载客服投诉处理。
  4. 文档侵权举报电话:19940600175。
'˖ڍመ੾᝶஠ڙጲhttp://www.paper.edu.cn一种应对云环境下在线聚集的数据倾斜方法孟小峰,任玮,杨晨,王春凯,慈祥中国人民大学信息学院,北京市 邮编100872摘要:数据倾斜不是在线聚集特有的问题,但是在线聚集对数据的处理方式带来了一些新的问题。尤其是在云环境下实现在线聚集时,存在一种比较特殊的数据倾斜。这种数据倾斜的产生是由于云环境中数据基于块的存储方式以及在线聚集基于采样的处理方式。由于云环境较为复杂,涉及到很多不同的因素。如果在对整个数据处理流程建模基础上进行统计结果的修正,将会使问题进一步复杂化。因此本文尝试从另一个角度出发,不再考虑各种复杂的因素,而是直接对数据序列通过再采样的方式恢复其随机性,从而消除其带来的结果估计的不准确性。实验表明,本文提出的方法能在很大程度上保证统计结果更为准确,且同处理前的方法相比,处理效率并未有损失。关键词:在线聚集;采样;数据倾斜中图分类号:TP391DataSkewMethodforOnlineAggregationinCloudEnvironmentMengXiao-Feng,RenWei,YangChen,WangChun-Kai,CiXiangDepartmentofInformation,UniversityofRenmin,Beijing100872Abstract:Dataskewisnotaspecificproblemofonlineaggregation.Howevertheproblemofonlineaggregationwillbringanewproblemfordataprocessing.Especiallyinthecloudenvironmenttoachieveonlineaggregation,thereisaspecialdataskew.Thiskindofdataskewisduetothedatablockbasedstorageinthecloudenvironmentandtheonlineaggregationbasedsamplingprocessing.Duetothecomplexityofthecloudenvironment,involvingmanydifferentfactors.Iftheresultsoftheentiredataprocessingmodelbasedonthestatisticalresultsbeingcorrected,itwillfurthercomplicatetheproblem.Thispaperattemptsfromanotherperspectiveandnolongerconsideravarietyofcomplexfactors.Thispaperwilldirectlyrestoredatasequencerandomnessbyre-sampling,theneliminatingtheresultestimationinaccuracy.Theexperimentalresultsshowthatthemethodproposedinthispapercanguaranteetheaccuracyofthestatisticalresultstoagreatextent.Comparedwiththepreviousmethods,thetreatmentefficiencyisnotlost.Keywords:OnlineAggregation,Sampling,DataSkew基金项目:教育部高等学校博士学科点专项科研基金(20130004130001)作者简介:孟小峰(1964-),男,教授,主要研究方向:大数据融合,存储以及隐私保护-1- ˖ڍመ੾᝶஠ڙጲhttp://www.paper.edu.cn0引言任何数据处理都无法绕过的一个问题就是数据分布,数据分布对于数据处理的效果而言常常有着至关重要的作用。很多研究问题中都假设数据分布是均匀的,当然这种假设经常是成立的。但是在更多的实际场景中,这种假设很难满足。比如我们经常说在社交网络的研究中,人的各种行为产生的数据都是符合“幂律”(PowerLaw)的。而符合“幂律”的数据就带有非常典型的长尾分布特征。在处理这种类型的数据时,我们常用的针对均匀分布的方法常常会出现一些问题。例如在数据库的查询优化中,需要经常用到数据的分布信息(可以是精确的,也可以是近似的)。如果数据分布均匀,这种信息的构建和维护相对较易。除了分布信息的构建和维护,针对均匀数据本身的优化也相对较易。而针对分布不均的数据,很多时候需要采用一些特殊的处理方法来应对。不同领域的学者可能对分布不均的数据研究的关注点不同。就数据库,或者说数据管理领域而言,通常会用数据倾斜(DataSkew)来指代不同应用场景下数据分布不均的情况。本文的贡献可以总结为以下四点:1)通过对云环境下在线聚集处理流程,特别是采样过程的仔细分析,发现云环境下在线聚集存在的一种特殊数据倾斜。这种数据倾斜不仅会导致处理时间增加,更重要地是会导致统计结果出现误差。2)从对数据序列随机性恢复的角度出发,提出一种基于接受/拒绝采样的简便方法以便在一定程度上消除本文所研究的数据倾斜。3)对于接受/拒绝采样的关键性参数进行了定义和证明。4)将本文提出的方法在原型系统COLA中予以实现,实验结果证明了方法的有效性。1相关工作数据倾斜在不同的研究领域,其含义可能会发生一些变化。就数据库领域而言,根据数据库系统百科全书(EncyclopediaofDatabaseSystems)的定义[1];“数据倾斜主要是指数据集中数据的不均匀分布。倾斜的数据可以服从一些常见的分布(比如Zipfian、Gaussian和Poisson),但是很多研究都用Zipfian分布来进行倾斜数据集的建模......数据倾斜对于数据库复杂查询并行执行的最直接影响就是由不均衡的负载导致的高响应时间”。下面会分别对关系数据库领域的数据倾斜以及云环境下数据倾斜问题的相关研究工作进行介绍。1.1关系数据库中数据倾斜问题的研究关系数据库领域很早就注意到数据倾斜的问题。考虑到数据倾斜产生的特点及其产生影响所需要的环境,关系数据库中对于数据倾斜问题的研究主要集中在并行数据库领域,且主要关注涉及连接(Join)操作时的数据倾斜问题。在文献[2]中,作者提出了在并行数据库环境下四种处理数据倾斜的方法,比如利用rangepartition方法来处理两表数据倾斜情况等。文献[3]利用直方图对数据集的倾斜程度进行评估,用CPU和I/O建立连接算法的代价模型,-2- ˖ڍመ੾᝶஠ڙጲhttp://www.paper.edu.cn在此基础上对之前提出的算法进行了扩展和改进。文献[4]对并行连接操作中的数据倾斜类型和产生原因进行了详细地分类和介绍,在此基础上提出利用PRG算法来处理数据倾斜连接。文献[5]中提出了在无共享(ShareNothing)结构下的数据倾斜连接算法优化,目标是使得系统的各个节点达到负载均衡且网络传输量最小。文献[6]在之前的基础上,对数据倾斜的类型、产生原因以及其对数据库系统操作的具体影响都进行了详细介绍。除了对连接操作的数据倾斜问题研究之外,文献[7]和[8]还对涉及到分组及聚集操作的数据倾斜提出了解决方案。1.2云环境下数据倾斜问题的研究随着大数据时代的到来,很多领域的数据处理已经无法构建于传统的关系数据库之上,越来越多的数据处理开始转向云环境。在各种云环境的处理框架中,MapReduce应用最为广泛。当前对于云环境下数据倾斜问题的研究基本都是围绕着MapReduce处理框架。虽然在并行数据库中对数据倾斜的问题已经有了较为深入的研究,但是这些方法并不都能够直接应用到MapReduce环境。根本原因还是在于关系数据库和MapReduce系统对于数据处理方式的差异上。比如在数据库中,具有相同分割属性的元组不必分配在同一组进行处理。而MapReduce中所有具有相同key的值必须分配在同一个Reduce任务上。文献[9]分析了MapReduce中分区倾斜产生的影响。文章作者通过研宄发现类似Zipf分布的输入或者中间数据的分布不均是导致数据倾斜的主要原因。MapReduce架构的提出者在文献[10]中提出可以利用备份任务的方法来解决系统执行时间过长的问题,而这种时间执行过程的本质就是在数据倾斜出现的情况下,有些任务所要执行的任务量较大造成的。文献[11]通过对实际运行的应用进行分析,将MapReduce环境的数据倾斜分为Map端的数据倾斜和Reduce端的数据倾斜。文中同时还对每一种情况给出了一些解决方法和建议。但是对于这些解决方法的效果如何并未做实际的验证。文献[12]则将MapReduce下的数据倾斜更进一步细化为RecordSizeSkew、PartitioningSkew和ComputationalSkew三种类型,同时也给出了一些简单的应对方案。文献[13]提出了名为Mantri的系统。该系统的基本着眼点在于MapReduce作业的调度系统。希望通过改进其调度系统来减轻在实际作业运行时倾斜所带来的问题。从文中可以发现Mantri系统能够较好地应对一些确定类型的倾斜,比如某些输入数据的执行时间过长所引起的倾斜,或者分区倾斜等。但是从整体上来看Mantri系统并没有提出完整解决数据倾斜的方法。文献[14]提出并实现了skewtune系统,用来解决MapReduce倾斜问题。Skewtune系统对运行的任务进行监控,根据估计运行时间的长短来判定系统当前状态下的异常任务。当系统中有任务执行完毕,也就是说有多余的系统资源可以进行计算时,就将当前异常任务的部分数据分配至空闲的计算资源来计算。文献[15]提出了基于MapReduce处理倾斜数据时负载不均衡问题算法。基本思想是在创建MapReduce任务时创建比实际Reducer更多的分区。然后在Reduce阶段使用贪婪算法决定数据分区到Reducer的对应关系。可以将多个分区对应一个Reduce任务,借此来达到使每个Reducer处理量负载均衡的目的。在文献[16]中,作者提出了一个启发式的算法LEEN改变分区策略。LEEN会不断记录每一个节点上当前接收到的数据量,以此来保证Reduce端能够实现负载均衡。文献[17]提出两种方法来解决分区倾斜。第一种是将新到来的数据直接分配到当前负载最小的机器上,这-3- ˖ڍመ੾᝶஠ڙጲhttp://www.paper.edu.cn是一种贪婪算法。第二种则利用数据采样,根据样本中数据的分布情况决定当前数据被分配到哪台机器。文献[18]解决数据倾斜的思路跟文献[19]的第二种方法类似,也是利用数据采样得到的信息来辅助数据分配。文献[20]重点解决了MapReduce数据倾斜导致大量数据不断从内存刷新到磁盘的问题,基本思路是在系统中构建一个大的共享内存池。通过这个共享内存池来存储数据倾斜导致的某些节点过大的数据量。文献[21]利用机器学习的方法对集群中不同任务的执行时间进行估计,然后利用这个估计的时间动态地调整不同任务上所承担的数据量。以上各种方法都从不同的角度在一定程度上解决了MapReduce数据倾斜的问题,但是这些方法基本都是针对单个数据集进行处理,并没有考虑多个数据集的情况。比如在利用MapReduce实现连接操作时如何解决数据倾斜问题。文献[22]针对两表链接的数据倾斜问题,提出一种基于rangepartition的方法优化两表连接方法。但是如果一个数据集中某些数据出现次数远远高于其他数据的出现次数,文中提出的的方法就不太适用。目前专门针对云环境在线聚集的数据倾斜的研究不是很多。文献[23]提出对数据进行预先的随机化以便在一定程度上避免在线聚集的数据倾斜。文献[24]在原本的在线聚集处理系统之上构建一个新的处理层,该层所要完成的任务就是对输入的数据实现在线的随机化。出于效率的考虑,该处理过程是全部在内存中进行的。文献[25]则针对在线聚集环境下的数据倾斜,提出了一种称为“keep-order”的解决方法。但是该方法非常简单,可以适用的场景有限。总的来看,无论是在关系数据库还是在云环境下,数据倾斜问题的研究一直都很受关注。相对而言,关系数据库的数据倾斜问题研究较为透彻。而对于云环境下,尤其是云环境下在线聚集的数据倾斜问题的研究还不是非常深入,仍存在不少亟待解决的问题。2问题定义与分析前面章节对各类数据倾斜问题进行了归纳和总结,对于云环境下数据倾斜问题的成因、分类以及已有的一些解决方案等也做了详细的介绍。但是云环境下的在线聚集存在一种特别的数据倾斜问题,造成这种问题的根本原因是由云环境的存储方式以及MapReduce框架处理数据的方式造成的。下面会详细介绍这种特殊数据倾斜对于云环境下在线聚集的影响,并对本节所要研究的问题进行一个正式的定义。2.1云环境下在线聚集数据倾斜的产生原因前文对各类数据倾斜的问题,对云环境下在线聚集而言,也均有存在。但相关的研究已经较多,因此本文重点关注下面介绍的一种特殊的数据倾斜。这种数据倾斜在云环境下正常的的处理方式中是不存在的。传统的在线聚集基本都是基于关系数据库,采样的单元是元组。考虑到存储效率等问题,云环境下的在线聚集基本都是基于块级别的采样。单从采样粒度上来看,块是远大于元组的,一个块内可能包含成百上千,甚至更多的元组。为了实现随机化采样,可以采用随机扫描或者顺序扫描的方式。对于原始的数据而言,通过对整个数据集的随机扫描可以得到一个随机化的样本。如果数据在采样前已经通过某种方式进行了随机化,那么顺序扫描就可以得到随机化的-4- ˖ڍመ੾᝶஠ڙጲhttp://www.paper.edu.cn样本。就在线聚集而言,通过数据的事先随机化,然后进行顺序扫描的方式更加适合。目前在线聚集领域的研究一般也是基于上述假设,采用的扫描方式是顺序扫描。图1和图2分别显示了关系数据库环境下基于元组的采样和云环境下基于块的采样过程。假设t1、t2、……、tn是原始的随机输入序列,而对序列ti处理得到结果为ti’。采样器依次从序列中读取数据并进行相关的处理。在以元组为单元的采样情况下,采样后的数据序列是t1’、t2’、……、tn’。也就是说采样结果序列和输入数据序列的对应关系是一致的。原始的输入序列是随机的,由于这种保序特性,经过采样器之后的输出序列仍是随机的,这种随机性就保证了后续结果估计等的正确性。图1:关系数据库下基于元组的采样图2:云环境下基于块的采样-5- ˖ڍመ੾᝶஠ڙጲhttp://www.paper.edu.cn云环境下基于块的采样在基本流程方面和关系数据库环境下类似。假设b1、b2、……、bn是原始的随机输入序列,对序列bi处理得到结果为bi’。但是与关系数据库下基于元组的采样不同的是,采样结果序列和输入数据序列的对应关系并不一定一致。也就是说先采样的数据不一定最先出现在结果序列中。主要的原因会在下文介绍。这种不一致会导致采样结果序列的随机性被破坏。因为原始的序列是随机的,但是跟原始序列顺序不一致的序列就不一定是随机的。这种不保序性会直接导致后续结果估计的偏差,因为这些估计都是基于一个共同假设:处理的数据序列是随机的。2.2问题定义前文已经解释了云环境下在线聚集这种特殊数据倾斜出现的原因,下面对这种数据倾斜问题进行一个详细地定义:在线聚集的实现主要依赖于数据采样,而采样的准确性则需要输入序列的随机性予以保证。一般来说,可以通过顺序读取一个随机序列中的数据来实现随机采样。由于关系数据库的采样是元组级别的,因此可以保证采样后的数据的顺序不会发生变化。但由于云环境下的数据存储一般是块级别的,一个块可以包含大量的元组。对于不同的查询而言,块中包含的有效数据也会不同,这就导致处理时间的不同。因此在原始随机序列中排序靠前的数据块可能会由于有效数据较多,而导致处理较慢,进而在处理后的数据序列中的排序位置发生变化。这种变化意味着处理后数据序列的随机性无法保证。这种由于数据块中包含有效数据量不一致,从而破坏序列随机性的数据倾斜问题就是本文所要研究的问题。3基于再采样的数据随机性恢复对于本文研究的数据倾斜而言,一般会有两种解决思路:一种思路是通过一定的方法保证采样之后的数据序列仍旧是随机的;另一种思路则是在采样后序列无法保证随机的前提下尝试恢复其随机性。对于第一种方法而言,有一些很直接的思路,比如强迫采样后的序列顺序仍和采样前的序列顺序保持一致,也就是说采样前靠前的数据在采样后的序列中仍旧靠前。但是这种方法常常会增加处理时间,导致处理效率非常低。本文主要从后一种思路出发,基于再采样的思想,来实现数据序列随机性的恢复。3.1接受/拒绝采样采样的方法有很多种。最终我们选择其中的接受/拒绝采样(Acceptance/RejectionSam-pling,A/RSampling)作为我们的再采样方法。一方面这种方法实现较易,另一方面这种方法可以在线化地进行采样,无需多个阶段的反复操作。接受-拒绝采样的基本思想是:当需要对分布f(X)进行采样但却很难直接进行时,可以通过另一个容易采样的分布g(X)进行模拟需要采样的分布,其中g(X)需要满足:形状上接近f(X),且对于8x,有成立。具体的采样步骤为:1)对g(X)采样得到一个样本x,x-g(X);-6- ˖ڍመ੾᝶஠ڙጲhttp://www.paper.edu.cn2)从均匀分布中采样得到一个样本u,u-U[0,1];3)如果u6f(x)/[Mg(x)],则认为x是有效样本,接收之,否则拒绝该样本;4)重复上述步骤,直到得到足够的样本。下文会对接受/拒绝采样的关键性问题进行解释。3.2采样序列随机性的恢复如图3所示,接受/拒绝采样的过程在reduce任务开始之前。在一般情况下,由于包含较少有效数据的块被处理时间的更短,因此在传输到reduce端的样本中,这类块出现的概率相对更高。但对于我们的估计而言,这些数据的价值偏低,且如果将这些块用于估计,会直接导致估计结果偏低。因此接受/拒绝采样的过程就被用于修正任意数据块被reduce端接受的概率,从而保证每个数据块具有相同的概率出现在reduce端,而不是有效数据更少的块出现的概率更大。具体来说,当map端产生新的输出结果时,图3中的A/Rsampler就会随机产生一个01之间的随机数u,如果该随机数u小于或等于接受概率,则允许其进入到reduce阶段的后续处理,否则直接丢弃。图3:增加接受/拒绝采样后的处理流程为了保证上述方法具有正确统计学意义的同时也能给出准确的估计结果,我们需要进一步完成如下三个方面的工作:1)证明利用接受/拒绝采样产生的块级别样本,每个块被抽中的概率相同。2)如何避免过度的数据修正。3)如何保证处理完所有数据后的最终结果和实际的结果是一致的。4系统实现在实际处理中,图3中的A/Rsampler的实现主要基于一个MapReduce作业来完成。map函数、combine函数以及reduce函数的设计分别如算法1、2和3所示。需要说明的-7- ˖ڍመ੾᝶஠ڙጲhttp://www.paper.edu.cn是combine函数除了需要统计该数据块的聚集结果(blocksum)之外,还需统计数据块中包含的有效数据数目blockcount。如图3所示,在reduce任务开始之前,shuffle阶段的A/Rsampler会对每个到达shuffle缓冲区的map输出结果进行接受/拒绝采样。当shuffle缓冲区新读取到一个map输出结果时,首先获取键值对值域部分中的blockcount,并基于此计算出该map输出结果的接受概率。然后系统随机产生一个0-1之间的伪随机数,与接受概率进行比较,如果满足接受条件,则该输出结果进入reduce函数进行后续处理,否则丢弃该map输出结果。Algorithm1算法1:Map函数Require:tuplet;Ensure:(key,value);1:iftsatisfiesthepredicatethen2:key.set(t.cols);3:value.set(expp(t),0);4:endif5:output.collect(key,value);Algorithm2算法2:Combine函数Require:key,list;Ensure:(key,value’);1:whilelist.hasNext()do2:it=list.getNext();3:blocksum+=it.get_first();4:blockcount++;5:endwhile6:value’.set(blocksum,blockcount);7:output.collect(key,value);-8- ˖ڍመ੾᝶஠ڙጲhttp://www.paper.edu.cnAlgorithm3算法3:Reduce函数Require:key,list;Ensure:(key,value’);n:numberoftuplesprocessed;:aggregationestimate;":confidenceintervalhalf-width;1:whilelist.hasNext()do2:it=list.getNext();3:sum+=it.get_first();4:quadraticsum+=it.get_second();5:endwhile6:variance=quadraticsum/n–sum*sum/n*n;7:=sum/n;8:"=Z*sqrt(variance)/sqrt(n);9:value’.set(,");10:output.collect(key,value’);5实验结果与分析5.1实验环境和数据集实验测试平台是由9个节点构成的集群,其中一个节点作为主节点(Master),其余8个节点作为从节点(Slave)用作数据存储和查询处理,节点之间通过1Gbit以太网交换机互连。每个节点拥有2个Xeon的四核CPU(2.40GHz),内存和磁盘空间分别为32GB和8TB。我们设置HDFS的块大小为默认值64M,集群中每个节点上同时运行2个mapper和1个reducer。实验同时使用真实数据集和模拟数据集来进行测试。整个实验一共使用了如下三个不同的数据集。1)真实的Wikipedia网页访问日志。该数据集每一行为一条记录,包含网页语言(language)、网页名称(pagename)、访问次数(pageviews)和网页字节数(bytes)四个属性列。实验所采用的网页访问日志数据量大小为320G,在HDFS中占用约5170个数据块。基于上述Wikipedia数据集构造的访问日志信息和网页参考信息。基于原始的Wikipedia访问日志数据,我们构造了两个表:网页参考信息表pageinfo和访问日志表visitlog,网页相关信息包含两个属性列:网页名称(pagename)和网页语言(language)。访问日志数据包含三个属性列:网页名称(pagename)、访问日期(date)和访问次数(pageviews),其中网页名称和访问日期构成表的主键。网页参考信息表pageinfo和访问日志表visitlog的数据量大小分别为30GB和300GB。-9- ˖ڍመ੾᝶஠ڙጲhttp://www.paper.edu.cn3)基于Zipfian分布模拟的倾斜数据集。为了准确衡量本文方法对于倾斜数据的效果,构造一组满足Zipfian分布的数据集。其中Zipf参数c分别设置为0,0.2,0.4,0.6,0.8以及1.0,因此一共生成6组数据集。数据集的基本构成类似,包含网页语言、网页名称、访问次数和网页字节数四个属性列,每条记录长度为40B。其中访问次数属性列是随机产生的整数(长度为4B),取值范围为0100。通过把网页字节数属性列设置为不同的值来控制过滤谓词的选择度。数据块中包含的有效数据数目呈现Zipfian分布,即最大的数据块包含的有效元组数与1/1c成正比,第二大数据块包含的有效元组数与1/2c成正比,第三大数据块与1/3c成正比,以此类推。每个数据集的大小约为70GB。本文实验主要运行如下查询Q来检验方法的实际效果。Q=SELECTlanguage,SUM(pageviews)FROMwikilogWHEREbytes>10000GROUPBYlanguage5.2查询误差与置信区间对比为了对比分析随机采样技术以及数据倾斜处理机制的重要性,本部分实验对4种不同方法进行对比。non-randomized,不进行随机采样,直接从源表中顺序读取数据进行处理;randomized,采用随机采样技术,但不对数据倾斜问题作任何处理;keep-order,类似文献[25]中强制保序的数据倾斜处理方法;accept-reject,本文提出的接受/拒绝采样,其中调整因子=0.95。为了便于理解,简单解释下文献[YXY13*]中的keep-order方法。从上文的分析可以知道,本文研究的数据倾斜主要是由于随机性被破坏导致的。因此一种很自然的想法就是让处理后的数据序列中的数据排序和原始的数据序列保持一致。如果原先靠前的数据未出现,则继续等待,这就是keep-order方法的基本思想。针对以上四种不同方法,我们在同样的数据集上执行查询Q。图4和图5是查询执行过程中针对英语查询的结果。图4展示的是没有进行随机采样non-randomized版本的实验结果,图中显示了聚集查询估计值和相应的置信区间上下边界。图中直的虚线是实际的真实值。从图4中我们可以发现存在着严重的误差。绝大多数情况下置信区间并未包含实际的真实结果,根据我们的统计,有效置信区间比例validratio不超过25%,所以在不进行随机采样时计算出的估计结果和置信区间不具备任何统计意义。因此实际中不能采用类似non-randomized的方法。其余三种方法randomized、keep-order、accept-reject的结果由图5给出。不难发现,当进行随机采样后,估计准确度大大提高,几乎全部的置信区间都包含了聚集结果的真实值。仔细观察图5可以看出,在整个查询执行过程中,randomized版本的估计结果总体呈现出比实际值偏低的趋势,这与前文中关于数据倾斜的产生原因及影响的分析是相吻合的,由于云环境下在线聚集的处理模式,导致含有有效数据较少的数据块会先到达reduce端,这就导致最终的估计结果比实际的要低。本文提出的再采样方法显然能够有效地改善估计结果。观察图5中keep-order版本和accept-reject版本的曲线,在查询刚开始执行时,估计值开始向真实值-10- ˖ڍመ੾᝶஠ڙጲhttp://www.paper.edu.cn图4:non-randomized版本的估计结果和置信区间图5:不同版本的估计结果-11- ˖ڍመ੾᝶஠ڙጲhttp://www.paper.edu.cn靠近。它们分别在处理进度达到38%和32%以后,估计值已经非常接近真实值,且在其后的查询中一直保持在这个水平。就keep-order版本而言,估计结果的平均偏离程度为0.82%,而accept-reject版本则仅为0.53%,分别优于randomized版本44%和64%左右。直观地来看,可以发现accept-reject的输出结果精度是最稳定的。从图5还可以看出,accept-reject版本在处理进度达到76%以后不再更新,这是因为accept-reject版本拒绝了部分样本数据,但根据前文的介绍,接受/拒绝采样过程中被舍弃的数据会放入一个临时数据集,借助此机制,查询执行完成后仍会返回真实结果。图6进一步展示了这三个实现版本的置信区间收敛情况。就图图6:不同版本的置信区间6的置信区间收敛情况而言,三个版本都表现出很快的收敛速度,大约处理了12%的数据之后就能使得置信区间相对宽度下降到5%以下。keep-order版本和accept-reject版本的置信区间相对于randomized版本稍微更窄些,而且可以观察到,它们在查询初期的置信区间收敛得更快。这对于在线聚集的应用场景而言非常重要,因为这能够保证在查询最开始阶段即可以一种平稳的速度给用户较高准确度的估计结果。5.3查询结果间隔对比本节实验设HDFS数据块大小为32MB,快照返回频率设为0.01(即处理1%的数据就返回一次估计结果)。用ti表示第i次更新结果消耗的时间,图7展示了查询执行过程中反馈间隔(即ti+1-ti)的变化情况。查询初始阶段,每个实现版本均表现出很长的反馈间隔(即t1-t0,t0为查询开始时间,t1为第一次结果更新的时间)。这是可接受的,因为期间涉及到任务启动和初始化的开销。随着查询往后进行,randomized版本和accept-reject版本呈现出稳定的反馈间隔,在图7中表现为与横轴几乎平行的平滑曲线。keep-order版本的结果返回曲线呈现出较强的波动性。在查询的总执行时间方面,整个过程randomized版本耗时2805.2s,keep-order版本2857.4s,而accept-reject版本仅耗时2769.8s,可见引入数据倾斜处理机制不但不会增加额外开销,还会在一定程度上减少查询处理时间。-12- ˖ڍመ੾᝶஠ڙጲhttp://www.paper.edu.cn图7:不同版本的快照反馈间隔5.4不同倾斜度数据的误差对比本实验在基于Zipfian分布模拟的倾斜数据集上运行查询Q,通过把Zipf参数c设为一组值(0,0.2,0.4,0.6,0.8,1.0),构造了6个具有不同倾斜度的数据集。基于这6个数据集,分别测试randomized、keep-order和accept-reject三个实现版本。为了保证实际的效果,对应上面6组数据集,accept-reject版本的调整因子分别设为(1,1,0.99,0.95,0.95,0.9)。由于篇幅图8:Zipf参数c=0.0有限,本节仅展示基于3个数据集(c=0,0.6,1.0)的英语的实验结果,分别如图8、图9和图10所示,图中给出整个查询处理过程中估计结果的变化曲线。对比观察图8、图9和图10可以看出,当数据集的倾斜程度较低时,估计的准确度更高,主要表现为置信区间宽度更窄。就randomized版本而言,在Zipf参数取0.0、0.6、1.0的数据集上计算出的置信区间相对宽度平均值分别为0.850%、1.42%、1.55%。而keep-order版本的对应的平均宽度则为0.853%、1.36%、1.53%,而accept-reject版本则为0.857%、1.39%、1.53%。事实上这是因为,倾斜度较低的数据集中包含更多符合选择谓词的有效元组。例如考虑c=1.0的情况:在包含有效数据最多的数据块中,有效数据比例与1/1成正比,第二大数-13- ˖ڍመ੾᝶஠ڙጲhttp://www.paper.edu.cn据块中有效数据比例正比于1/2,第三大数据块正比于1/3,以此类推。本实验中把有效数据比例设置为100%、50%、33%等等。所以,在c=0.0的数据集中,所有数据块包含的数据均为有效数据;而在c=1.0的数据集中,某些块中100%的数据均为有效数据,某些块仅含50%的有效数据,以此类推。因此当数据集中的倾斜度较低时,满足选择谓词的有效元组更多,置信区间宽度自然也就更窄了。此外,该实验进一步验证了本文提出的处理机制在面临严重的数据倾斜时仍可以保证估计算法的无偏性。从图10可以看出,如果不进行数据倾斜处理,randomized版本明显会受到数据倾斜的影响,表现出显著的估计偏低现象,导致真实结果很容易落在置信区间之外。当倾斜程度较为缓和时,这种影响可能不那么明显。而我们的数据倾斜处理机制可以成功地克服数据倾斜带来的估低影响。在查询处理后期估计值始终保持在与真实值相当接近的水平。上述实验充分验证了本文基于再采样的数据随机性恢复方法的鲁棒性,面对不同倾斜度的数据仍可以保证算法的无偏性,并提供高质量的估计结果。图9:Zipf参数c=0.6图10:Zipf参数c=1.0本文的所有实验表明,我们提出的基于再采样的数据倾斜处理机制能够有效地消除数据倾-14- ˖ڍመ੾᝶஠ڙጲhttp://www.paper.edu.cn斜带来的偏置影响,保证采样算法和估计算法的统计意义,提高估计结果的准确度,同时不会引入额外开销。在同其他方法的比较中,本文的方法最为稳定,处理时间也最少。6小结数据倾斜是任何数据处理过程都很难完全避免的问题,本文更指出云环境下在线聚集存在一种特殊的数据倾斜。这种数据倾斜会直接影响最终估计结果的准确性。由于无法预知用户的查询,因此这种数据倾斜很难完全避免。如果采用简单等待的方式来保证处理前后序列数据顺序则有很大可能导致处理时间的增加。本文提出一种基于再采样的方式来对随机性被破坏的序列进行随机性恢复,这种方法实现简单,且对最终的处理时间几乎不会造成影响。相关实验也验证了该法的有效性。参考文献(References)[1]LiuL,ZsuMT.Encyclopediaofdatabasesystems[M].SpringerPublishingCompany,Incorporated,2009.[2]DeWittDJ,NaughtonJF,SchneiderDA,etal.Practicalskewhandlinginparalleljoins[M].UniversityofWisconsin-Madison,ComputerSciencesDepartment,1992.[3]PoosalaV,IoannidisYE.Estimationofquery-resultdistributionanditsapplicationinparallel-joinloadbalancing[C]//VLDB.1996:448-459.[4]WaltonCB,DaleAG,JeneveinRM.ATaxonomyandPerformanceModelofDataSkewEffectsinParallelJoins[C]//VLDB.1991,91:537-548.[5]XuY,KostamaaP,ZhouX,etal.Handlingdataskewinparalleljoinsinshared-nothingsystems[C]//Proceedingsofthe2008ACMSIGMODinternationalconferenceonManage-mentofdata.ACM,2008:1043-1052.[6]MrtensH.AClassificationofSkewEffectsinParallelDatabaseSystems[M]//Euro-Par2001ParallelProcessing.SpringerBerlinHeidelberg,2001:291-300.[7]StamosJW,YoungHC.Asymmetricfragmentandreplicatealgorithmfordistributedjoins[J].ParallelandDistributedSystems,IEEETransactionson,1993,4(12):1345-1354.[8]YanWP,LarsonPB.Eageraggregationandlazyaggregation[C]//VLDB.1995,95:345-357.[9]LohrS.Sampling:designandanalysis[M].CengageLearning,2009.[10]DeanJ,GhemawatS.MapReduce:simplifieddataprocessingonlargeclusters[J].Com-municationsoftheACM,2008,51(1):107-113.-15- ˖ڍመ੾᝶஠ڙጲhttp://www.paper.edu.cn[11]KwonYC,BalazinskaM,HoweB,etal.Astudyofskewinmapreduceapplications[J].OpenCirrusSummit,2011.[12]Shi,Y.,Meng,X.,Wang,F.,Gan,Y.:YoucanstopearlywithCOLA:onlineprocessingofaggregatequeriesinthecloud.;InCIKM(2012)1223-1232[13]AnanthanarayananG,KandulaS,GreenbergAG,etal.ReiningintheOutliersinMap-ReduceClustersusingMantri[C]//OSDI.2010,10(1):24.[14]KwonYC,BalazinskaM,HoweB,etal.Skewtune:mitigatingskewinmapreduceap-plications[C]//Proceedingsofthe2012ACMSIGMODInternationalConferenceonMan-agementofData.ACM,2012:25-36.[15]B.Gufler,N.Augsten,A.Reiser,andA.Kemper,HandlingDataSkewinMapReduce..;inproceedingsofCLOSER.2011,574-583.[16]IbrahimS,JinH,LuL,etal.Leen:Locality/fairness-awarekeypartitioningformapre-duceinthecloud[C]//CloudComputingTechnologyandScience(CloudCom),2010IEEESecondInternationalConferenceon.IEEE,2010:17-24.[17]LeY,LiuJ,ErgunF,etal.OnlineloadbalancingforMapReducewithskeweddatainput[C]//INFOCOM,2014ProceedingsIEEE.IEEE,2014:2004-2012.[18]RasmussenA,ConleyM,PorterG,etal.Themis:anI/O-efficientMapRe-duce[C]//ProceedingsoftheThirdACMSymposiumonCloudComputing.ACM,2012:13.[19]MarianA,BrunoN,GravanoL.Evaluatingtop-kqueriesoverWeb-accessibledatabases.ACMTrans.onDatabaseSystems,2004,29(2):319-362.[20]ElmeleegyK,OlstonC,ReedB.SpongeFiles:mitigatingdataskewinmapreduceusingdistributedmemory[C]//Proceedingsofthe2014ACMSIGMODinternationalconferenceonManagementofdata.ACM,2014:551-562.[21]NikosZaheilas,VanaKalogeraki.Real-TimeSchedulingofSkewedMapReduceJobsinHeterogeneousEnvironments.11thInternationalConferenceonAutonomicComputing,I-CAC’14,USENIX,Philadelphia,PA,USA,June18-20,2014[22]AttaF,ViglasSD,NiaziS.SANDJoin—AskewhandlingjoinalgorithmforGoogle’sMapReduceframework[C]//MultitopicConference(INMIC),2011IEEE14thInternation-al.IEEE,2011:170-175.[23]Y.Wang,J.Luo,A.Song,etal.Improvingonlineaggregationperformanceforskeweddatadistribution[C]//DatabaseSystemsforAdvancedApplications.SpringerBerlinHeidelberg,2012:18-32.-16- ˖ڍመ੾᝶஠ڙጲhttp://www.paper.edu.cn[24]KalavriV,BrundzaV,VlassovV.BlockSampling:EfficientAccurateOnlineAggregationinMapReduce[C]//CloudComputingTechnologyandScience(CloudCom),2013IEEE5thInternationalConferenceon.IEEE,2013,1:250-257.[25]Y.Gan,X.Meng,Y.Shi.COLA:Acloud-basedsystemforonlineaggregation[C]//DataEngineering(ICDE),2013IEEE29thInternationalConferenceon.IEEE,2013:1368-1371.-17-'