LittleQ

爱好:写代码

最近重新把博客捡起来了,由于好久都没更新,导致在新的机器上各种跑不起来,于是索性把依赖都更新成最新的了。另外好久不用,hexo跟next主题更新了不少新功能,也顺便都加上了,加了几个插件,感觉还不错,尤其是搜索插件,很多年以前用Hexo些博客最头疼的就是没有检索功能,只能依靠标签跟归档文件夹去找。在添加搜索插件的时候,一开始并不生效,后面解决了,在此记录录一下开启搜索功能的过程。

准备工作

本次是以NexT主题为例,所以确保你也是使用的这个主题,其他的请自行Google搜索,由于配置用的是最新的,所以也贴一下版本号

1
2
3
4
5
➜  blog git:(master) ✗ hexo -v
INFO Validating config
hexo: 6.3.0
hexo-cli: 4.3.1
os: darwin 22.5.0 13.4

NexT版本是

1
2
"name": "hexo-theme-next",
"version": "8.18.1",

安装搜索插件

在博客根目录执行命令

1
npm install hexo-generator-searchdb

然后重新生成

1
hexo g

都操作完之后,可以在public目录下看到多了一个文件search.xml,到这里我以为已经可以了,但是启动本地服务之后,反复试了几次还是不行,导航栏没有任何变化,根本没有搜索入口。
然后去看了下官方文档,发现好像要自己写代码,官方文档解释如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
How to use this plugin in my Hexo blog?
You have two choices:

you don't want to write search engine by yourself. There are many themes that take use this plugin for local searching that works out of box.

you are familiar with JavaScript and would like to write your own search engine. You can implement one by yourself according to the template code search.js. There is no documentation at present, but you can find its usage in the source code of the theme NexT. Generally there are 3 steps:

write a search view. This is the place for displaying a search form and search results;
load the search.js script via CDN, for example:
<script src="https://cdn.jsdelivr.net/npm/hexo-generator-searchdb@1.4.0/dist/search.js"></script>
A LocalSearch class is provided in the search.js which tells the browser how to grab search data and filter out contents what we're searching;

write a search script, make use of the previous LocalSearch class.

乍一看好像得自己写JS代码才能行,准备放弃了,后来仔细看了下第一段,好多主题都是用的这个插件来实现本地搜索的,于是我搜了下NexT的本地配置文件_config.yml,发现了这么一段配置

1
2
3
4
5
6
7
8
9
10
11
12
13
# Local Search
# Dependencies: https://github.com/next-theme/hexo-generator-searchdb
local_search:
enable: false
# If auto, trigger search by changing input.
# If manual, trigger search by pressing enter key or search button.
trigger: auto
# Show top n results per article, show all results by setting to -1
top_n_per_article: 1
# Unescape html strings to the readable one.
unescape: false
# Preload the search data when the page loads.
preload: false

原来NexT的本地搜索用的就是hexo-generator-searchdb插件,只是开关默认是关闭的,于是将enable属性改成true,再次重新启动服务,本地已经出现搜索入口了,如图所示:
本地搜索功能截图

废话

一直想写一篇关于Spark Streaming的文章,但是实在是事情太多,当然主要还是我比较懒,身边的同事已经明示我博客好久都不更新来催更了,技术都快荒废了,所以先水一篇。这篇文章虽然是叫灌水篇,但实际上都是我在实际项目中学习和总结到的一些切身经验,如果你真的想在生产环境去用Spark Streaming实现一些简单的实时计算或者实时监控,看板之类的,有一些问题是你一开始就得考虑到的,并且得有非常可靠的解决方案。

算起来差不多有快两年没用过Spark Streaming了,毕竟现在的公司也是主推Flink了,从趋势来看,的确是Flink的势头更猛一些,虽说Spark Streaming后来引入Structed Streaming,据说性能提升不少,这个我也没用过,暂时就不做深入讨论了。

其实这个系列计划后面还有一篇JStorm,一篇Flink,之所以把Spark Streaming放在第一篇来讲主要是有两个原因:首先是这个框架算是在实际做项目中钻研过一段时间,对其使用中的坑也算踩了不少,最后也算圆满完成了项目;其次是之前在招人的时候面试过很多人,简历上写了做过的一些Spark Streaming项目,在考察他们项目的时候,问的深了慢慢就懵了,从来没有碰到过一个人有认真思考过那些问题,当然这里面有很多人的简历大概率是培训机构的一些模板(数仓分层 + Spark Streaming + 用户画像,基本是这个结构,技术选型一模一样,另外也吐槽下那些培训机构,你告诉别人用户画像放在HBase这没啥问题,问题是你总得好好讲下rowkey这个吧,连怎么设计,为什么要这么设计也不讲讲,一问就完蛋),本身也没真正做过Spark Streaming的项目,对框架的一些设计理念和底层的东西更加没有了解过。

正文

所以这篇文章主要会说一下一些实际项目中比较关注的问题,这些问题大概率也是面试官比较喜欢问的,做实时计算的确也绕不开那几个问题,好久没用了,想到什么写什么,截图是肯定没有了,但是如果你看到这篇文章,又或者在找解决方案,我想没有图应该也看得懂,下面就讲几个比较重要的问题。

Spark Streaming会丢数据吗

相信每个做实时计算的人都会碰到这个问题,或者说被别人追问过这个问题:为什么我从xxx能查到数据,从你实时处理之后就查不到了?又或者是这段时间的指标看着好像有问题,线上实时计算程序没漏掉数据吧?如果你非常自信的告诉别人,不可能丢的,这个Spark Streaming官网说了,xxx和xxx机制可以保证即使程序挂了,也能恢复,数据不会丢的。但是实际情况会比较复杂,很可能会被立马打脸,或者说运气好,丢了一两条也没人看的出来,但是其实有一些项目是连一条数据都不能丢的。

这里先说下结论,其实Spark Streaming是会丢数据的,要想保证数据100%不丢失,你需要根据实际情况,也就是输入输出的存储系统来做很多的处理,否则,丢数据的情况会有很多种,丢不丢数取决于你的集群和系统的稳定性。

所以每次碰到简历上写着Spark Streaming项目的人,我的第一个问题就是:你这个程序会丢数据吗?他们一开始都会很确定的告诉我不会,并且会开始讲自己是通过Direct的方式去消费Kafka的数据,手动维护offset的,只有当数据处理成功了才会提交offset,所以即使程序挂了,重启之后还是会从之前的offset开始消费,数据不会丢。当然有时候也会顺带问下他们Receiver模式和Direct的区别和为什么选后者,算是比较基本的问题,但是有的培训机构居然连这个也没讲明白。很多人都认为是不能手动维护offset,所以有丢数的风险,其实主要原因并不是这个,这里就不展开讲了。

但是我们面临的实际问题是,手动维护offset,确保每次处理成功了才提交offset就真的不会丢数据吗?从表面上来看,这个想法确实没什么问题,但是如果你对Spark Sgreaming了解的够深,就会发现这个想法是错的。其实要想知道会不会丢数,就得稍微了解一下底层的东西,或者说多看看监控和SparkUI界面的信息,了解一下Application,Job,Task,Batch这些概念,如果你把这几个东西搞清楚了,我相信你应该可以大致判断出来你写的程序是否有丢数据的风险以及在什么情况下会丢数据。

介绍

正式讲问题之前,先得看几个基本概念

名称 说明
Application 应用程序,也就是你每次提交一个Spark Streaming任务的时候,运行在集群上的一个计算任务
Batch Spark Streaming核心理念就是micro batch也就是算子和程序并不是时时刻刻都在计算和处理数据的,而是每隔一段时间成成一个批次
Job 一个计算任务可能会包含多个Job,大部分应该就是一个
Task 任务最终执行计算最小单位,对于sourcekafka的情况,总的数量一般和需要消费的分区数相等,最大同时执行task个数可以通过一些参数来设置

