案例一

2023-02-25 15:48:14 浏览数 (2)

需求: 1、对文本文件内的每个单词都统计出其出现的次数。 2、按照每个单词出现次数的数量,降序排列。 分析:(hello,5),(me,10),(you,3)

Java版本

代码语言:javascript复制
/**排序的wordCount程序
 * @author zhang
 *
 */
public class SortWordCount {public static void main(String[] args) {
​// 创建SparkConf和JavaSparkContext
​SparkConf conf = new SparkConf()​​​.setAppName("SortWordCount").setMaster("local");
JavaSparkContext sc = new JavaSparkContext(conf);

​// 创建lines RDD
​JavaRDD<String> lines = sc.textFile("C://Users//Administrator//Desktop//spark.txt");

​// 执行我们之前做过的单词计数
​JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() {

​​private static final long serialVersionUID = 1L;

​​@Override
public Iterable<String> call(String t) throws Exception {
​​​return Arrays.asList(t.split(" "));  
​​}
​});

​JavaPairRDD<String, Integer> pairs = words.mapToPair(

​​​new PairFunction<String, String, Integer>() {

​​​​private static final long serialVersionUID = 1L;

 @Override
​​​​public Tuple2<String, Integer> call(String t) throws Exception {
​​​​​return new Tuple2<String, Integer>(t, 1);
​​​​}
​​​});

​JavaPairRDD<String, Integer> wordCounts = pairs.reduceByKey(

​​​new Function2<Integer, Integer, Integer>() {

private static final long serialVersionUID = 1L;

​​​​@Override
public Integer call(Integer v1, Integer v2) throws Exception {
​​​​​return v1   v2;
​​​​}
​​​});

// 到这里为止,就得到了每个单词出现的次数
​// 但是,问题是,我们的新需求,是要按照每个单词出现次数的顺序,降序排序
// wordCounts RDD内的元素是什么?应该是这种格式的吧:(hello, 3) (you, 2)
​// 我们需要将RDD转换成(3, hello) (2, you)的这种格式,才能根据单词出现次数进行排序把!

// 进行key-value的反转映射
​JavaPairRDD<Integer, String> countWords = wordCounts.mapToPair(

​​​new PairFunction<Tuple2<String,Integer>, Integer, String>() {

​​​​private static final long serialVersionUID = 1L;

​​​​@Override
​​​​public Tuple2<Integer, String> call(Tuple2<String, Integer> t) ​​​​​​throws Exception {
​​​​​return new Tuple2<Integer, String>(t._2, t._1);
}
​​​});

​// 按照key进行排序
​JavaPairRDD<Integer, String> sortedCountWords = countWords.sortByKey(false);

​// 再次将value-key进行反转映射
​JavaPairRDD<String, Integer> sortedWordCounts = sortedCountWords.mapToPair(

​​​new PairFunction<Tuple2<Integer,String>, String, Integer>() {

​​​​private static final long serialVersionUID = 1L;

@Override
​​​​public Tuple2<String, Integer> call(Tuple2<Integer, String> t) ​​​​​​throws Exception {
return new Tuple2<String, Integer>(t._2, t._1);
​​​​}
​​​});

​// 到此为止,我们获得了按照单词出现次数排序后的单词计数
​// 打印出来
​sortedWordCounts.foreach(

new VoidFunction<Tuple2<String,Integer>>() {

private static final long serialVersionUID = 1L;

​​@Override
​​public void call(Tuple2<String, Integer> t) throws Exception {
​​​System.out.println(t._1   " appears "   t._2   " times.");  ​
​​}
});

​// 关闭JavaSparkContext
​sc.close();
}
}

Scala版本

代码语言:javascript复制
object SortWordCout {

def main(args: Array[String]) {

val conf = new SparkConf().setAppName("SortWordCount").setMaster("local")

 val sc = new SparkContext(conf)

 val lines = sc.textFile("C:/Users/zhang/Desktop//hello.txt", 1)

 val words = lines.flatMap { line => line.split(" ") }
 val pairs = words.map { word => (word,1) }
 val wordCounts = pairs.reduceByKey(_ _)

 val countWords = wordCounts.map(wordCount => (wordCount._2,wordCount._1))
 val sortedCountWords = countWords.sortByKey(false)
 val sortedWordCounts = sortedCountWords.map(sortedCountWord =>(sortedCountWord._2,sortedCountWord._1))

 sortedWordCounts.foreach(sortedWordCount => println(sortedWordCount._1   " appear "   sortedWordCount._2   "times"))
  }
}

0 人点赞