请登录 | 注册 | 我的订单
您购物车暂商品,赶紧选购吧!
Spark Streaming 流式计算实战
bigsaas云应用市场 / 2019-06-19
这次分享会比较实战些。具体业务场景描述:

我们每分钟会有几百万条的日志进入系统,我们希望根据日志提取出时间以及用户名称,然后根据这两个信息形成

userName/year/month/day/hh/normal 

userName/year/month/day/hh/delay

路径,存储到HDFS中。如果我们发现日志产生的时间和到达的时间相差超过的一定的阈值,那么会放到 delay 目录,否则放在正常的 normal 目录。

Spark Streaming 与 Storm 适用场景分析

为什么这里不使用 Storm呢? 我们初期确实想过使用 Storm 去实现,然而使用 Storm 写数据到HDFS比较麻烦:

* Storm 需要持有大量的 HDFS 文件句柄。需要落到同一个文件里的记录是不确定什么时候会来的,你不能写一条就关掉,所以需要一直持有。

* 需要使用HDFS 的写文件的 append 模式,不断追加记录。

大量持有文件句柄以及在什么时候释放这些文件句柄都是一件很困难的事情。另外使用 HDFS 的追加内容模式也会有些问题。

后续我们就调研 Spark Streaming 。 Spark Streaming 有个好处,我可以攒个一分钟处理一次即可。这就意味着,我们可以隔一分钟(你当然也可以设置成五分钟,十分钟)批量写一次集群,HDFS 对这种形态的文件存储还是非常友好的。这样就很轻易的解决了 Storm 遇到的两个问题。

实时性也能得到保证,譬如我的 batch interval 设置为 一分钟 那么我们就能保证一分钟左右的延迟 ,事实上我们的业务场景是可以容忍半小时左右的。

当然,Spark 处理完数据后,如何落到集群是比较麻烦的一件事情,不同的记录是要写到不同的文件里面去的,没办法简单的 saveAsTextFile 就搞定。这个我们通过自定义 Partitioner 来解决,第三个环节会告诉大家具体怎么做。 

上面大家其实可以看到 Spark Streaming 和 Storm 都作为流式处理的一个解决方案,但是在不同的场景下,其实有各自适合的时候。 

Spark Streaming 与 Kafka 集成方案选型

我们的数据来源是Kafka ,我们之前也有应用来源于 HDFS文件系统监控的,不过建议都尽量对接 Kafka 。

Spark Streaming 对接Kafka 做数据接受的方案有两种:

*Receiver-based Approach 

*Direct Approach (No Receivers)

两个方案具体优劣我专门写了文章分析,大家晚点可以看看这个链接和 Spark Streaming 相关的文章。

我的技术博文

我这里简单描述下:

*Receiver-based Approach 内存问题比较严重,因为她接受数据和处理数据是分开的。如果处理慢了,它还是不断的接受数据。容易把负责接受的节点给搞挂了。

* Direct Approach 是直接把 Kafka 的 partition 映射成 RDD 里的 partition 。 所以数据还是在 kafka 。只有在算的时候才会从 Kafka 里拿,不存在内存问题,速度也快。

所以建议都是用 Direct Approach 。具体调用方式是这样:

0193f0ce3517aa1f39a091dd0929cb67bcdd552f

还是很简单的,之后就可以像正常的 RDD 一样做处理了。

自定义 Partitioner 实现日志文件快速存储到 HDFS

d716dafc2b51f7ee896209c64579a2ce41322625

经过处理完成后 ,我们拿到了logs 对象 。

到这一步位置,日志的每条记录其实是一个 tuple(path,line)  也就是每一条记录都会被标记上一个路径。那么现在要根据路径,把每条记录都写到对应的目录去该怎么做呢?

一开始想到的做法是这样:

676553464115e605a087ec2693c4b1841d4a7e62

首先收集到所有的路径。接着 for 循环 paths ,然后过滤再进行存储,类似这样:

8d33cde81d974e45c6ce9b404f999f3bf11a69d0

这里我们一般会把 rdd 给 cache 住,这样每次都直接到内存中过滤就行了。但如果 path 成百上千个呢? 而且数据量一分钟至少几百万,save 到磁盘也是需要时间的。所以这种方案肯定是不可行的。

