Akka(41): Http:DBTable-rows streaming - 数据库表行交换

在前面一篇讨论里我们介绍了通过http进行文件的交换。因为文件内容是以一堆bytes来表示的,而http消息的数据部分也是byte类型的,所以我们可以直接用Source[ByteString,_]来读取文件然后放进HttpEntity中。我们还提到:如果需要进行数据库数据交换的话,可以用Source[ROW,_]来表示库表行,但首先必须进行ROW -> ByteString的转换。在上期讨论我们提到过这种转换其实是ROW->Json->ByteString或者反方向的转换,在Akka-http里称之为Marshalling和Unmarshalling。Akka-http的Marshalling实现采用了type-class编程模式,需要为每一种类型与Json的转换在可视域内提供Marshaller[A,B]类型的隐式实例。Akka-http默认的Json工具库是Spray-Json,着重case class,而且要提供JsonFormat?(case-class),其中?代表case class的参数个数,用起来略显复杂。不过因为是Akka-http的配套库,在将来Akka-http的持续发展中具有一定的优势,所以我们还是用它来进行下面的示范。

下面就让我们开始写些代码吧。首先,我们用一个case class代表数据库表行结构,然后用它作为流元素来构建一个Source,如下:

  case class County(id: Int, name: String)
  val source: Source[County, NotUsed] = Source(1 to 5).map { i => County(i, s"中国广东省地区编号 #$i") }

我们先设计服务端的数据下载部分:

import akka.actor._
import akka.stream._
import akka.stream.scaladsl._
import akka.http.scaladsl.Http
import akka.http.scaladsl.server.Directives._
import akka._
import akka.http.scaladsl.common._
import spray.json.DefaultJsonProtocol
import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport

trait MyFormats extends SprayJsonSupport with DefaultJsonProtocol
object Converters extends MyFormats {
  case class County(id: Int, name: String)
  val source: Source[County, NotUsed] = Source(1 to 5).map { i => County(i, s"中国广东省地区编号 #$i") }
  implicit val countyFormat = jsonFormat2(County)
}

object HttpDBServer extends App {
  import Converters._

  implicit val httpSys = ActorSystem("httpSystem")
  implicit val httpMat = ActorMaterializer()
  implicit val httpEC = httpSys.dispatcher

  implicit val jsonStreamingSupport = EntityStreamingSupport.json()
    .withParallelMarshalling(parallelism = 8, unordered = false)

  val route =
    path("rows") {
      get {
        complete {
          source
        }
      }
    }

  val (port, host) = (8011,"localhost")

  val bindingFuture = Http().bindAndHandle(route,host,port)

  println(s"Server running at $host $port. Press any key to exit ...")

  scala.io.StdIn.readLine()

  bindingFuture.flatMap(_.unbind())
    .onComplete(_ => httpSys.terminate())

}

在上面的代码里我们直接把source放进了complete(),然后期望这个directive能通过ToEntityMarshaller[County]类实例用Spray-Json把Source[County,NotUsed]转换成Source[ByteString,NotUsed]然后放入HttpResponse的HttpEntity里。转换结果只能在客户端得到证实。我们知道HttpResponse里的Entity.dataBytes就是一个Source[ByteString,_],我们可以把它Unmarshall成Source[County,_],然后用Akka-stream来操作:

     case Success([email protected](StatusCodes.OK, _, entity, _)) =>
          val futSource = Unmarshal(entity).to[Source[County,NotUsed]]
          futSource.onSuccess {
            case source => source.runForeach(println)
          }
 

上面这个Unmarshal调用了下面这个FromEntityUnmarshaller[County]隐式实例:

  // support for as[Source[T, NotUsed]]
  implicit def sprayJsonSourceReader[T](implicit reader: RootJsonReader[T], support: EntityStreamingSupport): FromEntityUnmarshaller[Source[T, NotUsed]] =
    Unmarshaller.withMaterializer { implicit ec ? implicit mat ? e ?
      if (support.supported.matches(e.contentType)) {
        val frames = e.dataBytes.via(support.framingDecoder)
        val unmarshal = sprayJsonByteStringUnmarshaller(reader)(_)
        val unmarshallingFlow =
          if (support.unordered) Flow[ByteString].mapAsyncUnordered(support.parallelism)(unmarshal)
          else Flow[ByteString].mapAsync(support.parallelism)(unmarshal)
        val elements = frames.viaMat(unmarshallingFlow)(Keep.right)
        FastFuture.successful(elements)
      } else FastFuture.failed(Unmarshaller.UnsupportedContentTypeException(support.supported))
    }

