2. 新疆大学 信息科学与工程学院, 乌鲁木齐 830046
2. School of Information Science and Engineering, Xinjiang University, Urumqi Xinjiang 830046, China
随着云计算、物联网、移动互联和社交媒体等新技术和新服务模式的不断兴起, 全球数据量呈爆炸式增长趋势, 人类已经全面进入大数据[1]时代。大数据蕴含大信息, 大信息提炼大知识, 大知识创造高价值并从更高的层面为用户提供更优质的服务。同时, 数据价值的时效性变得越来越明显, 为了更及时地从数据中提炼有价值的信息, 必须不断提高数据分析的实时性, 大数据流式计算应运而生。与批处理模式[2]相比, 流处理模式[3]具有实时性、易失性、无序性、无限性和突发性的特征[4], 能够在数据产生后即时提炼价值, 已在对数据分析实时性要求较高的场景中得到广泛应用。为了满足这种实时性要求, 集群应具备较高的响应能力和较低的计算延迟, 同时要求计算结果的准确性和可靠性[5], 这对流式计算的技术发展提出了更高的要求。然而面对高速连续大规模的数据, 计算节点间负载倾斜的问题尤为突出, 严重影响了集群的性能。但已有的研究成果多面向批处理环境, 无法应用于流式计算平台, 负载不均衡仍是制约流式计算集群响应能力和吞吐量的瓶颈。
Apache Flink[6-9]是针对流数据处理的分布式计算平台, 支持流处理和批处理两种模式, 提供Exactly-Once的可靠性流式计算[10]和丰富的时间窗口[11]机制。凭借低延迟、高吞吐的性能优势[12], Flink在得到学术界广泛关注的同时, 也在产业界的应用中取得显著成果:阿里巴巴构建了上千节点的Blink集群参与在线搜索和实时推荐业务, Google云平台宣布对Flink的相关技术支持。但在不断深入的产业化进程中Flink也面临一些挑战, 须经过不断改进和完善以满足应用需求。本文提出基于Flink的数据流动态负载均衡策略, 通过负载感知算法获取节点的计算延迟时间作为负载均衡策略的依据。实验结果表明,本文算法能通过优化负载分配缩短任务的执行时间, 平均优化比达到6.51%。
1 相关工作在大数据流式计算中, 数据由源点(Source)发出, 依次经过不同算子(Operator)的处理, 最终计算结果在汇点(Sink)被持久化。在不考虑迭代计算的前提下, 计算节点的拓扑是一个AOV-网(Activity On Vertex Network), 其中节点代表处理数据的算子, 弧代表数据流动的方向。而在分布式数据流计算中, 同一个算子往往被映射到多个不同的物理节点上, 这样每个计算节点都可能有多个前驱和多个后继。在传统的流式计算平台中, 数据元组大多以〈key, value〉的形式在节点间被计算和传输, 节点根据元组key的Hash值决定每个元组的路由。但这种方式本身具有一定的随机性和盲目性, 用户无法干预数据元组的路由选择, 节点也不考虑其后继的计算负载而是随机地完成交付。不可避免地, 会出现在某一时间段内大多数元组被发往相同的目标节点, 而其他节点没有得到负载分配, 从而导致节点间负载倾斜。在Flink中, 节点对数据元组的路由策略主要有广播、轮询和随机分配三种方式[7], 均不考虑节点的负载情况, 会造成与上述同样的问题。
为了更好地解决数据流负载均衡问题, 已有学者提出了基于各种资源评估模型和感知策略的负载均衡算法, 但极少应用于Flink平台:文献[13]提出一种用灵活算子迁移算法解决内存不足造成的背压(Backpressure)问题, 以理论计算的节点剩余内存作为性能评估指标, 即最大可持续吞吐量(Maximum Sustainable Throughput, MST)。文献[14]根据关键路径上的性能感知和非关键路径上的能耗感知数据制定任务调度计划, 达到响应时间与能耗的最低值; 但未考虑计算节点的内存、网络等其他性能指标的影响。文献[15]针对Storm平台分别提出在线和离线的自适应任务调度策略, 有效减小了任务调度中的通信开销, 但算法本身复杂度很高。文献[16]提出的SkewReduce策略建立了用户定义的代价模型, 根据任务执行中收集的元数据在邻近代价阈值时启动分区映射过程, 实现负载的均匀分配。文献[17]提出针对用户定义数据流上的延迟评估模型, 通过计算资源和任务并行度的弹性变化, 在计算资源最小化的同时提供低延迟保障; 但资源弹性变化和任务调度过程开销较大。
针对上述存在的问题, 本文的主要工作有:
1) 针对单一性能评估指标存在的局限性, 提出依据节点处理元组的延迟时间评估节点性能的思想。
2) 提出一种基于有向无环图(Directed Acyclic Graph, DAG)的深度优先搜索的负载感知(load-aware)算法, 用于检测节点的计算延迟时间, 将流式计算模型的AOV-网转化为AOE-网(Activity On Edge Network), 每个计算节点都能获取其后继的负载信息, 为路由决策提供支持。
3) 提出一种负载均衡策略, 节点可根据负载感知的检测结果重新分配其后继的计算负载, 该策略可同时满足全局和局部的负载均衡需求。
4) 提出一种流式数据分块管理的思想, 通过小顶堆结构管理数据块, 提出有状态(stateful)流式计算的分块负载迁移技术。
2 负载感知技术在分布式计算环境下, 负载均衡策略的依据是计算节点的资源利用率, 通过将资源利用率过高节点的计算负载迁移到资源利用率较低的节点中实现节点间负载的均衡。一个节点的可用资源包括CPU、内存、磁盘I/O、网络传输等, 而现有研究成果多将CPU或内存利用率等单一指标作为性能评估依据, 这在实际应用中是存在局限性的。
事实上, 节点中任何一种资源的匮乏都会成为节点响应能力的瓶颈, 导致数据元组在内存中被滞留而计算延迟加长。因此, 任何单一的性能评估指标在实际应用中都存在局限性, 而计算延迟是反映节点负载和响应能力的综合体现, 延迟越长说明节点负载越高、响应能力越弱; 反之说明节点有充足的剩余计算资源。负载感知技术通过延迟检测算法获取节点的计算延迟数据, 并将其作为制定负载均衡策略的依据。
2.1 延迟检测算法为了获取数据元组经过每个节点的延迟时间, 通过有向无环图的深度优先搜索算法遍历整个节点拓扑, 为每个节点不同的访问状态标记不同的颜色, 并记录每个节点状态改变的时间戳:初始状态下所有节点都标记为白色, 表示节点未被访问过。当节点被首次发现后标记为灰色, 并记录当前时间为节点的发现时间(Operator discover time, O.d)。当节点的所有后继都被访问完成且数据到达汇点后, 该节点被标记为黑色, 表示已完成对该节点对应所有路由的检测, 同时记录当前时间为节点的完成时间(Operator finish time, O.f)。
由上述可知, 发现时间是节点收到数据元组的时间, 完成时间是由该节点发出的数据元组完成所有计算并到达汇点的时间, 是一条路径的完成时间。特别地, 数据源点的发现时间是其发出数据元组的时间, 完成时间是集群中每个节点都完成一次计算的时间; 而汇点不对数据进行处理且没有后继, 其发现时间与完成时间相等, 都是其收到数据元组的时间, 即source.f=sink.f=sink.d。
集群在算法开始执行前根据其结构拓扑图执行相关初始化操作:将所有节点的访问状态标记为白色, 节点的发现和完成时间都记为空(NIL), 通过DFS-Visit(G, G.source)调用节点访问算法, 从源点开始对整个拓扑进行深度优先搜索。
算法1 DAG深度优先节点访问算法。
输入 节点拓扑DAG:G={V, E}; 当前待访问的节点O。
输出 当前节点发现时间O.d; 当前节点完成时间O.f; 当前节点及其后继的延迟检测表latency。
1) init(latency);
/*初始化二维表latency用于记录延迟时间*/
2) O.d← getTimeStamp();
/*获取当前系统时间戳作为节点O的发现时间*/
3) O.color ← GRAY;
/*将当前节点标记为灰色, 表示该节点已被发现*/
4) O.bsl← 0;
/*bsl为节点O黑色后继的延迟时间和, 初始值为0*/
5) foreach S∈G.adj[O]
/*依次遍历节点O的所有后继*/
6) if S.color=WHITE then
/*如果后继节点为白色, 即尚未被访问过*/
7) DFS-Visit(G, G.S);
/*对该后继节点进行深度优先搜索*/
8) else
/*由定理1可知, 所有非白色的后继节点均为黑色*/
9) S.d ← S.getDiscoverTime();
/*获取上一次搜索得到的发现时间*/
10) S.d ← S.getFinishTime();
/*获取上一次搜索得到的完成时间*/
11) O.bsl ← O.bsl+(S.f -S.d);
/*记录所有黑色后继的延迟时间和*/
12) latency.add(S, S.d, S.f);
/*记录后继节点的发现时间和完成时间*/
13) end if
14) end foreach
15) O.f ← getTimeStamp()+O.bsl;
/*当前系统时间与黑色后继延迟时间相加作为当前节点的完成时间*/
16) O.color ← BLACK;
/*将当前节点标记为黑色, 表示该节点已完成搜索*/
17) latency.add(O, O.d, O.f);
/*记录当前节点的发现时间和完成时间*/
18) load_ aware (O, latency);
/*利用2.2节所述策略, 对当前节点执行局部的负载感知*/
19) return latency;
/*返回当前节点及其后继的延迟时间表*/
在DFS-Visit(G, O)第6)~12) 行中, 算法依次检查当前待访问节点的所有后继, 并从所有白色节点向后深度优先搜索。算法认为所有非白色后继节点均为黑色, 可直接获取其上一次搜索中保存的发现和完成时间, 并延长当前节点的完成时间(第11) 行)。定理1保证了推断的正确性, 即保证算法的正确性以及每个节点有且仅有一组发现和完成时间。
定理1 黑白定理。在有向无环图的深度优先搜索算法中, 对于任意节点On及其任意一个后继节点On+k, 有On.d>On+k.f(On被发现时其后继节点的访问状态为黑色)或On.d < On+k.d < On+k.f≤On.f(On被发现时其后继节点的访问状态为白色)其中之一必然成立。
证明 当On.d < On+k.d时, 即节点On+k在On被标记为灰色之后才被发现, 且On+k是On的后继, 根据深度优先搜索的规则, 只有当On+k被处理完成之后算法才返回On, On+k被标记为黑色的时间应在On之前, 即On+k.f < On.f。当On.d>On+k.d时, 即On+k在On被发现之前先被发现, 这说明On+k在对其另一个前驱的深度优先搜索过程中已经被访问过, 因此在On被发现之前On+k已经被标记为黑色, 即On.d>On+k.f。
定理1说明在On第一次被发现的时刻, 其所有后继节点只可能为黑色或白色, 不可能为灰色, 所以算法的推断是正确的。这样保证每个节点仅被访问一次, 有且仅有一组发现和完成时间, 保证负载感知一致性并减少递归调用的次数, 降低算法的时间复杂度, 提高效率。
当深度优先搜索算法完成后, 整个集群完成对节点计算延迟的检测并获得检测结果, 每个节点On都得到如表 1所示的计算延迟检测表, 其中On+k(1≤k≤m)为节点On的所有后继, 节点根据延迟检测的结果进行负载感知并在需要时启动负载均衡算法。
通过深度优先搜索的延迟检测算法, 获取了每条路径的计算延迟, On.f-On.d是节点On所对应路径的延迟时间, 即On的计算延迟。在流式计算节点拓扑图中, 如果将On+k的延迟作为弧On→On+k的权值, 则可以将流式计算拓扑的AOV-网转化为对应的AOE-网。
定义1 数据流AOE-网。如图 1所示, 在数据流AOE-网中, 每个节点代表对应算子的物理映射, 弧代表对应的数据流向, 弧的权值代表对应弧头节点的计算延迟, 记为:
$ {{w}}({O_{{n}}}, {O_{{{n + k}}}}) = {O_{n + {{k}}}}.f-{O_{n + {{k}}}}.d $ | (1) |
其中:On和On+k分别是该弧的弧尾和弧头; On+k.d和On+k.f分别为节点On+k的发现和完成时间。对于指向同一个节点的两条弧, 由于在深度优先搜索算法中对其弧头节点访问了两次, 且第二次访问时该节点为黑色, 根据定理1及DFS-Visit算法第9)~12) 行的执行结果, 这两条弧应具有相同的权值, 即对应弧头节点在第一次访问时的计算延迟。
定义2 权值统计量。设某一节点On的后继节点集为Sn={On+1, On+2, …, On+m}, 共包含m个节点, 且每个节点的计算延迟分别为Wn={w(On, On+1), w(On, On+2), …, w(On, On+m)}, 则后继节点的平均延迟wn和最大延迟maxn分别为:
$ {\overline {{w}} _{{n}}} = \frac{1}{{{m}}}\sum\limits_{i = n + 1}^{n + {{m}}} {w({O_n}, {O_i})} $ | (2) |
$ ma{x_n} = {\rm{MAX}}(w\left( {{O_n}, {O_n}_{ + 1}} \right), w\left( {{O_n}, {O_n}_{ + 2}} \right), \cdots, w\left( {{O_n}, {O_n}_{ + m}} \right)) $ | (3) |
其中:w(On, Oi)是节点Oi的计算延迟, 也是弧On→Oi的权值。计算延迟的方差为:
$ \sigma _{{n}}^2 = \frac{1}{{{m}}}{\sum\limits_{i = n + 1}^{n + {{m}}} {\left[{w({O_n}, {O_i})-{{\overline w }_{{n}}}} \right]} ^2} $ | (4) |
其中: wn是节点On所有后继节点的平均计算延迟; w(On, Oi)是节点Oi的计算延迟, 也是弧On→Oi的权值。σn2越大说明节点间的负载倾斜越严重, 需要进行相应的负载调整。因此设定参数极大延迟θ和极大方差ε作为判定节点负载情况的阈值:
当maxn≤θ且σn2≤ε时, On后继节点间的负载分配是均衡的, 且负载压力是正常的, 不需要启动负载均衡策略。
当maxn>θ且σn2≤ε时, On的所有后继计算节点的负载分配是均衡的, 但负载压力过大。这说明On所有后继的计算负载都较大, 此时应减小节点On自身的计算负载。
当maxn>θ且σn2>ε时, On后继节点的负载分配不均衡, 且这种不均衡导致部分节点的负载压力过大, 计算延迟过长。应调整On后继的计算负载, 由On执行负载均衡策略。
3 负载均衡策略通过基于DAG的深度优先搜索的负载感知算法, 获取了每个节点的计算延迟。负载均衡策略的目标是通过将计算延迟过长节点的负载迁移到计算延迟较短的节点中去, 即把负载过高节点的计算负载迁移到负载较低的节点中去, 从而实现节点间的负载均衡。为了能够控制和改变数据元组的路由, 将节点的输出数据分为不同的“块”, 并使用堆结构管理这些数据块, 实现有状态流式计算的负载迁移技术, 同时兼容Flink的状态管理机制。
3.1 数据分块与管理机制流式数据的分块机制是由计算节点对每个待输出的数据元组执行两次映射来确定对应的目标节点:节点先通过元组key的Hash值确定对应的数据块, 再通过数据块的记录找到对应的目标节点并输出元组。
定义3 流式数据块。设计算节点On的后继节点集合为Sn={On+1, On+2, …, On+m}, 共包含m个节点且为同一算子的不同物理映射, 该节点对应数据块的集合为Bn={block1, block2, …, blockk}, k≫m, 且每个block是一个三元组:
$ bloc{k_i} = \left\langle {{O_n}, dest, size} \right\rangle $ | (5) |
其中:i是该数据块的编号; On是该数据块所属的节点; dest是该数据块对应的目标节点, 即blocki.dest=Oj, 且n < j≤n+m; size是该数据块已经处理的元组数目, 表示该数据块的大小。k≫m保证每个后继节点对应多个数据块。其中数据元组到block的映射采用传统哈希映射法:
$ i = {\rm{ }}\left[{{\rm{Hash}}\left( {key} \right){\rm{ mod}}\;k} \right]{\rm{ }} + {\rm{ }}1 $ | (6) |
则该元组对应的数据块为blocki。当节点需要输出一个元组时, 先根据key的Hash值找到对应的数据块, 再通过blocki.dest记录的内容找到对应的目标节点并输出该元组, 同时执行一次blocki.size + +更新数据块的大小。
一般认为, 在有状态的流式计算中数据块保存的状态数据大小与处理过的数据量正相关, 且迁移大数据块产生的开销较大, 因此节点总是希望迁移数据量最小的块以降低负载迁移的代价。如图 2所示, 节点用堆管理数据块, 即所有数据块按照block.size的大小构成小顶堆, 堆顶元素为所有数据块中数据量最小的。当堆顶元素被迁出或有新元素迁入时, 由节点自动调整堆结构, 保证堆顶元素始终是剩余元素中数据量最小的。这样每次发生负载迁移时都迁出堆顶的数据块, 以降低负载迁移的开销同时避免发生迁移抖动。
将延迟检测得到的数据与设定阈值进行比较, 所有被判定为负载不均衡的节点都要制定对应的负载均衡策略:设以节点On为弧尾的弧的权值集合为Wn, 当maxn>θ且σn2>ε时节点On执行负载均衡算法, 其过程为:
算法2 负载均衡算法。
输入 后继节点的计算延迟集合(含m个元素)Wn; 后继节点的最大计算延迟maxn; 计算延迟方差σn2。
1) while(maxn>θ && σn2>ε)
/*当参数超过设定的阈值时, 反复执行负载均衡算法*/
2) quicksort(Wn, desc);
/*对Wn中的元素降序排序*/
3) for i←1 to m/2 do
/*从延迟最长的后继节点向下依次执行负载迁移*/
4) t ← | Wn[i]-(Wn[i]+Wn[m-i])/2|;
/*计算待迁移数据块的数目t*/
5) load-migrate(Wn[i], Wn[m-i], t);
/*从Wn[i]向Wn[m-i]迁移t个数据块*/
6) end for
7) Wn ← DFS-Visit(G, On);
/*完成负载迁移后, 对当前节点执行一次局部负载感知*/
8) maxn← max(Wn);
/*根据式(3) 计算Wn中计算延迟最长的元素*/
9) σn2← Var(Wn);
/*根据式(4) 计算Wn中所有元素的方差*/
10) end while
由算法2可知, 这是一种启发式局部反馈的负载均衡策略, 针对需要负载均衡的节点不断检测其后继的负载情况并在需要时迁移负载, 直到均衡为止。特别地, 当On为数据源点时, 是对整个集群进行负载均衡。
在算法2第5) 行中, 当需要迁移负载时通过调用迁移函数实现节点间的负载迁移, 这里分别涉及到迁出节点On+i, 迁入节点On+m-i及其前驱节点On。当需要从On+i向On+m-i迁移t个数据块时, 负载迁移的执行过程如下:
1) 对节点On执行数据流静默:暂停向迁出节点On+i发送数据元组, 并将需要发送的元组保存在On的缓存中。
2) 从迁出节点On+i的堆顶取出一个数据块并调整堆结构, 将该数据块对应的状态数据发送至迁入节点On+m-i, 并调整其堆结构。
3) 修改节点On记录的对应数据块路由信息, 即block.dest←On+m-i, 将该数据块的元组路由到新的目标节点。
4) 返回执行第2) 步, 将节点On+i中数据量最小的t个数据块全部迁移至On+m-i后, 执行第5) 步。
5) 恢复On被静默的发送数据流, 将其缓存中的数据发往新的目标节点。
数据流计算中节点间的高效负载迁移任务通过以上步骤完成。在Flink中, 对于有状态的流式计算, 每个TaskManager的计算状态存储在自身的RocksDB数据库中, 并能够定期将状态的快照(Snapshot)信息固化在文件系统或Hadoop分布式文件系统(Hadoop Distributed File System, HDFS)中。这能够很好地兼容负载均衡和迁移策略:如果将节点的状态数据以块为单位进行组织和存储, 并使用堆管理这些数据块, 就可用负载迁移算法实现节点间有状态流式数据的负载迁移。
3.3 参数影响与代价评估实验结果表明, 部分参数的取值对算法的执行效果有影响: θ和ε是由用户设定的两个参数值,也是启动负载迁移算法的阈值,它们共同决定了负载感知算法的敏感程度, 决定了负载感知算法的敏感程度:阈值过小会导致算法过于敏感, 负载迁移过于频繁甚至出现迁移抖动的现象; 阈值过大会导致算法过于迟钝, 造成数据元组被阻塞, 使下一次负载迁移产生较大的时空代价。k是每个节点数据分块的数目, 它决定了负载迁移的粒度, k的值应远大于节点后继的数目, 这样保证每个节点都对应多个block。参数值过小会导致每个数据块元组数量过多, 负载迁移开销过高, 产生过度迁移和迁移抖动的问题; 参数值过大会影响算法的执行效率, 需要多次负载感知和迁移才能实现负载均衡。因此参数的取值对算法的执行效果至关重要。
在时间上, DFS算法的时间复杂度为T(n)=O(|V|+|E|), 其中:V是节点的集合, E是节点间弧的集合。在每个节点中, 计算σn2和maxn的时间复杂度为T(n)=O(m)。在负载迁移策略中, 快速排序的时间复杂度为T(n)=O(n log n), 堆操作的时间复杂度为T(n)=O(log n), 修改路由信息表等其他简单操作的复杂度均为T(n)=O(1), 因此总时间复杂度为T(n)=O(n log n)+O(log n)+O(1)=O(n log n)。同时, 算法收敛的速度还与节点间实际负载倾斜的程度有关, 而负载迁移的时间开销主要与被迁移数据块的大小以及网络传输速率有关。目前Flink集群在实际应用中一般不超过1500个节点, 且每次负载迁移都选择最小的数据块, 实验结果表明算法在时间复杂度上是可行的。
在空间上, 计算延迟检测表中的数据包括节点名称和对应的两个时间戳。在Java中, 时间戳占用8 Byte, 节点名称可用一个整型变量(4 Byte)记录, 而一个节点的后继不超过500个节点, 因此延迟检测表占用内存空间应不超过O(n)=(2×8 Byte+4 Byte)×500=10000 Byte≈10 KB。在负载迁移策略中, 每个节点存储的路由信息表的空间复杂度为O(1), 小顶堆结构占用内存应不超过O(n)=4 Byte×3×500=6000 Byte≈6 KB。这对于目前的硬件存储能力和千兆以太网的传输速率而言是不值一提的。实验结果表明算法在空间复杂度上是可行的。
4 实验与分析作为一款开源免费的新兴分布式数据流计算框架, Apache Flink已经得到比较广泛的应用。实验以Flink为平台, 分别执行WordCount和TeraSort两个标准Benchmark, 分别在相同环境下对动态负载均衡算法和现有负载均衡策略进行对比, 从而检验动态负载均衡及负载迁移策略的性能。
4.1 实验环境实验搭建的Flink集群运行在10台普通物理PC上, 每个节点的软硬件环境配置参数如表 2所示, 其中包括1个JobManager节点, 8个TaskManager节点和1个HDFS节点作为数据的源点和汇点。同时, 作为Flink平台的奠基性成果, 文献[6]分别采用WordCount和TeraSort两个标准的Benchmark进行验证。为了验证动态负载均衡算法对Flink平台的优化效果, 实验也采用这两个Benchmark作为基准测试。通过多次预实验进行反馈调节, 最终确定实验相关参数设定:θ=0.13 ms, ε=0.01 ms2, parallelism.default=8, k=100, taskmanager.numberOfTaskSlots=1, 这样每个节点启动一个线程, 占用单核的CPU资源, 从而形成负载倾斜且节点资源不足造成计算延迟过长的实验现象, 以验证算法的优化效果。其中, WordCount实验测试数据为有100万个英文单词的文本数据, TeraSort实验的测试数据为1 GB待排序的数值型数据。
如表 3所示, 在WordCount实验中共设立了两个实验组和三个对照组分别在对应环境下执行作业, 并在作业执行过程的重要时间点记录时间戳, 跟踪记录任务执行的相关参数作为对比和分析的依据。由于动态负载均衡算法是对现有Flink平台的优化, 因此三个对照组分别代表了Flink现有负载均衡策略在不同场景下的执行效率, 而两个实验组是在相同环境下分别启用了动态负载均衡算法与原系统的负载均衡策略形成对比实验。
启动负载均衡算法的条件之一是maxn>θ, 因此主要关注每组实验中负载最重、计算时间最长的TaskManager节点(称主工作节点), 并记录以下三个运行参数:任务执行时间、主工作节点处理的数据量和平均每处理1万单词使用的时间。为了避免个别任务存在的偶然性误差, 实验将每个对照组的任务分别执行10次并计算相关参数的平均值, 得到如图 3所示的实验结果。从图 3(a)中可以看出:在相同环境下:实验组1通过实施动态负载均衡算法, 将对照组1中原本由单节点处理的100万条数据分配给8个节点, 有效缩短了任务执行时间, 优化比为66.80%。实验组2通过实施负载均衡算法, 优化了对照组3中节点间的负载分配, 减少主工作节点的计算负载, 优化比为6.51%。这说明动态负载均衡算法在负载倾斜比较严重的情况下优化效果更好。对照组2是一种理想情况, 其任务执行时间最短说明负载均衡算法还有继续优化和提升的空间。在图 5(b)中, 实验组数据的平均处理时间与数据量的比值有明显的上升, 其中实验组1的上升最明显, 这是因为其负载倾斜严重导致较大的迁移代价。
为了进一步对比分析负载均衡算法对任务执行过程的影响, 在任务执行过程中数据汇点每收到10万条数据记录一个时间戳Ti, 通过两个时间戳之间的差值得到处理这些数据的时间延迟, 即Pi=Ti-Ti-1。选取每个实验组中各项参数距离均值最近的一次任务作为该实验组的代表, 分析其任务执行过程中集群响应能力的变化情况。
如图 4所示, 对照组1和实验组1分别是在相同环境下原系统和动态负载均衡策略的任务执行情况, 实验组1中集群分别在P2、P4和P6时间段内发生了3次负载迁移, 分别将相同数据量的计算时间缩短了484.9 ms、96.8 ms和21.4 ms, 其中第一次负载迁移的优化效果最明显, 因为原本单节点的计算负载被分配给8个节点, 显著提高任务执行效率。同时, 在P2、P3和P6时间段内的计算时间较上一时间段有上升, 这是因为负载迁移过程产生了一定的时间开销, 其中第一次迁移的数据量最大, 因此P2的上升最明显, 但整个任务的执行时间减少, 因此这种开销是值得的。
如图 5所示, 实验组2和对照组3分别是相同环境下的任务执行情况, 实验组2中集群通过在P2和P5时间段内发生的两次负载迁移, 分别将相同数据量的计算时间缩短了20.6 ms和5.8 ms, 且抑制了对照组3中计算时间明显波动的情况。该组对照实验的负载倾斜不严重, 因此算法的优化效果并不明显, 且负载迁移产生的开销较小。
实验结果表明:在数据倾斜度较大且待迁移数据量较小的情况下, 负载均衡算法的优化效果比较显著。在节点间负载比较均衡的情况下, 算法可抑制计算延迟的剧烈波动, 但对性能的优化效果没有前者显著。负载迁移策略的执行会导致少量的计算延迟, 但延迟在合理可接受的范围内, 且能有效提高整个任务的执行效率。
为了进一步分析相关参数对算法的影响, 实验通过控制变量ε=0.01 ms2, k=100不变, 分别设定不同的θ值, 重复执行实验组1, 得到如图 6所示的实验结果。
如图 6所示, θ取值过小导致算法过于敏感, 频繁的负载迁移产生过多的开销。θ取值过大导致算法过于迟钝, 其中当θ=0.2 ms或θ=0.3 ms时执行的2次负载迁移未实现最优的负载分配, 但由于阈值过高而没有出触发新的负载迁移。因此, 实验最终确定参数θ=0.13 ms, 集群会根据负载倾斜程度的不同执行2次或3次负载迁移, 且不会发生迁移抖动, 取得比较理想的负载均衡效果。
TeraSort是分布式计算平台中用于对数据排序的Benchmark, 在不同平台下对1 GB数据排序效率是衡量分布式系统出处理能力的公认标准。为验证算法对较复杂计算的优化效果, 实验采用Hadoop的TeraGen作业生成1 GB数据, 执行开源项目incubator-Flink[18]中提供的TeraSort作业, 分别使用平台原有的负载均衡策略和动态负载均衡策略进行测试, 得到如图 7所示的实验结果:在相同的执行环节中, 负载均衡算法分别在P2和P4时间段内启动了两次负载迁移, 通过优化节点间的负载分配降低了主工作节点的计算负载, 提高了任务的执行效率, 减少了任务执行的总时间;但由于需要迁移的数据量较大, 迁移过程中产生较大的时间开销, 且节点间的负载倾斜本身不是很明显, 因此没有其对WordCount作业的优化效果显著。
在大数据流式计算系统中, 节点间负载不均衡是造成集群性能下降的主要原因, 而资源评估不全面是制约负载均衡技术发展的瓶颈。本文提出基于计算延迟时间的数据流负载感知技术, 以及动态负载均衡和负载迁移策略, 通过优化节点间的负载分配提高了任务的执行效率。但本文算法也存在一定的缺陷:负载迁移技术会造成一定的时间开销, 导致一段时间内的计算延迟加长;另外目前只能通过反馈调节机制确定算法的相关参数。
下一步研究将针对数据流速本身波动造成资源分配不均的问题, 研究数据流弹性资源[17]计算中的负载均衡算法。在计算资源弹性变化的场景中根据负载感知的结果进行负载调优, 其中节点加入和离线时如何迁移负载将是下一步研究的重点。
[1] | 孟小峰, 慈祥. 大数据管理:概念、技术与挑战[J]. 计算机研究与发展, 2013, 50(1): 146-169. (MENG X F, CI X. Big data management: concepts, techniques and challenges[J]. Journal of Computer Research and Development, 2013, 50(1): 146-169. DOI:10.7544/issn1000-1239.2013.20121130) |
[2] | DEAN J, GHEMAWAT S. MapReduce: simplified data processing on large clusters[J]. Communications of the ACM, 2008, 51(1): 107-113. DOI:10.1145/1327452 |
[3] | 陈付梅, 韩德志, 毕坤, 等. 大数据环境下的分布式数据流处理关键技术探析[J]. 计算机应用, 2017, 37(3): 620-627. (CHEN F M, HAN D Z, BI K, et al. Key technologies of distributed data stream processing based on big data[J]. Journal of Computer Applications, 2017, 37(3): 620-627. DOI:10.11772/j.issn.1001-9081.2017.03.620) |
[4] | 孙大为, 张广艳, 郑纬民. 大数据流式计算:关键技术及系统实例[J]. 软件学报, 2014, 25(4): 839-862. (SUN D W, ZHANG G Y, ZHENG W M. Big data stream computing: technologies and instances[J]. Journal of Software, 2014, 25(4): 839-862.) |
[5] | QIAN Z, HE Y, SU C, et al. TimeStream: reliable stream computation in the cloud[C]//Proceedings of the 8th ACM European Conference on Computer Systems. New York: ACM, 2013: 1-14. |
[6] | ALEXANDROV A, BERGMANN R, EWEN S, et al. The stratosphere platform for big data analytics[J]. The VLDB Journal, 2014, 23(6): 939-964. DOI:10.1007/s00778-014-0357-y |
[7] | CARBONE P, KATSIFODIMOS A, EWEN S, et al. Apache Flink: stream and batch processing in a single engine[J]. Bulletin of the IEEE Computer Society Technical Committee on Data Engineering, 2015, 36(4): 28-38. |
[8] | KOSTAS T, ELLEN F. Introduction to Apache Flink[M]. Sebastopol: O'Reilly Media, 2016: 54. |
[9] | TANMAY D. Learning Apache Flink[M]. Birmingham: Packt Publishing, 2017: 63. |
[10] | CARBONE P, FÓRA G, EWEN S, et al. Lightweight asynchronous snapshots for distributed data flows[EB/OL]. [2017-01-10]. https://arxiv.org/pdf/1506.08603. |
[11] | CARBONE P, TRAUB J, KATSIFODIMOS A, et al. Cutty: aggregate sharing for user-defined windows[C]//Proceedings of the 25th ACM International on Conference on Information and Knowledge Management. New York: ACM, 2016: 1201-1210. |
[12] | CHINTAPALLI S, DAGIT D, EVANS B, et al. Benchmarking streaming computation engines: Storm, Flink and Spark streaming[C]//Proceedings of the 2016 IEEE International Parallel and Distributed Processing Symposium Workshops. Piscataway, NJ: IEEE, 2016: 1789-1792. |
[13] | COLLINS R L, CARLONI L P. Flexible filters: load balancing through backpressure for stream programs[C]//Proceedings of the Seventh ACM International Conference on Embedded Software. New York: ACM, 2009: 205-214. |
[14] | SUN D, ZHANG G, YANG S, et al. Re-Stream: real-time and energy-efficient resource scheduling in big data stream computing environments[J]. Information Sciences, 2015, 319: 92-112. DOI:10.1016/j.ins.2015.03.027 |
[15] | ANIELLO L, BALDONI R, QUERZONI L. Adaptive online scheduling in Storm [C]//DEBS 2013: Proceedings of the 7th ACM International Conference on Distributed Event-Based Systems. New York: ACM, 2013: 207-218. |
[16] | KWON Y C, BALAZINSKA M, HOWE B, et al. Skew-resistant parallel processing of feature-extracting scientific user-defined functions[C]//Proceedings of the 1st ACM Symposium on Cloud Computing. New York: ACM, 2010: 75-86. |
[17] | LOHRMANN B, JANACIK P, KAO O. Elastic stream processing with latency guarantees[C]//Proceedings of the 2015 IEEE 35th International Conference on Distributed Computing Systems. Piscataway, NJ: IEEE, 2015: 399-410. |
[18] | Fabian Hueske. Incubator-Flink[EB/OL]. [2017-03-26]. https://github.com/physikerwelt/incubator-flink. |