近年来,利用内存的低延迟特性改进并行计算框架性能成为新的研究方向。内存计算框架避免了频繁访问磁盘的I/O性能瓶颈,解放了大内存+多核处理器硬件架构的潜在高性能,成为学术界一致认可的高性能并行计算系统[1-2]。虽然内存计算框架的性能表现优异,但与大数据时代的即时应用需求相比,还存在不小的差距,因此,从计算模型的角度研究内存计算框架的性能优化方法具有一定的现实意义。本文选取开源内存计算框架Spark[3-4]为研究对象,Spark以HDFS(Hadoop Distributed File System)为底层文件系统,采用弹性分布式数据集(Resilient Distributed Datasets,RDD)[5]作为数据结构,通过数据集血统(lineage)[5-6]和检查点机制(checkpoint)[7-8]实现系统容错,编程模式则借鉴了函数式编程语言的设计思想,简化了多阶段作业的流程跟踪、任务重新执行和周期性检查点机制的实现。
在Spark作业的宽依赖Stage执行过程中,Mapper将数据按key划分并填入不同的Bucket,Bucket与Reducer为一一对应关系。由于原始数据分布的倾斜性,这样的单一轮次分区映射过程使各Reducer计算数据量有较大差异,任务执行时间长短不一,从而增加了宽依赖Stage的计算延时,降低了作业执行效率。虽然系统支持用户设定自定义分区函数,但由于真实的数据分布难以预知,无法确保自定义分区函数的合理性和准确性,因此数据分配的倾斜问题不可规避。为解决这一问题,本文主要做了以下工作:
1) 首先对内存计算框架的作业执行机制进行分析,建立作业效率模型,给出了RDD计算代价和作业执行时间的定义。
2) 通过分析宽依赖RDD的计算过程,建立了分区映射模型,给出了源数据分布、分区映射、分配倾斜度的定义,并证明这些定义与作业执行效率的因果关系。
3) 通过模型的相关定义求解,设计了扩展式数据分区算法和迭代式分区映射算法,并对算法执行的细节问题进行详细的分析和说明。
1 相关工作在提出MapReduce的文献[9]中,Dean等采用Hash函数对数据进行一次简单的划分,由于这种方法实现简单且通用性高,成为开源的Hadoop系统默认的分区方案。Spark作为类MapReduce系统,在实现中也自然承接了MapReduce的分区方法,但实际应用表明,在不了解数据分布的情况下,一次Hash划分的方法很难实现数据的合理分配。
一些研究成果致力于通过优化原生的分区映射策略解决数据分配的均衡性问题,文献[10]研究Map和Reduce两个阶段的任务执行过程,通过分析数据不均衡分配的原因,归纳出数据倾斜的5个类别。文献[11]提出SkewReduce策略,该策略建立用户定义的代价模型,在作业执行过程逐步收集元数据,邻近代价阈值时启动分区映射过程,以实现计算数据量的均匀分配。文献[12]提出MapReduce的增量式分区策略,将原始数据划分为细粒度的微分区,通过数据分布的逐步感知和已分配数据量的统计,采用Max-Min算法进行数据增量分配,达到数据分配逐渐均衡的目标。文献[13]提出SkewTune,与上述的研究成果不同,SkewTune建立Reducer的任务剩余代价评估模型,通过对Reducer执行进度进行统计,决定是否将数据向其他Reducer迁移。由于数据的二次迁移将延迟Reducer计算任务,因此相比设计分区策略保证数据均衡分配的方法,SkewTune具有较大的额外开销。文献[14]为实现分区数据的实时统计,在系统中增加额外的数据构Sketch-based,通过设计的分包算法进行Reducer计算数据量的动态调配,达到数据均衡分配的目标。
另外一些研究成果期望通过数据分布的逐步感知建立合理的分区映射方案。文献[15]通过在Mapper增加采样进程感知原始数据分布,已生成的分区容量达到阈值后进行重组或拆分,保障分配数据的均衡性。文献[16-17]提出精细分区和动态拆分两种算法,精细分区算法采样获得近似数据分布,动态拆分函数在Map任务完成一定比例后触发,进行分区容量的二次调整,达到数据合理分配的目标。文献[18-19]提出基于〈block,entity〉数据块的分区方法,通过评估函数对超出阈值的数据块进行调整,但没有精确定义分区调整的时机问题。文献[20]提出提前采样的策略,在Map任务执行前先对输入数据进行25%的随机采样,通过采样结果获得数据分布并制定分区函数。文献[21]提出LEEN策略,通过对输入数据的预扫描获取数据分布,在Map任务执行过程中逐 步统计key的频率,然后综合数据分布和key频率设定合理的分区函数。
本文与上述研究成果的不同之处在于从宽依赖Stage数据分配的基本原理入手,以提高作业的整体执行效率为目的,设计了迭代填充分区映射算法,解决同构集群环境下数据分配的均衡性问题。通过分析作业的执行过程,建立了作业效率模型,提出了RDD计算代价和作业执行时间的定义。建立分区映射模型,提出了源数据分布、分配倾斜度的定义,并证明了两个定义与作业执行时间的因果关系。根据模型和定义求解,设计了扩展式数据分区算法和迭代式分区映射算法。通过扩展式分区预留部分原始数据,并设计扩展区的延迟映射机制,为迭代式分区映射奠定基础。通过扩展区迭代式的多轮数据分配,对原生区的数据倾斜进行逐步修正,减少各Reducer分配数据量差异,从而从整体上提高宽依赖Stage的计算速度,提高作业执行效率。相比已有的研究工作,迭代填充分区映射算法更适宜于内存计算框架的性能优化,并具有较高的普适性和易用性。
2 问题的建模与分析本章首先分析Spark作业的执行机制,建立作业效率模型和分区映射模型,然后提出迭代填充分区映射的优化目标,为第3章的算法设计提供理论基础。
2.1 作业执行机制Spark将操作分为Transformation和Action两类,调度策略采用延时调度机制,即当Action操作执行时,作业才会分发到集群执行。基于延时调度的原理,Spark会首先根据RDD的血统生成作业的有向无环图(Directed Acyclic Graph,DAG),如图 1所示。其中虚线框代表Stage,圆角矩形代表RDD,填充方框表示RDD分区。Stage的划分以宽依赖为边界,各Stage顺序执行,直至计算出最终结果。集群任务分配则以数据本地性作为依据,即任务总是调度给具有最佳数据本地性的工作节点,以减少网络通信延时,提高作业执行效率。
根据2.1节的描述,Spark作业在执行时划分为多个Stage同步执行,每个Stage由一个或多个RDD构成,每个RDD由多个分区并行计算生成,因此,记一个作业的Stage集合为stages={stg1,stg2,…,stgi},每个Stage包含的RDD表示为集合stgi={RDDi1,RDDi2,…,RDDij},其中RDDij表示第i个Stage中第j个RDD,对于每个RDD,其分区集合记为RDDij={Pij1, Pij2 ,…,Pijk },这里Pijk表示RDDij中的第k个分区。
定义1 RDD计算代价。Spark任务中,分区是最基本的计算单位,分区计算首先要读取输入,再根据闭包运算符和操作符进行运算。设Parentsijk为分区Pijk的父分区集合,用于表示分区计算的输入数据,那么分区Pijk的计算代价为数据读取代价与数据处理代价之和,本文以分区计算时间作为衡量计算代价的唯一指标,即:
${{T}_{{{P}_{ijk}}}}=read\left( Parent{{s}_{ijk}} \right)+proc\left( Parent{{s}_{ijk}} \right)$ | (1) |
每个RDD的分区分配到不同的工作节点并行计算生成,因此RDD计算代价为所有分区计算代价的最大值,即:
${{T}_{RD{{D}_{ij}}}}=\text{max}\left( {{T}_{{{P}_{ij1}}}},{{T}_{{{P}_{ij2}}}},\cdots {{T}_{{{P}_{ijk}}}} \right)$ | (2) |
定义2 作业执行时间。如图 1所示,Spark将Stage分为窄依赖和宽依赖两类。对于窄依赖Stage,每个Stage包括多条流水线(每条流水线包括多个RDD的不同分区)。设窄依赖stagei共有h个RDD,所有RDD划分为x条流水线,单条流水线的分区集合为pipeix={Pi1x,Pi2x,…,Pijx},那么单条流水线的执行时间可表示为:
${{T}_{pip{{e}_{ix}}}}\text{=}\sum\limits_{j=1}^{k}{{{T}_{{{P}_{ijx}}}}}$ | (3) |
对于stagei,记其流水线集合为Pipesi={pipei1,pipei2,…,pipeix},那么stagei的执行时间应为各流水线执行时间最大值,即:
${{T}_{stagei}}=\text{max}\left( {{T}_{pip{{e}_{i1}}}},{{T}_{pip{{e}_{i2}}}},\cdots {{T}_{pip{{e}_{ix}}}} \right)$ | (4) |
设stagei+1为宽依赖,则其中仅包含一个RDD的计算任务,记为RDD(i+1) j,那么stagei的执行时间与RDD(i+1) j的计算代价相同,即:
${{T}_{stag{{e}_{i+1}}}}={{T}_{RD{{D}_{(i+1)\text{ }j}}}}$ | (5) |
若Spark作业共有n个Stage(其中包括若干个窄依赖和宽依赖Stage),则各Stage顺序执行,因此作业执行总时长为:
${{T}_{job}}=\sum\limits_{i=1}^{n}{{{T}_{stag{{e}_{i}}}}}$ | (6) |
作业的宽依赖Stage分Map和Reduce两个阶段执行,其中Map阶段将前一Stage的生成结果转化为〈key,value〉元组,放入不同的Bucket中,每个Bucket对应一个Reduce任务,所有Map任务执行结束后,由Reducer到各个工作节点拉取对应Bucket的数据,完成后续计算。由于工作节点内存空间有限,为防止频繁内存回收,Spark将Bucket数据写入磁盘,以保证Reducer输入数据的可用性。
定义3 源数据分布。用于描述输入数据在Mapper端的分布情况。记源数据的key集合为keys={key1,key2,…,keyl},即源数据有l个不同的key,记作业的Mapper集合为mps={1,2,…,m},那么对于编号为m的任意Mapper,其数据分布可表示为:
${{A}_{m}}={{\left( {{A}_{m1}},{{A}_{m2}},\cdots ,{{A}_{ml}} \right)}^{\text{T}}}$ | (7) |
其中Aml表示第l个key在第m个Mapper上的数据量。将所有Mapper的数据分布向量进行归并,那么源数据的整体分布可表示为m×l矩阵:
$A=\left[ \begin{array}{*{35}{l}} {{A}_{11}} & {{A}_{21}} & \cdots & {{A}_{m1}} \\ {{A}_{12}} & {{A}_{22}} & \cdots & {{A}_{m2}} \\ \vdots & \vdots & {} & \vdots \\ {{A}_{1l}} & {{A}_{2l}} & \cdots & {{A}_{ml}} \\ \end{array} \right]$ | (8) |
矩阵中同行元素表示相同key在不同Mapper上的数据分布,映射过程同行元素也由相同的Reducer完成计算,因此将数据按key进行数据量统计,任意key的数据总量可表示为:
${{c}_{l}}=\sum\limits_{m\in mp}{{{A}_{ml}}};l\in keys$ | (9) |
那么将源数据按key进行划分,可表示为如下集合:
$S=\left\{ {{c}_{1}},{{c}_{2}},{{c}_{l}} \right\};l\in keys$ | (10) |
定义4 分区映射。用于描述Mapper数据分布中key与Reducer之间的映射关系,分区映射也表示与Reducer对应Bucket的填充规则。Spark系统延用MapReduce的一次分区机制,默认对key进行哈希值转换,再与Reducer的数量取模,以此决定数据所对应的Bucket,因此原生的分区函数可表示为:
$f(Bucket)=hash(key)od (n)$ | (11) |
通过上述的分区函数可以看出,分区函数保证相同key值数据存放在同一个Bucket。由于所有Mapper采用同一分区函数划分数据,因此源数据中所有相同key数据都映射到同一Reducer。
记作业的Reducer集合为rds={rd1,rd2,…,rdn},那么任意Reducer的分区映射关系可表示为:
$inpu{{t}_{r{{d}_{i}}}}|\to \left\{ {{c}_{i}},{{c}_{n+i}},\cdots ,{{c}_{j\times n+i}} \right\};j\in \left[ 0,l/n \right]$ | (12) |
定义5 分配倾斜度。用于描述Reducer分配数据与均值的差异程度。由定义3可知,Reducer集合要处理的数据总量为S,那么在同构集群环境下,各Reducer分配数据量的均值应表示为:
$E=\left( \sum\limits_{i=1}^{l}{{{c}_{i}}} \right)/n$ | (13) |
根据定义4的描述,Spark依旧延用MapReduce的一次分区技术,将key的哈希值与Reducer数量取模,以判定该key数据与Reducer的对应关系,但由于key的哈希值与其在数据分布的出现频率无关,即与相同key的元组数无关,因此在多数情况下,各Reducer的数据分配量与均值不匹配,根据式(11) 、(12) ,将任意Reducer的分配倾斜度定义为:
${{Q}_{i}}=inpu{{t}_{r{{d}_{i}}}}/E$ | (14) |
定理1 在同构集群环境下,对于所有执行宽依赖Stage的Reducer,其分配倾斜度越小,作业的执行效率越高。
证明 设作业当前执行宽依赖Stagei,基于定义3,宽依赖Stage仅包含一个RDD的计算工作,因此Stagei的执行时间等于RDDij的计算代价。记任意的Reducer计算时间为Tfinishn,由于在任务分配中,每个Reducer负责计算RDDij的一个分区,因此Stagei的执行时间也可表示为:
${{T}_{stag{{e}_{i}}}}={{T}_{RD{{D}_{ij}}}}=\max ({{T}_{finis{{h}_{1}}}},{{T}_{finis{{h}_{2}}}},\cdots ,{{T}_{finis{{h}_{n}}}})$ | (15) |
同构集群环境下,各Reducer的计算能力基本一致,因此输入数据量成为决定计算时长的唯一因素。根据定义5的描述,分配倾斜度表示Reducer数据分配与均值的差异,均值代表完全均匀的数据分配,因此对于所有执行宽依赖Stage的Reducer,其分配倾斜度越小,作业的执行效率越高。
3 迭代填充分区映射算法本章基于模型相关的定义和证明,提出扩展式数据分区算法和迭代式分区映射算法,并对算法的执行细节进行分析和说明。
3.1 算法的总体描述根据2.3节定义4,传统Spark的分区方法延用MapReduce的一次划分方法,数据分配与key的个数有关而与分配数据量无关,导致数据发生倾斜影响作业执行效率,因此,迭代填充分区映射算法的目标是提高数据分配策略与数据量的关联度,以此增加分配策略的合理性,但由于在所有Map任务完成之前难以预知真实的数据分布,因此考虑改进既有的一次分区策略,通过多轮的分区映射过程达到数据适应性分配的目标。
迭代填充分区映射算法的主要思想是:1) 将Mapper与Reducer之间的数据缓冲区划分为原生区和扩展区两部分,每个区域包含的Bucket数量与Reducer的个数相同。2) 在原生分区策略的基础上加以改进,保证大部分数据写入原生区,而小部分key的数据写入扩展区,并能够对应到扩展区中不同的Bucket编号。3) 原生区中的Bucket与Reducer之间为固定对应关系,当某个Mapper计算完毕后,所有Reducer即可开始进行原生区的数据拉取。4) 初始状态下,扩展区中的Bucket与Reducer无对应关系,达到特定时机则启动后续轮次分配,将扩展区的数据逐步映射到Reducer。
3.2 扩展式数据分区算法扩展式数据分区算法的主要步骤如下:
1) 确定扩展参数x,原生区和扩展区生成Bucket,原生区的Bucket数量与Reducer的个数n相同,扩展区的Bucket数量为n×x。
2) Mapper计算hash(key)mod(n+x)获得写入数据的Bucket编号,若编号小于n,则写入数据,本次过程结束;若编号大于等于n,则表示数据应放入扩展区,继续执行步骤3) 。
3) 对于hash(key)mod(n+x)≥n的情况,继续计算(hash(key)/(n+x))mod(n×x),确定该数据在扩展区中的Bucket编号并写入数据,本次过程结束。
扩展式数据分区算法的伪代码如下:
算法1 扩展式数据分区算法。
输入:原生区native;扩展区extension;Reducer个数n;源数据键值key;扩展参数x。
初始化: bukNo←-1;
1) native.creatBucket(n);
2) extension. creatBucket(n*x);
3) bukNo= hash(key) mod (n+x);
4) if(bukNo<n) then
5) write(key,native[bukNo]);
6) else
7) bukNo= (hash(key) /(n+x)) mod (n*x);
8) write(key,extension[bukNo]);
9) end if
由算法描述可以看出,扩展参数决定了原生区与扩展区的划分比例,而扩展区则为后续的分区映射算法服务,通过多轮分配渐进填充,提高数据分配的合理性。
3.3 迭代式分区映射算法根据3.1节的描述,原生区的Bucket数量与Reducer个数相同,两者之间为一一对应关系,由于原生区的生成方式与MapReduce的一次分区策略相同,难以保证数据的均匀分配,因而扩展区的后续轮次分配的合理性成为算法目标实现的关键问题。为达到精准分配,本文方法在原生Spark系统中增加了1个计数器counter和1个数据构RelationSchema,counter用于统计扩展区内各Bucket的数据量,RelationSchema用于表示Bucket与Reducer的映射关系。原生区映射过程与传统Spark相同,不再赘述,下面重点讨论扩展区的映射过程,其主要步骤如下:
1) 将扩展区中的Bucket倒序排列,并选取前n个Bucket生成待分配列表。
2) 对所有Reducer的RelationSchema进行映射数据量统计,挑选出映射数据量最小的Reducer。
3) 将分配列表容量最大的Bucket与映射数据量最小的Reducer建立一一对应的映射关系,更新RelationSchema。
4) 重复步骤2) ,直到n个Bucket都映射完毕。
5) 启动数据拉取进程,等待下一轮映射过程。
算法2 迭代式分区映射算法。
输入:扩展区extension;Reducer集合rds;
初始化:candis←new List〈Bucket〉;//待分配列表
1) extension.orderDesc();
2) candis=extension.getTop(n);
3) for i=0 to n-1 do
4) rds.RelationSchema.statistics();
5) minload=min(rds);//负载最小Reducer
6) minload.mapping(candis[i]);//建立映射
7) minload. RelationSchema.update();
8) end for
9) start pull;//启动数据拉取进程
10) waitfor nextround;//等待下一轮分配
接下来讨论分区映射算法执行的时机问题,原生区的映射过程依旧采用传统Spark的处理方式,即当第1个Mapper计算完成后,所有Reducer即可从该Mapper拉取数据。而对于扩展区的映射过程,由于仅当所有Mapper都计算完成才能获得精确的扩展区数据分布,因此若算法过早执行,计数器counter的统计结果不够精确,影响分区映射的合理性,而过晚执行则会使Reducer处于饥饿状态,影响了作业的执行效率,因此分区映射算法的执行时机应设定为不影响作业执行效率的最晚时间,即当任意1个Reducer完成原生区的拉取工作,即启动第1次分区映射算法,而后续轮次分配的执行时机均为上一轮拉取工作结束时间,以此类推,完成整个扩展区的映射过程。因此,每一轮分区映射过程都是对上一轮因统计结果不精确而产生的分配误差进行修正,从而经过多轮迭代求得数据分配的近似最优解。
4 实验与评价本章通过实验比较和评价,验证迭代填充分区映射算法的有效性。
4.1 实验环境实验环境搭建采用1台服务器和8个工作节点组成的集群,其中服务器作为Hadoop的NameNode和Spark的Master,主要配置为16颗4核心处理器阵列、256 GB内存和4个千兆网卡。8个工作节点作为DataNode和Slave,配置如表 1所示。参数配置方面,HDFS的默认备份数为3,Block大小为64 MB,Spark的并行参数值(spark.default.parallelism)设置为16。作业执行时间的监测通过Spark控制台,各种资源的使用状况数据来源于nmon。
实验数据选取Zipf数据集和有向图两种类型,其中Zipf数据集主要包括9个子数据集,总量为7.3 GB,用于执行WordCount作业。每个子数据集满足指数为γ的标准Zipf分布,γ取值范围为0.2~1.0的小数,增量为0.1,γ的取值越大,表示数据分布越倾斜。有向图主要包括SNAP(Stanford Network Analysis Project)[22]提供的标准数据集,用于执行PageRank作业,如表 2所示。
迭代填充分区映射算法通过引入扩展参数,确定原生区与扩展区的划分比例,同时扩展参数也决定了数据分配的轮数,因此实验首先验证扩展参数对作业执行效率的影响。实验选取Zipf数据集中γ取值为0.3、0.6和0.9的3个子数据集执行WordCount作业,实验结果如图 2所示。
由图 2可以看出,对于Zipf-0.3数据集,由于数据分布的倾斜度较低,其作业执行效率随扩展参数值的增大,优化效果并不明显。而对于Zipf-0.6和Zipf-0.9,在前4个监测点,随着扩展参数值的增大,作业执行时间急剧下降,这是因为在数据分布倾斜度较大的情况下,原生区中各Bucket数据量差异较大,通过扩展参数的介入,能够附加额外的数据分配,修正原生区数据分配产生的误差,因此扩展参数值越大,修正效果越明显。当扩展参数值为4时,作业执行效率的优化效果趋于稳定,在后2个监测点,作业执行时间又出现小幅提高,这是由于扩展系数具有最优上限,在此基础上继续增加分配轮数也无法提高作业执行效率;达到最优上限后,算法的额外开销开始显现,额外开销导致了作业执行效率的轻微下降。
4.3 WordCount对比实验实验选取5个不同分布的Zipf数据集执行WordCount作业,对比迭代填充分区映射算法与传统Spark的性能差异。其中扩展参数值统一设置为4,Spark启动的Reducer数量为16。实验首先监测最大负载节点和最小负载节点的分配数据量变化,实验结果如图 3所示。
由图 3可以看出,与传统Spark环境相对比,迭代填充分区映射算法降低了最大负载工作节点的计算数据量,提高了最小负载节点的数据量。这是因为传统Spark一次分区策略对原始数据的分布不敏感,也缺乏有效的应对策略,因此数据分布的倾斜性导致了最大、最小负载节点之间的数据分配差。而对于迭代填充分区算法,扩展区的分配是对原生区数据倾斜分配有效弥补,从而产生相对均衡的数据分配。综合来看,随着Zipf分布指数的增大,传统Spark的数据分配量差异越来越明显,而迭代填充分区映射算法始终保持较为稳定的均匀状态。这是由于在传统Spark环境下,数据倾斜度越大,工作节点数据量差异也越大,分配效果也越差。而迭代填充分区映射算法的映射过程是通过多轮次分配完成,每一轮分配都是对上一轮分配误差的修正,有效降低了数据分布对分配效果的影响,因此,从数据分配合理性的角度来看,迭代填充分区映射算法具有良好的优化效果。
图 4显示了不同分布数据集作业执行时间的对比,从实验结果来看,对于不同分布的Zipf数据集,迭代填充分区映射算法的作业执行时间均小于传统Spark的执行时间。根据最大、最小节点的数据分配量可知,迭代填充分区映射算法保障了同构集群环境上数据分配的均衡性,具有相同计算能力的Reducer分配计算量差异较小,各任务完成时间也较为接近,因此宽依赖Stage的执行时间较短,作业的执行效率更高。而对于传统Spark,由于其对数据倾斜分布无任何有效应对策略,往往导致相同计算能力Reducer所分配的计算数据量有较大差异,各任务完成时间长短不一,因此宽依赖Stage的执行时间较长,降低了作业的整体执行效率。从优化效果来看,数据分布倾斜度越大,迭代填充分区映射算法的优化效果越明显,但优化效果并未随分布指数据的增大呈线性增加趋势,这是因为无法预知精确的数据分布,迭代填充分区映射算法仅是在现有条件下提供较为均衡的数据分配方案,而且扩展区的预留也采用固定的公式计算,不能根据不同数据分布进行灵活的适应性调整,因此对于不同的数据集其优化效果无明显规律。
上一节通过WordCount作业验证了算法的有效性,但由于WordCount仅包含一个依赖操作,Reducer也仅是作简单的加法运算,不能完全体现迭代填充分区映射算法的优化效果,因此本节选择宽依赖个数更多、操作复杂度更高的PageRank作业对算法作进一步评估。实验选择了2个不同大小的数据集进行,扩展参数值为4,Reducer个数为16,实验结果如图 5所示。
由图 5可以看出,对于每一个数据集,传统Spark和迭代填充分区映射算法的作业执行时间都随迭代次数的增加而上升;在每一个监测点,传统Spark的作业执行时间均大于迭代填充分区映射算法,从而证明了本文算法对Spark框架的性能优化具有良好的效果,也验证了理论模型及算法设计的正确性。从不同迭代次数的效率差异来看,作业的迭代次数越多,执行时间的优化度越高,作业执行时间的缩减比基本随迭代次数据增加呈线性增长趋势,这是由于在PageRank作业中,每轮迭代的数据分布都相同,因此迭代填充分区映射算法每轮迭代的优化效果也基本相同。从作业执行的整体趋势来看,随着迭代次数的增加,传统Spark的作业执行时间上升幅度较大,而迭代填充分区映射算法由于其多轮分配均衡了不同Reducer间的计算数据量,加速了宽依赖Stage的执行,因此作业执行时间上升趋势较为缓和。由此可以看出,传统Spark的作业执行效率受宽依赖的影响较大,而迭代填充分区映射算法对宽依赖的敏感度较低,宽依赖Stage越多,越能够体现算法的优化效果,作业执行的加速效应也越明显。
5 结语本文针对Spark宽依赖Stage数据分区的倾斜问题,首先分析Spark作业执行机制,建立了作业效率模型,给出了RDD计算代价和作业执行时间的定义。通过对Spark框架原始的分区策略进行研究和分析,建立了分区映射模型,给出了源数据分布、分区映射和分配倾斜度的定义,并证明了这些定义对作业执行效率影响,为算法设计提供依据。其次,在相关定义和证明的基础上,提出扩展式数据分区算法和迭代式分区映射算法,并对算法的执行细节进行分析和说明。最后,通过不同的实验验证算法的有效性,实验结果表明,迭代填充分区映射算法提高了数据分配的合理性,优化了宽依赖Stage的作业执行效率。下一步的研究方向是探索异构集群下适应工作节点计算能力的分区映射策略。
[1] | STRANDE S M, CICOTTI P, SINKOVITS R S, et al. Gordon:design, performance, and experiences deploying and supporting a data intensive supercomputer[C]//Proceedings of the 1st Conference of the Extreme Science and Engineering Discovery Environment:Bridging from the Extreme to the Campus and Beyond. New York:ACM, 2012:Article No. 3. |
[2] | BRONEVETSKY G, MOODY A. Scalable I/O systems via node-local storage:approaching 1 TB/sec file I/O, LLNL-TR-415791[R]. Livermore, CA:Lawrence Livermore National Laboratory, 2009:1-6. |
[3] | ZAHARIA M, CHOWDHURY M, DAS T, et al. Fast and interactive analytics over Hadoop data with Spark[J]. Login, 2012, 37 (4) : 45-51. |
[4] | Apache Spark. Spark overview[EB/OL].[2015-03-18]. http://spark.apache.org. |
[5] | ZAHARIA M, CHOWDHURY M, DAS T, et al. Resilient distributed datasets:a fault-tolerant abstraction for in-memory cluster computing[C]//Proceedings of the 9th USENIX Conference on Networked Systems Design and Implementation. Berkeley, CA:USENIX Association, 2012:2. |
[6] | LIN X, WANG P, WU B. Log analysis in cloud computing environment with Hadoop and Spark[C]//Proceedings of the 5th IEEE International Conference on Broadband Network and Multimedia Technology. Piscataway, NJ:IEEE, 2013:273-276. |
[7] | DONG X, XIE Y, MURALIMANOHAR N, et al. Hybrid checkpointing using emerging nonvolatile memories for future exascale systems[J]. ACM Transactions on Architecture and Code Optimization, 2011, 8 (2) : 510-521. |
[8] | 慈轶为, 张展, 左德承, 等. 可扩展的多周期检查点设置[J]. 软件学报, 2010, 21 (2) : 218-230. ( CI Y W, ZHANG Z, ZUO D C, et al. Scalable time-based multi-cycle checkpointing[J]. Journal of Software, 2010, 21 (2) : 218-230. doi: 10.3724/SP.J.1001.2010.03787 ) |
[9] | DEAN J, GHEMAWAT S. MapReduce:simplified data processing on large clusters[C]//Proceedings of the 6th Conference on Symposium on Opearting Systems Design and Implementation. Berkeley, CA:USENIX Association, 2004,6:10. |
[10] | KWON Y, BALAZINSKA M, HOWE B, et al. A study of skew in MapReduce application[EB/OL].[2016-03-18]. https://www.researchgate.net/publication/228941278_A_Study_of_Skew_in_MapReduce_Applications. |
[11] | KWON Y, BALAZINSKA M, HOWE B, et al. Skew-resistant parallel processing of feature-extracting scientific user-defined functions[C]//Proceedings of the 1st ACM Symposium on Cloud Computing. New York:ACM, 2010:75-86. |
[12] | 王卓, 陈群, 李战怀, 等. 基于增量式分区策略的MapReduce数据均衡方法[J]. 计算机学报, 2016, 39 (1) : 19-35. ( WANG Z, CHEN Q, LI Z H, et al. An incremental partitioning strategy for data balance on MapReduce[J]. Chinese Journal of Computers, 2016, 39 (1) : 19-35. ) |
[13] | KWON Y, BALAZINSKA M, HOWE B, et al. SkewTune:mitigating skew in MapReduce applications[C]//Proceedings of the 2012 ACM SIGMOD International Conference on Management of Data. New York:ACM, 2012:25-36. |
[14] | YAN W, XUE Y, MALIN B. Scalable and robust key group size estimation for reducer load balancing in MapReduce[C]//Proceedings of the 2013 IEEE International Conference on Big Data. Piscataway, NJ:IEEE, 2013:156-162. |
[15] | RAMAKRISHNAN S R, SWART G, URMANOV A, et al. Balancing reducer skew in MapReduce workloads using progressive sampling[C]//Proceedings of the 3rd ACM Symposium on Cloud Computing. New York:ACM, 2012:Article No. 16. |
[16] | GUFLER B, AUGSTEN N, REISER A, et al. Handing data skew in MapReduce[C]//Proceedings of the 1st International Conference on Cloud Computing and Services Science. Berlin:Springer, 2011:574-583. |
[17] | GUFLER B, AUGSTEN N, REISER A, et al. Load balancing in MapReduce based on scalable cardinality estimates[C]//Proceedings of the 2012 IEEE 28th International Conference on Data Engineering. Washington, DC:IEEE Computer Society, 2012:522-533. |
[18] | KOLB L, THOR A, RAHM E. Load balancing for MapReduce-based entity resolution[C]//Proceedings of the 2012 IEEE 28th International Conference on Data Engineering. Washington, DC:IEEE Computer Society, 2012:618-629. |
[19] | KOLB L, THOR A, RAHM E, et al. Block-based load balancing for entity resolution with MapReduce[C]//Proceedings of the 20th ACM International Conference on Information and Knowledge Management. New York:ACM, 2011:2397-2400. |
[20] | RACHA S C. Load balancing Map-Reduce communications for efficient executions of applications in a cloud[D]. Bangalore, India:Indian Institute of Science, 2012:12-16. |
[21] | IBRAHIM S, JIN H, LU L, et al. Handling partitioning skew in MapReduce using LEEN[J]. Peer-to-Peer Networking and Applications, 2013, 6 (4) : 409-424. doi: 10.1007/s12083-013-0213-7 |
[22] | JURE L. Stanford network analysis project[EB/OL].[2015-03-18]. http://snap.stanford.edu. |