提交第一个Spark统计文件单词数程序,配合hadoop hdfs

2019-08-20 16:07:58 浏览数 (1)

先说明,这次我们用的还不是Spark streaming,而是从hadoop hdfs拿取文件,经过计算,再把结果放回hadoop hdfs.

首先我们需要在之前的工程文件下修改我们的pom(具体参考IDEA全程搭建第一个Scala Spark streaming maven工程),增加hadoop版本号

代码语言:javascript复制
<hadoop.version>2.7.6</hadoop.version>

添加两个依赖

代码语言:javascript复制
<dependency>
  <groupId>org.apache.spark</groupId>
  <artifactId>spark-core_2.11</artifactId>
  <version>${spark.version}</version>
</dependency>
<dependency>
  <groupId>org.apache.hadoop</groupId>
  <artifactId>hadoop-client</artifactId>
  <version>${hadoop.version}</version>
</dependency>

修改打包方式

代码语言:javascript复制
<build>
  <pluginManagement>
    <plugins>
      <!-- 编译scala的插件 -->
      <plugin>
        <groupId>net.alchim31.maven</groupId>
        <artifactId>scala-maven-plugin</artifactId>
        <version>3.2.2</version>
      </plugin>
      <!-- 编译java的插件 -->
      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-compiler-plugin</artifactId>
        <version>3.5.1</version>
      </plugin>
    </plugins>
  </pluginManagement>
  <plugins>
    <plugin>
      <groupId>net.alchim31.maven</groupId>
      <artifactId>scala-maven-plugin</artifactId>
      <executions>
        <execution>
          <id>scala-compile-first</id>
          <phase>process-resources</phase>
          <goals>
            <goal>add-source</goal>
            <goal>compile</goal>
          </goals>
        </execution>
        <execution>
          <id>scala-test-compile</id>
          <phase>process-test-resources</phase>
          <goals>
            <goal>testCompile</goal>
          </goals>
        </execution>
      </executions>
    </plugin>

    <plugin>
      <groupId>org.apache.maven.plugins</groupId>
      <artifactId>maven-compiler-plugin</artifactId>
      <executions>
        <execution>
          <phase>compile</phase>
          <goals>
            <goal>compile</goal>
          </goals>
        </execution>
      </executions>
    </plugin>


    <!-- 打jar插件 -->
    <plugin>
      <groupId>org.apache.maven.plugins</groupId>
      <artifactId>maven-shade-plugin</artifactId>
      <version>2.4.3</version>
      <executions>
        <execution>
          <phase>package</phase>
          <goals>
            <goal>shade</goal>
          </goals>
          <configuration>
            <filters>
              <filter>
                <artifact>*:*</artifact>
                <excludes>
                  <exclude>META-INF/*.SF</exclude>
                  <exclude>META-INF/*.DSA</exclude>
                  <exclude>META-INF/*.RSA</exclude>
                </excludes>
              </filter>
            </filters>
          </configuration>
        </execution>
      </executions>
    </plugin>
  </plugins>
</build>

这样我们可以打出一个富jar包(包含所有第三方jar包的包),这个文件可能会比较大。

先来写一个单词统计的对象(Scala实现)

代码语言:javascript复制
object ScalaWordCount {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setMaster("local[4]").setAppName("ScalaWorkCount")
    val scc = new SparkContext(conf)
    //从hadoop hdfs获取文件
    val lines = scc.textFile(args(0))
    //统计文件中的单词的个数
    val result = lines.flatMap(_.split(" ")).map((_,1)).reduceByKey(_ _)
    //将统计结果存入hadoop hdfs
    result.saveAsTextFile(args(1))
    scc.stop()
  }
}

用maven打包后,得到这样一组文件,而我们需要的是这个大的jar包

在linux系统中,我们随便写一个文件,假设我们命名为a.txt,内容也随便写几个单词

ice park dog fish dinsh cark balana apple fuck fool my him cry

然后将其上传到hadoop hdfs中

root@host2 bin# ./hdfs dfs -put ./a.txt /usr/file

root@host2 bin# ./hdfs dfs -lsr /

lsr: DEPRECATED: Please use 'ls -R' instead.

drwxr-xr-x - root supergroup 0 2018-09-14 13:44 /usr

drwxr-xr-x - root supergroup 0 2018-11-03 16:06 /usr/file

-rw-r--r-- 3 root supergroup 63 2018-11-03 16:06 /usr/file/a.txt

-rw-r--r-- 3 root supergroup 173271626 2018-09-14 13:50 /usr/file/jdk-8u45-linux-x64.tar.gz

我们可以查看他的内容

root@host2 bin# ./hdfs dfs -cat /usr/file/a.txt

ice park dog fish dinsh cark balana apple fuck fool my him cry

此时我们也把我们需要的jar包上传到linux系统中

执行命令spark-submit得到一串输出

./spark-submit --master spark://host2:7077,host1:7077 --class com.guanjian.ScalaWordCount ./jar/sparknew-1.0-SNAPSHOT.jar hdfs://host2:8020/usr/file/a.txt hdfs://host2:8020/usr/file/wcount

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties

18/11/03 16:20:21 INFO SparkContext: Running Spark version 2.2.0

18/11/03 16:20:22 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable

18/11/03 16:20:22 INFO SparkContext: Submitted application: ScalaWorkCount

18/11/03 16:20:22 INFO SecurityManager: Changing view acls to: root

18/11/03 16:20:22 INFO SecurityManager: Changing modify acls to: root

18/11/03 16:20:22 INFO SecurityManager: Changing view acls groups to:

18/11/03 16:20:22 INFO SecurityManager: Changing modify acls groups to:

18/11/03 16:20:22 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(root); groups with view permissions: Set(); users with modify permissions: Set(root); groups with modify permissions: Set()

18/11/03 16:20:22 INFO Utils: Successfully started service 'sparkDriver' on port 42065.

18/11/03 16:20:22 INFO SparkEnv: Registering MapOutputTracker

18/11/03 16:20:22 INFO SparkEnv: Registering BlockManagerMaster

18/11/03 16:20:22 INFO BlockManagerMasterEndpoint: Using org.apache.spark.storage.DefaultTopologyMapper for getting topology information

18/11/03 16:20:22 INFO BlockManagerMasterEndpoint: BlockManagerMasterEndpoint up

18/11/03 16:20:22 INFO DiskBlockManager: Created local directory at /tmp/blockmgr-7aa12077-6316-47b4-97e9-f65b3009ac79

18/11/03 16:20:22 INFO MemoryStore: MemoryStore started with capacity 366.3 MB

18/11/03 16:20:22 INFO SparkEnv: Registering OutputCommitCoordinator

18/11/03 16:20:23 INFO Utils: Successfully started service 'SparkUI' on port 4040.

18/11/03 16:20:23 INFO SparkUI: Bound SparkUI to 0.0.0.0, and started at http://192.168.5.182:4040

18/11/03 16:20:23 INFO SparkContext: Added JAR file:/usr/local/spark2.2/bin/./jar/sparknew-1.0-SNAPSHOT.jar at spark://192.168.5.182:42065/jars/sparknew-1.0-SNAPSHOT.jar with timestamp 1541233223259

18/11/03 16:20:23 INFO Executor: Starting executor ID driver on host localhost

18/11/03 16:20:23 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 44491.

18/11/03 16:20:23 INFO NettyBlockTransferService: Server created on 192.168.5.182:44491

18/11/03 16:20:23 INFO BlockManager: Using org.apache.spark.storage.RandomBlockReplicationPolicy for block replication policy

18/11/03 16:20:23 INFO BlockManagerMaster: Registering BlockManager BlockManagerId(driver, 192.168.5.182, 44491, None)

18/11/03 16:20:23 INFO BlockManagerMasterEndpoint: Registering block manager 192.168.5.182:44491 with 366.3 MB RAM, BlockManagerId(driver, 192.168.5.182, 44491, None)

18/11/03 16:20:23 INFO BlockManagerMaster: Registered BlockManager BlockManagerId(driver, 192.168.5.182, 44491, None)

18/11/03 16:20:23 INFO BlockManager: Initialized BlockManager: BlockManagerId(driver, 192.168.5.182, 44491, None)

18/11/03 16:20:24 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 236.5 KB, free 366.1 MB)

18/11/03 16:20:24 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 22.9 KB, free 366.0 MB)

18/11/03 16:20:24 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on 192.168.5.182:44491 (size: 22.9 KB, free: 366.3 MB)

18/11/03 16:20:24 INFO SparkContext: Created broadcast 0 from textFile at ScalaWordCount.scala:13

18/11/03 16:20:24 INFO FileInputFormat: Total input paths to process : 1

18/11/03 16:20:25 INFO FileOutputCommitter: File Output Committer Algorithm version is 1

18/11/03 16:20:25 INFO SparkContext: Starting job: saveAsTextFile at ScalaWordCount.scala:15

18/11/03 16:20:25 INFO DAGScheduler: Registering RDD 3 (map at ScalaWordCount.scala:14)

18/11/03 16:20:25 INFO DAGScheduler: Got job 0 (saveAsTextFile at ScalaWordCount.scala:15) with 2 output partitions

18/11/03 16:20:25 INFO DAGScheduler: Final stage: ResultStage 1 (saveAsTextFile at ScalaWordCount.scala:15)

18/11/03 16:20:25 INFO DAGScheduler: Parents of final stage: List(ShuffleMapStage 0)

18/11/03 16:20:25 INFO DAGScheduler: Missing parents: List(ShuffleMapStage 0)

18/11/03 16:20:25 INFO DAGScheduler: Submitting ShuffleMapStage 0 (MapPartitionsRDD3 at map at ScalaWordCount.scala:14), which has no missing parents

18/11/03 16:20:25 INFO MemoryStore: Block broadcast_1 stored as values in memory (estimated size 4.7 KB, free 366.0 MB)

18/11/03 16:20:25 INFO MemoryStore: Block broadcast_1_piece0 stored as bytes in memory (estimated size 2.8 KB, free 366.0 MB)

18/11/03 16:20:25 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on 192.168.5.182:44491 (size: 2.8 KB, free: 366.3 MB)

18/11/03 16:20:25 INFO SparkContext: Created broadcast 1 from broadcast at DAGScheduler.scala:1006

18/11/03 16:20:25 INFO DAGScheduler: Submitting 2 missing tasks from ShuffleMapStage 0 (MapPartitionsRDD3 at map at ScalaWordCount.scala:14) (first 15 tasks are for partitions Vector(0, 1))

18/11/03 16:20:25 INFO TaskSchedulerImpl: Adding task set 0.0 with 2 tasks

18/11/03 16:20:25 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, localhost, executor driver, partition 0, ANY, 4840 bytes)

18/11/03 16:20:25 INFO TaskSetManager: Starting task 1.0 in stage 0.0 (TID 1, localhost, executor driver, partition 1, ANY, 4840 bytes)

18/11/03 16:20:25 INFO Executor: Running task 1.0 in stage 0.0 (TID 1)

18/11/03 16:20:25 INFO Executor: Running task 0.0 in stage 0.0 (TID 0)

18/11/03 16:20:25 INFO Executor: Fetching spark://192.168.5.182:42065/jars/sparknew-1.0-SNAPSHOT.jar with timestamp 1541233223259

18/11/03 16:20:25 INFO TransportClientFactory: Successfully created connection to /192.168.5.182:42065 after 51 ms (0 ms spent in bootstraps)

18/11/03 16:20:25 INFO Utils: Fetching spark://192.168.5.182:42065/jars/sparknew-1.0-SNAPSHOT.jar to /tmp/spark-d1405a32-85be-40d6-ba26-7abb1632abc4/userFiles-5bb37b1d-3c33-4f7b-acdb-ff85603d088f/fetchFileTemp5672377923687175291.tmp

18/11/03 16:20:26 INFO Executor: Adding file:/tmp/spark-d1405a32-85be-40d6-ba26-7abb1632abc4/userFiles-5bb37b1d-3c33-4f7b-acdb-ff85603d088f/sparknew-1.0-SNAPSHOT.jar to class loader

18/11/03 16:20:26 INFO HadoopRDD: Input split: hdfs://host2:8020/usr/file/a.txt:0 31

18/11/03 16:20:26 INFO HadoopRDD: Input split: hdfs://host2:8020/usr/file/a.txt:31 32

18/11/03 16:20:26 INFO Executor: Finished task 0.0 in stage 0.0 (TID 0). 1155 bytes result sent to driver

18/11/03 16:20:26 INFO Executor: Finished task 1.0 in stage 0.0 (TID 1). 1026 bytes result sent to driver

18/11/03 16:20:26 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 824 ms on localhost (executor driver) (1/2)

18/11/03 16:20:26 INFO TaskSetManager: Finished task 1.0 in stage 0.0 (TID 1) in 810 ms on localhost (executor driver) (2/2)

18/11/03 16:20:26 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool

18/11/03 16:20:26 INFO DAGScheduler: ShuffleMapStage 0 (map at ScalaWordCount.scala:14) finished in 0.858 s

18/11/03 16:20:26 INFO DAGScheduler: looking for newly runnable stages

18/11/03 16:20:26 INFO DAGScheduler: running: Set()

18/11/03 16:20:26 INFO DAGScheduler: waiting: Set(ResultStage 1)

18/11/03 16:20:26 INFO DAGScheduler: failed: Set()

18/11/03 16:20:26 INFO DAGScheduler: Submitting ResultStage 1 (MapPartitionsRDD5 at saveAsTextFile at ScalaWordCount.scala:15), which has no missing parents

18/11/03 16:20:26 INFO MemoryStore: Block broadcast_2 stored as values in memory (estimated size 72.0 KB, free 366.0 MB)

18/11/03 16:20:26 INFO MemoryStore: Block broadcast_2_piece0 stored as bytes in memory (estimated size 25.9 KB, free 365.9 MB)

18/11/03 16:20:26 INFO BlockManagerInfo: Added broadcast_2_piece0 in memory on 192.168.5.182:44491 (size: 25.9 KB, free: 366.2 MB)

18/11/03 16:20:26 INFO SparkContext: Created broadcast 2 from broadcast at DAGScheduler.scala:1006

18/11/03 16:20:26 INFO DAGScheduler: Submitting 2 missing tasks from ResultStage 1 (MapPartitionsRDD5 at saveAsTextFile at ScalaWordCount.scala:15) (first 15 tasks are for partitions Vector(0, 1))

18/11/03 16:20:26 INFO TaskSchedulerImpl: Adding task set 1.0 with 2 tasks

18/11/03 16:20:26 INFO TaskSetManager: Starting task 0.0 in stage 1.0 (TID 2, localhost, executor driver, partition 0, ANY, 4621 bytes)

18/11/03 16:20:26 INFO TaskSetManager: Starting task 1.0 in stage 1.0 (TID 3, localhost, executor driver, partition 1, ANY, 4621 bytes)

18/11/03 16:20:26 INFO Executor: Running task 1.0 in stage 1.0 (TID 3)

18/11/03 16:20:26 INFO Executor: Running task 0.0 in stage 1.0 (TID 2)

18/11/03 16:20:26 INFO ShuffleBlockFetcherIterator: Getting 1 non-empty blocks out of 2 blocks

18/11/03 16:20:26 INFO ShuffleBlockFetcherIterator: Getting 1 non-empty blocks out of 2 blocks

18/11/03 16:20:26 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 8 ms

18/11/03 16:20:26 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 8 ms

18/11/03 16:20:26 INFO BlockManagerInfo: Removed broadcast_1_piece0 on 192.168.5.182:44491 in memory (size: 2.8 KB, free: 366.3 MB)

18/11/03 16:20:26 INFO FileOutputCommitter: File Output Committer Algorithm version is 1

18/11/03 16:20:26 INFO FileOutputCommitter: File Output Committer Algorithm version is 1

18/11/03 16:20:26 INFO FileOutputCommitter: Saved output of task 'attempt_20181103162025_0001_m_000000_2' to hdfs://host2:8020/usr/file/wcount/_temporary/0/task_20181103162025_0001_m_000000

18/11/03 16:20:26 INFO FileOutputCommitter: Saved output of task 'attempt_20181103162025_0001_m_000001_3' to hdfs://host2:8020/usr/file/wcount/_temporary/0/task_20181103162025_0001_m_000001

18/11/03 16:20:26 INFO SparkHadoopMapRedUtil: attempt_20181103162025_0001_m_000001_3: Committed

18/11/03 16:20:26 INFO SparkHadoopMapRedUtil: attempt_20181103162025_0001_m_000000_2: Committed

18/11/03 16:20:26 INFO Executor: Finished task 1.0 in stage 1.0 (TID 3). 1267 bytes result sent to driver

18/11/03 16:20:26 INFO Executor: Finished task 0.0 in stage 1.0 (TID 2). 1267 bytes result sent to driver

18/11/03 16:20:26 INFO TaskSetManager: Finished task 1.0 in stage 1.0 (TID 3) in 325 ms on localhost (executor driver) (1/2)

18/11/03 16:20:26 INFO TaskSetManager: Finished task 0.0 in stage 1.0 (TID 2) in 330 ms on localhost (executor driver) (2/2)

18/11/03 16:20:26 INFO TaskSchedulerImpl: Removed TaskSet 1.0, whose tasks have all completed, from pool

18/11/03 16:20:26 INFO DAGScheduler: ResultStage 1 (saveAsTextFile at ScalaWordCount.scala:15) finished in 0.332 s

18/11/03 16:20:26 INFO DAGScheduler: Job 0 finished: saveAsTextFile at ScalaWordCount.scala:15, took 1.653739 s

18/11/03 16:20:26 INFO SparkUI: Stopped Spark web UI at http://192.168.5.182:4040

18/11/03 16:20:26 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!

18/11/03 16:20:26 INFO MemoryStore: MemoryStore cleared

18/11/03 16:20:26 INFO BlockManager: BlockManager stopped

18/11/03 16:20:26 INFO BlockManagerMaster: BlockManagerMaster stopped

18/11/03 16:20:26 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!

18/11/03 16:20:26 INFO SparkContext: Successfully stopped SparkContext

18/11/03 16:20:26 INFO ShutdownHookManager: Shutdown hook called

18/11/03 16:20:26 INFO ShutdownHookManager: Deleting directory /tmp/spark-d1405a32-85be-40d6-ba26-7abb1632abc4

这个时候我们来查看保存在hadoop hdfs中的结果。

root@host2 bin# ./hdfs dfs -lsr /

lsr: DEPRECATED: Please use 'ls -R' instead.

drwxr-xr-x - root supergroup 0 2018-09-14 13:44 /usr

drwxr-xr-x - root supergroup 0 2018-11-03 16:20 /usr/file

-rw-r--r-- 3 root supergroup 63 2018-11-03 16:06 /usr/file/a.txt

-rw-r--r-- 3 root supergroup 173271626 2018-09-14 13:50 /usr/file/jdk-8u45-linux-x64.tar.gz

drwxr-xr-x - root supergroup 0 2018-11-03 16:20 /usr/file/wcount

-rw-r--r-- 3 root supergroup 0 2018-11-03 16:20 /usr/file/wcount/_SUCCESS

-rw-r--r-- 3 root supergroup 78 2018-11-03 16:20 /usr/file/wcount/part-00000

-rw-r--r-- 3 root supergroup 37 2018-11-03 16:20 /usr/file/wcount/part-00001

root@host2 bin# ./hdfs dfs -cat /usr/file/wcount/part-00000

(him,1)

(park,1)

(fool,1)

(dinsh,1)

(fish,1)

(dog,1)

(apple,1)

(cry,1)

(my,1)

root@host2 bin# ./hdfs dfs -cat /usr/file/wcount/part-00001

(ice,1)

(cark,1)

(balana,1)

(fuck,1)

这样我们就得到了我们需要的结果,文本文件a.txt的单词统计,当然这种处理主要是一种离线处理,跟Web程序的关联不大,要做实时处理还要用到Spark streaming。

当然Spark也支持命令行式的操作,类似于Scala一样,如下,我们给本次操作分配5G内存,16线程

root@host2 bin# ./spark-shell --master spark://host2:7077,host1:7077 --executor-memory 5g --total-executor-cores 16

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties

Setting default log level to "WARN".

To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).

18/11/03 20:59:54 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable

18/11/03 21:00:01 WARN ObjectStore: Failed to get database global_temp, returning NoSuchObjectException

Spark context Web UI available at http://192.168.5.182:4040

Spark context available as 'sc' (master = spark://host2:7077,host1:7077, app id = app-20181103205956-0001).

Spark session available as 'spark'.

Welcome to

代码语言:txt复制
   ____              __
代码语言:txt复制
  / __/__  ___ _____/ /__
代码语言:txt复制
 _ / _ / _ `/ __/  '_/
代码语言:txt复制
/___/ .__/\_,_/_/ /_/\_   version 2.2.0
代码语言:txt复制
   /_/

Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_45)

Type in expressions to have them evaluated.

Type :help for more information.

scala>

比如我们要查看a.txt里面有多少个单词

scala> val r1 = sc.textFile("hdfs://host2:8020/usr/file/a.txt")

r1: org.apache.spark.rdd.RDDString = hdfs://host2:8020/usr/file/a.txt MapPartitionsRDD1 at textFile at <console>:24

我们可以看到他的返回值是一个RDD类型,那RDD是什么呢,一张图来说明

scala> r1.count

res0: Long = 1

scala> r1.flatMap(_.split(" ")).map((_,1)).count

res1: Long = 13

我们对这个RDD操作,行数为1行,单词数为13个单词

最后我们来看一下Spark的8091端口的系统界面

我们执行过的任务在这里面都可以有一些记录。

0 人点赞