这个隐式实例是由Spray-Jason提供的,在SprayJsonSupport.scala里。
下面是这部分客户端的完整代码:

import akka.actor._
import akka.stream._
import akka.stream.scaladsl._
import akka.http.scaladsl.Http
import akka.http.scaladsl.model._
import scala.util._
import akka._
import akka.http.scaladsl.common._
import spray.json.DefaultJsonProtocol
import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport
import akka.http.scaladsl.unmarshalling.Unmarshal

trait MyFormats extends SprayJsonSupport with DefaultJsonProtocol
object Converters extends MyFormats {
  case class County(id: Int, name: String)
  implicit val countyFormat = jsonFormat2(County)
}

object HttpDBClient extends App {
  import Converters._

  implicit val sys = ActorSystem("ClientSys")
  implicit val mat = ActorMaterializer()
  implicit val ec = sys.dispatcher

  implicit val jsonStreamingSupport: JsonEntityStreamingSupport = EntityStreamingSupport.json()

  def downloadRows(request: HttpRequest) = {
    val futResp = Http(sys).singleRequest(request)
    futResp
      .andThen {
        case Success([email protected](StatusCodes.OK, _, entity, _)) =>
          val futSource = Unmarshal(entity).to[Source[County,NotUsed]]
          futSource.onSuccess {
            case source => source.runForeach(println)
          }
        case Success([email protected](code, _, _, _)) =>
          println(s"download request failed, response code: $code")
          r.discardEntityBytes()
        case Success(_) => println("Unable to download rows!")
        case Failure(err) => println(s"download failed: ${err.getMessage}")

      }
  }
  downloadRows(HttpRequest(HttpMethods.GET,uri = s"http://localhost:8011/rows"))

  scala.io.StdIn.readLine()

  sys.terminate()

}

以上我们已经实现了客户端从服务端下载一段数据库表行,然后以Akka-stream的操作方式来处理下载数据。那么反向交换即从客户端上传一段表行的话就需要把一个Source[T,_]转换成Source[ByteString,_]然后放进HttpRequest的HttpEntity里。服务端收到数据后又要进行反向的转换即把Request.Entity.dataBytes从Source[ByteString,_]转回Source[T,_]。Akka-http在客户端没有提供像complete这样的强大的自动化功能。我们可能需要自定义并提供像ToRequestMarshaller[Source[T,_]]这样的隐式实例。但Akka-http的Marshalling-type-class是个非常复杂的系统。如果我们的目的是简单提供一个Source[ByteString,_],我们是否可以直接调用Spray-Json的函数来进行ROW->Son->ByteString转换呢?如下:

  import akka.util.ByteString
  import akka.http.scaladsl.model.HttpEntity.limitableByteSource

  val source: Source[County,NotUsed] = Source(1 to 5).map {i => County(i, s"广西壮族自治区地市县编号 #$i")}
  def countyToByteString(c: County) = {
    ByteString(c.toJson.toString)
  }
  val flowCountyToByteString : Flow[County,ByteString,NotUsed] = Flow.fromFunction(countyToByteString)

  val rowBytes = limitableByteSource(source via flowCountyToByteString)

  val request = HttpRequest(HttpMethods.POST,uri = s"http://localhost:8011/rows")
  val data = HttpEntity(
    ContentTypes.`application/json`,
    rowBytes
  )

我们直接用toJson函数进行County->Json转换实现了flowCountyToByteString。toJason是Spray-Json提供的一个函数:
package json {

  case class DeserializationException(msg: String, cause: Throwable = null, fieldNames: List[String] = Nil) extends RuntimeException(msg, cause)
  class SerializationException(msg: String) extends RuntimeException(msg)

  private[json] class PimpedAny[T](any: T) {
    def toJson(implicit writer: JsonWriter[T]): JsValue = writer.write(any)
  }

  private[json] class PimpedString(string: String) {
    @deprecated("deprecated in favor of parseJson", "1.2.6")
    def asJson: JsValue = parseJson
    def parseJson: JsValue = JsonParser(string)
  }
}