所以其实是每次开始调度执行一个Batch,会生成job,然后job会分stage,最细粒度就是到task执行具体的逻辑。因为我写Spark Streaming程序中也没见过有两个job的,所以暂且以只有一个job的程序为例,讨论下什么场景下会丢数据以及深层次的原因,下面主要介绍下TaskBatch

  1. Task
    作为最终消费和处理数据的执行单元,也就是你程序里面真正处理数据的算子和最终的action操作,对于手动维护offset来说,一般就是在所有数据处理完,就调用Kafka的api来commit offset,这样就确保了如果程序出现异常情况挂掉,offset不会更新,当程序再次重启,会从zk上读取消费的信息,从上一次最后提交的offset后开始消费数据。这也就是大家普遍认为不会丢数的依据;
  2. Batch
    一般会在程序中设置批次的间隔,也就是多长时间生成一个批次,这个取决于实际场景,假设是5min一个批次,所以每隔5分钟程序会自动生成新的批次,然后根据资源情况和上一个批次的执行情况来决定是否开始调度此批次。需要说明下,大部分情况,也就是默认情况下,同时执行的批次只能是一个,也就是如果数据太多导致上一个批次没有执行完,后面生成的批次就会被pending住。并且默认的调度算法是FIFO,所以基本上就是按数据的消费顺序来处理数据;
  3. 异常
    可以看到其实整个Spark Streaming程序从表格上来看,从上到下是一个任务的逐渐细化过程,所以问题来了,计算任务失败是如何定义的,Task失败了Job会失败吗?Batch会失败吗?Application会失败吗?
  • Task会有默认的重试次数,好像是4次,可能不同的平台会额外设置,这个参数是可以设置的。Task执行失败了会自动重试,如果超过了最大重试次数还是执行失败,那这个Task所在的Job就失败了,所以当前批次的状态就是Failed
  • Job失败了就失败了,因为Task已经重试过了仍然失败,最终这个Batch失败了, 但是Application并不会失败,一般而言,Task重试了很多次,会耗费很多时间,所以会有pending状态的Batch,这个时候其实会直接调用下一个pending的批次继续执行。
案例分析

所以到这里你就应该明白了为什么说,即使你使用直连方式消费kafka,手动维护offset,仍然会丢数据。这里还需要再额外讲一下,每个批次在生成之前,就会计算出这个批次当前需要消费的数据范围,也就是[offset1,offset2]这种,对应到kafka的每个分区,哪怕你上一个batch还没执行完,下一个批次他会算出他此时需要消费的数据。至于怎么算的,有时间后面展开说,总之不是每次开始执行才去zk里面读取最新offset开始消费,这里记住就行了,因为和丢数有关。

上面讲到了每个批次要处理的数据其实是根据以往任务执行的情况来估算出来的,假设第一个批次在执行过程中由于在不停的重试,时间超过了一个批次,后面又生成了几个批次,并且由于同时最多只能有一个batch在执行,其他的都在pending状态。这个时候当Task的失败重试次数超过了设置的最大失败重试次数,即最终还是失败了,于是第一个批次就挂了。并且由于程序设置的是执行成功之后才会commit offset,导致偏移量也没有提交,到此为止还算正常毕竟虽然失败了,但是offset也没有更新,如果重启的话,还是会从上一次成功的地方接着消费。

但事实是紧接着由于第一个批次失败了,资源空闲出来了,后面pending的第一个批次就开始调度了,然后呢比较顺利,马上就执行成功了,这个时候程序触发了commit offset,将最新的消费情况更新到了zk。依次类推,后面的批次按照之前算好的消息范围继续消费,成功后commit offset。于是在执行成功了几个批次之后,突然发现,第一个批次失败了,offset被第二个和后面的批次更新覆盖了,然而实际上消息并没有被消费处理,因为第二个批次处理的消息是提前算好的,这就是数据丢失的真实场景。

解决方案

要怎么做才可以保证数据不丢失呢?其实通过上面的分析,会导致丢数据的根本原因其实是因为Batch失败了,但是Application并没有失败,后续执行的Batch成功了把offset给覆盖了。所以解决问题的方法其实有两种:

  1. 不让Batch失败,这样就不会存在前面的批次失败了,后面的批次成功了这种情况。但是实际情况下总会存在程序出异常的情况,所以可以在程序出问题的时候将程序hang住,比如将Task的最大失败重试次数设置成int最大值。不过这样也有问题,一是如果就是程序的确就有问题,重试也不会成功,白白重试浪费资源;另外就是如果程序计算很快,那么重试也会有试完的时候,然后开始调度下一个Batch,所以并不推荐这种方式;
  2. Batch失败了让Application也失败,根本原因其实是Batch失败了Application没有失败,继续调度后续Batch导致offset被覆盖。这里需要借助因为第二个批次处理的消息是提前算好的,

这里需要借助下StreamingListener这个类,你需要继承这个接口,来监听Job的执行状态,从而来控制当Job失败了,程序直接重启,不要直接调度下一个Batch

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
class Batch extends Serializable {
var failedCnt: Long = 0
var successCnt: Long = 0
}

class BasedataSparkBatchListener(ssc: StreamingContext) extends StreamingListener {
val batch = new Batch()
/** Called when a batch of jobs has been submitted for processing. */
def onBatchSubmitted(batchSubmitted: StreamingListenerBatchSubmitted) { }

/** Called when processing of a batch of jobs has started. */
def onBatchStarted(batchStarted: StreamingListenerBatchStarted) { }

/** Called when processing of a batch of jobs has completed. */
def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted) {
val batchInfo = batchCompleted.batchInfo
val outputOperations = batchInfo.outputOperationInfos
val numFailedOutputOp = outputOperations.values.count(_.failureReason.nonEmpty)

if (numFailedOutputOp != 0) {
batch.failedCnt += 1
} else {
batch.failedCnt += 1
}
}
}

可以看到有很多方法,这里其实只需要实现onBatchCompleted方法获取失败的Batch数量就行,然后主程序需要注册下这个listener:

1
2
val listener = new BasedataSparkBatchListener(ssc)
ssc.addStreamingListener(listener)

然后在提交offset的地方需要做一个判断,如果是listener.batch.failedCnt > 0,执行ssc.stop()将程序杀掉,如果listener.batch.failedCnt = 0,则执行commit offset操作。

限速和背压设置

Exactly once真的必要吗

Speculative机制有什么问题

买的VPS三年了,一不小心忘了续费过期了,数据环境全给清空了,又得重新配置环境。因为有ipv6地址,所以想把上学那会儿的PT站用起来,重新装下transmission,这个装好了可以直接从网页上添加任务挂种,比较方便,安装过程如下

安装Transmission

1
2
3
4
5
6
7
8
# 安装源
yum install epel-release
# 直接安装相关的包
yum install transmission-*
# 启动服务创建配置文件
service transmission-daemon start
# 停止文件修改配置文件
service transmission-daemon stop

修改配置文件/var/lib/transmission/.config/transmission-daemon/settings.json,需要改动的几个地方如下:

1
2
3
4
5
6
7
8
9
"download-dir": "/root/pt",
"incomplete-dir": "/root/pt",
"rpc-authentication-required": true,
"rpc-port": 12345,
"rpc-enabled": true,
"rpc-password": "the fuck password",
"rpc-username": "zhangsan",
"rpc-whitelist": "0.0.0.0",
"rpc-whitelist-enabled": false,

NOTE: 下载的目录文件夹你得手动创建一下,我这里直接就在pt文件夹下面了,端口最好也改下,不要用默认的。

