案例二

2023-02-25 15:49:13 浏览数 (2)

需求: 1、按照文件中的第一列排序。 2、如果第一列相同,则按照第二列排序 准备数据: 1 5 2 4 3 6 1 3 2 1

首先自定义二次排序key

代码语言:javascript复制
/**

 * 自定义的二次排序key

 * @author Administrator

 *

 */

public class SecondarySortKey implements Ordered<SecondarySortKey>, Serializable {

​private static final long serialVersionUID = -2366006422945129991L;

​// 首先在自定义key里面,定义需要进行排序的列

​private int first;

​private int second;

​public SecondarySortKey(int first, int second) {

​​this.first = first;

​​this.second = second;
​}

​@Override
​public boolean $greater(SecondarySortKey other) {

​​if(this.first > other.getFirst()) {

​​​return true;

​​} else if(this.first == other.getFirst() && 

​​​​this.second > other.getSecond()) {

​​​return true;

​​}

​​return false;

​}

@Override

​public boolean $greater$eq(SecondarySortKey other) {

​​if(this.$greater(other)) {

​​​return true;

​​} else if(this.first == other.getFirst() && 

​​​​this.second == other.getSecond()) {

​​​return true;
​​}

​​return false;
​}

​@Override
​public boolean $less(SecondarySortKey other) {

​​if(this.first < other.getFirst()) {

return true;

​​} else if(this.first == other.getFirst() && 

​​​​this.second < other.getSecond()) {

return true;

​​}

​​return false;

​}

​@Override
public boolean $less$eq(SecondarySortKey other) {

​​if(this.$less(other)) {

​​​return true;

​​} else if(this.first == other.getFirst() && 

​​​​this.second == other.getSecond()) {

​​​return true;

​​}

​​return false;

​}

​@Override
​public int compare(SecondarySortKey other) {

​​if(this.first - other.getFirst() != 0) {

​​​return this.first - other.getFirst();

​​} else {

​​​return this.second - other.getSecond();
​​}

​}

​@Override
​public int compareTo(SecondarySortKey other) {

​​if(this.first - other.getFirst() != 0) {

​​​return this.first - other.getFirst();

​​} else {

​​​return this.second - other.getSecond();

​​}

​}

​// 为要进行排序的多个列,提供getter和setter方法,以及hashcode和equals方法

​public int getFirst() {

​​return first;

​}

​public void setFirst(int first) {

​​this.first = first;

​}

​public int getSecond() {

​​return second;

​}

​public void setSecond(int second) {

​​this.second = second;

​}

​@Override

​public int hashCode() {

​​final int prime = 31;

​​int result = 1;

​​result = prime * result   first;

​​result = prime * result   second;

​​return result;

​}

​@Override
​public boolean equals(Object obj) {

​​if (this == obj)

return true;

​​if (obj == null)

​​​return false;

​​if (getClass() != obj.getClass())

​​​return false;

​​SecondarySortKey other = (SecondarySortKey) obj;

if (first != other.first)

​​​return false;

​​if (second != other.second)

​​​return false;

​​return true;
 ​}
}

/**

* 二次排序

* 1、实现自定义的key,要实现Ordered接口和Serializable接口,在key中实现自己对多个列的排序算法

* 2、将包含文本的RDD,映射成key为自定义key,value为文本的JavaPairRDD

* 3、使用sortByKey算子按照自定义的key进行排序

* 4、再次映射,剔除自定义的key,只保留文本行

* @author Administrator

*

*/

public class SecondarySort {

​public static void main(String[] args) {

​​SparkConf conf = new SparkConf()​​​​.setAppName("SecondarySort").setMaster("local");

​​JavaSparkContext sc = new JavaSparkContext(conf);

​​JavaRDD<String> lines = sc.textFile("C://Users//Administrator//Desktop//sort.txt");
 
​​JavaPairRDD<SecondarySortKey, String> pairs = lines.mapToPair(

new PairFunction<String, SecondarySortKey, String>() {

​​​​​private static final long serialVersionUID = 1L;

​​​​​@Override
​​​​​public Tuple2<SecondarySortKey, String> call(String line) throws Exception {

​​​​​​String[] lineSplited = line.split(" ");  

​​​​​​SecondarySortKey key = new SecondarySortKey(

​​​​​​​​Integer.valueOf(lineSplited[0]), 

​​​​​​​​Integer.valueOf(lineSplited[1]));  

​​​​​​return new Tuple2<SecondarySortKey, String>(key, line);

​​​​​}
​​​​});

​​JavaPairRDD<SecondarySortKey, String> sortedPairs = pairs.sortByKey();

​​JavaRDD<String> sortedLines = sortedPairs.map(​​​​

new Function<Tuple2<SecondarySortKey,String>, String>() {

private static final long serialVersionUID = 1L;

@Override
public String call(Tuple2<SecondarySortKey, String> v1) throws Exception {

​​​​​​return v1._2;

​​​​​}
​​​​});

​​sortedLines.foreach(new VoidFunction<String>() {

​​​private static final long serialVersionUID = 1L;

​​​@Override

​​​public void call(String t) throws Exception {

​​​​System.out.println(t);  

​​​}
​​});

sc.close();
​}
}

Scala版本

代码语言:javascript复制
/**

* @author Administrator

*/

class SecondSortKey(val first: Int, val second: Int) extendsOrdered[SecondSortKey] with Serializable {

 def compare(that: SecondSortKey): Int = {

 if(this.first - that.first != 0) {

 this.first - that.first

 } else {

 this.second - that.second
 }
}
}

SecondSort.scala

代码语言:javascript复制
import org.apache.spark.SparkConf

import org.apache.spark.SparkContext

object SecondSort {

def main(args: Array[String]): Unit = {

val conf = new SparkConf().setAppName("SecondSort").setMaster("local")  

val sc = new SparkContext(conf)

val lines = sc.textFile("C://Users//Administrator//Desktop//sort.txt", 1)

val pairs = lines.map { line => (

   new SecondSortKey(line.split(" ")(0).toInt, line.split(" ")(1).toInt),

   line)}

val sortedPairs = pairs.sortByKey()

val sortedLines = sortedPairs.map(sortedPair => sortedPair._2)  

sortedLines.foreach { sortedLine => println(sortedLine) }  
 }
}

0 人点赞