Spark Streaming整合Kafka(二)

云计算 waitig 602℃ 百度已收录 0评论

Direct 方法(没有Receiver)

一、概述

本方法是从Spark1.3版本引进的,提供更加强大的端到端的数据保障,改善数据丢失的情况,并且不再采用receiver的方式去接收数据,这方法周期性地从每一个topic分区里面查询kafka最近的偏移量(offsets),再周期性定义每个批次(batch)的offset的范围,相当于拿到每一个批次的偏移量后,再直接通过kafka的api,去kafka里面,把每个批次的offset的范围给读取出来直接操作,当启动任务去处理这个数据,使用kafka的simple
consumer API去从kafka里读取偏移量的范围,这跟读文件系统非常类似,这特性在Spark1.3里面支持Java和Scala,在Spark1.4版本里支持Python。

二 、优点

相比于receiver方式,本方式的优点有下面几点:

1.简化了并行度。我们不需要创建多个Input Stream然后把它们联合起来,而是直接使用direct stream进行处理;

2. 高性能。能达到0数据丢失。在第一种方式里面,我们需要把数据写到WAL里面,以副本的方式寸尺数据才能保证无数据丢失,这不是一种高效的方法,而采用第二种方式,不用receiver读取数据,不需要写WAL;

3. 满足只执行一次的语义。第一种方式是使用kafka的高级别的API在zookeeper里面存储offset,这是传统的方式从kafka里面消费数据,这会有小的机率造成数据丢失,这是由于SparkStream所接收的数据跟zookeeper所追踪的数据可能会出现不一致。而第二种方式,采用简单的kafka API,不使用zookeeper。Offset将被Spark Streaming
的checkpoints所追踪,这样就能消除Spark Streaming与 zookeeper/kafka的不一致的问题,因此,每条记录都将被Spark Streaming进行有效准确地一次性接收(尽管有时也会失败)。

注意:为了使你的结果输出满足只执行一次的语义,你的输出数据到其他数据存储设备的操作必须是幂等的,即是重复执行多少次结果都是一样的,或者采用事务的方式保存输出结果和offset,保持其原子性。

三、缺点

无法更新偏移量到Zookeeper里面去,使得基于zookeeper的kafka监控工具无法使用,需要自己周期性地把每个批次消费的offsets更新到zookeeper里去。

四、整合步骤

1. 导入依赖

groupId = org.apache.spark
 artifactId = spark-streaming-kafka-0-8_2.11
 version = 2.2.0

2.编码

引入KafkaUtils建DirectStream(注意与第一种方式的差别)

 import org.apache.spark.streaming.kafka._

 val directKafkaStream = KafkaUtils.createDirectStream[
     [key class], [value class], [key decoder class], [value decoder class] ](
     streamingContext, [map of Kafka parameters], [set of topics to consume])

Demo

import org.apache.spark.SparkConf
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}
import kafka.serializer.StringDecoder
/**
  * Spark Streaming对接Kafka的方式二
  */
object KafkaDirectWordCount {

  def main(args: Array[String]): Unit = {

    if(args.length != 2) {
      System.err.println("Usage: KafkaDirectWordCount <brokers> <topics>")
      System.exit(1)
    }

    val Array(brokers, topics) = args

    val sparkConf = new SparkConf() //.setAppName("KafkaReceiverWordCount")
      //.setMaster("local[2]")

    val ssc = new StreamingContext(sparkConf, Seconds(5))

    val topicsSet = topics.split(",").toSet
    val kafkaParams = Map[String,String]("metadata.broker.list"-> brokers)

    // TODO... Spark Streaming如何对接Kafka
    val messages = KafkaUtils.createDirectStream[String,String,StringDecoder,StringDecoder](
    ssc,kafkaParams,topicsSet
    )

    // TODO... 自己去测试为什么要取第二个
    messages.map(_._2).flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).print()

    ssc.start()
    ssc.awaitTermination()
  }
}

3.部署

步骤同方式一,spark-submit命令如下:

spark-submit \
--class com.imooc.spark.KafkaDirectWordCount \
--master local[2] \
--name KafkaDirectWordCount \
--packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.2.0 \
/home/hadoop/lib/sparktrain-1.0.jar  hadoop000:9092 kafka_streaming_topic


官网地址:http://spark.apache.org/docs/latest/streaming-kafka-0-8-integration.html


本文由【waitig】发表在等英博客
本文固定链接:Spark Streaming整合Kafka(二)
欢迎关注本站官方公众号,每日都有干货分享!
等英博客官方公众号
点赞 (0)分享 (0)