restapi(1)- 文件上传下载服务

2019-07-19 16:41:52 浏览数 (1)

上次对restapi开了个头,设计了一个包括了身份验证和使用权限的restful服务开发框架。这是一个通用框架,开发人员只要直接往里面加新功能就行了。虽然这次的restapi是围绕着数据库表的CRUD操作设计的,但文件类数据在服务端与客户端之间的交换其实也很常用,特别是多媒体类如图片等文件类型。那我们就试着设计一个文件交换服务功能然后看看能不能很方便的加入到restapi框架内。

akka-http是以akka-stream为核心的,使用了大量的akka-stream功能。akka-stream中有一个FileIO组件库,里面提供了一系列有关文件读写功能,以数据流Source,Sink方式呈现:

代码语言:javascript复制
...
  def fromPath(f: Path, chunkSize: Int = 8192): Source[ByteString, Future[IOResult]] =
    fromPath(f, chunkSize, startPosition = 0)
...
  def toPath(
      f: Path,
      options: Set[OpenOption] = Set(WRITE, TRUNCATE_EXISTING, CREATE)): Sink[ByteString, Future[IOResult]] =
    toPath(f, options, startPosition = 0)

可以发现,这些Source,Sink都是以ByteString为流元素进行操作的,akka-http自带了ByteString的Marshaller,可以实现数据格式自动转换,在网络传输中不需要增加什么数据格式转换动作。先用FileIO来产生一个Source[ByteString,_]:

代码语言:javascript复制
package com.datatech.restapi
import akka.stream._
import akka.stream.scaladsl._
import java.nio.file._
import akka.util._
object FileStreaming {

  def fileStreamSource(filePath: String, chunkSize: Int = 1024, dispatcherName: String = ""): Source[ ByteString,Any] = {
    def loadFile  = {
      //   implicit val ec = httpSys.dispatchers.lookup("akka.http.blocking-ops-dispatcher")
      val file = Paths.get(filePath)
      if (dispatcherName != "")
        FileIO.fromPath(file, chunkSize)
         .withAttributes(ActorAttributes.dispatcher(dispatcherName))
      else
        FileIO.fromPath(file, chunkSize)
    }
    loadFile
  }
}

注意,我们可以用akka系统之外的线程池来进行FileIO操作,可以避免影响akka系统的运行效率。dispatcherName标注了在application.conf里自定义的线程池:

代码语言:javascript复制
akka {
  http {
    blocking-ops-dispatcher {
      type = Dispatcher
      executor = "thread-pool-executor"
      thread-pool-executor {
        // or in Akka 2.4.2 
        fixed-pool-size = 16
      }
      throughput = 100
    }
  }
}

下面是File功能架构FileRoute的设计:

代码语言:javascript复制
package com.datatech.restapi
import akka.actor._
import akka.stream._
import akka.stream.scaladsl._
import akka.http.scaladsl.server.Directives._
import akka.http.scaladsl.model._
import akka.http.scaladsl.coding.Gzip
import java.nio.file._
import FileStreaming._
import AuthBase._

