2. 国家计算机网络应急技术处理协调中心, 北京 100029
2. National Computer Network Emergency Response Technical Team /Coordinate Center of China, Beijing 100029, China
物联网、云计算、社交网络和移动通信等信息技术的飞速发展[1],标志着大数据时代的到来。在大数据时代,数据的产生速度快、体量大、种类多,但是其价值密度低。如何对大数据进行分析处理,挖掘出数据中蕴含的宝贵价值成为了一个新的挑战。聚类是一种重要的数据处理方法,广泛应用于文本挖掘、社交网络分析、生物资讯分析、市场调研等领域[2]。而且,由于聚类是一种无监督的学习方法,因此,在大规模数据的处理中,受到了越来越多的关注。
在众多聚类算法中,K-means和概率混合模型由于简单快速而应用最为广泛[3]。但是,K-means对初始中心点和噪声点比较敏感,概率混合模型又存在局部最优和收敛速度慢的问题。为了解决不同情形下的聚类问题,研究人员提出了不同的聚类算法:Lin等[4]于2008年提出了基于网格的偏转算法,但是其可扩展性较差;Menendez等[5]于2015年提出了基于谱图理论的Gany算法,但是由于要计算大规模矩阵的逆,计算复杂度太大;Liu等[6]于2016年提出了基于PageRank的K网页排名扫描算法,但是由于要输入PageRank的阻尼系数和预定义的阈值,参数不易调节。
与上述聚类算法相比,Lin等[7]于2010年提出的幂迭代聚类是一种简单、快速、准确并且可扩展的聚类算法。自提出后,该算法受到了广泛的关注,许多人对幂迭代聚类算法的并行实现进行了研究。Yan等[2]基于MPI并行实现了幂迭代聚类,但存在节点失效的问题;Darji等[8]基于Hadoop MapReduce研究了幂迭代聚类,但由于MapReduce是基于磁盘读写的分布式计算框架,每次shuffle都要耗费大量的时间,对于需要进行多次迭代的算法存在性能瓶颈。
Spark是一个基于内存的分布式通用计算框架[9],非常适合于迭代计算。GraphX是一个结合了图并行和数据并行的分布式图计算框架[10],方便用户在Spark上实现图算法。
为了解决MPI实现过于繁琐、MapReduce框架效率低下的问题,本文基于Spark的GraphX组件,利用幂迭代聚类算法具有的图的性质,研究并实现了基于GraphX的分布式幂迭代聚类。
最后,通过比较加速比指标发现基于GraphX的幂迭代聚类算法具有近似线性的加速比,说明算法具有良好的可扩展性。同时,通过与基于Hadoop的幂迭代聚类[8]进行比较,发现总的运行时间降低了61%。
1 幂迭代聚类 1.1 幂迭代幂迭代是一种快速计算矩阵的主特征值和主特征向量的方法。设A是一个n维矩阵,其特征值为λ1,λ2,…,λn,对应的特征向量为v1,v2,…,vn,假设特征值满足条件|λ1|>|λ2|≥|λ3|≥…≥|λn|,把绝对值最大的特征值λ1及其对应的特征向量v1称为矩阵A的主特征值和主特征向量。任给一个初始向量x0,满足
${{\mathbf{x}}_{0}}=\sum\limits_{i=1}^{n}{{{c}_{i}}{{\mathbf{v}}_{i}}({{c}_{1}}\ne 0)}$ | (1) |
利用矩阵向量乘法进行迭代:
${{x}_{k+1}}=cA{{x}_{k}}(k=0,1,2,\cdots )$ | (2) |
当k→∞时,式(2)左边收敛于矩阵A的主特征向量。式(2)中的c是为了防止xk变得过大而引入的归一化系数,一般可取1/‖Axk‖1或1/‖Axk‖2。
若v是矩阵A的特征向量,则cv(c是常数)仍然是矩阵A的特征向量。所以,若忽略式(2)中的归一化系数,则有:
${{\mathbf{x}}_{k}}=\sum\limits_{i=1}^{n}{{{c}_{i}}{{\mathbf{A}}^{k}}{{\mathbf{v}}_{i}}=}\sum\limits_{i=1}^{n}{{{c}_{i}}{{\lambda }_{i}}^{k}{{\mathbf{v}}_{i}}}$ | (3) |
即:
$\frac{{{\mathbf{x}}_{k}}}{{{\lambda }_{1}}^{k}}={{c}_{1}}{{\mathbf{v}}_{1}}+{{\sum\limits_{i=2}^{n}{{{c}_{i}}\left( \frac{{{\lambda }_{i}}}{{{\lambda }_{1}}} \right)}}^{k}}{{\mathbf{v}}_{i}}$ | (4) |
由假设λ1>λi(italic>1)知,式(4)右边收敛于c1v1,若λ2≠0,则式(4)右边的收敛速率为λ2/λ1。
1.2 基于快结束的幂迭代聚类给定数据集X={x1,x2,…,xn},相似性函数sim满足:
$sim({{x}_{i}},{{x}_{j}})=\left\{ \begin{align} & sim({{x}_{j}},{{x}_{i}})\ne 0,i\ne j \\ & 0,i=j \\ \end{align} \right.$ | (5) |
亲和矩阵A∈Rn×n定义为Aij=sim(xi,xj)。为了对A进行行归一化,定义度矩阵D,D是一个对角矩阵,满足Dii=
W既可以被看作一个矩阵,也可以被看作一个图,图的顶点为X={x1,x2,…,xn}。给定任意向量v0≠0,利用式(6)进行迭代:
${{\mathbf{v}}^{t+1}}=\frac{\mathbf{W}{{\mathbf{v}}^{t}}}{{{\left\| \mathbf{W}{{\mathbf{v}}^{t}} \right\|}_{1}}};t=0,1,\cdots $ | (6) |
在时刻t,图的顶点和边各维持一个属性,顶点xi的属性值为vt(i),从顶点xi到顶点xj的属性值为Wij。在迭代的过程中,边的属性值不变,每次迭代只更新顶点的属性值。
由于矩阵W进行了行归一化,所以它的主特征值是1,主特征向量是c(1,1,…,1)t(其中c为常数,且c≠0)。 向量vt在收敛到主特征向量的过程中,有一个非常有用的特点:
假设数据集{x1,x2,…,xn}可以划分为k个类簇,第i个类簇为{xi1,xi2,…,xini}(i=1,2,…,k),且满足约束
本文定义收敛速度为:
${{\delta }^{t+1}}={{\left\| {{\mathbf{v}}^{t+1}}-{{\mathbf{v}}^{t}} \right\|}_{1}};t=0,1,\cdots $ | (7) |
定义收敛加速度为:
${{\varepsilon }^{t+1}}=\left| {{\delta }^{t+1}}-{{\delta }^{t}} \right|;t=1,2,\cdots $ | (8) |
Lin等[7]证明了收敛的第一阶段收敛加速度的值较大,而收敛的第二阶段收敛加速度趋近于0。
基于这一特性,一种想法就是以收敛加速度为指标,当收敛加速度小于某个阈值时,就停止迭代,再用K-means算法对向量vt进行聚类。由于K-means对初始中心的选取比较敏感,在实现时本文选择了K-means++算法,防止其陷入局部极小。
因此,基于快结束的幂迭代聚类的流程如下。
输入 行归一化后的亲和矩阵W和聚类个数k。
随机选取非零初始向量v0
Repeat
设
t=t+1
Until δt+1-δt≈0
利用
输出 k个类簇C1,C2,…,Ck。
2 Spark和GraphX 2.1 Spark为了克服MapReduce计算框架所采用的无环数据流模型的缺点,加州大学伯克利分校的AMPLab提出了处理大规模数据的通用计算引擎Spark。Spark是一个基于内存的分布式计算框架,与MapReduce相比,Spark在多次并行操作中能够复用数据,而MapReduce的每个job都要向磁盘读写数据,缺少数据复用的机制。数据表明,在迭代型作业中,Spark的时间效率比MapReduce高两个数量级[11]。
另外,Spark还支持交互式分析,提供了对Scala、Java、Python、R这4种语言的支持。Spark拥有完整的数据分析栈BDAS(Berkeley Data Analytics Stack),包括Spark Streaming、Spark SQL、MLlib、GraphX、Bagel和SparkR。同时,Spark与MapReduce一样,具有良好的可扩展性和容错性。
一个Spark应用程序在集群中就是一组独立的进程集合,这组进程集合由驱动程序中的SparkContext对象进行协调。首先,SparkContext与集群管理器建立连接。连接成功后,Spark应用程序就可以从集群的物理节点获取executor。一个executor是一个既能执行计算任务,又能存储数据的进程。接着,Spark将应用程序代码分发给每个executor。最后,SparkContext给每个executor分配任务,executor开始执行任务。Spark工作原理如图 1所示。
在GraphX出现之前,对图结构数据进行处理的系统可以分为两类:图并行系统和数据并行系统[10]。图并行系统能够充分利用图的结构信息,所以它的计算效率高,但其表达力弱,对于很多图算法无法描述,限制了其应用范围。典型的图并行系统有Pregel、PowerGraph等。数据并行系统的优点是表达力强,但计算效率低。典型的数据并行系统有MapReduce、Spark等。直接在数据并行系统上实现图算法不仅难度大,而且如果不能充分利用图的结构信息对其进行切割和分布式表示,还会导致计算的复杂化,引起集群中大量的数据移动,增加不必要的网络开销。
GraphX将图并行系统和数据并行系统进行优势互补。基于数据并行引擎Spark,GraphX在一个系统中对图并行计算和数据并行计算进行了结合,使得其在计算速度上能够和专业的图计算系统相媲美,同时还保留了数据并行系统的表达力。
2.2.1 分布式图的表示在GraphX中,图的顶点和边都带有属性,可以描述为G(P)=(V,E,P),V表示顶点,E表示边,P=(PV,PE)描述顶点和边的属性。在Spark层上,一个图可以由两个弹性分布式数据集RDD来表示,(i,PV(i))和((i,j),PE(i,j)),即VertexRDD和EdgeRDD。
2.2.2 属性图的分割与存储在实际应用中,图的边的个数远远大于顶点的个数,所以GraphX中采用了顶点分割策略[10]。一方面可以减小存储开销,另一方面可以降低网络开销。
图的存储采取hash策略。对于边,以源点i和终点j作为键进行映射。对于顶点,以顶点id作为键进行映射。映射后的值为数据的存储节点的编号,则EdgeRDD和VertexRDD就可以分布式地存储在集群中了。
图 2展示了属性图的分割与存储原理。图 2以顶点1进行分割;边被分割为3个partition,分别存储在不同节点上;顶点则以VertexId作为键,映射存储在不同的物理机器上。
图的操作有很多种,包括构建图的操作、更改图的顶点和边的属性值的操作和更改图的结构信息的操作等。这里只针对幂迭代聚类算法,给出几种重要的操作的定义。
定义1 fromExistingRDDs。根据分布式图的表示可知,一个图可以表示成一个EdgeRDD和一个VertexRDD。 fromExistingRDDs的作用是:对于给定的EdgeRDD和VertexRDD,构建出相应的图,并返回该图。
定义2 aggregateMessages。该方法接收三个参数:sendMsg、mergeMsg和tripletFields。sendMsg和mergeMsg是两个用户自定义的函数,sendMsg作用于每一条边,表示如何向其源点或终点发送消息;mergeMsg作用于每一个顶点,表示如何对接收到的消息进行聚集。tripletFields是一个优化参数,可以用于减少网络开销。幂迭代聚类中需要用该方法归一化亲和矩阵以及实现矩阵和向量的乘法。
定义3 joinVertices。在很多情况下,现有图的VertexRDD需要利用外部VertexRDD的信息,以改变顶点的属性值。 joinVertices先对两个VertexRDD作等值连接,再对其做一个map操作,最后返回新的VertexRDD与原EdgeRDD构成的图。其工作原理如图 3所示。
幂迭代聚类的执行流程如图 4所示,其核心计算是矩阵与向量的乘积。在GraphX中,aggregateMessages可以在降低网络开销的条件下有效实现这一计算。整个聚类过程的伪代码如下。
输入 亲和矩阵所表示的图graph,聚类个数k,最大迭代次数maxIters。
输出 RDD[(id:Long,cluster:Int)]。
val w=normalize(graph)//行归一化
val w0=initDegreeVector(w)//初始化度向量
val tol=max{1e-5/w0.vertices.count(),1e-8} //收敛阈值
var prevDelta=Double.MaxValue
var diffDelta=Double.MaxValue
var curG=w0
for(int iter=1; iter <=maxIters && diffDelta >tol; iter++){
//计算第iter次迭代后的向量
val v=curG.aggregateMessages[Double](
sendMsg=ctx=>ctx.sendToSrc(ctx.attr*ctx.dstAttr),
mergeMsg=_+_,
TripletFields.Dst).cache()
//对向量进行归一化
val v1=v.mapValues(_/v.values.sum())
//计算收敛速度
val delta=curG.joinVertices(v1){
case (_,x,y)=>math.abs(x-y)
}.vertices.values.sum()
//更新收敛加速度
diffDelta=math.abs(delta-prevDelta)
//更新收敛速度和图的顶点的属性
prevDelta=delta
curG=Graph.fromExistingRDDs(v1,curG.edges)
}
//利用K-means++对最终的向量聚类,并返回结果
return kMeansPlusPlus(curG.vertices,k)
上述伪代码描述是Scala风格的,由于Spark是基于Scala的,所以上述代码可以很容易转化为能够实际运行的代码。在伪代码中,收敛阈值规定了下限值,这是为了避免聚类算法过度收敛,从而所有的数据都被聚为同一类。
实验环境是一个小型的Spark集群。该集群包括一个主节点和3个工作节点。每个节点的CPU型号是AMD Athlon(tm) X4 750 Quad Core Processor,主频为3.40 GHz,内存为2 GB,操作系统为Ubuntu 14.04版本。实验中数据的存储用到HDFS,安装的Hadoop版本为2.6.2 ,JDK版本为1.8,Scala版本为2.10.4,Spark版本为1.5.0。由于集群规模较小,采用的集群管理模式是Spark的内置模式。每台物理机划分为两个executor,平分计算资源和内存资源。executor与driver之间的通信基于akka,物理网络带宽为10 Mb/s,任务的提交基于client部署模式。
数据集采用UCI的Bag of Words数据集里的纽约时代新闻集NYTimes news articles。NYTimes数据集共有299 752篇新闻文档,101 636个不同的单词,共99 542 125个单词。
4.2 并行性能实验从数据集中随机选取10 000、20 000、40 000篇文档作为实验数据,并通过调整spark.cores.max参数[12],指定集群中最多参与计算的executor的数量,得到与运行时间的关系如图 5所示。
对于同样规模的数据集,随着executor的个数的增加,由于程序的计算资源和内存资源也相应地增加,所以程序的运行时间也越来越少。从图 5中可以看到,当新闻文档数目一定时,程序运行时间随着executor的个数的增加而减少。说明算法具有良好的并行效果。
一个executor封装了两个核的计算资源和1 GB的内存资源,为了更好地验证算法的可扩展性,实验采用了加速比来衡量并行算法的性能。
在有p个executor下的加速比定义为:
${{S}_{p}}={{T}_{1}}/{{T}_{p}};p=1,2,\cdots $ | (9) |
其中:T1是在一个executor下程序的运行时间;Tp是在p个executor下程序的运行时间。
理想情况下,Sp= p,这时的加速比直线与y=x重合。从图 6可看出,该算法并未达到理想加速比,这是因为Spark集群还有其他的额外开销,包括调度器延迟、任务反序列化、结果序列化等。但是,当executor个数增加、数据量变大时,加速比近似呈线性增加关系,说明算法的可扩展性很好,可以适用于海量数据下的数据处理。
为了说明基于GraphX的幂迭代聚类算法(GraphX-PIC)在性能方面的提升,在设置spark.cores.max为6的情况下,与Darji等[8]于2014年提出的基于Hadoop MapReduce的幂迭代聚类算法(Hadoop-PIC)作了对比实验。在数据量大小不同的情况下,总的运行时间的对比情况如图 7所示。
从图 7可看出,在新闻数量为40 000时,Hadoop-PIC的总的运行时间大约是GraphX-PIC的2.55倍;相对于Hadoop-PIC,GraphX-PIC的运行时间降低了61%。这是因为GraphX将每次迭代的结果缓存在内存中,可以作为下一次迭代的输入;而Hadoop将每次迭代的结果写到磁盘上,下一次迭代要从磁盘上读取上次迭代的结果作为输入,频繁的磁盘读写耗费了大量的时间。由于幂迭代的收敛速度很快,所以GraphX-PIC的性能与Hadoop-PIC的性能相差还不是很大,而对于像逻辑回归等需要多次迭代的算法,基于内存的框架Spark与基于磁盘的框架Hadoop之间的差异会更大。
图 8说明了基于内存的GraphX与基于磁盘的Hadoop在幂迭代聚类算法的不同迭代时期的运行时间差异。
由图 8可看出,在首次迭代中,GraphX只比Hadoop快15%~25%,这是由于Hadoop在master和slaves之间的心跳协议所致[12]。但在后续迭代中,GraphX受益于Spark基于内存的优势,后续迭代的运行时间大约只有首次迭代运行时间的30%;而Hadoop由于每次迭代都需要从磁盘读取数据,后续迭代与首次迭代的运行时间大致相同。所以,在多次迭代中,GraphX-PIC的性能要优于Hadoop-PIC。
5 结语本文基于分布式图计算框架GraphX,实现了幂迭代聚类算法,解决了幂迭代聚类的可扩展性问题,使得在大数据环境下进行聚类成为可能。实验结果表明,在计算资源、内存资源和数据量变大时,分布式幂迭代聚类算法取得了较好的加速比,可以胜任海量数据的处理任务。并且,受益于Spark基于内存的特点,GraphX-PIC的时间性能是传统的Hadoop-PIC的2.55倍左右。但该算法在亲和矩阵稀疏的短文本聚类中的优势不明显,接下来的工作将研究幂迭代在稀疏数据中的聚类,以扩大其应用范围。
[1] | 王虹旭, 吴斌, 刘旸. 基于Spark的并行图数据分析系统[J]. 计算机科学与探索, 2015, 9 (9) : 1066-1074. ( WANG H X, WU B, LIU Y. Parallel graph data analysis system based on Spark[J]. Journal of Frontiers of Computer Science and Technology, 2015, 9 (9) : 1066-1074. ) (0) |
[2] | YAN W, BRAHMAKSHATRIYA U, XUE Y, et al. p-PIC: parallel power iteration clustering for big data[J]. Journal of Parallel & Distributed Computing, 2013, 73 (3) : 352-359. (0) |
[3] | LIU L, CHEN X, LIU M, et al. An influence power-based clustering approach with PageRank-like model[J]. Applied Soft Computing, 2015, 40 (11) : 17-32. (0) |
[4] | LIN N P, CHANG C I, CHUEH H E, et al. A deflected grid-based algorithm for clustering analysis[J]. Wseas Transactions on Computers, 2008, 7 (4) : 125-132. (0) |
[5] | MENENDEZ H D, CAMACHO D. GANY: a genetic spectral-based clustering algorithm for large data analysis[C]//Proceedings of the 2015 IEEE Congress on Evolutionary Computation. Piscataway, NJ: IEEE, 2015:640-647. (0) |
[6] | LIU L, SUN L, CHEN S, et al. K-PRSCAN: a clustering method based on PageRank[J]. Neurocomputing, 2016, 175 (11) : 65-80. (0) |
[7] | LIN F, COHEN W W. Power iteration clustering[C]//Proceedings of the 27th International Conference on Machine Learning. Haifa: [s.n.], 2010: 655-662. (0) |
[8] | DARJI A, WAGHELA D. Parallel power iteration clustering for big data using MapReduce in Hadoop[J]. International Journal of Advanced Research in Computer Science and Software Engineering, 2014, 4 (6) : 1357-1363. (0) |
[9] | ZAHARIA M, CHOWDHURY M, FRANKLIN M J, et al. Spark: cluster computing with working sets[C]//HotCloud 2010: Proceedings of the 2nd USENIX Conference on Hot Topics in Cloud Computing. Berkeley: USENIX, 2010:1765-1773. (0) |
[10] | GONZALEZ J E, XIN R S, DAVE A, et al. GraphX: graph processing in a distributed dataflow framework[C]//OSDI 2014: Proceedings of the 11th USENIX conference on Operating Systems Design and Implementation. Berkeley: USENIX, 2014:599-613. (0) |
[11] | ZAHARIA M, CHOWDHURY M, DAS T, et al. Resilient distributed datasets: a fault-tolerant abstraction for in-memory cluster computing[C]//NSDI 2012: Proceedings of the 9th USENIX Conference on Networked Systems Design and Implementation. Berkeley: USENIX, 2012:2. (0) |
[12] | 陈侨安, 李峰, 曹越, 等. 基于运行数据分析的Spark任务参数优化[J]. 计算机工程与科学, 2016, 38 (1) : 11-19. ( CHEN Q A, LI F, CAO Y, et al. Parameter optimization for Spark jobs based on runtime data analysis[J]. Computer Engineering and Science, 2016, 38 (1) : 11-19. ) (0) |