Spark Streaming整合Kafka(一)

云计算 waitig 889℃ 百度已收录 0评论

基于Receiver 方式整合

一、Kafka版本选择

Spark Streaming支持Kafka0.8.2.1及以上的版本。

Kafka项目介绍了两个新的Comsumer(消费者)API,在0.8版本和0.10版本之间,根据自身需求选择版本号,另外要注意,0.8版本是兼容0.9 0.10版本的broker,但0.10版本不兼容之前的版本,接下来我粘贴下官网的一张对比图:

笔者选的是0.8版本。

二、基于Receiver 方式整合

这种方式是使用Receiver接收数据,这个receiver是使用kafka高级的消费者API操作,所谓高阶,就是它的偏移量之类的是由zookeeper来完成的,低层次的就需要手动来管理。所有的数据都是从kafka里过来通过receiver进行接收,然后存储到Spark executor里面去,接着Spark Streaming启动job处理存储中的数据。

默认的方式在处理一些故障的时候,会丢失一些数据,为了确保0数据的丢失,需要在Spark Streaming里面开启WAL机制(Write Ahead Logs),这样就能在HDFS上面同步保存所有kafka里面的数据,即先写到日志里面去,再进行处理,如果出现故障,可以从日志里找回,就能避免数据的丢失。接下来,分析如何使用这种机制在stream应用里:

1.导入依赖

 groupId = org.apache.spark
 artifactId = spark-streaming-kafka-0-8_2.11
 version = 2.2.0

2.编程

我们需要导入KafkaUrils,生成一个输入DStream

 import org.apache.spark.streaming.kafka._

 val kafkaStream = KafkaUtils.createStream(streamingContext,
     [ZK quorum], [consumer group id], [per-topic number of Kafka partitions to consume])

此处有几点要注意下:

1)kafka的partition和RDD里的partition不是一个概念;

2)有多个kafka DStream,我们可以采用不同的组到topic上面,采用并行的方式进行接收,这样可以提升数据的吞吐量;

3)如果需要开启WAL机制的话,底层需要有个支持副本的,类似HDFS的文件系统,数据接收后,首先会以副本的方式存储到日志里面。对于 input stream 的storage level 需要设置成 StorageLevel.MEMORY_AND_DISK_SER ,大概意思也就是在内存和磁盘都存储一下。

Demo

import org.apache.spark.SparkConf
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}

/**
  * Spark Streaming对接Kafka的方式一
  */
object KafkaReceiverWordCount {

  def main(args: Array[String]): Unit = {

    if(args.length != 4) {
      System.err.println("Usage: KafkaReceiverWordCount <zkQuorum> <group> <topics> <numThreads>")
    }

    val Array(zkQuorum, group, topics, numThreads) = args

    val sparkConf = new SparkConf() //.setAppName("KafkaReceiverWordCount")
      //.setMaster("local[2]")

    val ssc = new StreamingContext(sparkConf, Seconds(5))

    val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap

    // TODO... Spark Streaming如何对接Kafka
    val messages = KafkaUtils.createStream(ssc, zkQuorum, group,topicMap)

    // TODO... 自己去测试为什么要取第二个
    messages.map(_._2).flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).print()

    ssc.start()
    ssc.awaitTermination()
  }
}

3.提交或部署

首先进行打包,命令为:mvn clean package -DskipTests ,使用Spark-submit,提交命令如下:

spark-submit \
--class com.imooc.spark.KafkaReceiverWordCount \
--master local[2] \
--name KafkaReceiverWordCount \
--packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.2.0 \
/home/hadoop/lib/sparktrain-1.0.jar  hadoop000:2181 test kafka_streaming_topic 1

注意的是,Spark Streaming会一直启动receiver来接收数据,如果结束掉这个job,就无法正常接收kafka的数据。本次测试的话,可以先跑起kafka后,再在Spark上运行一个任务,即启动个receiver来接收kafka的数据。

三、Receiver整合总结

1) 启动zk
2) 启动kafka
3) 创建topic
4) 通过控制台测试本topic是否能够正常的生产和消费信息


本文由【waitig】发表在等英博客
本文固定链接:Spark Streaming整合Kafka(一)
欢迎关注本站官方公众号,每日都有干货分享!
等英博客官方公众号
点赞 (0)分享 (0)