Akka(39): Http:File streaming-文件交换

所谓文件交换指的是Http协议中服务端和客户端之间文件的上传和下载。Akka-http作为一种系统集成工具应该具备高效率的数据交换方式包括文件交换和数据库表行的上传下载。Akka-http的数据交换模式支持流式操作:代表交换数据可以是一种无限长度流的元素。这种模式首先解决了纯Http大数据通过Multipart传输所必须进行的数据分段操作和复杂的消息属性设定等需要的技术门槛,再者用户还可以很方便的使用Akka-stream对数据进行深度处理,免去了数据转换的麻烦。更重要的是:Akka-http还支持reactive-stream,可以避免由传输速率所产生的种种问题。在本篇我们讨论利用Akka-http进行文件的双向传递。

任何文件的内容储存格式无论在硬盘、内存或者数据线上都是一堆bytes。文件交换流程包括读取文件里的bytes,传送这些bytes,最终把这些bytes写入文件。我们看到这里每个环节操作目标都是bytes,所以可能在程序里是不需要任何数据转换过程的。Akka提供了一组文件读写函数,如下:

  def fromPath(f: Path, chunkSize: Int = 8192): Source[ByteString, Future[IOResult]] =
    fromPath(f, chunkSize, startPosition = 0)

  def fromPath(f: Path, chunkSize: Int, startPosition: Long): Source[ByteString, Future[IOResult]] =
    Source.fromGraph(new FileSource(f, chunkSize, startPosition, DefaultAttributes.fileSource, sourceShape("FileSource")))

  def toPath(f: Path, options: Set[OpenOption] = Set(WRITE, TRUNCATE_EXISTING, CREATE)): Sink[ByteString, Future[IOResult]] =
    toPath(f, options, startPosition = 0)

  def toPath(f: Path, options: Set[OpenOption], startPosition: Long): Sink[ByteString, Future[IOResult]] =
    Sink.fromGraph(new FileSink(f, startPosition, options, DefaultAttributes.fileSink, sinkShape("FileSink")))

我们看到:fromPath类型是Source[ByteSgtring,_],toPath类型是Sink[ByteString,_],直接就是流型式,应该可以直接放入Http消息的Entity中,如下:

  def fileStream(filePath: String, chunkSize: Int): Source[ByteString,Any] = {
    def loadFile = {
      //   implicit val ec = httpSys.dispatchers.lookup("akka.http.blocking-ops-dispatcher")
      val file = Paths.get(filePath)
      FileIO.fromPath(file, chunkSize)
        .withAttributes(ActorAttributes.dispatcher("akka.http.blocking-ops-dispatcher"))
    }
    limitableByteSource(loadFile)
  }

fileStream是Source[ByteString,_]可以直接放进Entity:

  val uploadText = HttpRequest(HttpMethods.POST,uri = s"http://localhost:8011/file/text")
  val textData = HttpEntity(
    ContentTypes.`application/octet-stream`,
    fileStream("/Users/tiger-macpro/downloads/A4.TIF",256)
  )

