DOI: 10.11817/j.issn.1672-7207.2016.07.022
基于PUSH机制的任务调度方法
张霄宏1, 2, 3,孙江峰1, 3,赵文涛1, 3
(1. 河南理工大学 计算机科学与技术学院,河南 焦作,454003;
2. 中国科学院 深圳先进技术研究院,广东 深圳,518055;
3. 河南省高等学校矿山信息化重点学科开放实验室,河南 焦作,454003)
摘要:为降低Hadoop MapReduce环境中任务的数据访问延时进而提高系统性能,提出一种基于PUSH机制的任务调度方法。该方法根据输入数据分布,主动将任务推送到存储其输入数据的节点。当任务在这些节点执行时,可以直接从本地磁盘读取数据,从而避免远程数据访问延时。该方法已在hadoop-0.20.2中实现,并在真实集群中进行验证。研究结果表明:与原有调度方式相比,该方法可将作业执行时间平均降低8%,在最好情况下可降低14.3%。
关键词:数据局部性;性能优化;任务调度;MapReduce
中图分类号:TP315 文献标志码:A 文章编号:1672-7207(2016)07-2334-07
A scheduling method based on task pushing in MapReduce
ZHANG Xiaohong1, 2, 3, SUN Jiangfeng1, 3, ZHAO Wentao1, 3
(1. School of Computer Science and Technology, Henan Polytechnic University, Jiaozuo 454003, China;
2. Shenzhen Institutes of Advanced Technology, Chinese Academy of Sciences, Shenzhen 518055, China;
3. Provical Open Laboratory of Mine Informatization Key Discipline, Jiaozuo 454003, China)
Abstract: To reduce remote data access latency and improve the system performance in Hadoop MapReduce, a new task scheduling method was proposed. According to the method, tasks were pushed to the nodes of storing their input data. When executing on those nodes, those tasks can access the relative input data from local disks, and hence avoiding remote data access latency. The new method was implemented in Hadoop-0.20.2, and evaluated in a real cluster. The results show that the method can decrease the execution time of jobs by 14.3% in the best case, and 8% on average.
Key words: data locality; performance optimization; task scheduling; MapReduce
为解决海量数据处理难题,Google公司率先提 出了MapReduce模型[1]。其开源实现Hadoop MapReduce[2],已成为海量数据处理领域的主流模型之一,并获得了广泛应用[3-7]。然而,在Hadoop MapReduce环境中,当执行任务的节点与存储其输入数据的节点不是同一节点时,任务在执行过程中就不得不通过远程I/O操作来访问输入数据,从而引起不确定的远程数据访问延时,降低系统性能。且数据访问延时越大,系统性能越差。为减少数据访问延时,Hadoop 缺省的调度方法总是把任务分配到离输入数据最近的节点执行。该方法采用以节点为中心的PULL调度机制,即只有当节点请求执行任务时才进行调度。受输入数据分布和资源竞争等因素的影响,该方法无法保证把所有任务都调度到存储其输入数据的节点执行,因此,不能解决由远程数据访问延时引起的系统性能问题。ZAHARIA等[8]提出利用延时调度来优化数据局部性,但该方法调度的是MapReduce作业,而非作业中包含的任务,因此,不能从根本上解决本文提出的问题。ZHANG等[9]的方法优先把任务保留给存储其输入数据的节点执行,但该方法需要对各节点未来请求任务的情况进行预测。WANG等[10]的方法虽然兼顾了任务的数据局部性,但由于采用短作业优先策略,对长作业并不公平。文献[7, 11-14]等介绍的调度方法虽然适用于MapReduce环境,但是并不以减少数据访问延时为目的。此外,SUN等[15]利用预取技术来隐藏部分数据访问延时,但是需要对任务执行节点进行预测。为避免任务在执行过程中引起远程数据访问延时进而影响系统性能,本文作者提出基于PUSH机制的任务调度方法。该方法主动把任务推送到输入数据所在节点,使任务在执行过程中可从本地读取数据,从而避免了远程数据访问延时。由于该方法仅以输入数据分布为依据进行任务推送,即使在资源竞争激烈的环境中仍可把任务调度到输入数据所在节点执行。
1 基于PULL机制的任务调度方法
在Hadoop MapReduce环境中,作业被划分成若干个map任务和reduce任务。map任务直接处理作业的原始输入数据,产生以形式表示的中间结果。reduce任务以这些中间结果为输入,产生最终的输出。由文献[16]可知,大部分的MapReduce作业都是map任务密集型作业,且其中很多作业都只有map任务,故本文仅研究降低map任务数据访问延时的方法。
为减少数据访问延时,Hadoop现有的调度方法总是把任务分配到离输入数据最近的节点执行。由于采用PULL机制,该方法只有在收到节点的请求时才给它分配任务,且优先分配数据存储在请求节点上的任务。只有在无此类任务的情况下,才选择输入数据离请求节点最近的任务。如果所选任务的输入数据存储在下一个请求节点上,那么该任务便错过了在输入数据所在节点执行的机会。
为便于描述,记ni表示第i个节点,mi表示第i个任务,di表示mi的输入数据,表示di存储在ni上,表示把mi调度到ni执行。假设mi,mj和mk的输入数据分别存储在nx,ny和nz上,且分别距节点na,nx和ny最近。当各节点按照(na,nx,ny…)的顺序请求任务时,基于PULL机制的方法调度任务的结果如下:,和,调度过程如图1所示。由于mi,mj和mk的输入数据都没有存储在执行节点,这些任务在执行过程中都会引起远程数据访问延时,影响系统性能。
图1 基于Pull机制的任务调度过程
Fig. 1 Scheduling tasks based on the pull mechanism
在实践中发现,这种情况通常出现在作业执行末尾。当作业规模较小时,这一情况尤为严重。根据文献[7]中的统计结果,在一个应用于实际生产的数据中心内部,作业平均包含的map任务数也只有42个,即大部分作业的规模都较小。如果节点只执行输入数据存储在本地的任务,在执行完当前作业中此类任务的情况下,继续执行下个作业中的此类任务,那么即不会产生远程数据访问延时,又不会浪费计算资源。
2 基于PUSH机制的任务调度方法
为克服PULL机制存在的不足,本文提出基于PUSH机制的任务调度方法。与基于PULL机制的方法不同,该方法在调度任务时不考虑节点当前是否有空闲资源,而只根据输入数据分布推送任务。当节点资源空闲时,便可开始执行推送给自己的任务。由于只将任务推送到存储其输入数据的节点,任务在执行过程中可以从本地磁盘访问数据,避免了远程数据访问延时。当数据有多个副本且分别存储在多个节点时,须同时将任务推送到这些节点。但是为保证效率,最终只允许任务在效率最高的节点执行。
2.1 任务推送
基于任务推送进行调度是本文方法的核心,是保证把任务调度到输入数据所在节点执行的关键。在进行推送时,首先根据任务的输入数据分布,计算任务与节点之间的推送关系;然后,依据这一关系依次将作业中各个任务推送的相应节点。本文假设每个数据都有3个副本,若存在,即的3个副本分别存储在节点nt,nw和nx,则根据基于Push机制的调度方法有,和;此处,表示推送,即应推送mi到节点nt,nw和nx。
假设某作业包含的任务数为Sm,这些任务的输入数据存储在节点集合N中,记N={n1, n2, …, }。如果以任务为单位进行推送,则需要(3Sm)次才能将所有任务推送到输入数据所在节点。,假设ni存储了个任务的输入数据,在以任务为单位推送的前提下,需要次才能将这些任务推送到ni。此处,任务数满足式(1)给出的约束条件。
1≤≤ (1)
通过分解式(1),可知的取值应在式(2)和式(3)定义的范围之内,同时还应满足式(4)定义的约束条件。
≤ (2)
≤ (3)
(4)
记Mi为输入数据存储在节点ni上的任务集合,且;记Mi中各任务与ni间的推送关系构成的集合为Ri,则有。Ri中各关系式可进行如下化简:
故Ri亦可表示为。由Ri可知,如果以节点为单位进行推送,可通过一次操作将mi1,mi2,…,推送到节点ni。输入数据存储在此节点的任务越多,该推送方式效率越高;也即越大,该推送方式效率越高。为提高效率,本文采取以节点为单位的推送方式,即每次只向1个节点推送,且1次推送当前作业中输入数据存储在此节点上的全部任务。
推送到同一个节点的任务彼此竞争计算资源。为便于管理,节点根据系统采用的调度策略,建立任务队列,根据队列和队列中任务的优先级进行本地调度。以ni为例,假设有q个优先级,分别为0,,…,,这些优先级对应的队列分别记为 ,,…,。记的优先级为P(),则推送到节点后,应入队列。当ni有空闲资源时,优先从选择任务执行。只有在为空时,才依次从其他队列选择任务。
2.2 请求执行
为确保在部分节点失效的情况下数据仍然可用,Hadoop MapReduce为每个数据都创建了多份副本,分别存储在多个不同的节点。在这一前提下,任一任务都会被推送到多个不同的节点。为保证任务只在1个节点上执行,特规定当计算节点具备执行某个任务的条件时,须先向管理节点发送执行请求。当多个节点请求执行同一个任务时,只允许效率较高的节点执行此任务。此处认为请求较早到达的节点,执行效率较高。
以任务推送关系,和为例,依据该关系被推送到节点,和。若具备执行任务的条件,则向管理节点发送执行的请求,记请求为。若还未执行,则以允许执行,并以响应的请求。此处,“A”表示允许执行。为避免和继续请求执行,须向这2个节点发送撤销任务的指令,分别记为和,“C”表示撤销任务。若正在执行或者已经完成,则拒绝的请求,即以来响应。
基于PUSH机制的方法响应任务请求的核心算法如下:
算法1 HandleREQ(n,m,R)算法
Algorithm 1 HandleREQ(n,m,R) algorithm
2.3 错误恢复
由于硬件、软件等多种原因,任务在执行过程中难免失败。如果仍将失败任务推送到输入数据所在节点执行,由于节点可能暂时没有可用资源,失败任务要等待较长时间才能获得执行机会,从而影响整个作业的执行进度。为避免这种情况发生,应尽早执行失败任务。在任务失败后,最先请求执行任务的节点是最先有可用资源的节点。将失败任务调度到此节点,会比调度到其他节点更早获得执行机会。
仍以任务请求为例,管理节点收到该请求后,首先检查是否存在失败任务。若存在失败任务f,则先驳回的执行请求,然后通知立即执行f,即管理节点将以和响应节点的请求。此处“F”表示禁止执行,“E”表示立即执行。当再次有可用资源时,可重新向管理节点请求执行。
引入失败任务处理机制后,基于PUSH机制的方法响应任务请求的核心算法描述如下:
算法2 HandleReqWithFailure(n,m,R)算法
Algorithm 2 HandleReqWithFailure(n,m,R) algorithm
2.4 算法分析
文中方法利用网络带宽资源将任务推送到存储其输入数据副本的各个节点。在任务数量一定的前提下,数据副本越多,推送的任务越多,消耗的网络资源也越多。记表示传送单个任务的输入数据所消耗的网络带宽资源,表示推送单个任务到单个节点消耗的网络带宽,l和分别表示采用基于PULL和基于PUSH机制调度任务时在输入数据所在节点执行的任务数,r表示数据副本个数,表示因采用文中方法产生的网络带宽收益,则可根据下式计算:
(5)
当时,表示新方法带来了网络带宽收益,且值越大,产生的收益越多。由式(5)可知,在>l且副本个数为3的条件下,只要满足,采用文中方法就会带来网络带宽收益。
3 实验分析
文中方法已在Hadoop-0.20.2中实现。为验证方法的有效性,将Handoop-0.20.2的原始版本和实现本文方法的版本部署在同一个集群上,通过对比作业在这2个Hadoop环境中的执行情况,来评价本文方法的有效性。实验中用到的集群包括4个节点,其中1个作为管理节点,另外3个作为计算和存储节点。表1所示为各节点的配置,节点类型1为管理节点的配置,节点类型2为计算和存储节点的配置。
表1 集群配置信息
Table 1 Cluster configuration
在Hadoop集群中,节点拥有的map/reduce slot数表示该节点可同时执行的最大map/reduce任务数。由于各节点的硬件配置相同,故可同时执行的最大map/reduce任务数也相同,即各个节点最多可同时执行16个map任务、最多能执行1个reduce任务。Hadoop分布式文件系统负责存储作业的输入数据。它将作业的输入数据划分成文件块,分别存储在不同的节点上。在本次实验中,设定文件块的大小为64 MB,各个数据块具有的副本数为3。
文中算法将map任务推送到输入数据副本所在的各个节点,使其在执行过程中可以从本地磁盘读取数据,从而避免了远程数据访问延时。数据副本越多,适合map任务执行的节点越多,选择高效节点时余地更大,但是任务推送消耗的网络带宽也越大。此外,副本越多,占用的磁盘空间越大,可用空间越少。综合考虑,在本次实验中,采用了Hadoop文件系统推荐的设置,即为每个数据块设置了3个副本。
在本次实验中跟踪测试了6个不同的作业,作业信息如表2所示。为了更接近真实情况,选择不同规模的作业进行测试。在这些作业中,既有map任务数大于系统中map slot总数的作业,也有小于map slot总数的作业,且作业包含的map任务平均数接近文献[7]在生产集群中的统计结果。在实验过程中,分别在2个Hadoop环境中多次运行这些作业,并且记录了每次的运行信息。通过对比各个作业在不同环境中的数据局部性、执行时间等指标来验证本文方法的有效性。
表2 测试作业信息
Table 2 Details of tested jobs
本文方法拟通过改善任务的数据局部性来避免远程数据访问延时,进而提高Hadoop的性能。当任务在输入数据所在节点执行时,具有最佳数据局部性,在执行过程中不会引起远程数据访问延时。图2所示为各作业在不同Hadoop环境中执行时具有最佳数据局部性的任务数。由图2可知:在实现文中方法的环境(新环境)中执行时,作业2,4,5及6包含的所有任务都具有最佳数据局部性,达到了理想状况。
图3所示为作业中具有最佳数据局部性的任务数占总任务数的比例。由图3可知:当作业在新环境中执行时,若不存在失败任务,则这一比例可达到100%。即使存在失败,这一比例最低也在93%以上。而当作业在原环境中执行时,在最好情况下,这一比例仅达到91%,在最差情况下低至81%。由此不难看出,采用文中方法调度任务可以提高任务的数据局部性。在最好情况下,数据局部性可以提高19%左右;在最差情况下也可提高9%。
图2 作业中具有最佳数据局部性的任务数量
Fig. 2 Total numbers of tasks with the best data locality
图3 具有最佳数据局部性的任务比例图
Fig. 3 Ratio of the tasks with the best data locality
图4所示为作业在2个Hadoop 环境中执行的平均时间。与原始方法的环境相比,当作业1,2,4,5和6在新环境中运行时,执行时间显著降低。其中,作业5的执行时间降低的幅度最大,达到了14.3%。作业1的执行时间降低的幅度最小,但也超过了10%。作业3中有多个任务执行失败,为保证作业成功完成,系统不得不对这些任务进行重新调度,消耗了过多的时间,导致该作业在实现文中方法的新环境中执行时所用时间比原环境更长。
图4 作业的执行时间
Fig. 4 Execution time of jobs
文中引用的其他调度算法虽然为实现不同的调度目标而采用了不同的调度策略,并各有长处,但在调度作业中包含的具体任务时用的都是Hadoop提供的缺省方法,即基于PULL机制的任务调度方法。故在本次实验中,仅对比了本文提出的基于PUSH机制的调度方法和Hadoop提供的基于PULL机制的方法。
文中方法通过将任务推送到输入数据所在节点执行来避免远程数据访问延时。当任务在此类节点执行时,可以直接从本地磁盘访问数据,避免了跨网络的数据传输,节省了网络资源。由于要推送任务到不同节点,该方法也会消耗网络资源,且输入数据副本越多,消耗的网络带宽资源也越多。在本次实验中,根据式(5)计算文中方法所带来的网络带宽收益。式(5)中各参数的取值和计算结果如表3所示。
表3中wtask通过如下方式获取:计算一组任务创建前后jvm中可用内存容量之差,记该差值与该组任务总数的比值为wtask。在所有作业中,作业3由于失败任务过多,失去了比较意义,故未计算其对应的Wbenefit。除此之外,表3中其他作业对应的Wbenefit远远大于1。由表3可知:文中方法通过将任务推送到输入数据节点执行,不仅可以提高系统性能,还可以节省网络带宽。
表3 新方法产生的网络带宽收益
Table 3 Network width benefit from new method
4 结论
1) 分析了Hadoop环境中的任务调度方法,其采用的基于PULL的调度机制无法保证把任务调度到输入数据所在节点执行,从而在作业执行过程中引入不确定的远程数据访问延时,影响系统性能。
2) 提出了一种基于PUSH机制的任务调度方法,根据输入数据分布情况,主动将任务推送到数据所在节点执行,确保任务在执行过程中可以从本地读取数据,从而避免了远程数据访问延时。
3) 在没有任务执行失败的情况下,该方法在最好情况下可将作业执行时间降低14%左右。
参考文献:
[1] DEAN J, GHEMAWAT S. Mapreduce: simplified data proces -sing on large clusters[J]. Communications of the ACM, 2008, 51(1): 107-113.
[2] The Apache Software Foundation. MapReduce tutorial[EB/OL]. [2013-08-04]. https://hadoop.apache.org/docs/r1.2.1/mapred_ tutorial.html
[3] 涂金金, 杨明, 郭丽娜. 基于MapReduce的基因读段定位算法[J]. 模式识别与人工智能, 2014, 27(3): 206-212.
TU Jinjin, YANG Ming, GUO Lina. Gene read mapping algorithms based on MapReduce[J]. Pattern Recognition and Artificial Intelligence, 2014, 27(3): 206-212.
[4] 唐颖峰, 陈世平. 一种基于后缀项表的并行闭频繁项集挖掘算法[J]. 计算机应用研究, 2014, 31(2): 373-377.
TANG Yingfeng, CHEN Shiping. Parallel closed frequent itemset mining algorithm with post fix-table[J]. Application Research of Computers, 2014, 31(2): 373-377.
[5] 王晓佳, 杨善林, 陈志强. 大数据时代下的情报分析与挖掘技术研究: 电信客户流失情况分析[J]. 情报学报, 2013, 32(6): 564-574.
WANG Xiaojia, YANG Shanlin, CHEN Zhiqiang. Research on information analysis and data mining in the age of big data: analysis of customer loss in telecom[J]. Journal of the China Society for Scientific and Technical Information, 2013, 32(6): 564-574.
[6] 付天新, 刘正军, 闫浩文. 基于MapReduce模型的生物量遥感并行反演方法研究[J]. 干旱区资源与环境, 2013, 27(1): 130-136.
FU Tianxin, LIU Zhengjun, YAN Haowen. Remote sensing retrieval method for biomass based on MapReduce parallel model[J]. Journal of Arid Land Resource and Environment, 2013, 27(1): 130-136.
[7] REN Zujie, WAN Jian, SHI Weisong. Workload analysis, implications and optimization on a production Hadoop cluster: a case study on taobao[J]. IEEE Transactions on Services Computing, 2014, 7(2): 307-321.
[8] ZAHARIA M, BORTHAKUR D, SARMA S J, et al. Delay scheduling: a simple technique for achieving locality and fairness in cluster scheduling[C]// Proc of the 5th European Conference on Computer Systems. New York: ACM, 2010: 265-273.
[9] ZHANG Xiaohong, ZHONG Zhiyong, FENG Shengzhong, et al. Improving data locality of mapreduce by scheduling in homogeneous computing environments[C]// IEEE 9th International Symposium on Parallel and Distributed Processing with Applications, Washington DC: IEEE, 2011: 120-126.
[10] WANG Weina, ZHU Kai, YING Lei, et al. A throughput optimal algorithm for map task scheduling in MapReduce with data locality[J]. ACM SIGMETRICS Performance Evaluation Review, 2013, 40(4): 33-42.
[11] TANG Zhuo, ZHOU Junqing, LI Kenli, et al. A map-reduce task scheduling algorithm for deadline constraints[J]. Cluster Computing, 2013, 16(4): 651-662.
[12] TAN Jian, MENG Xiaoqiao, ZHANG Li. Coupling task progress for MapReduce resource-aware scheduling[C]// Proc of IEEE INFOCOM, Washington DC: IEEE, 2013: 1618-1626.
[13] LU Peng, LEE Youngchoon, WANG Chen, et al. Workload characteristic oriented scheduler for MapReduce[C]// Proc of the 2012 IEEE 18th International Conference on Parallel and Distributed Systems, Washington DC: IEEE, 2012: 156-163.
[14] MASHAYEKHY L, NEJAD M N, GROSU D, et al. Energy-aware scheduling of MapReduce jobs for big data application[J]. IEEE Transaction on Parallel and Distributed Systems, 2015, 26(10): 2720-2733.
[15] SUN Mingming, ZHUANG Hang, ZHOU Xuehai, et al. HPSO: perfecting based scheduling to improve data locality for MapReduce clusters[C]// Proc of 14th International Conference on Algorithms and Architectures for Parallel Processing, Cham: Springer International Publishing, 2014: 82-95.
[16] KAVULYA S, TAN J, GANDHI R. An analysis of traces from a production MapReduce cluster[C]// Proc of IEEE/ACM International Conference on Cluster, Cloud and Grid Computing. Washington DC: IEEE, 2010: 94-103.
(编辑 罗金花)
收稿日期:2015-07-23;修回日期:2015-09-23
基金项目(Foundation item):国家自然科学基金面上资助项目(51274088);河南省教育厅项目(ITE12103);河南理工大学矿山信息化省级重点实验室项目(KY2012-05);河南理工大学博士基金资助项目(B2012-099);河南省科技攻关项目(142102210435) (Project(51274088) supported by the National Natural Science Foundation of China; Project(ITE12103) supported by the Foundation of Henan Educational Committee; Project(KY2012-05) supported by the Foundation of Provincial Open Laboratory of Mine Informatization Key Discipline; Project(B2012-099) supported by the PhD Foundation of Henan Polytechnic University; Project(142102210435) supported by the Programs for Science and Technology Development of Henan Province)
通信作者:赵文涛,教授,硕士生导师,从事分布式计算技术、大数据技术研究;E-mail: zwt@hpu.edu.cn