[关闭]
@nataliecai1988 2017-10-10T13:33:16.000000Z 字数 10365 阅读 792

数据收集工具的设计与最佳实践

约稿


作者:孙健波
策划&审校:Natalie

笔者之前在《七牛大数据平台的演进与大数据分析实践》中提到了已经开源的数据收集工具logkit。本文将深入介绍数据收集的设计思路以及大数据收集背后的细节,为大家提供大数据实战中第一步数据收集的最佳实践。

数据收集工具对比

目前社区已经不乏大量优秀的数据收集工具,如有名的Elastic Stack(Elasticsearch、Logstash、Kibana)中的Logstash;CNCF基金会里面有名的Fluentd;InfluxData公司TICK Stack中的Telegraf;Google 出品为Kubernetes定制的cAdvisor;Apache基金会中的顶级项目Flume。除了早期诞生的诸如Fluentd、Flume等项目,其他项目都是为特定的平台业务定制而成,然后在随后的开源中不断进化,变得更为通用。所以针对特定业务量身定制一款数据收集工具,是一个较为普遍的需求,也是出现如此多“轮子”的主要原因。

让我们先来看看这几种知名开源数据收集工具有哪些特点。

了解了以上这些开源软件的特点后,下面我们开始深入介绍构建一款数据收集工具会遇到哪些设计与挑战,以此为你的业务量身定制。

数据收集工具设计

架构设计

主流数据收集工具的主架构基本分为reader、parser,以及sender三部分,如图1所示。除了这三个日志收集常规组成部分,还应该包含可选模块,如基于解析过后的数据转换(filter/transformer)以及数据暂存管道(Channel/Buffer)。为了尽可能复用,每个组成部分都应该是插件式的,可以编写不同类型插件并且灵活地组装。Channel/Buffer部分也应该提供基于内存或者基于磁盘的选择。

数据收集架构
图1 数据收集架构设计

对于Reader、Parser、Sender等插件共同组装的业务数据收集单元,我们称之为一个运行单元(Runner),数据收集工具必须可以同时运行多个Runner,且每个Runner可以支持更新。

更新可以通过多种方式实现,最常规的是手动更新配置然后重启;更好的设计是支持热更新,不需要重启,自动识别配置文件的变化;还可以设计一个漂亮的web界面做配置的变更,以此引导用户使用并解决数据收集配置复杂、用户使用门槛高的难题。所以在整体架构之上还应该构建一个简单的API层,支持web界面的功能。

语言选择

数据收集属于轻量级的agent服务,一般选择的语言为C/C++或者近年来特别火热的Go,而Go语言已经成为这类数据收集工具编写的大众选择,如Logstash新开发的beats工具、Telegraf、cAdvisor等等,均使用Go语言开发。

社区已经有很多文章描述使用Go语言的好处,在此就不再赘述。总体而言用Go语言开发门槛较低,性能优良,支持跨多种操作系统平台,部署也极为简便。

分模块设计

数据读取模块(Reader)

顾名思义,数据读取模块负责从不同数据源中读取数据,设计Reader模块的时候,需要支持插件式数据源接入,且将接口设计得足够简单,方便大家一同贡献更多的读取数据源驱动。

自定义数据源,最基本的只需要实现如下两个方法即可。

  1. ReadLine() string
  2. SyncMeta() error

从数据来源上分类,数据读取大致可分为从文件读取、从数据存储服务端读取以及从消息队列中读取三类。

每一类Reader均在发送成功后通过SyncMeta()函数记录读取的位置,保证数据不会因为runner意外中断而丢失。

从文件读取数据最为常见,针对文件的不同rotate方式,有不同的读取模式,主要分为三类:

除此之外,还应支持包括多文件编码格式支持、读取限速等多种功能。

从数据存储服务中读取数据,可以采用时间戳策略,在诸如MongoDB、MySQL中记录的数据,包含一个时间戳字段,每次读取数据均按这个时间戳字段排序,以此获得新增的数据或者数据更新。另一方面,需要为用户设计类似定时器等策略,方便用户多次运行,不断同步收集服务器中的数据。

从消息队列中读取数据,这个最为简单,直接从消息队列中消费数据即可。注意记录读取的Offset,防止数据丢失。

解析模块(Parser)

