1 下载Spark-2.1.0-bin-hadoop2.7.tgz
http://spark.apache.org/downloads.html
2 解压缩
代码语言:javascript复制[root@sk1 ~]tar -zxvf spark-2.1.0-bin-hadoop2.7.tgz -C /opt
3 进入spark根目录
代码语言:javascript复制[root@sk1 ~]# cd /opt/spark-2.1.0-bin-hadoop2.7/
[root@sk1 spark-2.1.0-bin-hadoop2.7]# ls
bin derby.log LICENSE NOTICE README.md yarn
conf examples licenses python RELEASE
data jars metastore_db R sbin
4 运行bin/spark-shell
代码语言:javascript复制[root@sk1 spark-2.1.0-bin-hadoop2.7]# bin/spark-shell
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
17/04/07 22:41:20 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
17/04/07 22:41:32 WARN ObjectStore: Version information not found in metastore. hive.metastore.schema.verification is not enabled so recording the schema version 1.2.0
17/04/07 22:41:33 WARN ObjectStore: Failed to get database default, returning NoSuchObjectException
17/04/07 22:41:34 WARN ObjectStore: Failed to get database global_temp, returning NoSuchObjectException
Spark context Web UI available at http://192.168.11.138:4040
Spark context available as 'sc' (master = local[*], app id = local-1491619281633).
Spark session available as 'spark'.
Welcome to
____ __
/ __/__ ___ _____/ /__
_ / _ / _ `/ __/ '_/
/___/ .__/_,_/_/ /_/_ version 2.1.0
/_/
Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_112)
Type in expressions to have them evaluated.
Type :help for more information.
scala>
5 简单交互
代码语言:javascript复制scala> val rdd1=sc.parallelize(1 to 100,5)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:24
scala> rdd1.count
[Stage 0:> (0 res0: Long = 100
scala> val rdd2=rdd1.map(_ 4)
rdd2: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[1] at map at <console>:26
scala> rdd2.take(2)
res1: Array[Int] = Array(5, 6)
6 WordCount
6.1 准备数据
代码语言:javascript复制[root@sk1 ~]# vi /tmp/wordcount.txt
[root@sk1 ~]# cat /tmp/wordcount.txt
zookeeper hadoop hdfs yarn hive hbase spark
hello world
hello bigdata
6.2 程序
代码语言:javascript复制scala> val rdd=sc.textFile("file:///tmp/wordcount.txt")
rdd: org.apache.spark.rdd.RDD[String] = file:///tmp/wordcount.txt MapPartitionsRDD[3] at textFile at <console>:24
scala> rdd.count
res2: Long = 3
scala> val mapRdd=rdd.flatMap(_.split(" "))
mapRdd: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[4] at flatMap at <console>:26
scala> mapRdd.first
res3: String = zookeeper
scala> val kvRdd=mapRdd.map(x=>(x,1))
kvRdd: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[5] at map at <console>:28
scala> kvRdd.first
res4: (String, Int) = (zookeeper,1)
scala> kvRdd.take(2)
res5: Array[(String, Int)] = Array((zookeeper,1), (hadoop,1))
scala> val rsRdd=kvRdd.reduceByKey(_ _)
rsRdd: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[6] at reduceByKey at <console>:30
scala> rsRdd.take(2)
res6: Array[(String, Int)] = Array((spark,1), (hive,1))
scala> rsRdd.saveAsTextFile("file:///tmp/output")
6.3 查看结果
代码语言:javascript复制[root@sk1 ~]# ls /tmp/output/
part-00000 _SUCCESS
[root@sk1 ~]# cat /tmp/output/part-00000
(spark,1)
(hive,1)
(hadoop,1)
(bigdata ,1)
(zookeeper,1)
(hello,2)
(yarn,1)
(hdfs,1)
(hbase,1)
(world,1)
[root@sk1 ~]#