假设服务端收到数据后以Akka-stream方式再转换成一个List返回,我们用下面的方法来测试功能:

  def uploadRows(request: HttpRequest, dataEntity: RequestEntity) = {
    val futResp = Http(sys).singleRequest(
      request.copy(entity = dataEntity)
    )
    futResp
      .andThen {
        case Success([email protected](StatusCodes.OK, _, entity, _)) =>
          entity.dataBytes.map(_.utf8String).runForeach(println)
        case Success([email protected](code, _, _, _)) =>
          println(s"Upload request failed, response code: $code")
          r.discardEntityBytes()
        case Success(_) => println("Unable to Upload file!")
        case Failure(err) => println(s"Upload failed: ${err.getMessage}")

      }
  }

服务端接收数据处理方法如下:

     post {
        withoutSizeLimit {
          entity(asSourceOf[County]) { source =>
            val futofNames: Future[List[String]] =
              source.runFold(List[String](""))((acc, b) => acc ++ List(b.name))
            complete {
              futofNames
            }
          }
        }
      }

考虑到在数据转换的过程中可能会出现异常。需要异常处理方法来释放backpressure:

  def postExceptionHandler: ExceptionHandler =
    ExceptionHandler {
      case _: RuntimeException =>
        extractRequest { req =>
          req.discardEntityBytes()
          complete((StatusCodes.InternalServerError.intValue,"Upload Failed!"))
        }
    }

      post {
        withoutSizeLimit {
          handleExceptions(postExceptionHandler) {
            entity(asSourceOf[County]) { source =>
              val futofNames: Future[List[String]] =
                source.runFold(List[String](""))((acc, b) => acc ++ List(b.name))
              complete {
                futofNames
              }
            }
          }
        }
      }

在客户端试运行返回结果显示:

  uploadRows(request,data)

["","广西壮族自治区地市县编号 #1","广西壮族自治区地市县编号 #2","广西壮族自治区地市县编号 #3","广西壮族自治区地市县编号 #4","广西壮族自治区地市县编号 #5"]

正是我们期待的结果。

下面是本次讨论的示范代码:

服务端:

import akka.actor._
import akka.stream._
import akka.stream.scaladsl._
import akka.http.scaladsl.Http
import akka._
import akka.http.scaladsl.common._
import spray.json.DefaultJsonProtocol
import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport
import scala.concurrent._
import akka.http.scaladsl.server._
import akka.http.scaladsl.server.Directives._
import akka.http.scaladsl.model._

trait MyFormats extends SprayJsonSupport with DefaultJsonProtocol
object Converters extends MyFormats {
  case class County(id: Int, name: String)
  val source: Source[County, NotUsed] = Source(1 to 5).map { i => County(i, s"中国广东省地区编号 #$i") }
  implicit val countyFormat = jsonFormat2(County)
}

object HttpDBServer extends App {
  import Converters._

  implicit val httpSys = ActorSystem("httpSystem")
  implicit val httpMat = ActorMaterializer()
  implicit val httpEC = httpSys.dispatcher

  implicit val jsonStreamingSupport = EntityStreamingSupport.json()
    .withParallelMarshalling(parallelism = 8, unordered = false)

  def postExceptionHandler: ExceptionHandler =
    ExceptionHandler {
      case _: RuntimeException =>
        extractRequest { req =>
          req.discardEntityBytes()
          complete((StatusCodes.InternalServerError.intValue,"Upload Failed!"))
        }
    }

  val route =
    path("rows") {
      get {
        complete {
          source
        }
      } ~
      post {
        withoutSizeLimit {
          handleExceptions(postExceptionHandler) {
            entity(asSourceOf[County]) { source =>
              val futofNames: Future[List[String]] =
                source.runFold(List[String](""))((acc, b) => acc ++ List(b.name))
              complete {
                futofNames
              }
            }
          }
        }
      }
    }

  val (port, host) = (8011,"localhost")

  val bindingFuture = Http().bindAndHandle(route,host,port)