解析模块负责将数据源中读取的数据解析到对应的字段及类型,目前常见的解析器包括:

  1. csv parser: 按照分隔符解析成对应字段和类型,分隔符可以自定义,如常见的制表符(\t)、空格()、逗号(,)等等。
  2. json parser: 解析json格式的数据,json是一种自带字段名称及类型的序列化协议,解析json格式仅需反序列化即可。
  3. 基于正则表达式(grok) parser: Logstash grok解析非常强大,但是它并不指定类型,而Telegraf做了一个增强版的grok 解析器,除了基本的正则表达式和字段名称,还能标志数据类型,基本上能标志数据类型的grok解析器已经是一个完备的数据解析器了,可以解析几乎所有数据。当然,类型解析是相对复杂的功能,可能涉及到具体业务,如时间类型等。
  4. raw parser: 将读取到的一行数据作为一个字段返回,简单实用。
  5. nginx/apache parser: 读取nginx/apache等常见配置文件,自动生成解析的正则表达式,解析nginx/apache日志。

除了以上几种内置的解析器,同Reader一样,你也需要实现自定义解析器的插件功能,而Parser极为简单,只需要实现最基本的Parse方法即可。

  1. Parse(lines []string) (datas []sender.Data, err error)

每一种Parser都是插件式结构,可以复用并任意选择。在不考虑解析性能的情况下,上述几种解析器基本可以满足所有数据解析的需求,将一行行数据解析为带有Schema(具备字段名称及类型)的数据。但是当你希望对某个字段做操作时,纯粹的解析器可能不够用。于是作为补充,数据收集工具还需要提供Transformer/Filter的功能。

Transformer

Transformer是Parser的补充,针对字段进行数据变化。

举例来说,如果你有个字段想做字符串替换,比如将所有字段名称为"name"的数据中,值为"Tom"的数据改为"Tim",那么可以添加一个字符串替换的Transformer,针对"name"这个字段做替换。

又比如说,你的字段中有个"IP",你希望将这个IP解析成运营商、城市等信息,那么你就可以添加一个Transformer做这个IP信息的转换。

当然,Transformer应该可以多个连接到一起连动合作。

设计Transformer模块是一件有趣而富有挑战的事情,这涉及到Tranformer功能多样性带来的3个问题:

  1. 多样的功能必然涉及到多样的配置,如何将不同的配置以优雅而统一的方式传达到插件中?

  2. 多样的功能也涉及到不同功能的描述,如何将功能描述以统一的形式表达给用户,让用户选择相应的配置?

  3. 如何将上述两个问题尽可能简单地解决,让用户编写Transformer插件时关注尽可能少的问题?

这里我们留个悬念,感兴趣的朋友可以阅读logkit Transformer相关的源码寻求答案,笔者后续也会在logkit的wiki页面中描述。

Channel

经过解析和变换后的数据可以认为已经处理好了,此时数据会进入待发送队列,即Channel部分。Channel的好坏决定了一个数据收集发送工具的性能及可靠程度,是数据收集工具中最具技术含量的一环。

数据收集工具,顾名思义,就是将数据收集起来,再发送到指定位置,而为了将性能最优化,我们必须把收集和发送解耦,中间提供一个缓冲带,而Channel就是负责数据暂存的地方。有了Channel,读取和发送就解耦了,可以利用多核优势,多线程发送数据,提高数据吞吐量。

ft_sender
图2 ft sender

一种设计思路是把整个Channel,包括多线程发送做成一个框架,封装成一个特殊的sender,我们称这个特殊的sender为"ft sender"。其架构如图2所示,ft sender与其他sender一样也提供了Send()方法,解析完毕后的数据调用Send方法实际上就是将数据传入到Channel中,然后再由ft sender处理多线程发送逻辑,将从队列中取出的数据交由实际的sender多线程发送。

同时需要为用户提供磁盘和内存两大队列方式选择。

如果追求最高的可靠性,就使用磁盘队列,数据会暂存到本地磁盘中,发送后自动删除,即使突然宕机也不怕丢失数据。

如果追求更高的性能,可以使用内存队列,其实现方式就是Go语言的Channel机制,稳定而简单,在关停过程中也需要将Channel中的数据落地到磁盘,在随后重新启动时载入,正常使用过程中也没有数据丢失的风险。得益于Go语言的同步Channel机制,甚至可以把内存队列的大小设置为0,以此实现多线程发送,这样使用内存队列即使宕机,也没有了数据丢失的风险。

除了正常地作为待发送数据的等待队列以外,Channel还可以具有如下一些非常有趣而实用的功能:

并不是所有解析完毕的数据发送到服务端就一定是正确的,有时服务端指定的数据格式和解析完毕的格式存在出入,或者数据中含有一些非法字符等情况,则数据不能发送成功。此时,如果一批数据中只有一条这样错误的数据,就很容易导致这一整批都失败。

错误数据筛选的作用就在于,把这一整批数据中对的数据筛选出来,留下错误的数据,将正确的发送出去。

