Spark入门第一步:WordCount之java版、Scala版
Spark入门系列,第一步,编写WordCount程序。
我们分别使用java和scala进行编写,从而比较二者的代码量
代码语言:javascript复制数据文件 通过读取下面的文件内容,统计每个单词出现的次数
java scala python android
spark storm spout bolt
kafka MQ
elasticsearch logstash kibana
hive hbase mysql oracle sqoop
hadoop hdfs map reduce
java scala python android
spark storm spout bolt
kafka MQ
java scala python android
spark storm spout bolt
kafka MQ
elasticsearch logstash kibana
hive hbase mysql oracle sqoop
hive hbase mysql oracle sqoop
hadoop hdfs map reduce
代码实现
•使用java代码进行编写
代码语言:javascript复制package top.wintp.java_spark;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.api.java.function.VoidFunction;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import scala.Tuple2;
/**
* @author: pyfysf
* <p>
* @qq: 337081267
* <p>
* @CSDN: http://blog.csdn.net/pyfysf
* <p>
* @blog: http://wintp.top
* <p>
* @email: pyfysf@163.com
* <p>
* @time: 2019/10/26
*/
public class SparkWordCount {
public static void main(String[] args) {
// 复杂模式
// 创建SparkConf
SparkConf conf = new SparkConf();
conf.setAppName("spark_demo_java");
conf.setMaster("local");
// 创建javaSparkContext
JavaSparkContext sc = new JavaSparkContext(conf);
// 读取文件
JavaRDD<String> lines = sc.textFile("./data/words.txt");
// 截取单词
JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
@Override
public Iterator<String> call(String s) throws Exception {
return Arrays.asList(s.split("\s ")).iterator();
}
});
// 对单词进行计数
JavaPairRDD<String, Integer> pairWord = words.mapToPair(new PairFunction<String, String, Integer>() {
@Override
public Tuple2<String, Integer> call(String s) throws Exception {
return new Tuple2<>(s, 1);
}
});
// 根据key进行计算
JavaPairRDD<String, Integer> result = pairWord.reduceByKey(new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer i, Integer i2) throws Exception {
return i i2;
}
});
//打印结果
result.foreach(new VoidFunction<Tuple2<String, Integer>>() {
@Override
public void call(Tuple2<String, Integer> stringIntegerTuple2) throws Exception {
System.out.println(stringIntegerTuple2);
}
});
sc.stop();
}
}
•利用lamda表达式简化java代码
代码语言:javascript复制package top.wintp.java_spark;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.api.java.function.VoidFunction;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import scala.Tuple2;
/**
* @author: pyfysf
* <p>
* @qq: 337081267
* <p>
* @CSDN: http://blog.csdn.net/pyfysf
* <p>
* @blog: http://wintp.top
* <p>
* @email: pyfysf@163.com
* <p>
* @time: 2019/10/26
*/
public class SparkWordCount {
public static void main(String[] args) {
//lamda表达式
SparkConf conf = new SparkConf();
conf.setAppName("spark_demo_java");
conf.setMaster("local");
JavaSparkContext sc = new JavaSparkContext(conf);
JavaRDD<String> lines = sc.textFile("./data/words.txt");
JavaRDD<String> words = lines.flatMap((String line) -> Arrays.asList(line.split("\s ")).iterator());
JavaPairRDD<String, Integer> pairWords = words.mapToPair((String s) -> new Tuple2<>(s, 1));
JavaPairRDD<String, Integer> result = pairWords.reduceByKey(Integer::sum);
result.foreach((Tuple2<String, Integer> res) -> System.out.println(res));
sc.stop();
}
}
•使用scala代码编写
代码语言:javascript复制package top.wintp.scala_spark
import org.apache.spark.{SparkConf, SparkContext}
/**
* @author: pyfysf
* <p>
* @qq: 337081267
* <p>
* @CSDN: http://blog.csdn.net/pyfysf
* <p>
* @blog: http://wintp.top
* <p>
* @email: pyfysf@163.com
* <p>
* @time: 2019/10/26
*/
object SparkWordCount {
def main(args: Array[String]): Unit = {
// 完整版
// 创建配置对象
val conf = new SparkConf()
// 设置运行模式
conf.setMaster("local")
// 设置任务名称
conf.setAppName("sparkTest")
// 创建SparkContext对象
val sc = new SparkContext(conf)
// 读取文件
val lines = sc.textFile("./data/words.txt")
// 切割文件
val words = lines.flatMap((line: String) => {
line.split("\s ")
})
// 对word进行计数
val pariWrod = words.map((tmp: String) => {
new Tuple2(tmp, 1)
})
// 根据key进行聚合
val result = pariWrod.reduceByKey((v1: Int, v2: Int) => {
v1 v2
})
// 输出结果
result.foreach(println)
// 释放资源
sc.stop()
}
}
•利用scala的特性简化代码
代码语言:javascript复制package top.wintp.scala_spark
import org.apache.spark.{SparkConf, SparkContext}
/**
* @author: pyfysf
* <p>
* @qq: 337081267
* <p>
* @CSDN: http://blog.csdn.net/pyfysf
* <p>
* @blog: http://wintp.top
* <p>
* @email: pyfysf@163.com
* <p>
* @time: 2019/10/26
*/
object SparkWordCount {
def main(args: Array[String]): Unit = {
// 简洁版
val conf = new SparkConf().setAppName("sparkDemo").setMaster("local")
val sc = new SparkContext(conf)
val result = sc.textFile("./data/words.txt")
.flatMap(_.split("\s "))
.map((_, 1))
.reduceByKey(_ _)
result.foreach(println)
sc.stop()
}
}
建议大家对于java版和scala版本的这两种方式都要掌握。特别是scala的一行代码版本。