  println(s"Server running at $host $port. Press any key to exit ...")

  scala.io.StdIn.readLine()

  bindingFuture.flatMap(_.unbind())
    .onComplete(_ => httpSys.terminate())

}

客户端:

import akka.actor._
import akka.stream._
import akka.stream.scaladsl._
import akka.http.scaladsl.Http
import akka.http.scaladsl.model._
import scala.util._
import akka._
import akka.http.scaladsl.common._
import spray.json.DefaultJsonProtocol
import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport
import akka.http.scaladsl.unmarshalling._

trait MyFormats extends SprayJsonSupport with DefaultJsonProtocol
object Converters extends MyFormats {
  case class County(id: Int, name: String)
  implicit val countyFormat = jsonFormat2(County)
}

object HttpDBClient extends App {
  import Converters._

  implicit val sys = ActorSystem("ClientSys")
  implicit val mat = ActorMaterializer()
  implicit val ec = sys.dispatcher

  implicit val jsonStreamingSupport: JsonEntityStreamingSupport = EntityStreamingSupport.json()

  def downloadRows(request: HttpRequest) = {
    val futResp = Http(sys).singleRequest(request)
    futResp
      .andThen {
        case Success([email protected](StatusCodes.OK, _, entity, _)) =>
          val futSource = Unmarshal(entity).to[Source[County,NotUsed]]
          futSource.onSuccess {
            case source => source.runForeach(println)
          }
        case Success([email protected](code, _, _, _)) =>
          println(s"download request failed, response code: $code")
          r.discardEntityBytes()
        case Success(_) => println("Unable to download rows!")
        case Failure(err) => println(s"download failed: ${err.getMessage}")

      }
  }
  downloadRows(HttpRequest(HttpMethods.GET,uri = s"http://localhost:8011/rows"))

  import akka.util.ByteString
  import akka.http.scaladsl.model.HttpEntity.limitableByteSource

  val source: Source[County,NotUsed] = Source(1 to 5).map {i => County(i, s"广西壮族自治区地市县编号 #$i")}
  def countyToByteString(c: County) = {
    ByteString(c.toJson.toString)
  }
  val flowCountyToByteString : Flow[County,ByteString,NotUsed] = Flow.fromFunction(countyToByteString)

  val rowBytes = limitableByteSource(source via flowCountyToByteString)

  val request = HttpRequest(HttpMethods.POST,uri = s"http://localhost:8011/rows")
  val data = HttpEntity(
    ContentTypes.`application/json`,
    rowBytes
  )

  def uploadRows(request: HttpRequest, dataEntity: RequestEntity) = {
    val futResp = Http(sys).singleRequest(
      request.copy(entity = dataEntity)
    )
    futResp
      .andThen {
        case Success([email protected](StatusCodes.OK, _, entity, _)) =>
          entity.dataBytes.map(_.utf8String).runForeach(println)
        case Success([email protected](code, _, _, _)) =>
          println(s"Upload request failed, response code: $code")
          r.discardEntityBytes()
        case Success(_) => println("Unable to Upload file!")
        case Failure(err) => println(s"Upload failed: ${err.getMessage}")

      }
  }

  uploadRows(request,data)

  scala.io.StdIn.readLine()

  sys.terminate()

}
时间: 2024-10-04 00:49:37

Akka(41): Http:DBTable-rows streaming - 数据库表行交换的相关文章

DBMS_RLS包实现数据库表行级安全控制

DBMS_RLS 是实现数据库表行级别安全控制的,这个包包含精细访问控制管理接口,这个接口是用来实现VPD(Virtual Private Database),虚拟私有数据库.DBMS_RLS只能在ORACLE的企业版(Enterpris Edition Only)本才可以用.oracle ebs 的权限是用这个来管理的.在数据库的数据安全访问的解决上,有很多的方法来解决权限的问题,有的是通过功能模块来控制访问权限的,有的是用建立视图的方法控制,例如查询语句中加where语句来控制.但是用vie

php基础:while循环查出所有数据库表行

$conn = mysql_connect('localhost','root','root');mysql_query('use ctx',$conn);mysql_query('set names utf8',$conn); $sql = "select * from girl";$rs = mysql_query($sql); $date = array();while($row = mysql_fetch_row($rs)){    $date[] = $row;}print_

