先说明,这次我们用的还不是Spark streaming,而是从hadoop hdfs拿取文件,经过计算,再把结果放回hadoop hdfs.
首先我们需要在之前的工程文件下修改我们的pom(具体参考IDEA全程搭建第一个Scala Spark streaming maven工程),增加hadoop版本号
代码语言:javascript复制<hadoop.version>2.7.6</hadoop.version>
添加两个依赖
代码语言:javascript复制<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>${hadoop.version}</version>
</dependency>
修改打包方式
代码语言:javascript复制<build>
<pluginManagement>
<plugins>
<!-- 编译scala的插件 -->
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.2.2</version>
</plugin>
<!-- 编译java的插件 -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.5.1</version>
</plugin>
</plugins>
</pluginManagement>
<plugins>
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<executions>
<execution>
<id>scala-compile-first</id>
<phase>process-resources</phase>
<goals>
<goal>add-source</goal>
<goal>compile</goal>
</goals>
</execution>
<execution>
<id>scala-test-compile</id>
<phase>process-test-resources</phase>
<goals>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<executions>
<execution>
<phase>compile</phase>
<goals>
<goal>compile</goal>
</goals>
</execution>
</executions>
</plugin>
<!-- 打jar插件 -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>2.4.3</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
这样我们可以打出一个富jar包(包含所有第三方jar包的包),这个文件可能会比较大。
先来写一个单词统计的对象(Scala实现)
代码语言:javascript复制object ScalaWordCount {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[4]").setAppName("ScalaWorkCount")
val scc = new SparkContext(conf)
//从hadoop hdfs获取文件
val lines = scc.textFile(args(0))
//统计文件中的单词的个数
val result = lines.flatMap(_.split(" ")).map((_,1)).reduceByKey(_ _)
//将统计结果存入hadoop hdfs
result.saveAsTextFile(args(1))
scc.stop()
}
}
用maven打包后,得到这样一组文件,而我们需要的是这个大的jar包
在linux系统中,我们随便写一个文件,假设我们命名为a.txt,内容也随便写几个单词
ice park dog fish dinsh cark balana apple fuck fool my him cry
然后将其上传到hadoop hdfs中
root@host2 bin# ./hdfs dfs -put ./a.txt /usr/file
root@host2 bin# ./hdfs dfs -lsr /
lsr: DEPRECATED: Please use 'ls -R' instead.
drwxr-xr-x - root supergroup 0 2018-09-14 13:44 /usr
drwxr-xr-x - root supergroup 0 2018-11-03 16:06 /usr/file
-rw-r--r-- 3 root supergroup 63 2018-11-03 16:06 /usr/file/a.txt
-rw-r--r-- 3 root supergroup 173271626 2018-09-14 13:50 /usr/file/jdk-8u45-linux-x64.tar.gz
我们可以查看他的内容
root@host2 bin# ./hdfs dfs -cat /usr/file/a.txt
ice park dog fish dinsh cark balana apple fuck fool my him cry
此时我们也把我们需要的jar包上传到linux系统中
执行命令spark-submit得到一串输出
./spark-submit --master spark://host2:7077,host1:7077 --class com.guanjian.ScalaWordCount ./jar/sparknew-1.0-SNAPSHOT.jar hdfs://host2:8020/usr/file/a.txt hdfs://host2:8020/usr/file/wcount
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
18/11/03 16:20:21 INFO SparkContext: Running Spark version 2.2.0
18/11/03 16:20:22 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
18/11/03 16:20:22 INFO SparkContext: Submitted application: ScalaWorkCount
18/11/03 16:20:22 INFO SecurityManager: Changing view acls to: root
18/11/03 16:20:22 INFO SecurityManager: Changing modify acls to: root
18/11/03 16:20:22 INFO SecurityManager: Changing view acls groups to:
18/11/03 16:20:22 INFO SecurityManager: Changing modify acls groups to:
18/11/03 16:20:22 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(root); groups with view permissions: Set(); users with modify permissions: Set(root); groups with modify permissions: Set()
18/11/03 16:20:22 INFO Utils: Successfully started service 'sparkDriver' on port 42065.
18/11/03 16:20:22 INFO SparkEnv: Registering MapOutputTracker
18/11/03 16:20:22 INFO SparkEnv: Registering BlockManagerMaster
18/11/03 16:20:22 INFO BlockManagerMasterEndpoint: Using org.apache.spark.storage.DefaultTopologyMapper for getting topology information
18/11/03 16:20:22 INFO BlockManagerMasterEndpoint: BlockManagerMasterEndpoint up
18/11/03 16:20:22 INFO DiskBlockManager: Created local directory at /tmp/blockmgr-7aa12077-6316-47b4-97e9-f65b3009ac79
18/11/03 16:20:22 INFO MemoryStore: MemoryStore started with capacity 366.3 MB
18/11/03 16:20:22 INFO SparkEnv: Registering OutputCommitCoordinator
18/11/03 16:20:23 INFO Utils: Successfully started service 'SparkUI' on port 4040.
18/11/03 16:20:23 INFO SparkUI: Bound SparkUI to 0.0.0.0, and started at http://192.168.5.182:4040
18/11/03 16:20:23 INFO SparkContext: Added JAR file:/usr/local/spark2.2/bin/./jar/sparknew-1.0-SNAPSHOT.jar at spark://192.168.5.182:42065/jars/sparknew-1.0-SNAPSHOT.jar with timestamp 1541233223259
18/11/03 16:20:23 INFO Executor: Starting executor ID driver on host localhost
18/11/03 16:20:23 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 44491.
18/11/03 16:20:23 INFO NettyBlockTransferService: Server created on 192.168.5.182:44491
18/11/03 16:20:23 INFO BlockManager: Using org.apache.spark.storage.RandomBlockReplicationPolicy for block replication policy
18/11/03 16:20:23 INFO BlockManagerMaster: Registering BlockManager BlockManagerId(driver, 192.168.5.182, 44491, None)
18/11/03 16:20:23 INFO BlockManagerMasterEndpoint: Registering block manager 192.168.5.182:44491 with 366.3 MB RAM, BlockManagerId(driver, 192.168.5.182, 44491, None)
18/11/03 16:20:23 INFO BlockManagerMaster: Registered BlockManager BlockManagerId(driver, 192.168.5.182, 44491, None)
18/11/03 16:20:23 INFO BlockManager: Initialized BlockManager: BlockManagerId(driver, 192.168.5.182, 44491, None)
18/11/03 16:20:24 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 236.5 KB, free 366.1 MB)
18/11/03 16:20:24 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 22.9 KB, free 366.0 MB)
18/11/03 16:20:24 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on 192.168.5.182:44491 (size: 22.9 KB, free: 366.3 MB)
18/11/03 16:20:24 INFO SparkContext: Created broadcast 0 from textFile at ScalaWordCount.scala:13
18/11/03 16:20:24 INFO FileInputFormat: Total input paths to process : 1
18/11/03 16:20:25 INFO FileOutputCommitter: File Output Committer Algorithm version is 1
18/11/03 16:20:25 INFO SparkContext: Starting job: saveAsTextFile at ScalaWordCount.scala:15
18/11/03 16:20:25 INFO DAGScheduler: Registering RDD 3 (map at ScalaWordCount.scala:14)
18/11/03 16:20:25 INFO DAGScheduler: Got job 0 (saveAsTextFile at ScalaWordCount.scala:15) with 2 output partitions
18/11/03 16:20:25 INFO DAGScheduler: Final stage: ResultStage 1 (saveAsTextFile at ScalaWordCount.scala:15)
18/11/03 16:20:25 INFO DAGScheduler: Parents of final stage: List(ShuffleMapStage 0)
18/11/03 16:20:25 INFO DAGScheduler: Missing parents: List(ShuffleMapStage 0)
18/11/03 16:20:25 INFO DAGScheduler: Submitting ShuffleMapStage 0 (MapPartitionsRDD3 at map at ScalaWordCount.scala:14), which has no missing parents
18/11/03 16:20:25 INFO MemoryStore: Block broadcast_1 stored as values in memory (estimated size 4.7 KB, free 366.0 MB)
18/11/03 16:20:25 INFO MemoryStore: Block broadcast_1_piece0 stored as bytes in memory (estimated size 2.8 KB, free 366.0 MB)
18/11/03 16:20:25 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on 192.168.5.182:44491 (size: 2.8 KB, free: 366.3 MB)
18/11/03 16:20:25 INFO SparkContext: Created broadcast 1 from broadcast at DAGScheduler.scala:1006
18/11/03 16:20:25 INFO DAGScheduler: Submitting 2 missing tasks from ShuffleMapStage 0 (MapPartitionsRDD3 at map at ScalaWordCount.scala:14) (first 15 tasks are for partitions Vector(0, 1))
18/11/03 16:20:25 INFO TaskSchedulerImpl: Adding task set 0.0 with 2 tasks
18/11/03 16:20:25 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, localhost, executor driver, partition 0, ANY, 4840 bytes)
18/11/03 16:20:25 INFO TaskSetManager: Starting task 1.0 in stage 0.0 (TID 1, localhost, executor driver, partition 1, ANY, 4840 bytes)
18/11/03 16:20:25 INFO Executor: Running task 1.0 in stage 0.0 (TID 1)
18/11/03 16:20:25 INFO Executor: Running task 0.0 in stage 0.0 (TID 0)
18/11/03 16:20:25 INFO Executor: Fetching spark://192.168.5.182:42065/jars/sparknew-1.0-SNAPSHOT.jar with timestamp 1541233223259
18/11/03 16:20:25 INFO TransportClientFactory: Successfully created connection to /192.168.5.182:42065 after 51 ms (0 ms spent in bootstraps)
18/11/03 16:20:25 INFO Utils: Fetching spark://192.168.5.182:42065/jars/sparknew-1.0-SNAPSHOT.jar to /tmp/spark-d1405a32-85be-40d6-ba26-7abb1632abc4/userFiles-5bb37b1d-3c33-4f7b-acdb-ff85603d088f/fetchFileTemp5672377923687175291.tmp
18/11/03 16:20:26 INFO Executor: Adding file:/tmp/spark-d1405a32-85be-40d6-ba26-7abb1632abc4/userFiles-5bb37b1d-3c33-4f7b-acdb-ff85603d088f/sparknew-1.0-SNAPSHOT.jar to class loader
18/11/03 16:20:26 INFO HadoopRDD: Input split: hdfs://host2:8020/usr/file/a.txt:0 31
18/11/03 16:20:26 INFO HadoopRDD: Input split: hdfs://host2:8020/usr/file/a.txt:31 32
18/11/03 16:20:26 INFO Executor: Finished task 0.0 in stage 0.0 (TID 0). 1155 bytes result sent to driver
18/11/03 16:20:26 INFO Executor: Finished task 1.0 in stage 0.0 (TID 1). 1026 bytes result sent to driver
18/11/03 16:20:26 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 824 ms on localhost (executor driver) (1/2)
18/11/03 16:20:26 INFO TaskSetManager: Finished task 1.0 in stage 0.0 (TID 1) in 810 ms on localhost (executor driver) (2/2)
18/11/03 16:20:26 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool
18/11/03 16:20:26 INFO DAGScheduler: ShuffleMapStage 0 (map at ScalaWordCount.scala:14) finished in 0.858 s
18/11/03 16:20:26 INFO DAGScheduler: looking for newly runnable stages
18/11/03 16:20:26 INFO DAGScheduler: running: Set()
18/11/03 16:20:26 INFO DAGScheduler: waiting: Set(ResultStage 1)
18/11/03 16:20:26 INFO DAGScheduler: failed: Set()
18/11/03 16:20:26 INFO DAGScheduler: Submitting ResultStage 1 (MapPartitionsRDD5 at saveAsTextFile at ScalaWordCount.scala:15), which has no missing parents
18/11/03 16:20:26 INFO MemoryStore: Block broadcast_2 stored as values in memory (estimated size 72.0 KB, free 366.0 MB)
18/11/03 16:20:26 INFO MemoryStore: Block broadcast_2_piece0 stored as bytes in memory (estimated size 25.9 KB, free 365.9 MB)
18/11/03 16:20:26 INFO BlockManagerInfo: Added broadcast_2_piece0 in memory on 192.168.5.182:44491 (size: 25.9 KB, free: 366.2 MB)
18/11/03 16:20:26 INFO SparkContext: Created broadcast 2 from broadcast at DAGScheduler.scala:1006
18/11/03 16:20:26 INFO DAGScheduler: Submitting 2 missing tasks from ResultStage 1 (MapPartitionsRDD5 at saveAsTextFile at ScalaWordCount.scala:15) (first 15 tasks are for partitions Vector(0, 1))
18/11/03 16:20:26 INFO TaskSchedulerImpl: Adding task set 1.0 with 2 tasks
18/11/03 16:20:26 INFO TaskSetManager: Starting task 0.0 in stage 1.0 (TID 2, localhost, executor driver, partition 0, ANY, 4621 bytes)
18/11/03 16:20:26 INFO TaskSetManager: Starting task 1.0 in stage 1.0 (TID 3, localhost, executor driver, partition 1, ANY, 4621 bytes)
18/11/03 16:20:26 INFO Executor: Running task 1.0 in stage 1.0 (TID 3)
18/11/03 16:20:26 INFO Executor: Running task 0.0 in stage 1.0 (TID 2)
18/11/03 16:20:26 INFO ShuffleBlockFetcherIterator: Getting 1 non-empty blocks out of 2 blocks
18/11/03 16:20:26 INFO ShuffleBlockFetcherIterator: Getting 1 non-empty blocks out of 2 blocks
18/11/03 16:20:26 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 8 ms
18/11/03 16:20:26 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 8 ms
18/11/03 16:20:26 INFO BlockManagerInfo: Removed broadcast_1_piece0 on 192.168.5.182:44491 in memory (size: 2.8 KB, free: 366.3 MB)
18/11/03 16:20:26 INFO FileOutputCommitter: File Output Committer Algorithm version is 1
18/11/03 16:20:26 INFO FileOutputCommitter: File Output Committer Algorithm version is 1
18/11/03 16:20:26 INFO FileOutputCommitter: Saved output of task 'attempt_20181103162025_0001_m_000000_2' to hdfs://host2:8020/usr/file/wcount/_temporary/0/task_20181103162025_0001_m_000000
18/11/03 16:20:26 INFO FileOutputCommitter: Saved output of task 'attempt_20181103162025_0001_m_000001_3' to hdfs://host2:8020/usr/file/wcount/_temporary/0/task_20181103162025_0001_m_000001
18/11/03 16:20:26 INFO SparkHadoopMapRedUtil: attempt_20181103162025_0001_m_000001_3: Committed
18/11/03 16:20:26 INFO SparkHadoopMapRedUtil: attempt_20181103162025_0001_m_000000_2: Committed
18/11/03 16:20:26 INFO Executor: Finished task 1.0 in stage 1.0 (TID 3). 1267 bytes result sent to driver
18/11/03 16:20:26 INFO Executor: Finished task 0.0 in stage 1.0 (TID 2). 1267 bytes result sent to driver
18/11/03 16:20:26 INFO TaskSetManager: Finished task 1.0 in stage 1.0 (TID 3) in 325 ms on localhost (executor driver) (1/2)
18/11/03 16:20:26 INFO TaskSetManager: Finished task 0.0 in stage 1.0 (TID 2) in 330 ms on localhost (executor driver) (2/2)
18/11/03 16:20:26 INFO TaskSchedulerImpl: Removed TaskSet 1.0, whose tasks have all completed, from pool
18/11/03 16:20:26 INFO DAGScheduler: ResultStage 1 (saveAsTextFile at ScalaWordCount.scala:15) finished in 0.332 s
18/11/03 16:20:26 INFO DAGScheduler: Job 0 finished: saveAsTextFile at ScalaWordCount.scala:15, took 1.653739 s
18/11/03 16:20:26 INFO SparkUI: Stopped Spark web UI at http://192.168.5.182:4040
18/11/03 16:20:26 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
18/11/03 16:20:26 INFO MemoryStore: MemoryStore cleared
18/11/03 16:20:26 INFO BlockManager: BlockManager stopped
18/11/03 16:20:26 INFO BlockManagerMaster: BlockManagerMaster stopped
18/11/03 16:20:26 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
18/11/03 16:20:26 INFO SparkContext: Successfully stopped SparkContext
18/11/03 16:20:26 INFO ShutdownHookManager: Shutdown hook called
18/11/03 16:20:26 INFO ShutdownHookManager: Deleting directory /tmp/spark-d1405a32-85be-40d6-ba26-7abb1632abc4
这个时候我们来查看保存在hadoop hdfs中的结果。
root@host2 bin# ./hdfs dfs -lsr /
lsr: DEPRECATED: Please use 'ls -R' instead.
drwxr-xr-x - root supergroup 0 2018-09-14 13:44 /usr
drwxr-xr-x - root supergroup 0 2018-11-03 16:20 /usr/file
-rw-r--r-- 3 root supergroup 63 2018-11-03 16:06 /usr/file/a.txt
-rw-r--r-- 3 root supergroup 173271626 2018-09-14 13:50 /usr/file/jdk-8u45-linux-x64.tar.gz
drwxr-xr-x - root supergroup 0 2018-11-03 16:20 /usr/file/wcount
-rw-r--r-- 3 root supergroup 0 2018-11-03 16:20 /usr/file/wcount/_SUCCESS
-rw-r--r-- 3 root supergroup 78 2018-11-03 16:20 /usr/file/wcount/part-00000
-rw-r--r-- 3 root supergroup 37 2018-11-03 16:20 /usr/file/wcount/part-00001
root@host2 bin# ./hdfs dfs -cat /usr/file/wcount/part-00000
(him,1)
(park,1)
(fool,1)
(dinsh,1)
(fish,1)
(dog,1)
(apple,1)
(cry,1)
(my,1)
root@host2 bin# ./hdfs dfs -cat /usr/file/wcount/part-00001
(ice,1)
(cark,1)
(balana,1)
(fuck,1)
这样我们就得到了我们需要的结果,文本文件a.txt的单词统计,当然这种处理主要是一种离线处理,跟Web程序的关联不大,要做实时处理还要用到Spark streaming。
当然Spark也支持命令行式的操作,类似于Scala一样,如下,我们给本次操作分配5G内存,16线程
root@host2 bin# ./spark-shell --master spark://host2:7077,host1:7077 --executor-memory 5g --total-executor-cores 16
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
18/11/03 20:59:54 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
18/11/03 21:00:01 WARN ObjectStore: Failed to get database global_temp, returning NoSuchObjectException
Spark context Web UI available at http://192.168.5.182:4040
Spark context available as 'sc' (master = spark://host2:7077,host1:7077, app id = app-20181103205956-0001).
Spark session available as 'spark'.
Welcome to
代码语言:txt复制 ____ __
代码语言:txt复制 / __/__ ___ _____/ /__
代码语言:txt复制 _ / _ / _ `/ __/ '_/
代码语言:txt复制/___/ .__/\_,_/_/ /_/\_ version 2.2.0
代码语言:txt复制 /_/
Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_45)
Type in expressions to have them evaluated.
Type :help for more information.
scala>
比如我们要查看a.txt里面有多少个单词
scala> val r1 = sc.textFile("hdfs://host2:8020/usr/file/a.txt")
r1: org.apache.spark.rdd.RDDString = hdfs://host2:8020/usr/file/a.txt MapPartitionsRDD1 at textFile at <console>:24
我们可以看到他的返回值是一个RDD类型,那RDD是什么呢,一张图来说明
scala> r1.count
res0: Long = 1
scala> r1.flatMap(_.split(" ")).map((_,1)).count
res1: Long = 13
我们对这个RDD操作,行数为1行,单词数为13个单词
最后我们来看一下Spark的8091端口的系统界面
我们执行过的任务在这里面都可以有一些记录。