case class FileRoute(jwt: String)(implicit auth: AuthBase, sys: ActorSystem) {

  val destPath = "/users/tiger-macpro/cert4/meme.jpg"
  implicit val mat = ActorMaterializer()
  val route = pathPrefix("file") {
    val privatePath = auth.tempDirFromJwt(jwt)
    if (privatePath.length == 0)
      complete(StatusCodes.NotFound)

    (get & path(Remaining)) { filename =>
      withoutSizeLimit {
        encodeResponseWith(Gzip) {
          complete(
            HttpEntity(
              ContentTypes.`application/octet-stream`,
              fileStreamSource(privatePath "/download/" filename, 1024))
          )
        }
      }
    } ~
      (post &  parameters('filename)) { filename =>
        withoutSizeLimit {
          decodeRequest {
            extractDataBytes { bytes =>
              val fut = bytes.runWith(FileIO.toPath(Paths.get(privatePath "/upload/" filename)))
              onComplete(fut) { _ => complete(StatusCodes.OK)}
            }
          }
        }

      }

  }
}

每个用户在服务端都应该有个独立的文件目录,这个刚好可以放在jwt里:

代码语言:javascript复制
package com.datatech.restapi
import akka.http.scaladsl.server.directives.Credentials
import AuthBase._
object MockUserAuthService {

  case class User(username: String, password: String, userInfo: UserInfo)
  val validUsers = Seq(User("johnny", "p4ssw0rd",
       Map("shopid" -> "1101", "userid" -> "101", "tmpdir" ->"/users/tiger-macpro/1101101"))
    ,User("tiger", "secret",
      Map("shopid" -> "1101" , "userid" -> "102", "tmpdir" ->"/users/tiger-macpro/1101102")))

  def getValidUser(credentials: Credentials): Option[UserInfo] =
    credentials match {
      case p @ Credentials.Provided(_) =>
        validUsers.find(user => user.username == p.identifier && p.verify(user.password)) match {
          case Some(user) => Some(user.userInfo)
          case _ => None
        }
      case _ => None
    }

}

个人目录tmpdir是放在UserInfo里的,我们只需要从jwt里解析分离出来:

代码语言:javascript复制
   def tempDirFromJwt(jwt: String): String = {
      val optUserInfo = getUserInfo(jwt)
      val dir: String = optUserInfo match {
        case Some(m) =>
          try {
            m("tmpdir").toString
          } catch {case err: Throwable => ""}
        case None => ""
      }
      dir
    }

文件交换服务是需要使用权限的,所以FileRoute要放在authenticateOAuth2下面:

代码语言:javascript复制
 val route =
     path("auth") {
        authenticateBasic(realm = "auth", authenticator.getUserInfo) { userinfo =>
          post { complete(authenticator.issueJwt(userinfo))}
        }
     } ~
       pathPrefix("api") {
          authenticateOAuth2(realm = "api", authenticator.authenticateToken) { validToken =>
            (path("hello") & get) {
              complete(s"Hello! userinfo = ${authenticator.getUserInfo(validToken)}")
            } ~
            (path("how are you") & get) {
              complete(s"Hello! userinfo = ${authenticator.getUserInfo(validToken)}")
            } ~
              FileRoute(validToken)
                .route
            // ~ ...
          }
     }

写一个客户端来测试文件交换服务:

代码语言:javascript复制
import akka.stream._
import java.nio.file._
import java.io._
import akka.http.scaladsl.model.headers._
import scala.concurrent._
import com.datatech.restapi.FileStreaming._
import scala.concurrent.duration._
import akka.actor.ActorSystem
import akka.http.scaladsl.marshalling.Marshal
import akka.http.scaladsl.model._
import akka.http.scaladsl.Http
import akka.stream.scaladsl.{FileIO, Source}
import scala.util._


case class FileUtil(implicit sys: ActorSystem) {
  import sys.dispatcher
  implicit val mat = ActorMaterializer()
  def createEntity(file: File): RequestEntity = {
    require(file.exists())
    val formData =
      Multipart.FormData(
        Source.single(
          Multipart.FormData.BodyPart(
            "test",
            HttpEntity(MediaTypes.`application/octet-stream`, file.length(), FileIO.fromPath(file.toPath, chunkSize = 100000)), // the chunk size here is currently critical for performance
            Map("filename" -> file.getName))))
    Await.result(Marshal(formData).to[RequestEntity], 3 seconds)
  }

  def uploadFile(request: HttpRequest, dataEntity: RequestEntity) = {
    implicit val mat = ActorMaterializer()
    import sys.dispatcher
    val futResp = Http(sys).singleRequest(
   //   Gzip.encodeMessage(
        request.copy(entity = dataEntity)   //.addHeader(`Content-Encoding`(HttpEncodings.gzip))
   //   )
    )
    futResp
      .andThen {
        case Success(r@HttpResponse(StatusCodes.OK, _, entity, _)) =>
          entity.dataBytes.map(_.utf8String).runForeach(println)
        case Success(r@HttpResponse(code, _, _, _)) =>
          println(s"Upload request failed, response code: $code")
          r.discardEntityBytes()
        case Success(_) => println("Unable to Upload file!")
        case Failure(err) => println(s"Upload failed: ${err.getMessage}")
      }
  }

  def downloadFileTo(request: HttpRequest, destPath: String) = {
  //  val req = request.addHeader(`Content-Encoding`(HttpEncodings.gzip))
    val futResp = Http(sys).singleRequest(request)  //.map(Gzip.decodeMessage(_))
    futResp
      .andThen {
        case Success(r@HttpResponse(StatusCodes.OK, _, entity, _)) =>
          entity.withoutSizeLimit().dataBytes.runWith(FileIO.toPath(Paths.get(destPath)))
            .onComplete { case _ => println(s"Download file saved to: $destPath") }
        case Success(r@HttpResponse(code, _, _, _)) =>
          println(s"Download request failed, response code: $code")
          r.discardEntityBytes()
        case Success(_) => println("Unable to download file!")
        case Failure(err) => println(s"Download failed: ${err.getMessage}")
      }

  }

}

object TestFileClient  {
  type UserInfo = Map[String,Any]
  def main(args: Array[String]): Unit = {
    implicit val system = ActorSystem()
    implicit val materializer = ActorMaterializer()
    // needed for the future flatMap/onComplete in the end
    implicit val executionContext = system.dispatcher

    val helloRequest = HttpRequest(uri = "http://192.168.11.189:50081/")

    val authorization = headers.Authorization(BasicHttpCredentials("johnny", "p4ssw0rd"))
    val authRequest = HttpRequest(
      HttpMethods.POST,
      uri = "http://192.168.11.189:50081/auth",
      headers = List(authorization)
    )


    val futToken: Future[HttpResponse] = Http().singleRequest(authRequest)

    val respToken = for {
      resp <- futToken
      jstr <- resp.entity.dataBytes.runFold("") {(s,b) => s   b.utf8String}
    } yield jstr

    val jstr =  Await.result[String](respToken,2 seconds)
    println(jstr)

    scala.io.StdIn.readLine()

    val authentication = headers.Authorization(OAuth2BearerToken(jstr))

    val entity = HttpEntity(
      ContentTypes.`application/octet-stream`,
      fileStreamSource("/Users/tiger-macpro/cert3/ctiger.jpg",1024)
    )
    //
    val chunked = HttpEntity.Chunked.fromData(
      ContentTypes.`application/octet-stream`,
      fileStreamSource("/Users/tiger-macpro/cert3/ctiger.jpg",1024)
    )

    val multipart = FileUtil().createEntity(new File("/Users/tiger-macpro/cert3/ctiger.jpg"))

    val uploadRequest = HttpRequest(
      HttpMethods.POST,
      uri = "http://192.168.11.189:50081/api/file?filename=mypic.jpg",
    ).addHeader(authentication)

    //upload file
    //Await.ready(FileUtil().uploadFile(uploadRequest,entity),2 seconds)
    //Await.ready(FileUtil().uploadFile(uploadRequest,chunked),2 seconds)
    Await.ready(FileUtil().uploadFile(uploadRequest,multipart),2 seconds)

    val dlRequest = HttpRequest(
      HttpMethods.GET,
      uri = "http://192.168.11.189:50081/api/file/mypic.jpg",
    ).addHeader(authentication)

    FileUtil().downloadFileTo(dlRequest, "/users/tiger-macpro/cert3/mypic.jpg")

    scala.io.StdIn.readLine()
    system.terminate()
  }

}

在文件上传upload时试过用entity,chunked,multipart方式构建的request-entity,服务端都能处理。好像看过很多java的httpclient图片上传,都是用multipart entity。现在这个服务端能正确处理。当然,在服务端同样可以用multipart方式提供文件下载服务,就不在这里实现了。不过可以提供一段示范代码:

代码语言:javascript复制
import akka.actor._
import akka.stream._
import java.nio.file._
import java.io._

import akka.http.scaladsl.unmarshalling.Unmarshal
import akka.util.ByteString

import scala.concurrent.duration._
import akka.actor.ActorSystem
import akka.http.scaladsl.Http.ServerBinding
import akka.http.scaladsl.marshalling.Marshal
import akka.http.scaladsl.model._
import akka.http.scaladsl.server.Route
import akka.http.scaladsl.Http
import akka.stream.scaladsl.{FileIO, Source}
import com.typesafe.config.{Config, ConfigFactory}

import scala.concurrent.Future

object TestMultipartFileUpload extends App {
  val testConf: Config = ConfigFactory.parseString("""
    akka.loglevel = INFO
    akka.log-dead-letters = off""")
  implicit val system = ActorSystem("ServerTest", testConf)
  import system.dispatcher
  implicit val materializer = ActorMaterializer()

  val testFile: File = new File("/users/tiger-macpro/downloads/uploadFileDemo.scala")  //args(0))

  def startTestServer(): Future[ServerBinding] = {
    import akka.http.scaladsl.server.Directives._

    val route: Route =
      path("upload") {
        entity(as[Multipart.FormData]) { (formdata: Multipart.FormData) ⇒

          val fileNamesFuture = formdata.parts.mapAsync(1) { p ⇒
            println(s"Got part. name: ${p.name} filename: ${p.filename}")

            println("Counting size...")
            @volatile var lastReport = System.currentTimeMillis()
            @volatile var lastSize = 0L
            def receiveChunk(counter: (Long, Long), chunk: ByteString): (Long, Long) = {
              val (oldSize, oldChunks) = counter
              val newSize = oldSize   chunk.size
              val newChunks = oldChunks   1

              val now = System.currentTimeMillis()
              if (now > lastReport   1000) {
                val lastedTotal = now - lastReport
                val bytesSinceLast = newSize - lastSize
                val speedMBPS = bytesSinceLast.toDouble / 1000000 /* bytes per MB */ / lastedTotal * 1000 /* millis per second */

                println(f"Already got $newChunks} chunks with total size $newSized bytes avg chunksize ${newSize / newChunks}} bytes/chunk speed: $speedMBPS%6.2f MB/s")

                lastReport = now
                lastSize = newSize
              }
              (newSize, newChunks)
            }

            p.entity.dataBytes.runFold((0L, 0L))(receiveChunk).map {
              case (size, numChunks) ⇒
                println(s"Size is $size")
                (p.name, p.filename, size)
            }
          }.runFold(Seq.empty[(String, Option[String], Long)])(_ :  _).map(_.mkString(", "))

          complete {
            fileNamesFuture
          }
        }
      }
    Http().bindAndHandle(route, interface = "localhost", port = 0)
  }

  def createEntity(file: File): Future[RequestEntity] = {
    require(file.exists())
    val formData =
      Multipart.FormData(
        Source.single(
          Multipart.FormData.BodyPart(
            "test",
            HttpEntity(MediaTypes.`application/octet-stream`, file.length(), FileIO.fromPath(file.toPath, chunkSize = 100000)), // the chunk size here is currently critical for performance
            Map("filename" -> file.getName))))
    Marshal(formData).to[RequestEntity]
  }

  def createRequest(target: Uri, file: File): Future[HttpRequest] =
    for {
      e ← createEntity(file)
    } yield HttpRequest(HttpMethods.POST, uri = target, entity = e)

  try {
    val result =
      for {
        ServerBinding(address) ← startTestServer()
        _ = println(s"Server up at $address")
        port = address.getPort
        target = Uri(scheme = "http", authority = Uri.Authority(Uri.Host("localhost"), port = port), path = Uri.Path("/upload"))
        req ← createRequest(target, testFile)
        _ = println(s"Running request, uploading test file of size ${testFile.length} bytes")
        response ← Http().singleRequest(req)
        responseBodyAsString ← Unmarshal(response).to[String]
      } yield responseBodyAsString

    result.onComplete { res ⇒
      println(s"The result was $res")
      system.terminate()
    }

    system.scheduler.scheduleOnce(60.seconds) {
      println("Shutting down after timeout...")
      system.terminate()
    }
  } catch {
    case _: Throwable ⇒ system.terminate()
  }
}

上面这个示范里包括了服务端,客户端对multipart的数据处理。

在上面这个例子里我们先设计了一个独立的包括文件交换服务功能的FileRoute类,然后直接把FileRoute.route贴在主菜单后面就完成了文件交换服务功能的添加。比较接近实现restapi设计初衷。

下面是本次示范源代码:

build.sbt

代码语言:javascript复制
name := "restapi"

version := "0.1"

scalaVersion := "2.12.8"

libraryDependencies   = Seq(
  "com.typesafe.akka" %% "akka-http"   % "10.1.8",
  "com.typesafe.akka" %% "akka-stream" % "2.5.23",
  "com.pauldijou" %% "jwt-core" % "3.0.1",
  "de.heikoseeberger" %% "akka-http-json4s" % "1.22.0",
  "org.json4s" %% "json4s-native" % "3.6.1",
  "com.typesafe.akka" %% "akka-http-spray-json" % "10.1.8",
  "com.typesafe.scala-logging" %% "scala-logging" % "3.9.0",
  "org.slf4j" % "slf4j-simple" % "1.7.25",
  "org.json4s" %% "json4s-jackson" % "3.6.7",
  "org.json4s" %% "json4s-ext" % "3.6.7"
)

auth/AuthBase.scala

代码语言:javascript复制
package com.datatech.restapi

import akka.http.scaladsl.server.directives.Credentials
import pdi.jwt._
import org.json4s.native.Json
import org.json4s._
import org.json4s.jackson.JsonMethods._
import pdi.jwt.algorithms._
import scala.util._

object AuthBase {
  type UserInfo = Map[String, Any]
  case class AuthBase(
                       algorithm: JwtAlgorithm = JwtAlgorithm.HMD5,
                       secret: String = "OpenSesame",
                       getUserInfo: Credentials => Option[UserInfo] = null) {
    ctx =>
    def withAlgorithm(algo: JwtAlgorithm): AuthBase = ctx.copy(algorithm=algo)
    def withSecretKey(key: String): AuthBase = ctx.copy(secret = key)
    def withUserFunc(f: Credentials => Option[UserInfo]): AuthBase = ctx.copy(getUserInfo = f)

    def authenticateToken(credentials: Credentials): Option[String] =
      credentials match {
        case Credentials.Provided(token) =>
          algorithm match {
            case algo: JwtAsymmetricAlgorithm =>
              Jwt.isValid(token, secret, Seq((algorithm.asInstanceOf[JwtAsymmetricAlgorithm]))) match {
                case true => Some(token)
                case _ => None
              }
            case _ =>
              Jwt.isValid(token, secret, Seq((algorithm.asInstanceOf[JwtHmacAlgorithm]))) match {
                case true => Some(token)
                case _ => None
              }
          }
        case _ => None
      }

    def getUserInfo(token: String): Option[UserInfo] = {
      algorithm match {
        case algo: JwtAsymmetricAlgorithm =>
          Jwt.decodeRawAll(token, secret, Seq(algorithm.asInstanceOf[JwtAsymmetricAlgorithm])) match {
            case Success(parts) => Some(((parse(parts._2).asInstanceOf[JObject])  "userinfo").values.asInstanceOf[UserInfo])
            case Failure(err) => None
          }
        case _ =>
          Jwt.decodeRawAll(token, secret, Seq(algorithm.asInstanceOf[JwtHmacAlgorithm])) match {
            case Success(parts) => Some(((parse(parts._2).asInstanceOf[JObject])  "userinfo").values.asInstanceOf[UserInfo])
            case Failure(err) => None
          }
      }
    }

    def issueJwt(userinfo: UserInfo): String = {
      val claims = JwtClaim()   Json(DefaultFormats).write(("userinfo", userinfo))
      Jwt.encode(claims, secret, algorithm)
    }

    def tempDirFromJwt(jwt: String): String = {
      val optUserInfo = getUserInfo(jwt)
      val dir: String = optUserInfo match {
        case Some(m) =>
          try {
            m("tmpdir").toString
          } catch {case err: Throwable => ""}
        case None => ""
      }
      dir
    }

  }

}

file/FileStreaming.scala

代码语言:javascript复制
package com.datatech.restapi
import akka.stream._
import akka.stream.scaladsl._
import java.nio.file._
import akka.util._
object FileStreaming {

  def fileStreamSource(filePath: String, chunkSize: Int = 1024, dispatcherName: String = ""): Source[ ByteString,Any] = {
    def loadFile  = {
      //   implicit val ec = httpSys.dispatchers.lookup("akka.http.blocking-ops-dispatcher")
      val file = Paths.get(filePath)
      if (dispatcherName != "")
        FileIO.fromPath(file, chunkSize)
         .withAttributes(ActorAttributes.dispatcher(dispatcherName))
      else
        FileIO.fromPath(file, chunkSize)
    }
    loadFile
  }
}

file/FileRoute.scala

代码语言:javascript复制
package com.datatech.restapi
import akka.actor._
import akka.stream._
import akka.stream.scaladsl._
import akka.http.scaladsl.server.Directives._
import akka.http.scaladsl.model._
import akka.http.scaladsl.coding.Gzip
import java.nio.file._
import FileStreaming._
import AuthBase._

case class FileRoute(jwt: String)(implicit auth: AuthBase, sys: ActorSystem) {

  val destPath = "/users/tiger-macpro/cert4/meme.jpg"
  implicit val mat = ActorMaterializer()
  val route = pathPrefix("file") {
    val privatePath = auth.tempDirFromJwt(jwt)
    if (privatePath.length == 0)
      complete(StatusCodes.NotFound)

    (get & path(Remaining)) { filename =>
      withoutSizeLimit {
        encodeResponseWith(Gzip) {
          complete(
            HttpEntity(
              ContentTypes.`application/octet-stream`,
              fileStreamSource(privatePath "/download/" filename, 1024))
          )
        }
      }
    } ~
      (post &  parameters('filename)) { filename =>
        withoutSizeLimit {
          decodeRequest {
            extractDataBytes { bytes =>
              val fut = bytes.runWith(FileIO.toPath(Paths.get(privatePath "/upload/" filename)))
              onComplete(fut) { _ => complete(StatusCodes.OK)}
            }
          }
        }

      }

  }
}

MockUserAuthService.scala

代码语言:javascript复制
package com.datatech.restapi
import akka.http.scaladsl.server.directives.Credentials
import AuthBase._
object MockUserAuthService {

  case class User(username: String, password: String, userInfo: UserInfo)
  val validUsers = Seq(User("johnny", "p4ssw0rd",
       Map("shopid" -> "1101", "userid" -> "101", "tmpdir" ->"/users/tiger-macpro/1101101"))
    ,User("tiger", "secret",
      Map("shopid" -> "1101" , "userid" -> "102", "tmpdir" ->"/users/tiger-macpro/1101102")))

  def getValidUser(credentials: Credentials): Option[UserInfo] =
    credentials match {
      case p @ Credentials.Provided(_) =>
        validUsers.find(user => user.username == p.identifier && p.verify(user.password)) match {
          case Some(user) => Some(user.userInfo)
          case _ => None
        }
      case _ => None
    }

}

RestApiServer.scala

代码语言:javascript复制
package com.datatech.restapi

import akka.actor._
import akka.stream._
import akka.http.scaladsl.Http
import akka.http.scaladsl.server.Directives._
import pdi.jwt._
import AuthBase._
import MockUserAuthService._

object RestApiServer extends App {

  implicit val httpSys = ActorSystem("httpSystem")
  implicit val httpMat = ActorMaterializer()
  implicit val httpEC = httpSys.dispatcher



  implicit val authenticator = new AuthBase()
    .withAlgorithm(JwtAlgorithm.HS256)
    .withSecretKey("OpenSesame")
    .withUserFunc(getValidUser)

  val route =
     path("auth") {
        authenticateBasic(realm = "auth", authenticator.getUserInfo) { userinfo =>
          post { complete(authenticator.issueJwt(userinfo))}
        }
     } ~
       pathPrefix("api") {
          authenticateOAuth2(realm = "api", authenticator.authenticateToken) { validToken =>
            (path("hello") & get) {
              complete(s"Hello! userinfo = ${authenticator.getUserInfo(validToken)}")
            } ~
            (path("how are you") & get) {
              complete(s"Hello! userinfo = ${authenticator.getUserInfo(validToken)}")
            } ~
              FileRoute(validToken)
                .route
            // ~ ...
          }
     }


  val (port, host) = (50081,"192.168.11.189")

  val bindingFuture = Http().bindAndHandle(route,host,port)

  println(s"Server running at $host $port. Press any key to exit ...")

  scala.io.StdIn.readLine()


  bindingFuture.flatMap(_.unbind())
  .onComplete(_ => httpSys.terminate())


}

TestFileClient.scala

代码语言:javascript复制
import akka.stream._
import java.nio.file._
import java.io._
import akka.http.scaladsl.model.headers._
import scala.concurrent._
import com.datatech.restapi.FileStreaming._
import scala.concurrent.duration._
import akka.actor.ActorSystem
import akka.http.scaladsl.marshalling.Marshal
import akka.http.scaladsl.model._
import akka.http.scaladsl.Http
import akka.stream.scaladsl.{FileIO, Source}
import scala.util._


case class FileUtil(implicit sys: ActorSystem) {
  import sys.dispatcher
  implicit val mat = ActorMaterializer()
  def createEntity(file: File): RequestEntity = {
    require(file.exists())
    val formData =
      Multipart.FormData(
        Source.single(
          Multipart.FormData.BodyPart(
            "test",
            HttpEntity(MediaTypes.`application/octet-stream`, file.length(), FileIO.fromPath(file.toPath, chunkSize = 100000)), // the chunk size here is currently critical for performance
            Map("filename" -> file.getName))))
    Await.result(Marshal(formData).to[RequestEntity], 3 seconds)
  }

  def uploadFile(request: HttpRequest, dataEntity: RequestEntity) = {
    implicit val mat = ActorMaterializer()
    import sys.dispatcher
    val futResp = Http(sys).singleRequest(
   //   Gzip.encodeMessage(
        request.copy(entity = dataEntity)   //.addHeader(`Content-Encoding`(HttpEncodings.gzip))
   //   )
    )
    futResp
      .andThen {
        case Success(r@HttpResponse(StatusCodes.OK, _, entity, _)) =>
          entity.dataBytes.map(_.utf8String).runForeach(println)
        case Success(r@HttpResponse(code, _, _, _)) =>
          println(s"Upload request failed, response code: $code")
          r.discardEntityBytes()
        case Success(_) => println("Unable to Upload file!")
        case Failure(err) => println(s"Upload failed: ${err.getMessage}")
      }
  }

  def downloadFileTo(request: HttpRequest, destPath: String) = {
  //  val req = request.addHeader(`Content-Encoding`(HttpEncodings.gzip))
    val futResp = Http(sys).singleRequest(request)  //.map(Gzip.decodeMessage(_))
    futResp
      .andThen {
        case Success(r@HttpResponse(StatusCodes.OK, _, entity, _)) =>
          entity.withoutSizeLimit().dataBytes.runWith(FileIO.toPath(Paths.get(destPath)))
            .onComplete { case _ => println(s"Download file saved to: $destPath") }
        case Success(r@HttpResponse(code, _, _, _)) =>
          println(s"Download request failed, response code: $code")
          r.discardEntityBytes()
        case Success(_) => println("Unable to download file!")
        case Failure(err) => println(s"Download failed: ${err.getMessage}")
      }

  }

}

object TestFileClient  {
  type UserInfo = Map[String,Any]
  def main(args: Array[String]): Unit = {
    implicit val system = ActorSystem()
    implicit val materializer = ActorMaterializer()
    // needed for the future flatMap/onComplete in the end
    implicit val executionContext = system.dispatcher

    val helloRequest = HttpRequest(uri = "http://192.168.11.189:50081/")

    val authorization = headers.Authorization(BasicHttpCredentials("johnny", "p4ssw0rd"))
    val authRequest = HttpRequest(
      HttpMethods.POST,
      uri = "http://192.168.11.189:50081/auth",
      headers = List(authorization)
    )


    val futToken: Future[HttpResponse] = Http().singleRequest(authRequest)

    val respToken = for {
      resp <- futToken
      jstr <- resp.entity.dataBytes.runFold("") {(s,b) => s   b.utf8String}
    } yield jstr

    val jstr =  Await.result[String](respToken,2 seconds)
    println(jstr)

    scala.io.StdIn.readLine()

    val authentication = headers.Authorization(OAuth2BearerToken(jstr))

    val entity = HttpEntity(
      ContentTypes.`application/octet-stream`,
      fileStreamSource("/Users/tiger-macpro/cert3/ctiger.jpg",1024)
    )
    //
    val chunked = HttpEntity.Chunked.fromData(
      ContentTypes.`application/octet-stream`,
      fileStreamSource("/Users/tiger-macpro/cert3/ctiger.jpg",1024)
    )

    val multipart = FileUtil().createEntity(new File("/Users/tiger-macpro/cert3/ctiger.jpg"))

    val uploadRequest = HttpRequest(
      HttpMethods.POST,
      uri = "http://192.168.11.189:50081/api/file?filename=mypic.jpg",
    ).addHeader(authentication)

    //upload file
    //Await.ready(FileUtil().uploadFile(uploadRequest,entity),2 seconds)
    //Await.ready(FileUtil().uploadFile(uploadRequest,chunked),2 seconds)
    Await.ready(FileUtil().uploadFile(uploadRequest,multipart),2 seconds)

    val dlRequest = HttpRequest(
      HttpMethods.GET,
      uri = "http://192.168.11.189:50081/api/file/mypic.jpg",
    ).addHeader(authentication)

    FileUtil().downloadFileTo(dlRequest, "/users/tiger-macpro/cert3/mypic.jpg")

    scala.io.StdIn.readLine()
    system.terminate()
  }

}

0 人点赞