我当时还把 paths 循环给并行化了,然而当前情况是 CPU 处理慢了,所以有改善,但是仍然达不到要求。

这个时候你可能会想,要是我能把每个路径的数据都事先收集起来,得到几个大的集合,然后把这些集合并行的写入到 HDFS 上就好了。事实上,后面我实施的方案也确实是这样的。所谓集合的概念,其实就是 Partition 的概念。而且这在Spark 中也是易于实现的,而实现的方式就是利用自定义 Partioner 。具体的方式如下:

cb19375e7da7296b6fcbdd64dcb8930870b35c8f

通过上面的代码,我们就得到了路径和 partiton id 的对应关系。接着遍历 partition 就行了。对应的 a 是分区号,b 则是分区的数据迭代器。接着做对应的写入操作就行。这些分区写入都是在各个 Executor 上执行的,并不是在 Driver 端,所以足够快。

我简单解释下代码 ,首先我把收集到的路径 zipWithIndex 这样就把路径和数字一一对应了 ;接着我新建了一个匿名类 实现了 Partitioner 。numPartitions 显然就是我们的路径集合的大小,遇到一个 key (其实就是个路径)时,则调用路径和数字的映射关系 ,然后就把所有的数据根据路径 hash 到不同的 partition 了 。接着遍历 partition 就行了,对应的 a 是分区号,b 则是分区的数据迭代器。接着做对应的写入操作就行。这些分区写入都是在各个 Executor 上执行的,并不是在 Driver 端,所以足够快。我们在测试集群上五分钟大概 1000-2000w 数据,90颗核,180G 内存,平均处理时间大概是2分钟左右。内存可以再降降  我估计 100G 足够了  。

在演示场景中,Spark Streaming 如何保证数据的完整性,不丢,不重

虽然 Spark Streaming 是作为一个24 * 7 不间断运行的程序来设计的,但是程序都会 crash ,那如果 crash 了,会不会导致数据丢失?会不会启动后重复消费?

关于这个,我也有专门的文章阐述(https://yq.aliyun.com/articles/60120),

我这里直接给出结论:

* 使用 Direct Approach 模式

* 启用 checkPoint 机制

做到上面两步,就可以保证数据至少被消费一次。

那如何保证不重复消费呢?

这个需要业务自己来保证。简单来说,业务有两种:

* 幂等的

* 自己保证事务

所谓幂等操作就是重复执行不会产生问题,如果是这种场景下,你不需要额外做任何工作。但如果你的应用场景是不允许数据被重复执行的,那只能通过业务自身的逻辑代码来解决了。

以当前场景为例,就是典型的幂等 ,因为可以做写覆盖 ,

8a95de8d680c824a1e41be21bb0f6feaf06331fc

具体代码如上 ,那如何保证写覆盖呢? 

文件名我采用了 job batch time 和 partition 的 id 作为名称。这样,如果假设系统从上一次失败的 job 重新跑的时候,相同的内容会被覆盖写,所以就不会存在重复的问题。

回顾

我们每分钟会有几百万条的日志进入系统,我们希望根据日志提取出时间以及用户名称,然后根据这两个信息形成  

userName/year/month/day/hh/normal 

userName/year/month/day/hh/delay 

路径,存储到HDFS中。如果我们发现日志产生的时间和到达的时间相差超过的一定的阈值,那么会放到 delay 目录,否则放在正常的 normal 目录。

我们作了四个方面的分析: 

Spark Streaming 与 Storm 适用场景分析 ;

Spark Streaming 与 Kafka 集成方案选型,我们推荐Direct Approach 方案 ;

自定义 Partitioner 实现日志文件快速存储到HDFS ;

Spark Streaming 如何保证数据的完整性,不丢,不重 。

阿里云产品价格优惠,提供专业的技术服务,咨询热线:400-1565-661
 
 

[ 相关下载 ]
用户评论(共0条评论)
  • 暂时还没有任何用户评论
,共 1 页。 上一页 下一页
我要评论
用户名: 匿名用户
E-mail:
评价等级:
评论内容:
验证码:
captcha