傻白甜,约不约?

2020-10-29 14:29:01 浏览数 (1)

首先来介绍下,今天的主角“傻白甜”(SBT:Simple Build Tools), 其功能与 Maven 和 Gradle 类似。其是由 Scala 编写,对于新手入门不是太友好,如果只是写纯 Java 的 Bug ,大可不必和自己过不去,但是如果你经常使用 Spark 等大数据工具,还是有点必要学学使用的。而且 Sbt 默认会从一些奇奇怪怪的地方下载依赖,相信大家的第一次,都不会很美好( Sbt 的项目构建异常缓慢,而且还经常会失败),笔者也不例外,所以有了这篇文章,希望对你有些帮助。

提要:本文首先会介绍如果要使用sbt,需要完成那些配置,然后使用sbt,构建2个helloworld,一个基于akka的,一个是基于delta的。

配置全局仓库

在用户目录下,修改文件 C:Usersdafei.sbtrepositories , 如果没有,可以自行创建目录及文件。

添加内容:

代码语言:javascript复制
[repositories]
local
ali: https://maven.aliyun.com/repository/central/
huaweicloud-maven: https://repo.huaweicloud.com/repository/maven/
maven-central: https://repo1.maven.org/maven2/
sbt-plugin-repo: https://repo.scala-sbt.org/scalasbt/sbt-plugin-releases, [organization]/[module]/(scala_[scalaVersion]/)(sbt_[sbtVersion]/)[revision]/[type]s/[artifact](-[classifier]).[ext]

IDEA 配置

File -> Settings -> Build,Execution,Deployment -> Build Tools -> sbt

添加Vm参数 -Dsbt.override.build.repos=true , 也可以通过设置环境变量 SBT_OPTS="-Dsbt.override.build.repos=true" 实现。这里需要说明,如果需要指定自己的 sbt ,并不能像其他软件,设置倒跟目录就行,这里需要指定倒 bin 目录下的 sbt-launch.jar

编译一下,会发现舒爽很多了。

sbt 项目依赖

在使用 scala 的时候,大家一定要注意自己的sdk版本以及配置的依赖包的版本要一致,如果不符,就会出现各种奇怪的问题

代码语言:javascript复制
libraryDependencies  = "org.apache.spark" %% "spark-core" % "3.0.1"
libraryDependencies  = "org.apache.spark" %% "spark-sql" % "3.0.1"
libraryDependencies  = "org.apache.spark" %% "spark-catalyst" % "3.0.1"
libraryDependencies  = "org.apache.spark" %% "spark-streaming" % "3.0.1"

libraryDependencies  = "io.delta" %% "delta-core" % "0.7.0"


libraryDependencies  = "com.typesafe.akka" %% "akka-actor" % "2.6.10"
libraryDependencies  = "com.typesafe.akka" %% "akka-remote" % "2.6.10"
libraryDependencies  = "com.typesafe.akka" %% "akka-stream" % "2.6.10"

akka

Akka是JAVA虚拟机平台上构建高并发、分布式和容错应用的工具包和运行时。Akka用Scala语言编写,同时提供了Scala和Java的开发接口。Akka处理并发的方法基于Actor模型,Actor之间通信的唯一机制就是消息传递。

Akka特点:

  1. 对并发模型进行了更高的抽象
  2. 是异步、非阻塞、高性能的事件驱动编程模型
  3. 是轻量级事件处理(1GB内存可容纳百万级别个Actor)
  4. 它提供了一种称为Actor的并发模型,其粒度比线程更小,你可以在系统中启用大量的Actor。
  5. 它提供了一套容错机制,允许在Actor出现异常时进行一些恢复或重置操作。
  6. Akka既可以在单机上构建高并发程序,也可以在网络中构建分布式程序,并提供位置透明的Actor定位服务。

代码案例:

代码语言:javascript复制
package cn.datahub

import akka.actor.{Actor, ActorSystem, Props}

import scala.io.StdIn

class HelloActor extends Actor{
  // 重写接受消息的偏函数,其功能是接受消息并处理
  override def receive: Receive = {
    case "你好帅" => println("竟说实话,我喜欢你这种人!")
    case "丑八怪" => println("滚犊子 !")
    case "stop" => {
      context.stop(self) // 停止自己的actorRef
      context.system.terminate() // 关闭ActorSystem,即关闭其内部的线程池(ExcutorService)
    }
  }
}

object HelloActor {
  /**
   * 创建线程池对象MyFactory,用来创建actor的对象的
   */
  private val MyFactory = ActorSystem("myFactory")    //里面的"myFactory"参数为线程池的名称
  /**
   *     通过MyFactory.actorOf方法来创建一个actor,注意,Props方法的第一个参数需要传递我们自定义的HelloActor类,
   * 第二个参数是给actor起个名字
   */
  private val helloActorRef = MyFactory.actorOf(Props[HelloActor], "helloActor")

  def main(args: Array[String]): Unit = {
    var flag = true
    while(flag){
      /**
       * 接受用户输入的字符串
       */
      print("请输入您想发送的消息:")
      val consoleLine:String = StdIn.readLine()
      /**
       * 使用helloActorRef来给自己发送消息,helloActorRef有一个叫做感叹号("!")的方法来发送消息
       */
      helloActorRef ! consoleLine
      if (consoleLine.equals("stop")){
        flag = false
        println("程序即将结束!")
      }
      /**
       * 为了不让while的运行速度在receive方法之上,我们可以让他休眠0.1秒
       */
      Thread.sleep(100)
    }
  }
}

执行效果:

Delta Lake

Delta Lake 是一个存储层,为 Apache Spark 和大数据 workloads 提供 ACID 事务能力,其通过写和快照隔离之间的乐观并发控制(optimistic concurrency control),在写入数据期间提供一致性的读取,从而为构建在 HDFS 和云存储上的数据湖(data lakes)带来可靠性。Delta Lake 还提供内置数据版本控制,以便轻松回滚。

代码案例:

代码语言:javascript复制
package cn.datahub

import io.delta.tables.DeltaTable
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.expr

object Delta {
  def main(args: Array[String]): Unit = {

    System.setProperty("hadoop.home.dir", "E:\devlop\workspace\wanwansbt3")

    val spark = SparkSession.builder.appName("MyApp").master("local")
      //      .config("spark.jars.packages", "io.delta:delta-core_2.12:0.7.0")
      .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
      .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
      .getOrCreate()

    // create table
//    val data = spark.range(0, 50)
//    data.write.format("delta").save("./data/delta-table")

    // load table
    val deltaTable = DeltaTable.forPath(spark, "./data/delta-table")
    deltaTable.toDF.show();

    // update
    deltaTable.update(
      condition = expr("id % 2 == 0"),
      set = Map("id" -> expr("id   100"))
    )
    deltaTable.toDF.show()

  }
}

执行效果:

代码仓库 https://github.com/dafei1288/wanwansbt3

0 人点赞