做法很简单,当发送时遇到服务端返回存在格式错误的数据时,将这一批数据平均拆分为两批(二分),再放入队列,等待下次发送。再遇到错误时则再拆分,这样不断二分,直到一个批次中只有一条数据,且发送失败,那我们就找到了这个错误数据,可以选择丢弃或记录。

借助队列,我们很容易就能将错误数据筛选出来。

包拆分的由来是服务端不可能无限制开放每个批次数据传输的大小,出于服务器性能、传输性能等原因,总有会有一些限制。

当一个批次的数据过大时,就会导致传输失败。此时的做法与错误筛选的方法相似,只要将包二分即可,如果还是太大就再次二分,以此类推。

限速的功能最容易理解,数据统统经过Channel,那么只要限制这个Channel传输介质的速度即可。例如磁盘队列,只需要限制磁盘的IO读写速度;内存队列则限制队列大小以此达到限速的目的。

常见的流量控制的算法有漏桶算法(Leaky bucket)令牌桶算法(Token bucket)两种,比较推荐采用令牌桶算法实现该功能,感兴趣的朋友可以阅读一下logkit 的 rateio 包

Sender

Sender的主要作用是将队列中的数据发送至Sender支持的各类服务,一个最基本的实现同样应该设计得尽可能简单,理论上仅需实现一个Send接口即可。

  1. Send([]Data) error

那么实现一个发送端有哪些注意事项呢?

  1. 多线程发送:多线程发送可以充分利用CPU的多核能力,提升发送效率,这一点我们在架构设计中通过设计ft sender作为框架解决了该问题。

  2. 错误处理与等待:服务端偶尔出现一些异常是很正常的事情,此时就要做好不同错误情况的处理,不会因为某个错误而导致程序出错,另外一方面,一旦发现出错应该让sender等待一定时间再发送,设定一个对后端友好的变长错误等待机制也非常重要。一般情况下,可以采用随着连续错误出现递增等待时间的方法,直到一个最顶峰(如10s),就不再增加,当服务端恢复后再取消等待。

  3. 数据压缩发送:带宽是非常珍贵的资源,通常服务端都会提供gzip压缩的数据接收接口,而sender利用这些接口,将数据压缩后发送,能节省大量带宽成本。

  4. 带宽限流:通常情况下数据收集工具只是机器上的一个附属程序,主要资源如带宽还是要预留给主服务,所以限制sender的带宽用量也是非常重要的功能,限流的方法可以采用前面Channel一节提到的令牌桶算法。

  5. 字段填充(UUID/timestamp):通常情况下收集的数据信息可能不是完备的,需要填充一些信息进去,如全局唯一的UUID、代表收集时间的timestamp等字段,提供这些字段自动填充的功能,有利于用户对其数据做唯一性、时效性等方面的判断。

  6. 字段别名:解析后的字段名称中经常会出现一些特殊字符,如"$"、"@"等符号,如果发送的服务端不支持这些特殊字符,就需要提供重命名功能,将这些字段映射到一个别的名称。

  7. 字段筛选:解析后的字段数据未必都需要发送,这时如果能提供一个字段筛选的功能,就可以方便用户选择去掉一些无用字段,并节省传输的成本。也可以在Transformer中提供类似discard transformer的功能,将某个字段去掉。

  8. 类型转换:类型转换是一个说来简单但是做起来非常繁琐的事情,不只是纯粹的整型转换成浮点型,或者字符串转成整型这么简单,还涉及发送到的服务端支持的一些特殊类型,如date时间类型等,更多的类型转换实际上相当于最佳实践,能够做好这些类型转换,就会让用户体验得到极大提升。

  9. 简单、简单、简单:除了上述这些,剩下的就是尽可能的让用户使用简单。假设我们要写一个mysql sender,mysql的数据库和表如果不存在,可能数据会发送失败,那就可以考虑提前创建;又比如数据如果有更新,那么就需要将针对这些更新的字段去更新服务的Schema等等。

Metrics

除了基本的自定义的数据收集,数据收集工具作为一个机器的agent,还可以采集机器的基本数据,例如CPU、内存、网络等信息,通过这些信息,可以全面掌控机器的状态。

具体的内容可以参考logkit文档:Runner之系统信息采集配置

至此,一个完整的数据收集工具的设计要点已经介绍完毕。

我们已经开源的logkit正是按照这样的设计实现的,logkit集合了多种开源数据收集工具的优点,聚焦易用性,致力于打造产品级别的开源软件。

数据收集工具logkit