然后重启服务

1
systemctl start transmission-daemon.service

最后网页验证一下,访问http://123.4.5.6:12345/transmission/web/,如果可以访问那就说明可以了。

百度云盘

其实用PT下载挺快的,一般可以达到30Mb/s这种速度,但是怎么把文件搞回来是个问题,直接在服务器上起一个python的httpServer也可以,但是直连非常的慢,还不稳定,所以我采取了一种比较简单的方案:把下载下来的文件同步到百度云盘,然后再通过百度云下载。

背景

最近在构思做一个通用化的字典工具,其中有一个功能就是自动扫描枚举类,将枚举类序列化成一张表,对比更新到数据库中。但是在实际中使用发现,如果不做任何限制,直接用fastjsonJSON.toJSONString(obj) 方法,得到的只是枚举的名字,并没有得到一个全字段的json串。即SUCCESS(0, "成功")得到的将是SUCCESS这个字符串

fastjson版本:1.2.56

解决方案

  1. 重写覆盖枚举类的toString() 方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
@Getter
@AllArgsConstructor
public enum EnumTest {
SUCCESS(0, "成功"),
FAIL(-1, "失败");

private int code;
private String msg;

@Override
public String toString() {
return "{\n" +
" \"code\": " + getCode() + ",\n" +
" \"msg\": " + getMsg() + "\n" +
"}";
}
}

这样的话,如果直接用EnumTest.SUCCESS.toString()就可以得到想要的结果,但是如果用JSON.toJSONString(EnumTest.SUCCESS)得到的仍然是SUCCESS。这个比较直白简单,但是不支持fastjson的方法,就是手动控制了枚举的toString输出内容,但是也有一个不好的问题就是,新增字段或者修改字段,还得改toString方法,万一忘了改,那可能会发生一些莫名其妙的Bug,而且还不易察觉,所以不推荐。
**NOTE:**注解是用了lombok包里面的一些方法

  1. 自定义SerializeConfig

其实仔细看JSON.toJSONString()方法,有一些其他的重载方法提供了一些其他的参数,其中

1
2
3
public static String toJSONString(Object object, SerializeConfig config, SerializerFeature... features) {
return toJSONString(object, config, (SerializeFilter) null, features);
}

这个里面有一个自定义序列化的配置参
使用如下:

1
2
3
SerializeConfig config = new SerializeConfig();
config.configEnumAsJavaBean(EnumTest.class);
System.out.println(JSON.toJSONString(EnumTest.SUCCESS, config));

我们对比下几个的输出结果:

1
2
3
4
5
6
System.out.println("1:" + EnumTest.SUCCESS.toString());
System.out.println("2:" + JSON.toJSONString(EnumTest.SUCCESS));

SerializeConfig config = new SerializeConfig();
config.configEnumAsJavaBean(EnumTest.class);
System.out.println("3:" + JSON.toJSONString(EnumTest.SUCCESS, config));

结果如下:

1
2
3
4
5
6
1:{
"code": 0,
"msg": 成功
}
2:"SUCCESS"
3:{"code":0,"msg":"成功"}

通过这种方式,可以比较灵活自由的达到我们想要的序列化效果,而没有破坏掉其他的一些引用到枚举类的地方,因为如果直接重载了枚举本身的toString()方法,会产生一些不可预知的错误。

枚举嵌套

对于简单的枚举,这种应该没有啥问题,如果枚举出现了嵌套呢?我们写个例子测试一下,再申明一个EnumTest2

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
// 枚举定义
@Getter
@AllArgsConstructor
public enum EnumTest2 {
SUCCESS(0, "成功", EnumTest.SUCCESS),
FAIL(-1, "失败", EnumTest.FAIL);

private int code;
private String msg;
private EnumTest enumTest;

}

// 自定义序列化配置
SerializeConfig config = new SerializeConfig();
config.configEnumAsJavaBean(EnumTest2.class);
System.out.println(JSON.toJSONString(EnumTest2.SUCCESS, config));

// 输出
{"code":0,"enumTest":"SUCCESS","msg":"成功"}

可以看到EnumTest2本身序列化没问题,但是他的enumTest属性没有按照我们想要的方式来,需要改一些:

1
2
3
4
// 这个地方改一下
config.configEnumAsJavaBean(EnumTest.class, EnumTest2.class);
// 输出结果
{"code":0,"enumTest":{"code":0,"msg":"成功"},"msg":"成功"}

枚举转字典

其实上面的基本是为了搞清楚枚举的序列化问题,主要目的其实还是为了我们的字典如何同步。因为并不是所有的枚举都需要入库,所以我们需要实现一个注解,当有这个注解的枚举,那么我们会把他同步到字典中。当然还有一个问题就是上面探讨的,如果一个枚举他嵌套了其他的枚举,我们还需要把他所引用的枚举都配置到自定义序列化的配置里,所以实现如下:

同步注解

背景

最近有个Flink实时作业写HBase的任务发现丢数据了,Flink平台和HBase运维也无法定位到具体的问题,也没有任何异常日志。没办法只能通过把HBase数据导出到离线Hadoop集群来分析。
一开始怀疑MQ没有采集到日志,后来通过把Kafka日志拉取到HDFS查询发现数据是有的,那问题就只可能是在计算过程中丢失了。万幸实时采集的数据都有落HDFS,所以想离线分析一波,首先让运维
给HBase打了一个快照,然后给了个MR代码让我自己解析数据结构。

问题

其实代码很简单,就是解析Cell把值解析出来然后写到HDFS路径上,需要用到的包也不多,pom.xml文件如下:
``结构。

代码

其实代码很简单,就是解析Cell把值解析出来然后写到HDFS路径上,需要用到的包也不多,pom.xml文件如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.5</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.5</version>
</dependency>

