如何自下而上设计开发一个大数据准实时同步系统

最近项目基本上进入尾声了,也有时间来整理下最近做的这个项目.因为主要就一个人在做,所以周期比较长,整个系统涉及到的开源数据框架比较多,所以感觉还是有不少价值的,当然这个里面也有很多的坑,只有做过才能体会到,后面我会慢慢展开.

项目背景

主要是两个公司合并了,哪两家就不说了,反正是行业Top1,Top2.后来打算成立新公司,所以数据需要整合.其实在一开始合并之后,数据就有陆陆续续的整合,不过这种整合方式效率比较低,整个链路很长,涉及到很多部门,圈绕的比较大.具体的流程大概说下:

  1. 首先数据在我们的数据仓库ETL跑完之后,会有一个下游作业把数据拷贝到一个公共hadoop队列里面(Hadoop权限这块比较差,由于支付的数据要求较高,在防火墙内,不可能共享Hive数据仓库所在目录的数据,所以只能采取这种方式共享数据);
  2. 然后由专门和对方公司数据部门对接的人来把我们拷贝到指定队列的数据从hadoop上下载下来,上传公有云服务器(其实就是FTP服务器);
  3. 然后对方公司的数据组有专门的人从公有云把数据下载到他们的Hadoop客户机上,然后把数据上传到他们的Hadoop集群,然后数据才能达到基本能用的状态.

**PS:**由于Hive本身作为离线数据仓库,数据延迟为T+1,然后数据再到他们那,延迟就变成了T+2,并且整个过程涉及到的部门比较多.数据不出问题还好,一旦出了问题,得找到每个环节的负责人,然后定位问题出现的环节,总之是很麻烦.

所以上面的老大也觉得这种方式使用数据非常的麻烦,成本太大,关键是数据的时效性也不满足要求,所以就想做实时数据流同步,整合两边的数据仓库.当然数据仓库只是实时数据流的一个用途之一,也会有其他数据使用场景,比如实时计算之类的.

设计方案

一般而言,一个完整的数据体系结构都比较复杂,我这里就大概说下整体架构:

数据体系结构图
如果觉得图片太小看不清可以在新标签中打开查看大图.我会从下而上简单说下这个架构里面都有些什么东西:

数据采集

数据系统的第一步,数据采集.这部分数据主要分为两大类,DB数据以及业务日志

Database

第一类就是比较主要的,业务数据库数据采集,目前基本上都是用的mysql数据库,所以采集方案比较大众化都是采用阿里的Canal,当然一般可能不能直接拿来用,需要你根据公司的技术架构做一些二次开发,因为一般公司内部会对MySQL做一些架构来给业务提供高可用的特性.数据采集之后会封装成自定义的Kafka Message,为了方便区分,topic命名方式可以有一个统一规则,例如binlog_{db实例架构类型}_{db实例名},这样能比较方便知道某个db实例的binlog数据特性以及排查问题.这里需要注意的就是往Kafka上写数据的时候,Kafka的分区策略要注意下,一个参考建议就是可以采用(Schema,Table)作为PartitionKey.这样可以保证同一个表的数据是顺序存放在同一个分区的,这样后续的一些程序处理可以保证数据的顺序性.
**NOTE:**封装的KafkaMessage可以根据业务来,如果想简单起见,可以把Canal的RowChange消息整个封装进去.

Tomcat Log

这部分数据原则上来说,并没有Database数据重要,因为我们建的大部分数据表,模型都是基于业务的ER关系表来的.但是日志数据可以做很多其他的事情,数据挖掘里面也有很大的用途,还有的主要就是业务辅助用,后面会讲一些使用场景.业务日志采集采取的方案比较多,有采用入侵式埋点的,但是现在的普遍做法应该还是非入侵式采集,因为日志采集事实上对业务而言是一个非必须的功能,但是线上业务首先要保证性能和可靠性不受其他无关影响.
采集业务日志一般采用的就是flume tail功能,对指定日志目录下面的文件执行类似Linux:tail -f命令不断滚动采集新产生的日志.这个里面也有很多的坑,目前最新的flume好像支持直接采集一个目录,这样就不用自己写tail插件去采集日志了.这部分日志采集之后也是放在Kafka集群,同样的也需要自己封装成定义的KafaMessage.建议是需要保留日志的文件名以及机器名,所以分区策略是用这两个key就可以了.
**NOTE:**一般大一点儿的公司,线上的业务系统都会有appId这种东西,所以建议如果有这个信息也封装到KafkaMessage消息里面.

数据计算

数据存储之后,就是数据的计算了,Kafka毕竟只能存一段时间,过期就会删除,所以这些消息会落地到其他的存储系统,不同类型的数据有不同的落地场景,相同的数据也可能有不同的落地场景,下面一个一个说下这些应用及落地场景:

Binlog->Hive

Database的binlog数据采集到Kafka之后,最大的一个用途就是作为数据仓库的ods层源数据.由于离线数据仓库一般延迟要求不高,基本上默认就是T+1,所以我们不用事实计算这部分数据,反而是吞吐量是个很重要的指标.所以这里可以考虑的就是Spark Streaming这个框架,微批处理,可以在延迟和吞吐量之间做一个很好的平衡.
当然如果你没有Spark集群,也可以用Flume消费Kafka消息写到本地(据说直接写HDFS性能不是很好,没有测试过),然后隔一段时间put到hdfs上.关于写的策略,因为每天的binlog是增量数据,所以你需要一个字段作为binlog的分区依据,可以在将Canal消息封装KafkaMessage的时候加上一个binlog的执行时间.
**关键点:**这个过程其实坑还是挺多的,Binlog一般并发比较大,你要保证数据可以准确去重,不会取到错误的数据.