MS SQL查询所有表行数,获取所有数据库名,表名,字段名

1.获取所有数据库名 --SELECT Name FROM Master..SysDatabases ORDER BY Name -- 2.获取所有表名: --SELECT Name NAMEtemp,* FROM TEST..SysObjects Where XType='U' ORDER BY Name --表名  ----XType='U':表示所有用户表; ----XType='S':表示所有系统表; 3.获取所有字段名: SELECT Name FROM SysColumns WHER

2、数据库和数据库表操作

一.(dos下)数据库的创建.删除.查看和使用 1.1 普通方式登录后再使用数据库 1 Microsoft Windows [版本 6.1.7601] 2 版权所有 (c) 2009 Microsoft Corporation.保留所有权利. 3 4 C:\Users\Dell>cd c: 5 6 c:\>d: 7 8 D:\>cd D:\Program Files\MySQL\MySQL Server 5.0\bin 9 10 D:\Program Files\MySQL\MySQL

使用Sqoop1.4.4将MySQL数据库表中数据导入到HDFS中

问题导读:         1.--connect参数作用? 2.使用哪个参数从控制台读取数据库访问密码? 3.Sqoop将关系型数据库表中数据导入HDFS基本参数要求及命令? 4.数据默认导入HDFS文件系统中的路径? 5.--columns参数的作用? 6.--where参数的作用? 一.部分关键参数介绍 参数介绍 --connect <jdbc-uri> 指定关系型数据库JDBC连接字符串 --connection-manager <class-name> 指定数据库使用的管

将Hive统计分析结果导入到MySQL数据库表中(一)——Sqoop导入方式

最近在做一个交通流的数据分析,需求是对于海量的城市交通数据,需要使用MapReduce清洗后导入到HBase中存储,然后使用Hive外部表关联HBase,对HBase中数据进行查询.统计分析,将分析结果保存在一张Hive表中,最后使用Sqoop将该表中数据导入到MySQL中.整个流程大概如下: 下面我主要介绍Hive关联HBase表--Sqoop导出Hive表到MySQL这些流程,原始数据集收集.MapReduce清洗及WEB界面展示此处不介绍. 一.HBase数据库表 hbase(main):

数据库表的查询操作实践演练(实验三),数据库演练

继前两次的实验,本次实验以熟练掌握利用select语句进行各种查询操作:单表查询.多表连接及查询.嵌套查询.集合查询等,巩固数据库查询操作.下面就跟着小编一起练习吧!在实验一创建并插入数据的表(Student, Course,SC,Teacher,TC)的基础上,完成以下操作.(1)将教师‘罗莉'的名字改为‘罗莉莉'.复制代码 代码如下:update Teacher set tname='罗莉莉' where tname='罗莉'(2)将两个同学(数据自己临时设置,用后即删除)的两门课程的成绩以

对于多个数据库表对应一个Model问题的思考

最近做项目遇到一个场景,就是客户要求为其下属的每一个分支机构建一个表存储相关数据,而这些表的结构都是一样的,只是分属于不同的机构.这个问题抽象一下就是多个数据库表对应一个Model(或者叫实体类).有了这个问题,我就开始思考在现有的代码中解决问题,最早数据采集部分是用EF来做数据存储的,我查了一下,资料并不多,问了一下对EF比较熟悉的朋友,得出的结论是EF实现这个功能比较复杂,不易实现.EF不能实现就要去找其他的框架,在PDF.NET的讨论群跟大家讨论这个问题的时候,@深蓝医生说PDF.NET可

Shell脚本实现DB2数据库表导出到文件

该Shell脚本用于实现将DB2数据库表导出到文件,将在另一篇博文<Java代码调用Shell脚本并传入参数实现DB2数据库表导出到文件>中通过Java代码实现调用该脚本并传入参数. 1 #!/usr/bin/env sh 2 3 DBSCHEMA=$1 4 DBUSER=$2 5 DBPASSWORD=$3 6 TABLENAME=$4 7 FILEPATH=$5 8 DELIMITER=$6 9 EXPORTLIMIT=$7 10 11 SQLERR="NO ERROR MSG&