restapi(6)- do it the functional way, 重温函数式编程

再次看了看上篇博客的源代码,发现连自己都看不懂了。想是为了赶时间交货不知不觉又回到OOP行令模式了,看看下面这段代码:

       (post &  parameters(‘pid,‘desc.?,‘width.as[Int].?,‘heigth.as[Int].?)) { (pid, optDesc, optWid, optHgh) =>
          val futCount: Future[Int] = repository.count(pid).value.value.runToFuture.map {
            eoi =>
              eoi match {
                case Right(oi) => oi match {
                  case Some(i) => i
                  case None => -1
                }
                case Left(err) => -1
              }
          }
          val count: Int = Await.result(futCount, 2 seconds)
          var doc = Document(
            "pid" -> pid,
            "seqno" -> count
          )
          if (optDesc != None)
            doc = doc + ("desc" -> optDesc.get)
          if (optWid != None)
            doc = doc + ("desc" -> optWid.get)
          if (optHgh != None)
            doc = doc + ("desc" -> optHgh.get)

          withoutSizeLimit {
            decodeRequest {
              extractDataBytes { bytes =>
                val fut = bytes.runFold(ByteString()) { case (hd, bs) =>
                  hd ++ bs
                }
                onComplete(fut) {
                  case Success(b) =>
                    doc = doc + ("pic" -> b.toArray)
                    val futmsg: Future[String] = repository.insert(doc).value.value.runToFuture.map {
                      eoc =>
                        eoc match {
                          case Right(oc) => oc match {
                            case Some(c) => count.toString //   c.toString()
                            case None => "insert may not complete!"
                          }
                          case Left(err) => err.getMessage
                        }
                    }
                    complete(futmsg)
                  case Failure(err) => complete(err)
                }
              }
            }
          }

有人能从这段代码里理解它的功能吗?本来作者的目的很简单:前端通过httprequest提交了一张图片及产品编号pid、系统读取MongoDB查找相同pid的数量count,然后将图片和描述包括count写入数据库并在reponse里返回count。把一个简单功能的实现搞的这么复杂都是我的错,可能受OOP荼毒太深。这次希望静下心来用函数式编程模式把这段代码从新实现一次,示范一下函数式编程的代码精炼和高雅特点。首先介绍一下DBResult[A]这个类型:这是一个Monad,为了应付Future[Either[Option[R]]]这样的类型而设计的,是一个表现数据库操作比较全面的类型,但同时它又是造成上面这段代码混乱的元凶。现在我们可以用隐式转换implicit conversion方式进行代码简化重用:

  import monix.execution.Scheduler.Implicits.global
  implicit class DBResultToFuture(dbr: DBOResult[_]){
      def toFuture[R] = {
        dbr.value.value.runToFuture.map {
          eor =>
            eor match {
              case Right(or) => or match {
                case Some(r) => r.asInstanceOf[R]
                case None => throw new RuntimeException("Operation produced None result!")
              }
              case Left(err) => throw new RuntimeException(err)
            }
        }
      }
  }

用这个隐式转换类型为任何DBOResult[R]增加一个函数toFuture[R]。现在整个futCount算式可以简化成下面这样:

          val futCount: Future[Int] = repository.count(pid).value.value.runToFuture.map {
            eoi =>
              eoi match {
                case Right(oi) => oi match {
                  case Some(i) => i
                  case None => -1
                }
                case Left(err) => -1
              }
          }

futCount:Future[Int]=repository.count(pid).toFuture

真正的简单易明。

不知怎么搞的,我尽然在这段代码中间使用了Await.result。从OOP角度分析这很容易理解,下一段程序需要上一段程序的结果来继续运行。在上面的例子里我们需要先获取count然后把count塞进Document再把Document存入数据库。逻辑思路上没问题,不过这样的做法是典型的行令式编程模式。在函数式编程模式里,阶段性的运算结果是在包嵌在Monad中的。Monad本身只是一个运算计划,只有真正运算时才能获取结果。Monad本身是函数组件,可以实现多个Monad的函数组合。在这里可以形象的把Monad函数组合描述为数据库操作步骤:先count、再insert,这两个步骤产生的结果还是留在Monad里的,直到所谓的世界末日,即实际运算完成后才取出,所以Monad是一种典型的程序运算流程管道。假如我们再把insert这段程序写成addPicture(...): DBOResult[_], 如下:

   def addPicuture(pid: String,seqno: Int, optDesc: Option[String]
                         ,optWid:Option[Int],optHgh:Option[Int],
                         bytes: Array[Byte]):DBOResult[Completed] ={
      var doc = Document(
        "pid" -> pid,
        "seqno" -> seqno,
        "pic" -> bytes
      )
      if (optDesc != None)
        doc = doc + ("desc" -> optDesc.get)
      if (optWid != None)
        doc = doc + ("desc" -> optWid.get)
      if (optHgh != None)
        doc = doc + ("desc" -> optHgh.get)
      repository.insert(doc)
    }

好了,现在整篇代码变成了下面这样:

       (post &  parameters(‘pid,‘desc.?,‘width.as[Int].?,‘heigth.as[Int].?)) { (pid, optDesc, optWid, optHgh) =>
          withoutSizeLimit {
            decodeRequest {
              extractDataBytes { bytes =>
                val futBytes = bytes.runFold(ByteString()) { case (hd, bs) =>
                  hd ++ bs
                }
                val futSeqno = for {
                  cnt <- repository.count(pid).toFuture
                  barr <- futBytes
                  _ <- addPicuture(pid, cnt, optDesc, optWid, optHgh, barr.toArray).toFuture
                } yield cnt
                complete(futSeqno.map(_.toString))
              }
            }
  

现在是不是变得简单易明了?如果你觉着这样看起来更加容易理解,那么我建议你现在开始多点接触了解函数式编程。

接着用同样方式把整个项目重新实现一次。修改后的源代码如下:

MongoRepo.scala

package com.datatech.rest.mongo
import org.mongodb.scala._
import org.bson.conversions.Bson
import org.mongodb.scala.result._
import com.datatech.sdp.mongo.engine._
import MGOClasses._
import MGOEngine._
import MGOCommands._
import com.datatech.sdp.result.DBOResult.DBOResult

object MongoRepo {

  class MongoRepo[R](db:String, coll: String, converter: Option[Document => R])(implicit client: MongoClient) {
    def getAll[R](next:Option[String],sort:Option[String],fields:Option[String],top:Option[Int]): DBOResult[Seq[R]] = {
      var res = Seq[ResultOptions]()
      next.foreach {b => res = res :+ ResultOptions(FOD_TYPE.FOD_FILTER,Some(Document(b)))}
      sort.foreach {b => res = res :+ ResultOptions(FOD_TYPE.FOD_SORT,Some(Document(b)))}
      fields.foreach {b => res = res :+ ResultOptions(FOD_TYPE.FOD_PROJECTION,Some(Document(b)))}
      top.foreach {b => res = res :+ ResultOptions(FOD_TYPE.FOD_LIMIT,None,b)}

      val ctxFind = MGOContext(dbName = db,collName=coll)
        .setActionType(MGO_ACTION_TYPE.MGO_QUERY)
        .setCommand(Find(andThen = res))
      mgoQuery[Seq[R]](ctxFind,converter)
    }

    def query[R](filtr: Bson, next:Option[String]=None,sort:Option[String]=None,fields:Option[String]=None,top:Option[Int]=None): DBOResult[Seq[R]] = {
      var res = Seq[ResultOptions]()
      next.foreach {b => res = res :+ ResultOptions(FOD_TYPE.FOD_FILTER,Some(Document(b)))}
      sort.foreach {b => res = res :+ ResultOptions(FOD_TYPE.FOD_SORT,Some(Document(b)))}
      fields.foreach {b => res = res :+ ResultOptions(FOD_TYPE.FOD_PROJECTION,Some(Document(b)))}
      top.foreach {b => res = res :+ ResultOptions(FOD_TYPE.FOD_LIMIT,None,b)}
      val ctxFind = MGOContext(dbName = db,collName=coll)
        .setActionType(MGO_ACTION_TYPE.MGO_QUERY)
        .setCommand(Find(filter = Some(filtr),andThen = res))
      mgoQuery[Seq[R]](ctxFind,converter)
    }

    import org.mongodb.scala.model.Filters._
    def count(pid: String):DBOResult[Int] = {
      val ctxCount = MGOContext(dbName = db,collName=coll)
        .setActionType(MGO_ACTION_TYPE.MGO_QUERY)
        .setCommand(Count(filter=Some(equal("pid",pid))))
      mgoQuery[Int](ctxCount,None)
    }

    def getOneDocument(filtr: Bson): DBOResult[Document] = {
      val ctxFind = MGOContext(dbName = db,collName=coll)
        .setActionType(MGO_ACTION_TYPE.MGO_QUERY)
        .setCommand(Find(filter = Some(filtr),firstOnly = true))
      mgoQuery[Document](ctxFind,None)
    }
    def getOnePicture[R](pid: String, seqno: Int): DBOResult[R] = {
      val ctxFind = MGOContext(dbName = db, collName = coll)
        .setActionType(MGO_ACTION_TYPE.MGO_QUERY)
        .setCommand(Find(filter = Some(and(equal("pid",pid),equal("seqno",seqno))), firstOnly = true))
      mgoQuery[R](ctxFind, converter)
    }
    def insert(doc: Document): DBOResult[Completed] = {
      val ctxInsert = MGOContext(dbName = db,collName=coll)
        .setActionType(MGO_ACTION_TYPE.MGO_UPDATE)
        .setCommand(Insert(Seq(doc)))
      mgoUpdate[Completed](ctxInsert)
    }

    def delete(filter: Bson): DBOResult[DeleteResult] = {
      val ctxDelete = MGOContext(dbName = db,collName=coll)
        .setActionType(MGO_ACTION_TYPE.MGO_UPDATE)
        .setCommand(Delete(filter))
      mgoUpdate[DeleteResult](ctxDelete)
    }

    def update(filter: Bson, update: Bson, many: Boolean): DBOResult[UpdateResult] = {
      val ctxUpdate = MGOContext(dbName = db,collName=coll)
        .setActionType(MGO_ACTION_TYPE.MGO_UPDATE)
        .setCommand(Update(filter,update,None,!many))
      mgoUpdate[UpdateResult](ctxUpdate)
    }

    def replace(filter: Bson, row: Document): DBOResult[UpdateResult] = {
      val ctxUpdate = MGOContext(dbName = db,collName=coll)
        .setActionType(MGO_ACTION_TYPE.MGO_UPDATE)
        .setCommand(Replace(filter,row))
      mgoUpdate[UpdateResult](ctxUpdate)
    }

  }
  import monix.execution.Scheduler.Implicits.global
  implicit class DBResultToFuture(dbr: DBOResult[_]){
      def toFuture[R] = {
        dbr.value.value.runToFuture.map {
          eor =>
            eor match {
              case Right(or) => or match {
                case Some(r) => r.asInstanceOf[R]
                case None => throw new RuntimeException("Operation produced None result!")
              }
              case Left(err) => throw new RuntimeException(err)
            }
        }
      }
  }

}

MongoRoute.scala

package com.datatech.rest.mongoimport akka.http.scaladsl.server.Directivesimport com.datatech.sdp.file._

import scala.util._import org.mongodb.scala._import com.datatech.sdp.file.Streaming._import org.mongodb.scala.result._import MongoRepo._import akka.stream.ActorMaterializerimport com.datatech.sdp.result.DBOResult._import org.mongodb.scala.model.Filters._import com.datatech.sdp.mongo.engine.MGOClasses._import monix.execution.CancelableFutureimport akka.util._import akka.http.scaladsl.model._import akka.http.scaladsl.coding.Gzipimport akka.stream.scaladsl._import MongoModels.WebPic

import scala.concurrent._import scala.concurrent.duration._object MongoRoute {  class MongoRoute[M <: ModelBase[Document]](val pathName: String)(repository: MongoRepo[M])(    implicit c: MongoClient, m: Manifest[M], mat: ActorMaterializer) extends Directives with JsonConverter {    import monix.execution.Scheduler.Implicits.global    var dbor: DBOResult[Seq[M]] = _    var dbou: DBOResult[UpdateResult] = _    val route = pathPrefix(pathName) {      pathPrefix("pictures") {        (post &  parameters(‘pid,‘desc.?,‘width.as[Int].?,‘heigth.as[Int].?)) { (pid, optDesc, optWid, optHgh) =>          withoutSizeLimit {            decodeRequest {              extractDataBytes { bytes =>                val futBytes = bytes.runFold(ByteString()) { case (hd, bs) =>                  hd ++ bs                }                val futSeqno = for {                  cnt <- repository.count(pid).toFuture[Int]                  barr <- futBytes                  _ <- addPicuture(pid, cnt, optDesc, optWid, optHgh, barr.toArray).toFuture[Completed]                } yield cnt                complete(futSeqno.map(_.toString))              }            }          } ~            (get & parameters(‘pid, ‘seqno.as[Int].?, ‘width.as[Int].?, ‘height.as[Int].?)) {              (pid, optSeq, optWid, optHght) =>                if (optSeq == None) {                  val futRows = repository.query(equal("pid", pid)).toFuture                  complete(futureToJson(futRows))                } else {                  val futPicRow = repository.getOnePicture(pid, optSeq.get).toFuture[WebPic]                  onComplete(futPicRow) {                    case Success(row) =>                      val width = if (optWid == None) row.width.getOrElse(128) else optWid.getOrElse(128)                      val height = if (optHght == None) row.heigth.getOrElse(128) else optHght.getOrElse(128)                      if (row.pic != None) {                        withoutSizeLimit {                          encodeResponseWith(Gzip) {                            complete(                              HttpEntity(                                ContentTypes.`application/octet-stream`,                                ByteArrayToSource(Imaging.setImageSize(row.pic.get.getData, width, height)                                ))                            )                          }                        }                      } else complete(StatusCodes.NotFound)                    case Failure(err) => complete(err)                  }                }            }        }      } ~        pathPrefix("blob") {          (get & parameter(‘filter)) { filter =>            val filtr = Document(filter)            val futOptPic: CancelableFuture[Option[MGOBlob]] = repository.getOneDocument(filtr).toFuture            onComplete(futOptPic) {              case Success(optBlob) => optBlob match {                case Some(blob) =>                  withoutSizeLimit {                    encodeResponseWith(Gzip) {                      complete(                        HttpEntity(                          ContentTypes.`application/octet-stream`,                          ByteArrayToSource(blob.getData)                        )                      )                    }                  }                case None => complete(StatusCodes.NotFound)              }              case Failure(err) => complete(err)            }          } ~            (post &  parameter(‘bson)) { bson =>              val bdoc = Document(bson)              withoutSizeLimit {                decodeRequest {                  extractDataBytes { bytes =>                    val futbytes = bytes.runFold(ByteString()) { case (hd, bs) =>                      hd ++ bs                    }                    val futmsg:Future[Completed] = for {                      bytes <- futbytes                      doc = Document(bson) + ("photo" -> bytes.toArray)                      c <- repository.insert(doc).toFuture[Completed]                    } yield c                    complete(futmsg.map(_.toString))                  }                }              }            }        } ~        (get & parameters(‘filter.?,‘fields.?,‘sort.?,‘top.as[Int].?,‘next.?)) {          (filter,fields,sort,top,next) => {            dbor = {              filter match {                case Some(fltr) => repository.query(Document(fltr),next,sort,fields,top)                case None => repository.getAll(next,sort,fields,top)              }            }            val futRows:Future[Seq[WebPic]] = dbor.toFuture[Seq[WebPic]]            complete(futureToJson(futRows))          }        } ~ post {        entity(as[String]) { json =>          val extractedEntity: M = fromJson[M](json)          val doc: Document = extractedEntity.to          val futmsg = repository.insert(doc).toFuture[Completed]          complete(futmsg.map(_.toString))        }      } ~ (put & parameter(‘filter,‘set.?, ‘many.as[Boolean].?)) { (filter, set, many) =>        val bson = Document(filter)        if (set == None) {          entity(as[String]) { json =>            val extractedEntity: M = fromJson[M](json)            val doc: Document = extractedEntity.to            val futmsg = repository.replace(bson, doc).toFuture            complete(futureToJson(futmsg))          }        } else {          set match {            case Some(u) =>              val ubson = Document(u)              dbou = repository.update(bson, ubson, many.getOrElse(true))            case None =>              dbou = Left(new IllegalArgumentException("missing set statement for update!"))          }          val futmsg:Future[UpdateResult] = dbou.toFuture[UpdateResult]          complete(futureToJson(futmsg.map(_.toString)))        }      } ~ (delete & parameters(‘filter, ‘many.as[Boolean].?)) { (filter,many) =>        val bson = Document(filter)        val futmsg:Future[DeleteResult] = repository.delete(bson).toFuture[DeleteResult]        complete(futureToJson(futmsg.map(_.toString)))      }    }

    def addPicuture(pid: String,seqno: Int, optDesc: Option[String]                         ,optWid:Option[Int],optHgh:Option[Int],                         bytes: Array[Byte]):DBOResult[Completed] ={      var doc = Document(        "pid" -> pid,        "seqno" -> seqno,        "pic" -> bytes      )      if (optDesc != None)        doc = doc + ("desc" -> optDesc.get)      if (optWid != None)        doc = doc + ("desc" -> optWid.get)      if (optHgh != None)        doc = doc + ("desc" -> optHgh.get)      repository.insert(doc)    }

  }

}

原文地址:https://www.cnblogs.com/tiger-xc/p/11403466.html

时间: 2025-01-11 23:13:19

restapi(6)- do it the functional way, 重温函数式编程的相关文章

Sth about 函数式编程(Functional Programming)

今天开会提到了函数式编程,针对不同类型的百年城方式,查阅了一部分资料,展示如下: 编程语言一直到近代,从汇编到C到Java,都是站在计算机的角度,考虑CPU的运行模式和运行效率,以求通过设计一个高效的编程语言,作为人与计算机之间沟通的桥梁.因为计算机本质上是串行执行一个个指令流,因此编程语言也被设计为命令式编程(Imperative Programming),先算什么再算什么,怎么输入怎么计算怎么输出,全部由编程人员决定. 后来,大家发现冯·诺伊曼结构将数据和指令平等化的思想能够帮助我们更好地对

Functional programming-函数式编程

A number of concepts and paradigms are specific to functional programming, and generally foreign to imperative programming (including object-oriented programming). However, programming languages are often hybrids of several programming paradigms, so

Python5 函数式编程(Functional Programming)

高阶函数Higher-order function 变量可以指向函数 以abs()函数举例,把函数本身赋值给变量: >>> f = abs >>> f <built-in function abs> 结论:函数本身也可以赋值给变量,即:变量可以指向函数.如果一个变量指向了一个函数,那么,可通过该变量来调用这个函数.直接调用abs()函数和调用变量f()完全相同. 函数名也是变量 对于abs()这个函数,完全可以把函数名abs看成变量,它指向一个可以计算绝对值

函数式编程(functional programming)

(一)什么是函数式编程? 函数式编程是种编程方式,它将电脑运算视为函数的计算.函数编程语言最重要的基础是λ演算(lambda calculus),而且λ演算的函数可以接受函数当作输入(参数)和输出(返回值).----来自百科 个人看了n篇博客过后的感觉就是: 将业务逻辑细化,抽象,封装成一个个功能函数,并借助语言自带的高阶函数api,将整个业务流程转化为函数之间的相互调用,这就是函数式编程. (二)包括什么内容? 倾向于数据就是数据,函数就是函数,函数既可以当做参数传来传去,也可以作为返回值,可

编程范式:命令式编程(Imperative)、声明式编程(Declarative)和函数式编程(Functional)

主要的编程范式有三种:命令式编程,声明式编程和函数式编程. 命令式编程:命令式编程的主要思想是关注计算机执行的步骤,即一步一步告诉计算机先做什么再做什么.比如:如果你想在一个数字集合 collection(变量名) 中筛选大于 5 的数字,你需要这样告诉计算机:第一步,创建一个存储结果的集合变量 results:第二步,遍历这个数字集合 collection:第三步:一个一个地判断每个数字是不是大于 5,如果是就将这个数字添加到结果集合变量 results 中.代码实现如下: List<int>

重温网络编程(一)

前言 近日突然发现自己忘记了网络编程,重新拾起一下. 几个概念性东西: 很多时候介绍socket连接通讯的时候,往往是上面这张图. 右边是服务器,左边是客户端. 服务器: 第一步: socket(), 是创建一个socket.这个过程我们使用的语言和操作系统?其实就是和我们调用http请求登录是一样的,会返回一个标识符,操作系统是服务端,我们的程序是客户端. 操作系统内部就做某些东西,可能是创建一个socket,然后返回一个标识码,通过这个标识码,后面调用其他的api,通过传输这个标识符,就可以

重温网络编程——协议(二)

前言 在网络传输中有两种特别出名,一种是tcp,一种是udp. 他们都是基于套接字,tcp 属于sock_stream 类型,udp 属于sock_dgram. sock_steam: 传输过程不会有数据丢失. 按序传输数据. 传输的数据不存在数据边界. sock_dgram 强调快速传输而非传输顺序. 传输的数据有数据边界. 传输的数据可能会损坏. 4 限制了每次传输数据的大小. 具体可以参考tcp和udp. 关于数据边界: https://blog.csdn.net/boiled_water

重温网络编程——常识(三)

前言 关于一些网络编程的常识整理. 正文 1.网络数据传输到我们的计算机,是如何知道传输给那个应用? 通过端口,所以端口也是不能重复占用的. 下面是sockaddr_in 的定义: typedef struct sockaddr_in { #if(_WIN32_WINNT < 0x0600) short sin_family; #else //(_WIN32_WINNT < 0x0600) ADDRESS_FAMILY sin_family; #endif //(_WIN32_WINNT &l

GitHub 上都有值得关注学习的 iOS 开源项目

GitHub上有很多不错的iOS开源项目,个人认为不错的,有这么几个: 1. ReactiveCocoa:ReactiveCocoa/ReactiveCocoa · GitHub:GitHub自家的函数式响应式编程范式的Objective-C实现,名字听着很高大上,学习曲线确实也比较陡,但是绝对会改变你对iOS编程的认知,首推之.2. Mantle:Mantle/Mantle · GitHub:又是GitHub自家的产物,轻量级建模的首选,也可以很好的配合CoreData工作.3. AFNetw