<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>${hbase.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-protocol</artifactId>
<version>${hbase.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-common</artifactId>
<version>${hbase.version}</version>
</dependency>

<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-server</artifactId>
<version>${hbase.version}</version>
</dependency>

<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId>
<version>${hadoop.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>${hadoop.version}</version>
</dependency>

<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.54</version>
</dependency>

具体的版本根据你的集群而定,然后就是解析程序了:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
public class HBase2HDFSApp {
static Logger LOG = LoggerFactory.getLogger(HBase2HDFSApp.class);
/**
* * 需要传入的参数:快照名字,解析之后输出路径,快照输入路径,临时路径
* * @param args
* * @throws Exception
*
*/
public static void main(String[] args) throws Exception {
if (args == null) {
System.err.println("Parameter Errors ! Usage : <snapshot_name> <output_path> <input_path> <tmp_output_path>");
System.exit(-1);
}
String snapShotName = args[0];
Path outputPath = new Path(args[1]);
Configuration configuration = HBaseConfiguration.create();
configuration.set("hbase.rootdir", "hdfs://" + args[2]);
configuration.set("mapreduce.job.queuename", "xxx");
configuration.set("hadoop.tmp.dir", "xxxx");
String jobName = HBase2HDFSApp.class.getSimpleName();
Job job = Job.getInstance(configuration, jobName);
job.setJarByClass(HBase2HDFSApp.class);
LOG.info("start to init");
TableMapReduceUtil.initTableSnapshotMapperJob(snapShotName,
new Scan(),
HBase2HDFSMapper.class,
Text.class,
NullWritable.class,
job, true, new Path(args[3]));
LOG.info("init success");
outputPath.getFileSystem(configuration).delete(outputPath, true);
FileOutputFormat.setOutputPath(job, outputPath);
job.setOutputFormatClass(TextOutputFormat.class);
job.setNumReduceTasks(0);//没有reduce
job.waitForCompletion(true);

}
public static class HBase2HDFSMapper extends TableMapper<Text, NullWritable> {
@Override
protected void map(ImmutableBytesWritable key, Result rs, Context context) throws IOException, InterruptedException {
byte[] keyBytes = key.get();
JSONObject value = new JSONObject();
String rk = new String(keyBytes);
List<Cell> list = rs.listCells();
for (int i = 0; i < list.size(); i++) {
Cell cell = list.get(i);
value.put(new String(CellUtil.cloneQualifier(cell)), new String(CellUtil.cloneValue(cell)));
}
context.write(new Text(rk + "\t" + value.toJSONString()), NullWritable.get());
}
}
}

**NOTE:**需要注意的是<output_path>和<tmp_output_path>根目录要一致,然后就是不能使<input_path>的子目录.
编译打包之后提交运行,注意打包需要用到assembly插件,对应的pom.xml配置为:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<version>2.6</version>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id> <!-- this is used for inheritance merges -->
<phase>package</phase> <!-- bind to the packaging phase -->
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>`配置为:

然后打包之后运行:

1
2
3
4
5
hadoop jar ./target/xxx-1.0.0-SNAPSHOT-jar-with-dependencies.jar com.xxx.HBase2HDFSApp \
<snapshot_name> \
<output_path> \
<input_path> \
<tmp-output_path>

然后就出现了一个经典的错误:

1
2
3
4
5
Caused by: java.lang.IllegalAccessError: class com.google.protobuf.HBaseZeroCopyByteString cannot access its superclass com.google.protobuf.LiteralByteString
at java.lang.ClassLoader.defineClass1(Native Method)
at java.lang.ClassLoader.defineClass(ClassLoader.java:763)
at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
xxxx

问题原因

google了一下,找到这个问题的解决方法,详细见链接: http://www.voidcn.com/article/p-hhmhpejc-bh.html. 大概就是引入了一个优化措施导致的,
这个问题的发生是由于优化了HBASE-9867引起的,无意间引进了一个依赖类加载器。它影响使用-libjars参数和使用 fat jar两种模式的job.
fat jar模式Hadoop的一个特殊功能:可以读取操作目录中/lib目录下包含的所有库的JAR文件,把运行job依赖的jar放在jar中的lib目录下。

解决方式也比较简单:

  1. 把缺的这个包拷贝到hadoop lib目录
  2. 环境变量中导入这个缺失的包

由于我是临时跑一次,而且hadoop环境是公用的,直接破坏了不好,就采用的临时方案.首先定位到com.google.protobuf.HBaseZeroCopyByteString位于hive-server包中,具体对应的jar包是

hbase-protocol-0.98.21-hadoop2-xxxx.jar

具体的版本看你们公司集群编译之后对应的包版本即可,然后调整运行命令为:

1
2
3
4
5
6
export HADOOP_CLASSPATH=xxxx/hbase-protocol-0.98.21-hadoop2-xxx.jar
hadoop jar ./target/xxx-1.0.0-SNAPSHOT-jar-with-dependencies.jar com.xxx.HBase2HDFSApp \
<snapshot_name> \
<output_path> \
<input_path> \
<tmp-output_path>

然后运行就行了,大工告成。

七牛图床

最近发现博客里面用的七牛的免费图床全部过期了,之前也收到了七牛发的测试域名回收通知,当时以为域名过期了,再申请续一下就行,结果发现是直接都回收,啥都没有了,心中真是一万头草泥马。
之前工作比较忙也就忘了,最近登录博客一看,图片全部GG了, 博客搭建最近发现博客里面用的七牛的免费图床全部过期了,所以周末在家花时间就折腾了下迁移方案,记录一下.

方案调研

其实最初写博客的初衷也只是为了记录写自己日常工作中学到的一些东西,方便日后查阅。另外一个其实也有赠人玫瑰的想法,记录下工作中碰到的一些棘手的问题,方便同行交流。所以其实访问量不大,对图片的需求也不大,但是有时候博客里有必须得放一两张图。
所以最初在网上调研的时候,基本是看有哪些免费好用的图用的图床,当时看七牛的评价挺好的,一个月有10G的免费流量,一般人根本用不到那么多,而且方案也比较成熟,各种工具啥的都有,就用七牛了。所以现在不让用了,就调研了下其他的方案,网上有推荐其他免费图床的,反正我是真不敢用了。还有一些推荐阿里OSS的,不过据说收费比较复杂,用之前先好好研究下收费公式,因为我只有腾讯的vps,所以就只关注了腾讯的方案,腾讯和阿里OSS对应的服务叫COS,并且收费方式也很良心,大家可以去这里看下:定价对象存储 COS
COS计费规则截图
所以看这个图,基本上你可以不用花钱,流量肯定够你用了.

图片备份

这里有个很坑的地方就是,如果你的测试域名过期,你上传到七牛云的文件你是没办法直接访问的,你会发现点击预览和下载都是没有反,你会发现点击预览和下载都是没有反应的,这是因为你上传生成的域名链接已经被回收了,是无法通过网页URL来访问的,只能通过其他接口来操作,主要有下面几个步骤:

新建存储空间

之前那个存储空间里面上传的文件已经没有办法访问了,但是可以创建一个新的存储空间,通过其他接口把文件都转移到新的存储空间,这样就可以访问那些失效的文件了,比如可以建一个新的存储空间叫backup

下载开发工具

命令行工具(qshell),这个工具提供了很多接口,下载下来解压就能直接用,根据的操作系统选择对应的就行,详细的可以看下载界面的链接,如果只用一次,也不用去设置什么环境变量了,直接开始搞

1
2
3
4
5
6
7
8
9
10
11
# 我的是mac,所以用的是下面这个,具体的取决于你的系统
sudo chmod +x qshell-darwin-x64
ln -s qshell-darwin-x64 qshell
# AK/SK 需要去 个人中心->密钥管理 看下你自己的
./qshell account <AK> <SK>
# 把过期存储空间所有文件列表保存到文件
./qshell listbucket <old存储空间> list.txt
# 切割出文件名
cat list.txt | awk -F '\t' '{print $1}' > list_final.txt
# 把过期的文件列表搬迁到新的存储空间,我这里会出现让输入一个确认字符串,照着输入就行
./qshell batchcopy <old存储空间> backup list_final.txt

然后就可以在网页上的新的存储空间看到之前那些无法查看的文件了.

批量下载到本地

qshell提供了qdownload可以批量下载文件,不过官网给出的api文档特别标注了,这个接口默认是要收费的:配置【该功能默认需要计费,如果希望享受10G的免费流量,请自行设置cdn_domain参数,如不设置,需支付源站流量费用,无法减免!!!】,先看下用法:

1
qshell qdownload [<ThreadCount>] <LocalDownloadConfig>

第一个下载线程数参数是个可选参数,可以不用管,主要是需要写个配置文件,并且记住,得配置下cdn_domain这个参数,新建一个配置文件batch_download.conf:

1
2
3
4
5
6
7
8
9
10
11
12
{
"dest_dir" : "/xxx/xxx/Downloads/qiniu",
"bucket" : "backup",
"prefix" : "",
"suffixes" : "",
"cdn_domain" : "http://pgiolcvny.bkt.clouddn.com",
"referer" : "",
"log_file" : "download.log",
"log_level" : "info",
"log_rotate" : 1,
"log_stdout" : false
}

备注:cdn_domain这个就是你的backup这个存储空间的对外访问域名,每个参数的具体含义及使用事项在这里可以看到qdownload参数解释,配置好之后就可以执行:

1
./qshell qdownload batch_download.conf

终端中就可以看到日志,然后在dest_dir中就可以看到你要下载的文件了。

上传到腾讯COS

我们把所有的文件下载下来之后,然后还需要把文件上传到COS,这样图片才可以作为资源被外部访问,如果你之前没有使用过对象存储服务,还需要先创建一个存储桶,记住权限要设置成对外可读(不然别人也访问不了),然后把这些文件上传到这个存储桶里边,这个在网页上就可以直接操作,可以批量把刚才下载的都上传了。

批量替换

然后就只剩一步了,我们现在可以通过腾讯的COS来作为我们的图床服务,所以如果你写的新的博客,可以直接用新的地址,但是你之前写的那些博客,都是七牛的域名,所以需要把博客的原始文件里面的图片链接全部替换成腾讯COS的域名,老的域名可以看你的博客文件,我的是:http://7xn9y9.com1.z0.glb.clouddn.com,然后新的域名可以直接在腾讯云控制台,点开一张你上传过的图片查看,我的是:https://blog-1254094716.cos.ap-chengdu.myqcloud.com.具体的文件名因为都是一样的编码方式,所以只用替换域名就行,这里可以用sed命令来批量操作:

1
2
3
4
5
6
7
8
cd source/_post

# Linux用户
sed -i 's#(http://7xn9y9.com1.z0.glb.clouddn.com#(https://blog-1254094716.cos.ap-chengdu.myqcloud.com#g' *.md

# Mac用户
sed -i -e 's#(http://7xn9y9.com1.z0.glb.clouddn.com#(https://blog-1254094716.cos.ap-chengdu.myqcloud.com#g' *.md
rm *.md-e

**NOTE:**之所以替换的链接带上(是为了防止误伤,比如这边文章里就有七牛的域名链接地址,但是图片链接在MarkDown写法里都是放在括号里的,所以记得这么替换就行。

然后你可以去访问下你的博客,找一篇有图的,应该是可以访问的。

引言

最近打算写一下关于Kafka系列的文章,在整个数据体系中,Kafka扮演着一个非常重要的角色-数据总线.作为一个数据开发工程师,在数据的采集/存储/流计算/ETL/数据仓库/数据应用这几个方面,Kafka起到的作用是非常大的,甚至会影响到其他组建或者环节的技术选型.
在很久以前,在Kafka还没那么成熟的时候,很多的数据基础组件在设计之初并没有考虑到数据接收/数据输出解藕,以及数据容灾式持久化,往往都需要配合第三方或者额外开发其他的组件去保证数据的吞吐量以及可靠性,例如sqoop,canal.不过这个都不是本文的重点,本文的重点在于:为何现在很多公司都把Kafka作为整个数据链路的数据总线,这里很关键的一点是–吞吐量.

常用MQ介绍

这里我不会对其他MQ做过多的介绍,但是市面上的主流MQ也必须有个大概的了解,之所以流行开来也是有其独特的优势,Kafka也不例外,先放一张阿里云栖社区做的MQ对比图:
业界主流MQ对比
主要看下吞吐量这个地方,除了和RocketMQ领先的不多,基本上是碾压其他的MQ.一般性能好的MQ吞度量能达到几十万这个量级就非常厉害了,但是可以看到用机械磁盘的Kafka单机TPS差不多可以到200w了,那么对于一个集群而言,几百万的TPS完全不在话下.

这就引出了本文要讨论的一个重要问题:Kafka为什么这么快,吞吐量为何这么惊人?

Kafka吞吐量之谜

这个问题要想回答的好或者说回到的全面,其实并不简单.一个系统设计的这么好,往往是多方面综合考虑的结果,当然在剖析Kafka性能之前,先大概说一下实际使用情况下Kafka性能是否真的如网上说的那么优秀.因为目前BU内部的Kafka是由自己维护,所以规模不是很大,但也支撑了整个BU所有的日常数据业务.

线上规模

集群| 版本 | CPU | 内存 | 磁盘 | 网卡 | brokers数量
—–|—–|——|——|——–|——–
1| 0.8.2.1 | 32核| 64G |2T | 10Gbps | 5
2| 1.0.0 | 32核| 64G |2T | 10Gbps | 5
3| 0.10.2.1 | 32核| 128G |2T | 1Gbps | 3

目前主要数据在0.8.2.1,这个集群使用了大概有3年了,一直很稳定,也承载了几乎所有的数据,可以注意到两个上面的机器其实对配置要求不是很高,但是对磁盘(机械磁盘,T级别)和网卡(万兆)要求会稍微高一些.从这里可以看出,Kafka的性能瓶颈一般在磁盘和网卡.对CPU和内存的要求其实不是很高.实际使用也确实是,最开始本来也是千兆网卡,后来发现brokers节点容易出现网卡被打满,性能上不去的情况.还有就是磁盘有时候会不够用.

下面来说一一介绍一下,为啥Kafka吞吐量能做到这么高.

存储设计

单机写入能到百万级别,并且还是廉价的磁盘.要知道读/写机械磁盘,寻址操作是一个很耗时的IO操作,这也就是为什么现在的DB或者像ES这样的存储系统都慢慢换成SSD了.
Kafka是怎么做的呢,它在设计之初的一个目标就是:

以时间复杂度为O(1)的方式提供消息持久化能力,即使对TB级以上数据也能保证常数时间复杂度的访问性能.

所以Kafka一开始便被设计成一个日志系统,消息只能append,这使得Kafka非常适合用来作为数据总线,所有的数据根据到来的顺序被顺序序列化到文件末尾,然后消费者也是按顺序消费.
实际测试使用中,顺序读写机械磁盘有时候比随机读写内存的吞吐量还要好.当然这个还归功于CPU的工作方式,在加载数据的时候,CPU会预测,连带读取一整块数据,下次读取如果命中,就直接从内存中读,也不用再去加载.

Producer

消息的写入主要由producer完成,首先简单说下Kafka的消息结构,每一个主题topic的消息有多个partition组成,每个partition都会有leader,follower.这里强调一下,不管是producer写数据还是consumer读数据,都是跟leader打交到,follower只负责从对应的leader同步数据.followerleader一起构成了这个partitionISR(同步复制队列),如果follower复制没有跟上,会被从ISR中剔除.所以只有当leader节点挂掉的时候,ISR中的follower节点才有可能备胎转正,数据的读写有新的leader节点负责.
所以说到写数据,就必须要说到KafkaAck机制.有时候性能和可靠性本身就是矛盾的,Kafka发送数据光快还不行,还得保证可靠性.
Ack机制

  1. 0:表示producer无需等待leader的确认,
  2. 1:代表需要leader确认写入它的本地log并立即确认,
  3. -1:代表所有的备份都完成后确认

问题产生

之前有一段时间beta jstorm集群打日志老是把磁盘都占满了,一开始懒得管,每次都是把dump堆栈文件直接删掉了.但是只是缓解了问题,后面磁盘在一段时间之后还是会别打满,后来有空排查了下问题,简单记录一下.
首先关于如何排查线上Jstorm作业问题这个,其实Jstorm UI也提供了线上的界面,可以查看作业执行日志以及dump堆内存,但是想看实时日志的话还是得到对应机器上查看,另外有些时候还得查看Worker日志才能比较好定位问题,这里用一个线上的实际问题来看下一般怎么排查问题.

问题描述

问题是这样的,Jstorm集群的机器最近经常磁盘报警,线上查看发现是Worker生成了很多*.hprof文件.这里简单说下,Jstorm集群的一个具体的Worker进程在发生OOM的时候会生成dump文件,也就是*.hprof文件,具体参数就是我们常说的-XX:+HeapDumpOnOutOfMemoryError.所以如果程序写的有问题某些参数设置的不对,或者数据量太大导致OOM的话,会不停的dump内存生成文件,直到磁盘被耗光.

问题排查

首先既然是磁盘被耗光了,那先看下磁盘上哪个Worker日志文件占用的磁盘空间最多,定位到具体的Worker之后,我们可以看下jvm运行信息:
Jstorm Worker Jvm运行统计
可以看到Perm区已经100%了,其他几个区使用率很低,FullGC一直在增加,这个时候基本上就可以判断是Perm区的问题了,我们再确认下Worker的启动参数,直接ps -axu | grep 32502即可:
Work启参数
注意下红色标出来的部分-XX:PermSize=33554432 -XX:MaxPermSize=33554432,算一下基本就是32M这么大.方法区一般存的就是类的信息,这说明加载了太多的Class,可以用jmap -histo:live 32502看下,会发现确实有好多xxxClass的实例,说明确实是.最后我还看了对应的代码文件,发现也的确是用到了很多的的包,并且还是写在static代码块做初始化的.所以最后我改了下启动参数,具体为修改storm.yaml文件:

nimbus.childopts: “ -Xms1g -Xmx1g -Xmn500m -XX:PermSize=50m -XX:SurvivorRatio=4 -XX:MaxTenuringThreshold=15 -XX:+UseConcMarkSweepGC -XX:+UseCMSInitiatingOccupancyOnly -XX:CMSInitiatingOccupancyFraction=70 -XX:CMSFullGCsBeforeCompaction=5 -XX:+HeapDumpOnOutOfMemoryError -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=5 -XX:GCLogFileSize=100M -XX:+UseCMSCompactAtFullCollection -XX:CMSMaxAbortablePrecleanTime=5000 “
**PS:**当然不是说你改了就可以生效了,你需要先把对应的Worker进程干掉,我一般是直接kill <pid>,因为只要不是强制杀掉,Jstorm会在其他节点重启这个Worker,所以不会丢数据啥的,就直接杀掉了,然后是重启Supervisor进程,然后那些新提交的启动的Worker就会用新的参数启动,所有的就这么简单了,完事儿收工睡觉.

总结

其实在使用了这么久的Jstorm后,也想总结一下个人对Jstorm的看法,或者说大一点,对实时计算框架的一点看法,因为我本人也没有用过原生的Storm,所以这两者的框架优缺点我就不展开了.
首先说下Jstorm,也是目前我们采用的流处理框架,目前业务日志实时处理都是用的Jstorm,处理之后的日志会存到ES/Hbase/Redis这几个地方,上层会有应用去实时使用这些数据.因为日志是不可变更数据,所以Jstorm的异步+compent(并发度) 可以很好的提高吞度量而不用去考虑数据的处理顺序而去而外考虑同步态.并且日志数据的要求会低一些,就是允许丢失部分数据,所以在写ES的时候,也是bulk+异步的方式写,也不至于存储过程卡住.
然后要说的是Spark Streaming,这个也是流处理框架,不过之前更多的是被称作Micro Batch框架,现在好像也支持真正意义上的流式处理了,细节就不多说了,没有实际用过.Spark Streaming在部门使用场景主要是为了处理db binlog数据.前面也说了Jstorm是异步的,意思就是不同的bolt之间异步,同一bolt不同task之间也是异步,但是有一些数据处理场景需要考虑到处理顺序以及同步,如果都是异步处理,那么数据的最终顺序可能会和读取的顺序不一致,所以这个时候就需要采用Async+Sync结合的方式处理.对应Spark Streaming来说就是batchbatch之前同步,batch内部job之间,task之间异步.
这样的好处也比较明显,batch之间类似于有一层屏障来控制顺序,但是batch内部的task并发处理数据,吞吐量也不会受同步影响太高.缺点也比较哦明显,需要等待前面的batch完成,所以latency必定比不上真正的流式处理框架Jstorm.
最后要说的一个是Flink,最初开始了解这个东西是在2016年,所以Flink诞生的时间最晚,因此在设计上也更加的先进(如果不是那也就没有必要重复设计).这个东西目前部门内部没有用,但是公司是主推新的作业尽量用Flink来开发.后面也打算把计算全部迁移到Flink,不过工程量可能有些大.就我调研和使用场景来说.这个东西确实要比Jstorm好,说几个我个人的感觉:

  1. Flink支持Scala开发,目前Jstorm还是用Java开发,做过数据开发的人就应该知道,Scala在数据处理方面确实要比Java写起来开发效率高很多,Java其优势还是在Web后端这块.
  2. 调试.这个开发过你就知道了,可能当初Jstorm的设计人员并没有考虑到这个问题,虽然Jstorm也可以在本地调试,但是需要你写不一样的代码;而Flink以及Spark Streaming这两者代码基本不需要怎么改就可以在本地调试,开发上更人性化.
  3. 设计理念,毕竟是后出来的,肯定设计的初衷也是为了解决当前框架无法解决,或者说无法优雅解决的问题.所以会吸收精华部分,摒弃糟粕,设计理念也会更加先进.尤其是一些关键点:吞吐,延迟,反压这些问题.

另外还有个人建议不使用Jstorm的理由,从去年发布Jstorm 2.2/2.4之后,差不多有一年都没有更新了,社区活跃度不高,Issue基本上没人管了.不过有些东西比较有价值的,flue-core,这个东西其实是Storm的一个插件,当让也可以在Jstorm里面用,简单来说就是可以写一个yaml配置文件去定义一个作业,这样就不用再编译提交jar包来启动作业了.那么我们可以定义或者提前编写一些通用的公共bolt组件,做一个平台来开发Jstorm作业,可以做到页面化开发而不用手写Java代码.

最后说一下自己在使用这些大数据的开源组件的一些见解,其实用过很多组件之后会发现,他们之间会有一些共同的设计理念,举个flume的例子:
flume架构
这里有三个比较重要的组件:source,channel,sink.我不知道是不是flume首创的这个,但是从组件的开源时间上来看应该是.很巧的是flink里面也是这么个结构,所以确实可以说这个结构真的是一个优秀的设计思想.尤其是channel这个思想,其实很多的开源组件在设计的时候并没有引入这种设计,比如Canal,所以只能自己去实现具体的数据存储功能,其实这个是不利于推广的,不能开箱即用.正因为Channel的存在,所以source/sink可以实现复用及自由组合,比较灵活,扩展性很强,但是需要有一个Channel支撑,Kafka就是这么个存在,所以Kafka的那几个哥们后来出来创业了,围绕Kafka创建了Confluent,这个里面围绕Kafka创建了各种不同的source/sink,基本涵盖了所有的数据源以及存储源,这种通过一个Channel来缓冲以及解藕不同的逻辑单元,在数据处理领域来说应该是一种非常值得借鉴的思想.在建设基础数据体系或者一个系统的时候,可以多考虑这种结构.

作为一个后端开发,尤其是数据开发,我们很多的服务都是以进程的方式运行在后台.例如Jstorm/Kafka/ElasticSearch等等,线上报警处理也是一个必备技能了,更多的可能是一些磁盘,内存,CPU指标之类的,有些命令不常用可能会忘记,做个记录方便查找.

磁盘

这个应该是最常见的,磁盘报警是家常便饭了.

df -lh

查看系统磁盘占用情况,一般磁盘报警了可以先用这个命令看下大概是哪个盘出问题了,找到占用比较大的磁盘有时候需要配合其他命令:

1
2
3
4
5
# 到具体的目录下执行
# 1.快速方法
du -sh
# 2.推荐方法(-x 可以过滤掉和一开始文件系统不一样的文件/文件系统)
du -h --max-depth=1

这里不推荐第一种方式是因为效率问题,如果碰上是根/目录满了,基本上统计不出来,如果为了快,想配合排序定位问题,还可以配合sort函数使用

1
2
# 倒序排(一般占用大基本上就是看G,很少会到T,毕竟磁盘没那么大,也有例外)
du -h --max-depth=1 | grep [TG] |sort -nr | head

find

这个命令紧跟在上一个命令后面,就是因为很多时候我们需要批量删除满足某些条件的文件,但这些文件可能并不是简单的在一个文件夹下面,类型一样.可能分散在某个目录下面的多级目录,并且类型也很多:

1
2
3
4
5
6
7
8
# 根据文件类型来删,比如日志文件 *.log
find . -name '*.xxx' -delete
# or
find . -name '*.xxx' -exec sudo rm -f {} \;
# 根据时间 -mtime:内容时间 -ctime:状态修改
find . -mtime +10 -a -ctime +10 -delete
# 根据文件大小 >100k <500M
find . -size +100k +size -500M -delete

**NOTE:**find命令-a:与,-or:或,not:否.另外使用删除请慎重,可以先用-print打印一下看看是否满足要求,不然删了不该删的后果可能很严重.

swap

有时候磁盘满了可能是swap设置的太大,占用的swap又无法释放(内存就算有很大空闲,swap已经使用的可能也不会释放),导致磁盘一直报警,处理这个要稍微麻烦一点儿.首先看下磁盘占用比较大的几个程序,或者说看下有没有比较重要的服务占用着缓存,有的话重启下,先把重要的程序占用的swap释放掉,然后关闭缓存,开启缓存:
看一次实际线上问题:

1
2
3
4
$ free -m
total used free shared buff/cache available
Mem: 11855 2366 9146 157 341 9144
Swap: 4095 2600 1495

可以看到内存有很多空闲,但是swap占用了2.6G无法释放,然后统计下是哪些进程在占用着swap:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
$ for i in $(sudo ls /proc | grep "^[0-9]" | awk '$0>100'); do sudo awk '/Swap:/{a=a+$2}END{print '"$i"',a/1024"M"}' /proc/$i/smaps;done| sort -k2nr | head
awk: fatal: cannot open file `/proc/10612/smaps' for reading (No such file or directory)
awk: fatal: cannot open file `/proc/10613/smaps' for reading (No such file or directory)
awk: fatal: cannot open file `/proc/10614/smaps' for reading (No such file or directory)
awk: fatal: cannot open file `/proc/10615/smaps' for reading (No such file or directory)
awk: fatal: cannot open file `/proc/10616/smaps' for reading (No such file or directory)
20211 466.664M
23994 215.438M
9339 208.672M
9334 167.766M
9340 152.559M
9338 132.375M
20293 88.9883M
9342 86.918M
9335 84.4492M
4323 76.8984M

可以看到进程号和对应的缓存占用大小(如果不想看到错误信息,可以grep -v “No such”过滤掉),看下具体的有没有比较重要的,如果想看具体的程序,后面可以用awk切割出pid,然后配合ps -p xxx看下具体的进程是不是重要的,手动重启下.然后就是比较关键的两个操作:

1
2
3
4
# 关闭所有缓存
swapoff -a
# 开启所有缓存
swapon -a

这样swap就全部被清空了.

lsof

这个命令可能一般人用的比较少,这个主要是看文件句柄的,有时候一个文件很大,我们直接删了,但是会发现磁盘空间并没有释放.这个时候一般就是还有进程占用着这个文件句柄,所以你可能在目录里面看没有什么文件,但是磁盘就是被占用了.可以这么排查:

1
2
# 查看文件为删除状态的文件
lsof | grep deleted

这个命令里面就可以看到是哪个进程持有这个文件的句柄,比如像tomcat,有时候日志打太多了,但是你直接删掉,磁盘根本不会减少,这个时候你可以重启下tomcat,就会发现文件占用的空间又回来了.

cpu/mem

这两个一般都是用top命令来看,所以放在一起说了.一般top之后可以看到进程的实时信息,top有一些常用参数命令

1
2
3
4
5
c top显示带具体进程信息
M 按mem占用排序
P 按cpu占用排序
# 具体线程信息
top -H -p {pid}

具体的进程问题,可能得用具体的方法,这里就不做展开了.

最近项目基本上进入尾声了,也有时间来整理下最近做的这个项目.因为主要就一个人在做,所以周期比较长,整个系统涉及到的开源数据框架比较多,所以感觉还是有不少价值的,当然这个里面也有很多的坑,只有做过才能体会到,后面我会慢慢展开.

项目背景

主要是两个公司合并了,哪两家就不说了,反正是行业Top1,Top2.后来打算成立新公司,所以数据需要整合.其实在一开始合并之后,数据就有陆陆续续的整合,不过这种整合方式效率比较低,整个链路很长,涉及到很多部门,圈绕的比较大.具体的流程大概说下:

  1. 首先数据在我们的数据仓库ETL跑完之后,会有一个下游作业把数据拷贝到一个公共hadoop队列里面(Hadoop权限这块比较差,由于支付的数据要求较高,在防火墙内,不可能共享Hive数据仓库所在目录的数据,所以只能采取这种方式共享数据);
  2. 然后由专门和对方公司数据部门对接的人来把我们拷贝到指定队列的数据从hadoop上下载下来,上传公有云服务器(其实就是FTP服务器);
  3. 然后对方公司的数据组有专门的人从公有云把数据下载到他们的Hadoop客户机上,然后把数据上传到他们的Hadoop集群,然后数据才能达到基本能用的状态.

**PS:**由于Hive本身作为离线数据仓库,数据延迟为T+1,然后数据再到他们那,延迟就变成了T+2,并且整个过程涉及到的部门比较多.数据不出问题还好,一旦出了问题,得找到每个环节的负责人,然后定位问题出现的环节,总之是很麻烦.

所以上面的老大也觉得这种方式使用数据非常的麻烦,成本太大,关键是数据的时效性也不满足要求,所以就想做实时数据流同步,整合两边的数据仓库.当然数据仓库只是实时数据流的一个用途之一,也会有其他数据使用场景,比如实时计算之类的.

设计方案

一般而言,一个完整的数据体系结构都比较复杂,我这里就大概说下整体架构:

数据体系结构图
如果觉得图片太小看不清可以在新标签中打开查看大图.我会从下而上简单说下这个架构里面都有些什么东西:

数据采集

数据系统的第一步,数据采集.这部分数据主要分为两大类,DB数据以及业务日志

Database

第一类就是比较主要的,业务数据库数据采集,目前基本上都是用的mysql数据库,所以采集方案比较大众化都是采用阿里的Canal,当然一般可能不能直接拿来用,需要你根据公司的技术架构做一些二次开发,因为一般公司内部会对MySQL做一些架构来给业务提供高可用的特性.数据采集之后会封装成自定义的Kafka Message,为了方便区分,topic命名方式可以有一个统一规则,例如binlog_{db实例架构类型}_{db实例名},这样能比较方便知道某个db实例的binlog数据特性以及排查问题.这里需要注意的就是往Kafka上写数据的时候,Kafka的分区策略要注意下,一个参考建议就是可以采用(Schema,Table)作为PartitionKey.这样可以保证同一个表的数据是顺序存放在同一个分区的,这样后续的一些程序处理可以保证数据的顺序性.
**NOTE:**封装的KafkaMessage可以根据业务来,如果想简单起见,可以把Canal的RowChange消息整个封装进去.

Tomcat Log

这部分数据原则上来说,并没有Database数据重要,因为我们建的大部分数据表,模型都是基于业务的ER关系表来的.但是日志数据可以做很多其他的事情,数据挖掘里面也有很大的用途,还有的主要就是业务辅助用,后面会讲一些使用场景.业务日志采集采取的方案比较多,有采用入侵式埋点的,但是现在的普遍做法应该还是非入侵式采集,因为日志采集事实上对业务而言是一个非必须的功能,但是线上业务首先要保证性能和可靠性不受其他无关影响.
采集业务日志一般采用的就是flume tail功能,对指定日志目录下面的文件执行类似Linux:tail -f命令不断滚动采集新产生的日志.这个里面也有很多的坑,目前最新的flume好像支持直接采集一个目录,这样就不用自己写tail插件去采集日志了.这部分日志采集之后也是放在Kafka集群,同样的也需要自己封装成定义的KafaMessage.建议是需要保留日志的文件名以及机器名,所以分区策略是用这两个key就可以了.
**NOTE:**一般大一点儿的公司,线上的业务系统都会有appId这种东西,所以建议如果有这个信息也封装到KafkaMessage消息里面.

数据计算

数据存储之后,就是数据的计算了,Kafka毕竟只能存一段时间,过期就会删除,所以这些消息会落地到其他的存储系统,不同类型的数据有不同的落地场景,相同的数据也可能有不同的落地场景,下面一个一个说下这些应用及落地场景:

Binlog->Hive

Database的binlog数据采集到Kafka之后,最大的一个用途就是作为数据仓库的ods层源数据.由于离线数据仓库一般延迟要求不高,基本上默认就是T+1,所以我们不用事实计算这部分数据,反而是吞吐量是个很重要的指标.所以这里可以考虑的就是Spark Streaming这个框架,微批处理,可以在延迟和吞吐量之间做一个很好的平衡.
当然如果你没有Spark集群,也可以用Flume消费Kafka消息写到本地(据说直接写HDFS性能不是很好,没有测试过),然后隔一段时间put到hdfs上.关于写的策略,因为每天的binlog是增量数据,所以你需要一个字段作为binlog的分区依据,可以在将Canal消息封装KafkaMessage的时候加上一个binlog的执行时间.
**关键点:**这个过程其实坑还是挺多的,Binlog一般并发比较大,你要保证数据可以准确去重,不会取到错误的数据.

Binlog->ElasticSearch

这个需求和Hive那个很像,但是又有一些区别.Hive不支持单行记录更新(据说貌似新版本的支持了,没去详细了解过),ElasticSearch简单来说就是作为Database里面的表的一个镜像.因为MySQL的单表数据量在超过5000w的时候性能会变得非常差,所以业务上为了性能考虑,都会对表按月或者其他规则做分表.所以一般像运营,开发,测试在查询数据的时候非常的不方便.而ElasticSearch本身就是作为索引系统而存在的,非常适合有大量的查询场景.
这个因为并不是同步到离线数据仓库里面,所以对数据的延迟要求会高一些,这个时候Spark Streaming就不太合适,那么可不可以使用Jstorm这种流式处理框架呢?答案是不行,原因也很简单,JStorm这种流处理框架是at least once语义,也就是能保证数据至少被处理一次(当然好像后面的版本是准备实现还是已经实现了exactly only once,反正我们自己用的并没有内置支持),并且还有一个很重要的原因就是:JStorm无法保证数据的顺序,一个拓扑里面的每个bolt会有多个task,数据如果是按随机分发的话,不同的task处理速度是没办法预估的,这就可能导致先产生的binlog变更记录有可能后写入到ElasticSearch,就容易出现数据不一致的问题.所以这种情况下异步流处理框架是没有办法处理这种问题的,所以在技术选型上我们可以选像Spark Streaming这种顺序批处理(但是也有条件限制,就是每次只能有一个batch在执行,队列设置成FIFO).
这里简单说下我们的技术方案:首先从Kafka消费Binlog,清洗出每个表的Primary Key,封装成自定义的Kafka Message写回到Kafka,然后再从Kafka去消费这种消息,拿到Primary Key,然后查询对应的Database,这样每次都是拿到的最新的记录,然后更新ElasticSearch,这样就不用担心数据变更记录顺序不一致的问题,因为每次拿到的都是最新的.至于历史数据问题也比较简单,直接把当前表里面的记录清洗出Key写到Kafka,后面作业不用改,自然就会消费到这些消息,整体架构如下:

Binlog-ElasticSearch

**优化点:**其实这个项目后期还是有一些优化点的,目前为了简单起见是采取的中转了一次,并且反查Database的方案,这样可以不用考虑数据的顺序,正确性问题,因为反查拿到的数据永远是当前时间点的正确数据.但是也会有一个问题,就是服务部署的机器无法做到动态扩容,需要有指定Database的连接权限,公司的Database都是采用的ip白名单方式,需要提前申请权限.其实如果Kafka保证Binlog的数据正确性,那么下游完全不用再反查Database,直接按变更记录查询就行了,唯一需要担心的就是数据初始化和已有数据同步作业同时跑的一个顺序问题.

Tomcat Log->Redis/Hbase/ElasticSearch/Hadoop

这一步相信会有很多的应用场景,基本就涵盖了几乎所有的日志使用场景了,分别大致说一下几个场景吧.

Trace系统

这个应该算是最终要,最有意义的一个东西:分布式式追踪系统.基本上一个稍微大一点儿的公司都会有自己的Trace系统,一方面是大公司的系统比较复杂,一件事情往往需要很多个系统互相配合才能完成;另外一个就是现在微服务比较火,都在提倡系统解藕,所以调用链会比较长.业界有名的就是Google 的Dapper,具体的实现有阿里的鹰眼(不开源),Twitter的Zipkin.当然这不是我们这里的主要内容,我们只关心如何应用,也就是在有traceId的情况下,这些日志能有哪些应用场景.有大概这么几个场景:

  1. 服务异常排查: 这个就是用到的traceId,我们把这些日志通过流计算作业提取traceId,把日志按照qrtaceId存放,当某个服务出问题了,可以通过查询服务的调用链,分析具体的出问题在哪个环节.
  2. 用户行为/订单流程分析: 这个的原理其实和上面的差不多,只不过现在不是看traceId,而是看userId/orderNo.这个应用场景也很大,主要是发现异常数据,定位排查细节,一般我们会选取某个有问题的用户,查到他某个时间段的所有行为日志.上面这两个就比较适合存放在HBase中,至于为什么,了解HBase的特性的人应该就明白了.
  3. 运营分析: 一般对于有些系统,运营会关注系统的异常数据,在系统出现问题的时候,有时候运营需要查询到异常数据,然后批量做人工处理.我们也通过流计算提取所有的日志中的Execption Log,按应用名,系统,机器名,异常类型存放到ElasticSearch中.之所以放在ES中主要是因为可以安装ES-SQL插件,运营可以写SQL查询统计这些信息.当然也会有定时作业去汇总这些信息发邮件,按照系统给相关负责人发邮件.

Hive数据仓库

虽然数据仓库主要是业务的DB数据,但是日志里面也含有非常多的信息,而且DB受限于业务和性能,不会把所有的东西都存在DB里面,但是日志就自由多了,可以输出很多有用的信息,比如记录用户的位置/搜索/页面浏览记录,再比如修改密码/登录这些,往往DB里面只会记录成功的那条记录,中间很多失败的信息是只存在于日志里面的.所以这部分数据也会通过Streaming作业这个ETL过程并入到Hive数据仓库里面.PM也主要是分析用户行为日志来改进用户体验.

统一查询

这个说起来其实也不算一个单独的功能,上面的这几个其实说到底也是提供了统一查询的功能.这里要说的是一个统一入口问题,正常的开发线上排查问题,查看日志都是直接到对应的机器上查看的.但是当我们有实时日志采集系统之后,完全可以做到屏蔽机器这层概念,即通过我们的系统直接实时tail服务器上的日志,虽然会有延迟,但是在秒级别的话是可以忽略的.

未完待续

0%