近年来,大数据已然成为科技领域和企业领域关注的热点,其中蕴含的巨大价值成为存储和处理大数据的巨大动力[1]。在大数据发展的过程中,Hadoop[2]和Spark[3]计算引擎逐渐被大多数企业和机构认可,其中Spark因其基于内存的计算框架,运算速度更快,被愈发重视。
Shuffle性能问题是很多分布式系统所共有的问题,究其原因是Shuffle过程会对集群的CPU、内存、磁盘和网络造成巨大负担,其中的任何一个因素都有可能成为集群运算的瓶颈,因此改进集群的Shuffle性能成为很多研究人员研究的目标。目前的研究主要分为底层优化、Map和Reduce端优化和对MapReduce模型的改进。
1) 底层优化:Zaharia等[4]提出一种延迟调度的算法,该算法能有效提升数据的本地化程度,从而提高集群的工作效率。
2) Map和Reduce端的优化:Guo等[5]提出一种独立工作的Shuffle服务——iShuffle。它可以估计Map阶段输出partition的大小, 并将Map输出的partition自动均衡放置到各个Reduce节点中;还可以减少任务的排定,使得reduce tasks的调度更加灵活。熊倩等[6]提出通过对Map节点上同一个作业的多个Map任务所产生的大量临时结果数据作合并的机制,减少了Map节点的输出结果数据量,从而减少了整个MapReduce作业执行时间,进而提高了性能。彭辅权等[7]分别从Map端数据压缩、重构远程数据拷贝传输协议等方面优化和重构Shuffle过程,使Shuffle阶段的运行效率得到提高。Davidson等[8]针对Spark平台经常会遇到的Shuffle性能瓶颈提出了几种应对的替代方案,以减轻处理这些瓶颈相关的系统开销。
3) MapReduce模型的改进:李玉林等[9]提出一种改进型的MapReduce模型——MBR(Map-Balance-Reduce)模型。通过增加Balance任务,对Map任务处理完成的中间数据进行均衡操作,从而保证Reduce任务的完成时间基本一致。
目前有很多针对Hadoop Shuffle阶段的改进,Spark作为基于内存的运算框架,在其内部实现上与Hadoop有很多不同。随着Spark版本的不断演变,对于Spark Shuffle数据持久化的问题已经得到了很多的优化,但是并没有从Task内存分配的角度做工作。因此本文通过研究源码,从Shuffle过程中内存分配的角度,针对Task的内存分配进行改进,使Shuffle效率得到提高。
1 Spark内存概述Spark内存分配如图 1所示。Spark默认的JVM(Java Virtual Machine)堆大小为512 MB,可以通过spark.executor.memory参数进行调整。这里的内存基本上是由Executor内部的所有任务所共享。为了避免内存溢出,因此只使用90%,可通过spark.storage.safetyFraction调整。Spark将要处理的数据存储在Storage部分,这个部分占Safe的60%,可以通过spark.storage.memoryFraction控制。Shuffle可用的内存大小占Safe的20%,由spark.shuffle.memoryFraction控制。由于Shuffle数据的大小是估算出来的,一是为了降低开销,二是为了降低估算误差,因此使用spark.shuffle.safetyFraction作为一个保险系数,降低实际Shuffle使用的内存阈值,起到一定的缓冲作用,降低实际内存占用超过用户配置值的概率。默认情况下spark.shuffle.safetyFraction的值为0.8,由此可以得出,真正用于Shuffle的内存大小为spark.executor.memory * spark.shuffle.safetyFraction * spark.shuffle.memoryFraction。Unroll memory用作数据的序列化和反序列化,由spark.storage.unrollFraction控制,默认为0.2[10]。本文在此内存分配框架下,研究图 1中用于Shuffle操作的部分。
Spark Shuffle线程内存管理器ShuffleMemoryManager负责管理Shuffle线程占有内存的分配与释放。Task通过ExternalAppendOnlyMap和ExternalSorter向ShuffleMemoryManager申请内存,在运行结束后由ShuffleMemoryManager回收。为了使每一个Task都能比较公平地获取内存,它采取的内存分配策略是对于正在运行的N个Task,每个Task至少可以申请总内存的1/(2N),至多申请1/N。在运行过程中,N会随着内存池中正在运行Task的数目不断变化。
2 改进的Spark Shuffle内存分配算法现有Spark Shuffle内存分配算法采取对内存池中Task公平分配资源的策略,一定程度上保证了每个Task在被调度运行时都能较为公平地获取资源并运行。但是它并未考虑每个Task所需的资源量,仅从Task的数量上考虑公平性,这将导致内存的使用率较低,并且内存需求较多的Task可能会频繁溢出,甚至出现OutOfMemory错误。
并且executor中能同时运行的Task总数由CPU core数决定,当内存需求较大的Task被调度时,其申请的内存会因为内存不足而出现溢出,需要等待其他Task运行完毕空出内存后才能继续运行,这将大大影响整个作业的运行效率。
此外,由于Spark Shuffle的Fetch过程是一边Fetch数据一边处理的,所以当一个分区的全部数据在处理结束前是无法统计其数据大小的。
基于以上问题,首先将资源需求较少的Task作“分割化”处理,即将内存资源需求较小的Task分成若干块,这样可使某一时刻内存需求较少的Task所占有内存量变少,因此内存需求较大的Task被调度运行时可以获得更多内存避免溢出。这样做的代价是内存需求较少Task的内存申请次数会增加并有可能出现溢出,但其溢出量很少,处理速度很快,并且不会出现频繁溢出的情况,相比于大规模的磁盘IO操作,仍然提升了Application的整体效率,尤其在数据集有数据倾斜的时候。然后本文认为一个Task溢出次数越多且溢出后等待时间越长,那么它所需计算的原始数据量也就越大。基于此,本文提出一种小内存需求型Task作“分割化”处理,大内存需求型Task基于Task溢出次数和Task溢出后等待时间的Shuffle内存分配算法来改进源码的公平分配算法。
2.1 改进的Spark Shuffle内存分配算法描述由于要将内存需求较小的Task作“分割化”处理,所以首先在Task申请内存时需要一个指标来区分Task是大内存需求型Task还是小内存需求型Task。定义该指标为Memavg,如式(1)~(2)所示。其中:L为所有运行结束时未发生溢出Task的集合;K为已被调入过内存池的Task的集合。Memavg代表的是所有运行结束时未发生溢出Task所占用内存的平均值,当一个新被调用Task的内存请求比Memavg大时,认为该Task为大内存需求型的Task;反之,认为该Task为小内存需求型的Task。
$Me{m_{{\rm{avg}}}} = \sum\limits_{i = 1}^l {Me{m_{{\rm{taskNotSpill}}}}(i)} /\sum\limits_{i = 1}^k {f(i)}, {\rm{ }}l \in L, k \in K$ | (1) |
$f(x) = \left\{ {\begin{array}{*{20}{c}} {0, }&{spillCount > 0}\\ {1, }&{spillCount = 0} \end{array}} \right.$ | (2) |
式(3)表明了小内存需求型Task实际申请到的内存,算法直接给予其申请内存的一半与空闲内存的最小值。
$toGran{t^j} = {\rm{min}}(Mem_{{\rm{apply}}}^j/2, Me{m_{{\rm{free}}}}), {\rm{ }}Mem_{{\rm{apply}}}^j \le Me{m_{{\rm{avg}}}}$ | (3) |
在源码使用的公平分配算法中,拥有每个Task可使用内存的最小保证。对于大内存需求型Task,也给予它最小的内存保证Memlow,保证其执行效率并避免不必要的溢出操作,如式(4)所示。其中maxMemory为可使用的最大内存,numActiveTasks是指当前executor中正在运行的Task数目。
$Me{m_{{\rm{low}}}} = \frac{{maxMemory}}{{2*numActiveTasks}}, {\rm{ }}Me{m_{{\rm{apply}}}} > Me{m_{{\rm{avg}}}}$ | (4) |
公平分配算法中,有一个对Task可获取最大内存的限制,值为maxMemory/numActiveTasks。它不考虑Task内存需求的已有表现,因而不能根据不同Task的数据特征对内存分配作出合理分配。而且它还有一个很严重的问题,当一个executor中的Task满载运行,即numActiveTask为其最大值时,如果有一个Task的内存需求量大于maxMemory/numActiveTasks,那么它将不得不溢出,并等待大多数Task运行完毕,即numActiveTasks变小时才能被再度运行,这将严重拖延整个Application的运行效率。基于此本文根据Task的溢出历史和溢出后等待时间的表现来计算出它应得的内存,进而自适应的调整内存分配。式(5)为空闲内存的计算方法,其中taskMemory管理着内存池中所有未执行完成Task的id和已占有的内存。
$Me{m_{{\rm{free}}}} = maxMemory - \sum\limits_{i = 1}^m {taskMemory\left( i \right)}, m \in M$ | (5) |
其中M为taskMemory中包含的元素集合。对空闲内存的分配权重weight如式(6)所示,一个Task能从空闲内存中得到的份额取决于其溢出次数占未运行完成Task溢出总数的比值,和溢出后等待调用的时间与未运行完成Task溢出后等待调用时间的比值,其中这两部分的权重由a1和a2确定(a1+a2=1)。由于溢出的次数是影响Task运行效率的主要因素,而溢出后等待时间取决于内存池的整体状态,加上多次实验的经验,在实验中设a1=0.7、a2=0.3。
$\begin{array}{l} weigh{t^j} = {a_1}*\frac{{spillCount(j)}}{{\sum\limits_{i = 1}^m {spillCount(i)} }} + \\ \;\;\;\;\;\;\;\;\;\;\;\;\;\;{a_2}*\frac{{waitingTime(j)}}{{\sum\limits_{i = 1}^m {waitingTime(i)} }} \end{array}$ | (6) |
由式(5)~(6)可以得到Task可获得的最大内存阈值Memhigh,即式(7)。它由两部分组成,至少可以获得内存池分给所有活动状态下Task的均值,此外它可额外获得一部分空闲内存,提高内存的利用率,这部分的大小是通过溢出历史和溢出后等待时间的历史一起决定的,这使得溢出次数较多,等待时间较长的Task有机会获得更大内存,以利于加快其执行进度。
$Mem_{{\rm{high}}}^j = \frac{{maxMemory}}{{numActiveTasks}} + Me{m_{{\rm{free}}}}*weigh{t^j}$ | (7) |
此外,在公平分配算法中,若Task溢出到磁盘,那么它所占有的全部内存将被回收,若该Task是大内存需求型Task,那么它下次被调度时很难获得比溢出前更大的内存资源,因此在本文中设定一个Memrelease,见式(8),它使得Task在溢出时不释放所占有的全部内存,而是根据其溢出历史释放一部分内存,这有利于该Task下次被调用时避免再次溢出,其释放的内存也有利于内存池中其他Task的运行。
$\begin{array}{l} Mem_{{\rm{release}}}^j = (1 - spillCount(j)/\sum\limits_{i = 1}^k {spillCount(i)} )*\\ \;\;\;\;\;\;\;\;\;\;\;\;\;\;\;\;\;\;\;taskMemory(j) \end{array}$ | (8) |
式(9)代表当前Task所能获得的最大内存:
$\begin{array}{l} maxToGran{t^j} = {\rm{min}}(Mem_{{\rm{apply}}}^j, {\rm{ max}}(0, {\rm{ }}Mem_{{\rm{high}}}^j - \\ \;\;\;\;\;\;\;\;\;\;\;\;\;\;\;\;\;\;\;\;\;\;\;\;taskMemory(j))) \end{array}$ | (9) |
式(10)~(11)表明了大内存需求型Task实际申请到的内存。首先需要判断该Task已占有的内存是否已满足Memlow,若已满足则为Task分配min(maxToGrant,Memfree)的内存,如式(10);若不满足,则需要判断空闲内存能否满足最小内存保证Memlow,若不能满足则需要等待内存池中有足够内存时再调度运行,若能满足则为Task分配min(maxToGrant,Memfree)的内存,如式(11)。
$toGran{t^j} = {\rm{min}}(maxToGran{t^j}, Me{m_{{\rm{free}}}})$ | (10) |
$\begin{array}{l} toGran{t^j} = \\ \left\{ \begin{array}{l} {\rm{min}}(maxToGran{t^j}, Me{m_{{\rm{free}}}}), \;\;\;\;\;Me{m_{{\rm{free}}}} \ge \\ {\rm{min}}(maxToGran{t^j}, \frac{{maxMemory}}{{2*numActiveTasks}} - taskMemory(j))\\ wait(), \;\;\;\;\;\;\;\;\;\;\;\;\;\;\;\;\;\;\;Me{m_{{\rm{free}}}} < \\ {\rm{min}}(maxToGran{t^j}, \frac{{maxMemory}}{{2*numActiveTasks}} - taskMemory(j)) \end{array} \right. \end{array}$ | (11) |
本文提出的算法中,使用HashMap和HashSet存放Task的各项历史数据:taskMemory存放未运行完成Task的唯一编号和已占用的内存容量;taskNotSpill记录未发生溢出Task的编号;waitingTime存放已溢出Task的编号及等待调用所耗费的时间;spillCount记录已溢出Task的编号及溢出次数。
如图 2为算法的运行流程。当Task需要内存资源来保证其运行时,Task就会向ShuffleMemoryManager发出申请。ShuffleMemoryManager在接收到请求后首先查看TaskMemory是否有该Task的编号,若没有,则表示该Task为尚未运行过,那么接下来将为它初始化各个变量。然后比较其Memapply和Memavg的大小来判断该Task是大内存需求型还是小内存需求型,若是小内存需求型,则直接赋予它min(Memapply/2,Memfree)的内存,若是大内存需求型,则首先需要判断它目前已占用的内存是否大于内存池大小的1/(2N),若满足则直接赋予它min(maxToGrant,Memfree)的内存,若不能满足则继续判断当前Memfree与min(maxToGrant,maxMemory/(2*numActiveTasks)-Memused)的大小,若能满足则赋予它min(maxToGrant,Memfree)的内存,若不能满足,由于其最小内存保证,需要等待其他Task运行完成空出内存后继续申请。
改进的Spark Shuffle内存分配算法的伪代码如下。
输入 Task申请的内存值numBytes;
输出 算法实际为Task分配的内存值toGrant。
1) if!taskMemory.contains(id) then
2) 初始化taskMemory(id), spillCount(id), waitingTime(id)为0, taskNotSpill.add(id)
3) notifyAll
4) end if
5) while True do
6) 按式(1)、(5)~(7)更新numActiveTasks, memAvg, freeMemory, weight和memHigh的值
7) if numBytes < memAvg then
8) taskMemory(id)←taskMemory(id)+min(numBytes/2, freeMemory)
9) return min(numByte/2, freeMemory)
10) end if /*7)~10)为小内存需求型Task分配内存*/
11) maxToGrant←min(numBytes, max(0, memHigh-curMem)) /*curMem为Task当前已占用的内存*/
12) if curMem < memLow then
13) if freeMemory≥min(maxToGrant, memLow-curMem) then
14) toGrant←min(maxToGrant, freeMemory)
15) taskMemory(id)←taskMemory(id)+toGrant
16) return toGrant
17) else wait
18) end if
19) else toGrant←min(maxToGrant, freeMemory)
20) taskMemory(id) ←taskMemory(id)+toGrant
21) return toGrant
22) end if /*11)~22)为大内存需求型Task分配内存*/
23) end
算法的时间复杂度为O(n),其中n为一个Executor中所要运行的Task数。
3 实验结果及分析 3.1 重新编译源码过程Spark的编译包括make-distribution.sh编译、SBT编译和Maven编译三种方式,本文选择make-distribution.sh的编译方式。Spark源码的编译过程较为繁琐,需要很多依赖才能完成,但是实践后会对Spark的整体系统架构有更深入理解。
3.2 实验环境为了验证本文提出的基于溢出次数和Spill Task等待时间改进算法的正确性,本研究使用四台服务器搭建Spark集群,其中:表 1显示各节点在集群中的角色,表 2显示了各节点的性能,表 3显示了各软件版本信息。
本文选取“groupByKey”操作作为验证算法性能的实验案例。实验程序为自己编写的Wordcount程序,实验数据集为安然邮件数据集和亚马逊评论数据集。根据实验需要,生成了均匀的数据“mail”和不均匀的数据“mailSkew”,如表 4所示。实验中还使用Shuffle阶段可利用的内存量“ShuffleMem”(由公式spark.executor.memroy*spark.shuffle.safetyFraction*spark.shuffle.memoryFraction计算所得)来比较两个算法在不同内存环境下的性能。
本文使用以下参数评估算法性能:totalTime为完成Application所需时间;Max为耗时最长的Task所用时间;Shuffle Spill为Application中所有Task的溢出量。
3.3 均匀数据集实验结果分析从图 3可以看出,两种算法在均匀数据集上完成Application所需时间、Shuffle Task运行最长时间、和溢出量上相差不大,改进算法略优于源码的公平算法。这主要是由于在均匀数据集下,Task需要处理的数据量比较平均,对于单个Task而言内存的需求量较小,Task可以在未溢出的情况下运行完成。图 3(c)、(d)最开始两算法都有较大规模的溢出,这是因为对整体内存的配置过低,每个Task可以分配的内存过小,导致溢出。
totalTime是完成整个Application所需要的时间,是衡量算法性能最直观的指标。从图 4(a)可以看出:改进算法在所设置的内存量上全面优于源码的公平算法,可以减少运行时间的10%~12.5%。Max是Shuffle阶段运行时间最长的Task所消耗的时间,该Task一般为处理倾斜数据的Task,因此Max值的大小将直接影响整个Application的运行时间。从图 4(b)可以看出:改进算法的Max值要明显小于源码的公平算法,尤其是在可分配内存较小的情况下。这主要是因为,源码的公平算法规定每一个Task最多可以获得内存池中内存的1/N,然而这部分内存并不能满足处理倾斜数据Task的需求,因此该Task会频繁地溢出直到对内存需求小的Task运行完毕,空出足够的内存才能成功运行,这其中频繁的溢出和等待调度都相当耗时。而改进算法将对内存需求较小的Task进行“分割化”处理,每次只从内存池中获取较少的内存,因此当内存需求较大的Task向内存池申请内存时,内存池就可以分配足够多的内存,从而避免了一些溢出,减少了Task的整体运行时间。
Shuffle spill是Application中所有Task的总溢出量。由图 4(c)~(d)可以看出改进算法的溢出量显著少于源码的公平算法。因为公平算法不能及时满足需求量较大的Task的内存需求,因此会造成Task的频繁溢出,导致Shuffle Spill的量很大。改进算法通过Task的溢出次数和溢出后的等待时间准确捕捉到需要内存较多的Task,并在其下次调度时从空闲内存中多分配一部分资源,使其可以尽快运行完成,从而减少了很多溢出量。
3.5 实验结论本文使用了两种数据集,分别是均匀的数据集和不均匀的数据集,其中不均匀的数据集中倾斜的数据约占19%。实验在不同数据集上分别用源码公平分配算法和改进算法来执行WordCount程序,并使用totalTime、Max、Shuffle Spill三个指标来衡量两个算法的性能。从3.3节和3.4节的实验来看,改进算法在均匀数据集上的性能略好于公平分配算法,在不均匀的数据集上可以看出经过算法的自适应调节,Task的溢出量较公平分配算法有大幅度的减少,使集群整体性能提升10%~12.5%。
综上所述,改进算法能够在有数据倾斜时更加高效地利用内存,减少对内存资源需求较大Task的溢出量,从而提升Application的整体运行效率。
4 结语本文对Spark Shuffle内存分配算法进行了研究,提出一种小内存需求型Task作“分割化”处理、大内存需求型Task基于Task溢出次数和Task溢出后等待时间的Shuffle内存分配算法来改进源码的公平分配算法,对整个内存池的使用率和各Task的运行效率作出改进,并对源码进行重新编译和部署。实验结果表明,改进算法能有效提升Spark在处理倾斜数据时的运行效率。Shuffle环节是大多数Spark作业的瓶颈,因此如何提升Shuffle阶段的性能也成为众多研究人员思考的问题。现实情况下,影响Shuffle性能的原因还有很多,如:代码开发、资源参数和数据倾斜等。在本文研究的基础之上,下一步将考虑其他对Shuffle性能影响的因素,与实际问题相结合,使Spark的Shuffle性能得到更大提升。
[1] | 程学旗, 靳小龙, 王元卓, 等. 大数据系统和分析技术综述[J]. 软件学报, 2014, 25(9): 1889-1908. (CHENG X Q, JIN X L, WANG Y Z, et al. Survey on big data system and analytic technology[J]. Journal of Software, 2014, 25(9): 1889-1908.) |
[2] | Apache. Apache Hadoop[2017-04-20]. http://apache.hadoop.org. |
[3] | ZAHARIA M, CHOWDHURY M, DAS T, et al. Resilient distributed datasets:a fault-tolerant abstraction for in-memory cluster computing[C]//Proceedings of the 20129th USENIX Conference on Networked Systems Design and Implementation. Berkeley, CA:USENIX Association, 2012:2-2. |
[4] | ZAHARIA M, BORTHAKUR D, SARMA J S, et al. Delay scheduling:a simple technique for achieving locality and fairness in cluster scheduling[C]//Proceedings of the 20105th European Conference on Computer Systems. New York:ACM, 2010:265-278. |
[5] | GUO Y F, RAO J, CHENG D Z, et al. iShuffle:improving Hadoop performance with shuffle-on-write[J]. IEEE Transactions on Parallel & Distributed Systems, 2017, 28(6): 1649-1662. |
[6] | 熊倩, 张龑, 郭明, 等. MapReduce Shuffle性能改进[J]. 计算机应用, 2017, 37(S1): 58-62, 67. (XIONG Q, ZHANG Y, GUO M, et al. Performance improvement of MapReduce Shuffle[J]. Journal of Computer Applications, 2017, 37(S1): 58-62, 67.) |
[7] | 彭辅权, 金苍宏, 吴明晖, 等. MapReduce中shuffle优化与重构[J]. 中国科技论文, 2012, 7(4): 241-245. (PENG F Q, JIN C H, WU M H, et al. Optimization and reconstruction shuffle in MapReduce[J]. China Sciencepaper, 2012, 7(4): 241-245.) |
[8] | DAVIDSON A, OR A. Optimizing shuffle performance in Spark[EB/OL].[2017-04-12]. https://people.eecs.berkeley.edu/~kubitron/courses/cs262a-F13/projects/reports/project16_report.pdf. |
[9] | 李玉林, 董晶. 基于Hadoop的MapReduce模型的研究与改进[J]. 计算机工程与设计, 2012, 33(8): 3110-3116. (LI Y L, DONG J. Study and improvement of MapReduce based on Hadoop[J]. Computer Engineering and Design, 2012, 33(8): 3110-3116.) |
[10] | GRISHCHENKO A. Distributed systems architecture[EB/OL].[2017-04-12]. https://0x0fff.com/spark-architecture. |
[11] | WANG J H, QIU M K, GUO B, et al. Phase-reconfigurable Shuffle optimization for Hadoop MapReduce[J]. IEEE Transactions on Cloud Computing, 2015, PP(99): 1-1. |
[12] | LI J G, LIN X L, CUI X L, et al. Improving the Shuffle of Hadoop MapReduce[C]//Proceedings of the 2013 IEEE 5th International Conference on Cloud Computing Technology and Science. Piscataway, NJ:IEEE, 2013:266-273 |