单机运行Spark Shell

2022-05-06 17:53:54 浏览数 (1)

1 下载Spark-2.1.0-bin-hadoop2.7.tgz

http://spark.apache.org/downloads.html

2 解压缩

代码语言:javascript复制
[root@sk1 ~]tar -zxvf spark-2.1.0-bin-hadoop2.7.tgz -C /opt

3 进入spark根目录

代码语言:javascript复制
[root@sk1 ~]# cd /opt/spark-2.1.0-bin-hadoop2.7/
[root@sk1 spark-2.1.0-bin-hadoop2.7]# ls
bin   derby.log  LICENSE       NOTICE  README.md  yarn
conf  examples   licenses      python  RELEASE
data  jars       metastore_db  R       sbin

4 运行bin/spark-shell

代码语言:javascript复制
[root@sk1 spark-2.1.0-bin-hadoop2.7]# bin/spark-shell 
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
17/04/07 22:41:20 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
17/04/07 22:41:32 WARN ObjectStore: Version information not found in metastore. hive.metastore.schema.verification is not enabled so recording the schema version 1.2.0
17/04/07 22:41:33 WARN ObjectStore: Failed to get database default, returning NoSuchObjectException
17/04/07 22:41:34 WARN ObjectStore: Failed to get database global_temp, returning NoSuchObjectException
Spark context Web UI available at http://192.168.11.138:4040
Spark context available as 'sc' (master = local[*], app id = local-1491619281633).
Spark session available as 'spark'.
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _ / _ / _ `/ __/  '_/
   /___/ .__/_,_/_/ /_/_   version 2.1.0
      /_/

Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_112)
Type in expressions to have them evaluated.
Type :help for more information.

scala>

5 简单交互

代码语言:javascript复制
scala> val rdd1=sc.parallelize(1 to 100,5)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:24

scala> rdd1.count
[Stage 0:>                                                          (0                                                                      res0: Long = 100

scala> val rdd2=rdd1.map(_ 4)
rdd2: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[1] at map at <console>:26

scala> rdd2.take(2)
res1: Array[Int] = Array(5, 6)

6 WordCount

6.1 准备数据

代码语言:javascript复制
[root@sk1 ~]# vi /tmp/wordcount.txt
[root@sk1 ~]# cat /tmp/wordcount.txt 
zookeeper hadoop hdfs yarn hive hbase spark
hello world 
hello bigdata   

6.2 程序

代码语言:javascript复制
scala> val rdd=sc.textFile("file:///tmp/wordcount.txt")
rdd: org.apache.spark.rdd.RDD[String] = file:///tmp/wordcount.txt MapPartitionsRDD[3] at textFile at <console>:24

scala> rdd.count
res2: Long = 3

scala> val mapRdd=rdd.flatMap(_.split(" "))
mapRdd: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[4] at flatMap at <console>:26

scala> mapRdd.first
res3: String = zookeeper

scala> val kvRdd=mapRdd.map(x=>(x,1))
kvRdd: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[5] at map at <console>:28

scala> kvRdd.first
res4: (String, Int) = (zookeeper,1)

scala> kvRdd.take(2)
res5: Array[(String, Int)] = Array((zookeeper,1), (hadoop,1))

scala> val rsRdd=kvRdd.reduceByKey(_ _)
rsRdd: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[6] at reduceByKey at <console>:30

scala> rsRdd.take(2)
res6: Array[(String, Int)] = Array((spark,1), (hive,1))                    

scala> rsRdd.saveAsTextFile("file:///tmp/output")

6.3 查看结果

代码语言:javascript复制
[root@sk1 ~]# ls /tmp/output/
part-00000  _SUCCESS
[root@sk1 ~]# cat /tmp/output/part-00000 
(spark,1)
(hive,1)
(hadoop,1)
(bigdata    ,1)
(zookeeper,1)
(hello,2)
(yarn,1)
(hdfs,1)
(hbase,1)
(world,1)
[root@sk1 ~]# 

0 人点赞