logkit(https://github.com/qiniu/logkit)是七牛大数据团队开源的一个通用的日志收集工具,可以从多种不同的数据源中采集数据,并对数据进行一系列的解析、变换、裁减,最终发送到多种不同的数据下游,其中就包括了七牛的大数据平台Pandora。除了基本的数据收集、解析以及发送功能之外,logkit集合了多种同类开源软件的优势,涵盖了容错、并发、热加载、断点续传等高级功能,更提供了页面方便用户配置、监控以及管理自己的数据收集业务,是一款产品级别的开源软件。

目前支持的数据源包括:

  1. File Reader: 读取文件中的日志数据,如 nginx/apache 服务日志文件、业务日志等。

  2. Elasticsearch Reader: 全量导出Elasticsearch中的数据。

  3. MongoDB Reader: 同步MongoDB中的数据。

  4. MySQL Reader: 同步MySQL中的数据。

  5. MicroSoft SQL Server Reader: 同步Microsoft SQL Server中的数据。

  6. Kafka Reader: 导出Kafka中的数据。

  7. Redis Reader: 导出Redis中的数据。

目前支持发送到的服务包括Pandora、ElasticSearch、InfluxDB、MongoDB以及本地文件五种,近期还会支持发送到Kafka以及发送到某个HTTP地址。

  1. Pandora Sender:发送到Pandora(七牛大数据处理平台)服务端。

  2. Elasticsearch Sender: 发送到Elasticsearch服务端。

  3. File Sender: 发送到本地文件。

  4. InfluxDB Sender: 发送到InfluxDB服务端。

  5. MongoDB Sender: 后发送到MongoDB服务端。

而在这已经实现的有限的几个发送端中,我们是这么设计的使用场景:

而发送到七牛的Pandora服务中,不仅能涵盖上述全部场景,还提供了大量可以快速发掘数据价值的实际应用场景的使用模板,同时还可以利用七牛成本较低的云存储服务对数据进行持久备份。

目前logkit支持的收集端和发送端并不多,非常欢迎大家来贡献更多的收集/发送端。

量身定制

再回过头来聊聊量身定制的话题,本文描述了一个数据收集工具打造的完整过程,我们深知量身定制一个数据收集工具有多么重要,而量身定制也是logkit的一大特点。logkit架构中的每个组成部分(Reader、Parser、Sender、Transformer、Channel等)都是一个GO语言的package,你完全可以用logkit中的包,自己写主函数,从而定制一个专属的收集工具。我们提供了代码案例,方便你亲自实践。

优势

总体而言,logkit有如下优势:

下面让我们来实践一下,看看logkit在实战中是什么样子。

logkit实战

下载

编译完后的logkit是一个Go的二进制包,你可以在logkit的Download页面找到对应操作系统的Release版本。

也可以参照logkit源码编译指南,从代码层面定制自己的专属logkit。

启动

logkit部署非常简单,将这个binary放在系统PATH路径中就算部署完成了,推荐使用诸如supervisor等进程管理工具进行管理。

启动logkit工具,可以使用默认的配置文件,执行如下命令即可。

  1. logkit -f logkit.conf

配置文件中默认开启了3000端口,初次使用可以通过浏览器打开logkit配置页面,配置runner并调试读取、解析和发送方式,浏览器访问的地址是http://127.0.0.1:3000

配置

logkit 首页
图3 logkit首页

打开网址后看到如图3所示的logkit配置助手首页,这个页面会清晰地展示目前所有的logkit Runner运行状态,包括读取速率、发送速率、成功/失败数据条数,以及一些错误日志。还可以在这里修改和删除Runner。

点击左上角【增加Runner】按钮,可以添加新的logkit Runner。

Reader
图4 数据源Reader配置

如图4所示,新增Runner的第一步就是配置数据源,为了尽可能方便用户,logkit将绝大多数选项都预设了默认值,用户只需要根据页面提示填写黄色的必填项即可。

按页面步骤依次配置数据源、解析方式、以及发送方式。

Parser
图5 解析数据

如图5所示,在配置解析方式的页面还可以根据配置尝试解析样例数据,这个页面可以根据你的实际数据非常方便地调试解析方式。

Transformer
图6 字段变化Transformer

如图6所示,除了解析以外,还可以针对解析出来的某个字段内容做数据变换(Transform),即上一章中描述的Transformer。可以像管道一样拼接多个Transformer,做多重字段变化。

最后配置完发送方式,可以在如图7所示的页面做二次确认。

确认添加
图7 确认并添加页

在二次确认的页面中,可以直接修改表达内容也可以返回上一步修改,最终点击添加Runner即可生效。

到这里,一个复杂的数据收集工作就完成了,怎么样,就是这么简单,快来实际体验一下吧!

作者简介

孙健波,七牛大数据高级工程师,开源项目logkit的主要作者,InfoQ专栏作者。

添加新批注
在作者公开此批注前,只有你和作者可见。
回复批注