Spark的WordCount详解

云计算 waitig 522℃ 百度已收录 0评论

WordCount代码详解

1、创建SparkConf

/**创建SparkConf
设置设置本地执行,不需要安装spark集群
比如setMaster 设置集群Master ULR,如果设置local是Spark在本地运行 

**/val conf = new SparkConf()

conf.setAppName("Wow, My first Spark APP in IDEA!")

//conf.setMaster("local")

2、创建SparkContext对象

 /** 创建SparkContext对象
SparkContext是Spark唯一的入口程序,无论是采用Scala,Java
SParkContext的核心作业:初始化Spark运用程序所需要的核心组件
同时还会负责SPark程序往Master注册程序等,SParkContext是Spark中最为至关重要的一个对象
创建SparkContext通过传递SparkConf实例来定制Spark运行的具体的参数和对象*/

val sc = new SparkContext(conf)

3、读取HDFS数据创建RDD

/**

通过sparkContext读取Hdfs数据文件创建RDD

*/

val lines = sc.textFile("hdfs://master:8020/tmp/data")

4、通过flatMap拆分每个单词

/**

RDD的flatMap转换操作进行判断处理,对原RDD每个输入元素进行切分返回多个元素的新的RDD,

flatMap使用场景:

有时候,我们希望对每个输入元素生成多个输出元素。实现该功能的操作叫做flatMap()。和map()类似,

我们提供给flatMap()的函数被分别应用到了输入的RDD的每个元素上。不过返回的不是一个元素,而是一个返回值序列的迭代器

**/

val words = lines.flatMap { line => line.split(" ") }

5、在每个单词计算1的基础上根据出现次数进行累加

/**

RDD的单个元素转换为对应元素和1的元组Tuble<K,V>的新的RDD,其中的K为原RDD的每个word值,V为1

*/

val pairs = words.map { word => (word, 1) }

6、单词统计

/**

reduceByKey函数输入两个元素返回一个,也就是每个元组Tuble<K,V>中每个K相同的V进行处理,_+_代表直接进行V进行累加,

类似与MapReduce的Map处理

*/

val wordCounts = pairs.reduceByKey(_ + _)

7、打印统计结果

/**

collect:读取RDD数据加载到本地缓存,返回一个包含RDD中所以元素的数组

foreach:循环每个元组Tuble<K,V>,打印每个元组的K和V

*/

wordCounts.collect.foreach(wordNumberPar => println(wordCounts._1 + " : " + wordCounts._2))

8、关闭SparkContext

sc.stop

至此Spark WordCount全部代码详解完


本文由【waitig】发表在等英博客
本文固定链接:Spark的WordCount详解
欢迎关注本站官方公众号,每日都有干货分享!
等英博客官方公众号
点赞 (0)分享 (0)