[关闭]
@xishuixixia 2016-01-15T13:05:31.000000Z 字数 5583 阅读 2582

酷狗大数据平台重构过程中遇到的坑

未分类


讲师介绍

王劲,目前就职于酷狗音乐,大数据架构师,负责酷狗大数据技术规划、建设、应用。 11年的IT从业经验,2年分布式应用开发,3年大数据技术实践经验,主要研究方向流式计算、大数据存储计算、分布式存储系统、NoSQL、搜索引擎等。

背景

酷狗音乐大数据平台重构整整经历了一年时间,在这过程中挖过坑,填过坑,在此总结下这一年的工作经验,避免重复踩坑。

大数据平台建设的过程详见作者发布在InfoQ公众号上的《经典大数据架构案例:酷狗音乐的大数据平台重构》一文。

三种“坑”

我们在大数据平台重构过程中踩过的坑,大致可以分为操作系统、架构设计、开源组件三类,下面主要列举些比较典型的,花时间比较长的问题。

1. 操作系统级的坑

Hadoop的I/O性能很大程度上依赖于Linux本地文件系统的读写性能。Linux中有多种文件系统可供选择,比如ext3和ext4,不同的文件系统性能有一定的差别。我们主要想利用ext4文件系统的特性,由于之前的操作系统都是CentOS5.9不支持ext4文件格式,所以考虑操作系统升级为CentOS6.3版本。部署Hadoop集群后,作业一启动,就出现CPU的sys态使用过高,高峰达90%,用户态使用比较低,导致系统负载过高。如下图:

经过很长时间的测试验证,发现CentOS6优化了内存申请的效率,引入了THP的特性,而Hadoop是高密集型内存运算系统,这个改动给hadoop带来了副作用。通过以下内核参数优化关闭系统THP特性,CPU内核使用率马上下降:
echo never > /sys/kernel/mm/redhat_transparent_hugepage/enabled
echo never > /sys/kernel/mm/redhat_transparent_hugepage/defrag

修改参数后,结果如下图:

2. 架构设计的坑

最初的数据流架构是数据采集网关把数据上报给Kafka,再由数据实时清洗平台(ETL)做预处理后直接实时写入HDFS,如下图:

pic_3

此架构,需要维持HDFS Client的长连接,由于网络等各种原因导致Storm实时写入HDFS经常不稳定,隔三差五的出现数据异常,使后面的计算结果异常不断,当时尝试过很多种手段去优化,如:保证长连接、连接断后重试机制、调整HDFS服务端参数等,都解决得不彻底。

每天异常不断,旧异常没解决,新异常又来了,在压力山大的情况下,考虑从架构角度调整,不能只从具体的技术点去优化了。在做架构调整时,考虑到我们架构重构的初衷,提高数据的实时性,尽量让计算任务实时化,但重构过程中要考虑现有业务的过渡,所以架构必须支持实时与离线的需求,结合这些需求,在数据实时清洗平台(ETL)后加了一层数据缓存重用层(kafka),也就是经过数据实时清洗平台后的数据还是写入kafka集群,由于kafka支持重复消费,所以同一份数据可以既满足实时计算也满足离线计算,从上面的整体技术架构也可以看出,如下图:

pic_4

KG-Camus组件也是基于架构调整后,重新实现了一套离线消费Kafka集群数据的组件,此组件是参考LinkedIn的Camus实现的。此方式,使数据消费模式由原来的推方式改为拉模式了,不用维持HDFS Client的长连接等功能了,直接由作业调度系统每隔一段时间去拉一次数据,不同的业务可以设置不同的时间间隔,从此架构调整上线后,基本没有类似的异常出现了。

这个坑,是我自己给自己挖的,导致我们的重构计划延期2个月,主要原因是由最初技术预研究测试不充分。

3. 开源组件的坑

由于整个数据平台涉及到的开源组件很多,踩过的坑也是十个手指数不过来。

3.1. 在Hadoop集群中,大规模往hdfs同时写多批文件,hdfsClient和datanode出现timeOut,或者hdfsClient出现"All dataNode are bad...."错误,最终导致数据写入hdfs失败。