Binlog->ElasticSearch

这个需求和Hive那个很像,但是又有一些区别.Hive不支持单行记录更新(据说貌似新版本的支持了,没去详细了解过),ElasticSearch简单来说就是作为Database里面的表的一个镜像.因为MySQL的单表数据量在超过5000w的时候性能会变得非常差,所以业务上为了性能考虑,都会对表按月或者其他规则做分表.所以一般像运营,开发,测试在查询数据的时候非常的不方便.而ElasticSearch本身就是作为索引系统而存在的,非常适合有大量的查询场景.
这个因为并不是同步到离线数据仓库里面,所以对数据的延迟要求会高一些,这个时候Spark Streaming就不太合适,那么可不可以使用Jstorm这种流式处理框架呢?答案是不行,原因也很简单,JStorm这种流处理框架是at least once语义,也就是能保证数据至少被处理一次(当然好像后面的版本是准备实现还是已经实现了exactly only once,反正我们自己用的并没有内置支持),并且还有一个很重要的原因就是:JStorm无法保证数据的顺序,一个拓扑里面的每个bolt会有多个task,数据如果是按随机分发的话,不同的task处理速度是没办法预估的,这就可能导致先产生的binlog变更记录有可能后写入到ElasticSearch,就容易出现数据不一致的问题.所以这种情况下异步流处理框架是没有办法处理这种问题的,所以在技术选型上我们可以选像Spark Streaming这种顺序批处理(但是也有条件限制,就是每次只能有一个batch在执行,队列设置成FIFO).
这里简单说下我们的技术方案:首先从Kafka消费Binlog,清洗出每个表的Primary Key,封装成自定义的Kafka Message写回到Kafka,然后再从Kafka去消费这种消息,拿到Primary Key,然后查询对应的Database,这样每次都是拿到的最新的记录,然后更新ElasticSearch,这样就不用担心数据变更记录顺序不一致的问题,因为每次拿到的都是最新的.至于历史数据问题也比较简单,直接把当前表里面的记录清洗出Key写到Kafka,后面作业不用改,自然就会消费到这些消息,整体架构如下:

Binlog-ElasticSearch

**优化点:**其实这个项目后期还是有一些优化点的,目前为了简单起见是采取的中转了一次,并且反查Database的方案,这样可以不用考虑数据的顺序,正确性问题,因为反查拿到的数据永远是当前时间点的正确数据.但是也会有一个问题,就是服务部署的机器无法做到动态扩容,需要有指定Database的连接权限,公司的Database都是采用的ip白名单方式,需要提前申请权限.其实如果Kafka保证Binlog的数据正确性,那么下游完全不用再反查Database,直接按变更记录查询就行了,唯一需要担心的就是数据初始化和已有数据同步作业同时跑的一个顺序问题.

Tomcat Log->Redis/Hbase/ElasticSearch/Hadoop

这一步相信会有很多的应用场景,基本就涵盖了几乎所有的日志使用场景了,分别大致说一下几个场景吧.

Trace系统

这个应该算是最终要,最有意义的一个东西:分布式式追踪系统.基本上一个稍微大一点儿的公司都会有自己的Trace系统,一方面是大公司的系统比较复杂,一件事情往往需要很多个系统互相配合才能完成;另外一个就是现在微服务比较火,都在提倡系统解藕,所以调用链会比较长.业界有名的就是Google 的Dapper,具体的实现有阿里的鹰眼(不开源),Twitter的Zipkin.当然这不是我们这里的主要内容,我们只关心如何应用,也就是在有traceId的情况下,这些日志能有哪些应用场景.有大概这么几个场景:

  1. 服务异常排查: 这个就是用到的traceId,我们把这些日志通过流计算作业提取traceId,把日志按照qrtaceId存放,当某个服务出问题了,可以通过查询服务的调用链,分析具体的出问题在哪个环节.
  2. 用户行为/订单流程分析: 这个的原理其实和上面的差不多,只不过现在不是看traceId,而是看userId/orderNo.这个应用场景也很大,主要是发现异常数据,定位排查细节,一般我们会选取某个有问题的用户,查到他某个时间段的所有行为日志.上面这两个就比较适合存放在HBase中,至于为什么,了解HBase的特性的人应该就明白了.
  3. 运营分析: 一般对于有些系统,运营会关注系统的异常数据,在系统出现问题的时候,有时候运营需要查询到异常数据,然后批量做人工处理.我们也通过流计算提取所有的日志中的Execption Log,按应用名,系统,机器名,异常类型存放到ElasticSearch中.之所以放在ES中主要是因为可以安装ES-SQL插件,运营可以写SQL查询统计这些信息.当然也会有定时作业去汇总这些信息发邮件,按照系统给相关负责人发邮件.

Hive数据仓库

虽然数据仓库主要是业务的DB数据,但是日志里面也含有非常多的信息,而且DB受限于业务和性能,不会把所有的东西都存在DB里面,但是日志就自由多了,可以输出很多有用的信息,比如记录用户的位置/搜索/页面浏览记录,再比如修改密码/登录这些,往往DB里面只会记录成功的那条记录,中间很多失败的信息是只存在于日志里面的.所以这部分数据也会通过Streaming作业这个ETL过程并入到Hive数据仓库里面.PM也主要是分析用户行为日志来改进用户体验.

统一查询

这个说起来其实也不算一个单独的功能,上面的这几个其实说到底也是提供了统一查询的功能.这里要说的是一个统一入口问题,正常的开发线上排查问题,查看日志都是直接到对应的机器上查看的.但是当我们有实时日志采集系统之后,完全可以做到屏蔽机器这层概念,即通过我们的系统直接实时tail服务器上的日志,虽然会有延迟,但是在秒级别的话是可以忽略的.

未完待续