我们把fileStream放入了HttpRequest中。对于HttpResponse可以用下面的方式:

 val route = pathPrefix("file") {
    (get & path("text" / Remaining)) { fp =>
      withoutSizeLimit {
        complete(
          HttpEntity(
            ContentTypes.`application/octet-stream`,
            fileStream("/users/tiger-macpro/" + fp, 256))
        )
      }

注意:complete进行了HttpResponse的构建。因为Entity.dataByes就是Source[ByteString,_],所以我们可以直接把它导入Sink:

          entity.dataBytes.runWith(FileIO.toPath(Paths.get(destPath)))
            .onComplete { case _ => println(s"Download file saved to: $destPath") }

上面我们提过FileIO.toPath就是一个Sink。由于我们的目的是大型的文件交换,所以无论上传下载都使用了withoutSizeLimit:

 val route = pathPrefix("file") {
    (get & path("exchange" / Remaining)) { fp =>
      withoutSizeLimit {
        complete(
          HttpEntity(
            ContentTypes.`application/octet-stream`,
            fileStream("/users/tiger-macpro/" + fp, 256))
        )
      }
    } ~
      (post & path("exchange")) {
        withoutSizeLimit {
          extractDataBytes { bytes =>
            val fut = bytes.runWith(FileIO.toPath(Paths.get(destPath)))
            onComplete(fut) { _ =>
              complete(s"Save upload file to: $destPath")
            }
          }
        }

      }
 

好了下面的示范代码里对字符型或二进制文件都进行了交换的示范操作:

服务端:

import akka.actor._
import akka.stream._
import akka.stream.scaladsl._
import akka.http.scaladsl.Http
import akka.http.scaladsl.server.Directives._
import akka.http.scaladsl.model._
import akka.http.scaladsl.model.HttpEntity._
import java.nio.file._

object FileServer extends App {

  implicit val httpSys = ActorSystem("httpSystem")
  implicit val httpMat = ActorMaterializer()
  implicit val httpEC = httpSys.dispatcher

  def fileStream(filePath: String, chunkSize: Int) = {
     def loadFile = {
       //   implicit val ec = httpSys.dispatchers.lookup("akka.http.blocking-ops-dispatcher")
       val file = Paths.get(filePath)
       FileIO.fromPath(file, chunkSize)
         .withAttributes(ActorAttributes.dispatcher("akka.http.blocking-ops-dispatcher"))
     }
    limitableByteSource(loadFile)
  }
  val destPath = "/users/tiger-macpro/downloads/A4-1.TIF"
  val route = pathPrefix("file") {
    (get & path("exchange" / Remaining)) { fp =>
      withoutSizeLimit {
        complete(
          HttpEntity(
            ContentTypes.`application/octet-stream`,
            fileStream("/users/tiger-macpro/" + fp, 256))
        )
      }
    } ~
      (post & path("exchange")) {
        withoutSizeLimit {
          extractDataBytes { bytes =>
            val fut = bytes.runWith(FileIO.toPath(Paths.get(destPath)))
            onComplete(fut) { _ =>
              complete(s"Save upload file to: $destPath")
            }
          }
        }

      }
  }

  val (port, host) = (8011,"localhost")

  val bindingFuture = Http().bindAndHandle(route,host,port)

  println(s"Server running at $host $port. Press any key to exit ...")

  scala.io.StdIn.readLine()

  bindingFuture.flatMap(_.unbind())
    .onComplete(_ => httpSys.terminate())

}

客户端:

import akka.actor._
import akka.stream._
import akka.stream.scaladsl._
import akka.http.scaladsl.Http
import akka.http.scaladsl.model.HttpEntity.limitableByteSource
import akka.http.scaladsl.model._
import java.nio.file._
import akka.util.ByteString
import scala.util._

object FileClient extends App {

  implicit val sys = ActorSystem("ClientSys")
  implicit val mat = ActorMaterializer()
  implicit val ec = sys.dispatcher

  def downloadFileTo(request: HttpRequest, destPath: String) = {
    val futResp = Http(sys).singleRequest(request)
    futResp
      .andThen {
        case Success([email protected](StatusCodes.OK, _, entity, _)) =>
          entity.dataBytes.runWith(FileIO.toPath(Paths.get(destPath)))
            .onComplete { case _ => println(s"Download file saved to: $destPath") }
        case Success([email protected](code, _, _, _)) =>
          println(s"Download request failed, response code: $code")
          r.discardEntityBytes()
        case Success(_) => println("Unable to download file!")
        case Failure(err) => println(s"Download failed: ${err.getMessage}")
      }

  }

  val dlFile = "Downloads/readme.txt"
  val downloadText = HttpRequest(uri = s"http://localhost:8011/file/exchange/" + dlFile)

  downloadFileTo(downloadText, "/users/tiger-macpro/downloads/sample.txt")
  scala.io.StdIn.readLine()

  val dlFile2 = "Downloads/image.png"
  val downloadText2 = HttpRequest(uri = s"http://localhost:8011/file/exchange/" + dlFile2)
  downloadFileTo(downloadText2, "/users/tiger-macpro/downloads/sample.png")
  scala.io.StdIn.readLine()

  def uploadFile(request: HttpRequest, dataEntity: RequestEntity) = {
    val futResp = Http(sys).singleRequest(
        request.copy(entity = dataEntity)
      )
    futResp
      .andThen {
        case Success([email protected](StatusCodes.OK, _, entity, _)) =>
        entity.dataBytes.map(_.utf8String).runForeach(println)
        case Success([email protected](code, _, _, _)) =>
          println(s"Upload request failed, response code: $code")
          r.discardEntityBytes()
        case Success(_) => println("Unable to Upload file!")
        case Failure(err) => println(s"Upload failed: ${err.getMessage}")

      }
  }

  def fileStream(filePath: String, chunkSize: Int): Source[ByteString,Any] = {
    def loadFile = {
      //   implicit val ec = httpSys.dispatchers.lookup("akka.http.blocking-ops-dispatcher")
      val file = Paths.get(filePath)
      FileIO.fromPath(file, chunkSize)
        .withAttributes(ActorAttributes.dispatcher("akka.http.blocking-ops-dispatcher"))
    }
    limitableByteSource(loadFile)
  }

  val uploadText = HttpRequest(HttpMethods.POST,uri = s"http://localhost:8011/file/exchange")
  val textData = HttpEntity(
    ContentTypes.`application/octet-stream`,
    fileStream("/Users/tiger-macpro/downloads/readme.txt",256)
  )

  uploadFile(uploadText,textData)

  scala.io.StdIn.readLine()

  sys.terminate()

}
时间: 2024-08-02 15:12:52

Akka(39): Http:File streaming-文件交换的相关文章

Linux CentOS 7 中find命令、三个Time、快捷键及file判断文件类型

一. find命令 locate 查找命令,从本地生成的数据库中查找文件 如果没有locate命令,安装软件包:mlocate [[email protected]_46_188_centos ~]# which locate /usr/bin/locate [[email protected]_46_188_centos ~]# rpm -qf /usr/bin/locatemlocate-0.26-5.el7.x86_64 [[email protected]_46_188_centos ~

Hyper-V初涉_Hyper-V虚拟机文件交换

使用虚拟机时,文件交互就显得十分重要.如果能实现物理机与虚拟机之间的文件交互,将会节省大量的时间.比较可惜的是,Hyper-V虚拟机并不支持USB存储设备,所以在文件交换上略显麻烦. 与Hyper-V虚拟机实现文件共享可以通过网上邻居或FTP服务器方式,均为通过网络进行数据传输,架构在Hyper-V虚拟机与物理机进行网络互通的基础之上.这里将以网上邻居的形式进行演示. 在物理机上的"系统"(WinKey + Pause)中点击"远程设置",勾选"允许远程链

ajax input file 提交文件

<!DOCTYPE html> <html xmlns="http://www.w3.org/1999/xhtml"><head runat="server"><meta http-equiv="Content-Type" content="text/html; charset=utf-8"/>        <title>Html5 Ajax 上传文件</ti

File / Directory 文件的操作及远程下载

//文件远程下载 WebClient client = new WebClient();    Uri uri = new Uri(url);    client.DownloadFile(Uri uri,String filename); //文件一定得存在不然会报错所以在删除文件时先判断这个文件是否存在 File.Exists(filePath) File.Delete(filePath); //File对文件的操作 File.Move(sourceFile,destFile);//文件的移

java学习一目了然&mdash;&mdash;File类文件处理

java学习一目了然--File类文件处理 File类(java.io.File) 构造函数: File(String path) File(String parent,String child) File(File parent,String child) 创建文件: boolean createNewFile(); 创建文件夹: boolean mkdir(); 用于创建一层未定义文件夹 boolean mkdirs(); 用于创建多层未定义文件夹,相当于多个mkdir() 删除文件/文件夹

SQL Server -&gt;&gt; Sparse File(稀疏文件)

Sparse File(稀疏文件)不是SQL Server的特性.它属于Windows的NTFS文件系统的一个特性.如果某个大文件中的数据包含着大量“0数据”(这个应该从二进制上看),这样的文件就可以被称之为稀疏文件.如果从二维图上看这个文件你会发现文件就像很多很多洞一样,这就是“稀疏”的由来.这种文件造成的问题是空间浪费.比如说如果你现在用VMWare Workstatation创建了一个虚拟机,初始化磁盘大小为40G,VM必然会为虚拟机生成一个或者多个.vmdk文件.如果文件系统真的分配40

Java——File(文件)

 public static void main(String[] args) { // getFile(); /* * 需求:  对指定目录进行所有内容的列出,(包含子目录中的内容) * */ File dir = new File("E:\\HB_JAVA解压"); listAll(dir, 0); } public static void listAll(File dir, int len) { System.out.println(getSpace(len) + dir.g

GRT Recover My File误删文件恢复

在日常的电脑操作中,我们可能会由于硬盘硬件损坏.硬盘格式化导致文件的丢失,在这种情况下,推荐一款误删文件的恢复软件给大家使用. GRT Recover My File是由GRTSoft公司推出的一款数据恢复软件,能够帮助你恢复误删的照片.电影.歌曲等资源. 功能介绍:1.GRT Recover My File支持FAT12.FAT16.FAT32.NTFS文件系统:2.可以帮助你快速的恢复被误删除的文件和文件夹,即使回收站已被清空或使用SHIFT + Del键彻底删除也可以恢复:3.程序提供了易

sql点滴39—解决数据库日志文件过大的问题

原文:sql点滴39-解决数据库日志文件过大的问题 随着数据库使用时间增长,日志文件也在不停的增大,这里介绍几种方法减小这个文件的方法. 1.直接删除log文件 分离数据库.分离数据库之前一定要做好数据库的全备份,选择数据库——右键——任务——分离,如下图 将日志文件和数据文件复制粘贴到另外一个文件夹中以防万一.删除链接,如下图 直接删除日志文件,然后再附加数据库,如下图 附加的时候会自动将ldf文件和mdf文件都附加上,但是会提示找不到ldf文件,没关系,选中ldf文件这一行,点击下面的删除按