具体的分析过程:
大规模往hdfs同时写多批文件,Datanode Thread Dump大量DataXceiver和发送心跳线程被Blocked掉,出现心跳异常有时候达到几十秒左右,大量DataXceiver线程被blocked掉无法向各个hdfsClient的dataStreamer(向datanode发送packet)和ResponseProcessor(接收pipeline中datanode的ack)线程提供服务和datanode的BlockReceiver线程无法正常工作,最终导致客户端出现timeOut,或者hdfsClient往hdfs写packet时整个PipeLine中的datanode无法响应客户端请求,进而系统内部启动pipeline容错,但是各个datanode都由于DataXceiver大量被Blocked掉无法提供服务,最后导致客户端报出“All dataNode are bad...”。

经过分析发现, hadoop2.6中的bug,代码中采用了非常大粒度的对象锁(FsDatasetImpl),在大规模写操作时导致锁异常。这个bug出现在2.5和2.6个版本中(我们新集群用的是2.6),目前这个bug已经在2.6.1和2.7.0这两个版本中修复。

官方具体给出的Path信息:
https://issues.apache.org/jira/browse/HDFS-7489
https://issues.apache.org/jira/browse/HDFS-7999

其实具体的修复方案就是将这个大粒度的对象锁分解为多个小粒度的锁,并且将datande向namenode发送心跳线程从相关联的锁中剥离。

采用集群不停机的平滑方法将hadoop生产集群的版本升级2.7.1,重现异常场景通过大规模数据并发写多批文件,和测试kg-camus拉kafka程序(之前2.6.0跑失败)都没有发现之前的异常信息,作业成功。并且观察升级之后的blocked线程和datanode心跳情况,一切正常。心跳间隔不会随着blocked线程数目变化而同步变化,并且保持2s以内(2.7.1已经将这2部分在对象锁中分离)。下面是测试表现:

pic_5

3.2. Yarn的资源申请粒度的任意组合都无法刚刚好将节点的内存和虚拟CPU资源用尽,kg-dn-4,kg-dn-11节点资源使用情况如下图:

pic_6

主要通过调整yarn以下参数,来提高单个作业的资源充分利用率。

  1. 调整降低Application Master资源粒度。 4G/2core降低为2G/1core,具体调整项:
    yarn.app.mapreduce.am.resource.mb
    yarn.app.mapreduce.am.resource.cpu-vcores

  2. 降低Yarn队列中的同时最大可运行应用数目,从而降非用于计算的container数目来提高资源使用率,具体的调整项kgdc-fair-scheduler.xml文件的maxRunningApps项。

3.3. 在大量作业运行时,会出现大量map饿死现象,根据集群规模调整reduce启动时机,降低map饿死几率。

调整以下参数:

  1. 提高map task完成启动reduce的比例阀值,从5%调高到60%。
    (调整项:mapreduce.job.reduce.slowstart.completedmaps)

  2. 降低在map task完成之前,最多启动reduce
    (调整项:yarn.app.mapreduce.am.job.reduce.rampup.limit)

  3. 当map task需要资源但暂时无法获取资源(比如reduce task运行过程中,部分map task因结果丢失需重算)时,为了保证至少一个map task可以得到资源,最多可以抢占reduce task比例,调 整回默认比例50%。
    (调整项:yarn.app.mapreduce.am.job.reduce.preemption.limit)

3.4. 在架构设计的坑中提到,我们新开发了kg-camus组件,kg-camus组件就是通过作业调度系统定时批量从kafka中拉取业务数据到HDFS中(kg-camus说穿了,就是kafka的消费者),在开发中也走了些弯路。

由于Kafka partition的消息只能顺序读取,每条消息有一个offset标识位置,kafka consumer提供两种API(高级与低级API),我们为了更灵活的控制消息的读取方式,选择采用低级API,这也就意味着需要consumer方自己记录消费的offset位置。记录的方式有两种可以选择:一种是文件;一种上报给zookeeper,由zookeeper记录。我们首先选择的是文件的方式,之所以选择文件存储是因为想camus尽量简单,除了与hadoop有关系,不想与其他组件有关系。但是实际上并行的Task写同一个HDFS文件其实是非常不稳定的,若想稳定需要每个Task写一个文件,写完之后做merge,也就意味着一个很简单的功能复杂化了,所以放弃了该种方式。而选用zookeeper的方式记录,采用Zookeeper需要考虑各种容错机制,例如,由于网络的原因,并非每次写都能成功。我们加了一些容错机制后,目前没有出现什么问题。低级消费kafka消息还需要考虑partition leader切换、offset过期等异常情况的处理。

