一、pom.xml 添加spark-core依赖包
代码语言:javascript复制 org.apache.spark
spark-core_2.11
2.1.1
二、代码实现
代码语言:javascript复制package spark;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function;
import scala.Tuple2;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
public class SparkWordCountForJava {
public static void main(String[] args) {
// 初始化spark , local[*]:以*核心数在本地运行
SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("SparkWordCountForJava");
JavaSparkContext jsc = new JavaSparkContext(conf);
JavaRDD textFileRdd = jsc.textFile("C:\Users\com\Desktop\test.txt");
// 将数据按照切分规则分成一个个单词
JavaRDD flatMapRdd = textFileRdd.flatMap(new FlatMapFunction() {
public Iterator call(String s) throws Exception {
String[] splits = s.split("t");
List list = Arrays.asList(splits);
return list.iterator();
}
});
// 每个单词作为key,value为1
JavaRDD> mapRdd = flatMapRdd.map(new Function>() {
public Tuple2 call(String s) throws Exception {
return new Tuple2(s, 1);
}
});
// 分组:相同 key 分为一组
JavaPairRDD>> groupByRdd = mapRdd.groupBy(new Function, String>() {
public String call(Tuple2 s) throws Exception {
return s._1;
}
});
// Lmbda 表达式写法 和 mapRdd 、 groupByRdd 值一样
JavaRDD> mapRdd1 = flatMapRdd.map(s -> new Tuple2(s, 1));
JavaPairRDD>> groupByRdd1 = mapRdd1.groupBy(s -> s._1);
// 相同key,value值累加
JavaPairRDD mapValuesRdd = groupByRdd.mapValues(new Function>, Integer>() {
public Integer call(Iterable> v1) throws Exception {
int sum = 0;
for(Tuple2 t:v1) {
sum = t._2;
}
return sum;
}
});
// 行动算子:collect,将数据拉取到driver端
List> list = mapValuesRdd.collect();
System.out.println(list);
}
}
三、测试文件 test.txt
代码语言:javascript复制小明 小绿 小黑
小红 小红 小白
小蓝 小蓝 小蓝
小黑 小白 小黑
小红 小红 小黄
小黑 小白 小绿
小红 小蓝 小蓝
小红 小红 小黄
小绿 小蓝 小蓝
小黑 小白 小蓝
四、运行结果
[(小绿,3), (小白,4), ( ,9), (小蓝,8), (小黑,5), (小红,7), (小明,1), (小黄,2)]