3.5. 在集群中,离线批处理作业经常出现计算结果延时出(这里的延时是,我们根据业务规定每种对应的业务在规定的时间点必须出数据),行为数据的字段内容比较多,之前hive一直采用textfile格式,大家都知道,textfile格式存储,磁盘开销大,数据解析开销大,在重构过程中果断采用列式存储方式进行处理,在大数据中,列式存储性能比较好的有ORCFile、Parquet,由于目前集群中大部分作业还在hive上执行,考虑兼容性,临时过滤方案选择ORCFile。

拿18.34G数据,以5维度为例做性能测试,如下:

pic_7

结论:存储资源节省15%,计算资源节省67%。
测试结果非常满意,因为,集群中的作业延时,就是因为计算资源不足引起的,所以我们在行为数据模型中把存储格式全替换为ORCFile格式后,大部分作业执行结果很满意,有的作业由之前的执行时间3个小时缩短到20分钟就能搞定。说明,在列式存储中,执行性能的好坏取决于我们的作业的需要数据列数多少而定。

ORCFile只是临时过滤方案, Parquet是最终存储格式,针对整个存储计算这块,已经制定了一套全新的方案,现在正在改造,改造完后,整个集群资源最少可以节省30%。

3.6. 当我们的行为数据全量接入到Kafka集群(几百亿/天),数据采集网卡出现大量连接超时现象,但万兆网卡进出流量使用率并不是很高,只有几百Mbit/s,经过大量的测试排查后,调整以下参数,就顺利解决了此问题。

  1. num.network.threads(网络处理线程数)值应该比cpu数略大
  2. num.io.threads(接收网络线程请求并处理线程数)值提高为cpu数两倍

调整参数后网卡流量如下图:

pic_8

3.7. 在hive0.14 版本中,利用函数ROW_NUMBER() OVER对数据进行数据处理后,导致大量的作业出现延时很大的现象,经异常排查后,发现在数据记录数没变的情况,数据的存储容量扩大到原来的5倍左右,导致MapReduce执行很慢造成的。改为自己实现类似的函数后,解决了容量扩大原来几倍的现象。

说到这里,关于hive版本问题,这几天又遇到一个问题。在做去重后的数据写入ORCFile表中时,由于测试环境用的是hive1.x版本,生产环境用的是hive0.14版本,导致用测试环境编译的包放到生产环境,可以插入数据,但部分sql语法不可用,如:limit。经多次测试后发现是版本问题导致的。

3.8. 在业务实时监控系统中,用OpenTSDB与实时计算系统(storm)结合,用于聚合并存储实时metric数据。在这种实现中,通常需要在实时计算部分使用一个时间窗口(window),用于聚合实时数据,然后将聚合结果写入tsdb。但是,由于在实际情况中,实时数据在采集、上报阶段可能会存在延时,而导致tsdb写入的数据不准确。针对这个问题,我们做了一个改进,在原有tsdb写入api的基础上,增加了一个原子加的api。这样,延迟到来的数据会被叠加到之前写入的数据之上,实时的准确性由于不可避免的原因(采集、上报阶段)产生了延迟,到最终的准确性也可以得到保证。另外,添加了这个改进之后,实时计算端的时间窗口就不需要因为考虑延迟问题设置得比较大,这样既节省了内存的消耗,也提高了实时性。

3.9. 然而我们在做实时监控系统中,遇到的瓶颈不是在实时计算上,而是在结果存储方面。在存储方面,花了大量的时间去调优测试,其中也参考了携程对OpenTSDB的一些建议(携程的OpenTSDB使用得很好,好像还申请了专利)。存储这块,我们主要对它做了以下改进和优化,跟我们的需求进行定制修改源代码。
OpenTSDB的改进和优化:

  1. 去除聚合时的分组插值,直接聚合。
  2. 修改了startkey和endkey中时间戳,只查询需要的行,对查询结果时间戳添加时区的偏移。
  3. 添加降采样表,供采样粒度较大的查询使用,减少了rowkey数,提升了查询性能。
  4. 添加协处理器支持,减少io和序列化/反序列化开销。

第4点的改动比较大,把OpenTSDB的查询处理功能由客户端搬迁到服务端(协处理器),大大减少了io和序列化/反序列化开销,性能提升明显。另外,我们通过降维,添加分区标识来提高性能,这点主要用HBase的分区特性。

添加新批注
在作者公开此批注前,只有你和作者可见。
回复批注