Flink流计算编程--在双流中体会joinedStream与coGroupedStream

一、joinedStream与coGroupedStream简介

在实际的流计算中,我们经常会遇到多个流进行join的情况,Flink提供了2个Transformations来实现。

如下图:

注意:Join(Cogroups) two data streams on a given key and a common window。这里很明确了,我们要在2个DataStream中指定连接的key以及window下来运算。

二、SQL比较

我们最熟悉的SQL语言中,如果想要实现2个表join,可以如下实现:

select T1.* , T2.*
from T1
join T2 on T1.key = T2.key;

这个SQL是一个inner join的形式。稍微复杂点的带有group by与order by的SQL如下:

select T1.key , sum(T2.col)
from T1
join T2 on T1.key = T2.key
group by T1.key
order by T1.col;

通过这2个SQL,我们想要在Flink中实现实时的流计算,就可以通过joinedStream或coGroupedStream来实现。但是在join之后实施更复杂的运算,例如判断、迭代等,仅仅通过SQL实现,恐怕会很麻烦,只能通过PL/SQL块来实现,而Flink提供了apply方法,用户可以自己编写复杂的函数来实现。

三、join与coGroup的区别

先来看下源码中提供的类与方法比较下:

1、join

通过结构可以发现,在JoinedStreams提供了where方法,在where类中提供了equalTo方法,下一层就是window,之后是trigger、evictor以及apply方法。

这里贴出一个代码模板共参考:

 * val one: DataStream[(String, Int)]  = ...
 * val two: DataStream[(String, Int)] = ...
 *
 * val result = one.join(two)
 *     .where {t => ... }
 *     .equal {t => ... }
 *     .window(TumblingEventTimeWindows.of(Time.of(5, TimeUnit.SECONDS)))
 *     .apply(new MyJoinFunction())

2、coGroup

仔细观察我们发现,实现上与join几乎一样,唯一的区别在于apply方法提供的参数类型。

代码模板如下:

 * val one: DataStream[(String, Int)]  = ...
 * val two: DataStream[(String, Int)] = ...
 *
 * val result = one.coGroup(two)
 *     .where(new MyFirstKeySelector())
 *     .equalTo(new MyFirstKeySelector())
 *     .window(TumblingEventTimeWindows.of(Time.of(5, TimeUnit.SECONDS)))
 *     .apply(new MyCoGroupFunction())

3、区别

刚才提到的apply方法中的参数类型不一样,join中提供的apply方法,参数是T1与T2这2种数据类型,对应到SQL中就是T1.* 与 T2.*的一行数据。而coGroup中提供的apply方法,参数是Iterator[T1]与Iterator[2]这2种集合,对应SQL中类似于Table[T1]与Table[T2]。

基于这2种方式,如果我们的逻辑不仅仅是对一条record做处理,而是要与上一record或更复杂的判断与比较,甚至是对结果排序,那么join中的apply方法显得比较困难。

四、程序实践

下面开始实际演示程序的编写与代码的打包并发布到集群,最后输出结果的一步步的过程。

说明:由于是双流,我模拟Kafka的Topic,自定义了2个socket,其中一个指定“transaction”的实时交易输入流,另一个socket指定“Market”的快照输入流,原则上每3秒(时间戳)生成1个快照。

1、join

由于是2个DataStream,且我的逻辑是要根据各自流产生的时间戳去限制window,因此这里要对2个流都分配时间戳并emit水位线(采用EventTime):

val eventMarketStream = marketDataStream.assignAscendingTimestamps(_._2)
val eventTransactionStream = transactionDataStream.assignAscendingTimestamps(_._2)

join操作后,apply方法接收的只是T1与T2的类型:

val joinedStreams = eventTransactionStream
      .join(eventMarketStream)
      .where(_._1)
      .equalTo(_._1)
      .window(TumblingEventTimeWindows.of(Time.seconds(10)))
      .apply{
        (t1 : (Long, Long, Long, Long), t2 : (Long, Long, Long), out : Collector[(Long,String,String,Long,Long,Long)]) =>

          val transactionTime = t1._2
          val marketTime = t2._2
          val differ = transactionTime - marketTime

          val format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS")

          if(differ >=0 && differ <= 3 * 1000) {
            out.collect(t1._1,format.format(marketTime) + ": marketTime", format.format(transactionTime) + ": transactionTime",t2._3,t1._3,t1._4)
          }
      }

这里实现的逻辑就是每个key在10秒的EventTime窗口中join,且只需要那些交易时间在快照时间之后,且在3秒的间隔内的数据。

详细的代码如下:

import java.text.SimpleDateFormat

import org.apache.flink.api.common.functions.MapFunction
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.util.Collector

/**
  * Created by zhe on 2016/6/21.
  */
object JoinedOperaion {
  case class Transaction(szWindCode:String, szCode:Long, nAction:String, nTime:String, seq:Long, nIndex:Long, nPrice:Long,nVolume:Long, nTurnover:Long, nBSFlag:Int, chOrderKind:String, chFunctionCode:String,nAskOrder:Long, nBidOrder:Long, localTime:Long)

  case class MarketInfo(szCode : Long, nActionDay : String, nTime : String, nMatch : Long)

  case class Market(szCode : Long, eventTime : String, nMatch : Long)

  def main(args: Array[String]): Unit = {
    /**
      * 参数包含3个:hostname,port1,port2
      * port1:作为Transaction的输入流(例如nc -lk 9000,然后输入参数指定9000)
      * port2:作为Market的输入流(例如nc -lk 9999,然后输入参数指定9999)
      */
    if(args.length != 3){
      System.err.println("USAGE:\nSocketTextStream <hostname> <port1> <port2>")
      return
    }
    val hostname = args(0)
    val port1 = args(1).toInt
    val port2 = args(2).toInt

    /**
      * 1、指定运行环境,设置EventTime
      * 1、Obtain an execution environment
      */
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

    /**
      * 2、创建初始化数据流:Transaction与market
      * 2、Load/create the initial data
      */
    val inputTransaction = env.socketTextStream(hostname, port1)
    val inputMarket = env.socketTextStream(hostname, port2)

    /**
      * 3、实施“累计资金流量”,
      *    资金流量(in) = if(当前价格>LastPrice){sum + = nTurnover}elsif(当前价格=LastPrice且最近的一次Transaction的价格<>LastPrice的价格且那次价格>LastPrice){sum += nTurnover}
      *    资金流量(out) = if(当前价格<LastPrice){sum + = nTurnover}elsif(当前价格=LastPrice且最近的一次Transaction的价格<>LastPrice的价格且那次价格<LastPrice){sum += nTurnover}
      * 3、Specify transformations on this data
      */
    val transactionDataStream = inputTransaction.map(new TransactionPrice)
    val marketDataStream = inputMarket.map(new MarketPrice)

    val eventMarketStream = marketDataStream.assignAscendingTimestamps(_._2)
    val eventTransactionStream = transactionDataStream.assignAscendingTimestamps(_._2)

    val joinedStreams = eventTransactionStream
      .join(eventMarketStream)
      .where(_._1)
      .equalTo(_._1)
      .window(TumblingEventTimeWindows.of(Time.seconds(10)))
      .apply{
        (t1 : (Long, Long, Long, Long), t2 : (Long, Long, Long), out : Collector[(Long,String,String,Long,Long,Long)]) =>

          val transactionTime = t1._2
          val marketTime = t2._2
          val differ = transactionTime - marketTime

          val format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS")

          if(differ >=0 && differ <= 3 * 1000) {
            out.collect(t1._1,format.format(marketTime) + ": marketTime", format.format(transactionTime) + ": transactionTime",t2._3,t1._3,t1._4)
          }
      }

      .name("JoinedStream Test")

    /**
      * 4、标准输出
      * 4、Specify where to put the results of your computations
      */
    joinedStreams.print()

    /**
      * 5、执行程序
      * 5、Trigger the program execution
      */
    env.execute("2 DataStream join")

  }

  class TransactionPrice extends MapFunction[String,(Long, Long, Long, Long)]{
    def map(transactionStream: String): (Long, Long, Long,Long) = {
      val columns = transactionStream.split(",")
      val transaction = Transaction(columns(0),columns(1).toLong,columns(2),columns(3),columns(4).toLong,columns(5).toLong,
        columns(6).toLong,columns(7).toLong,columns(8).toLong,columns(9).toInt,columns(9),columns(10),columns(11).toLong,
        columns(12).toLong,columns(13).toLong)

      val format = new SimpleDateFormat("yyyyMMddHHmmssSSS")

      if(transaction.nTime.length == 8){
        val eventTimeString = transaction.nAction + ‘0‘ + transaction.nTime
        val eventTime : Long = format.parse(eventTimeString).getTime
        (transaction.szCode,eventTime,transaction.nPrice,transaction.nTurnover)
      }else{
        val eventTimeString = transaction.nAction + transaction.nTime
        val eventTime = format.parse(eventTimeString).getTime
        (transaction.szCode,eventTime,transaction.nPrice,transaction.nTurnover)
      }
    }
  }

  class MarketPrice extends MapFunction[String, (Long, Long, Long)]{
    def map(marketStream : String) : (Long, Long, Long) = {
      val columnsMK = marketStream.split(",")

      val marketInfo = MarketInfo(columnsMK(0).toLong,columnsMK(1),columnsMK(2),columnsMK(3).toLong)

      val format = new SimpleDateFormat("yyyyMMddHHmmssSSS")

      if(marketInfo.nTime.length == 8){
        val eventTimeStringMarket = marketInfo.nActionDay + ‘0‘ + marketInfo.nTime
        val eventTimeMarket = format.parse(eventTimeStringMarket).getTime
        (marketInfo.szCode, eventTimeMarket, marketInfo.nMatch)
      }else{
        val eventTimeStringMarket = marketInfo.nActionDay  + marketInfo.nTime
        val eventTimeMarket = format.parse(eventTimeStringMarket).getTime
        (marketInfo.szCode, eventTimeMarket, marketInfo.nMatch)
      }
    }
  }

}

1.1、Maven打包

在项目的target目录下会生成一个jar包:flink-scala-project-0.1.jar

将其拷贝到Driver(这里采用master当作Driver)。

1.2、启动hdfs集群、Flink集群,并开启2个socket:

master上操作:

关于Hadoop集群、Flink集群的配置,参见各自的官方文档即可。

1.3、发布程序到Flink集群

这里通过CLI(Command-Line Interface)的方式发布:CLI

flink run -c toptrade.flink.trainning.JoinedOperaion /root/Documents/flink-scala-project-0.1.jar master 9998 9999

这里-c是指定入口类,后边的3个参数分别是:hostname、port1、port2

1.4、webUI查看当前集群状态

Flink的conf文件中,webUI的默认端口是8081。

点开Running Jobs:

当前没有任何数据流入,因此records的传输字节都是0。

1.5、进程

由于程序直接发布到了master,也就是master当作Driver来发布,因此看到的Job对应的进程名字叫:CliFrontend

1.6、输入数据

transaction的数据如下:

60000.SH,60000,20160520,93000960,1,39,173200,400,1000,66,0,0,62420,76334,93002085

60000.SH,60000,20160520,93001130,1,39,173200,200,1100,66,0,0,62420,76334,93002085

60000.SH,60000,20160520,93002300,1,41,173200,500,1200,66,0,0,62420,76334,93002085

60000.SH,60000,20160520,93003970,1,41,173200,300,1300,66,0,0,62420,76334,93002085

60000.SH,60000,20160520,93004150,1,41,173200,100,1400,66,0,0,62420,76334,93002085

60000.SH,59999,20160520,93005190,1,41,173200,500,1500,66,0,0,62420,76334,93002085

60000.SH,60000,20160520,93006100,1,41,173200,600,1600,66,0,0,62420,76334,93002085

60000.SH,60000,20160520,93007900,1,41,173200,600,1700,66,0,0,62420,76334,93002085

60000.SH,60000,20160520,93008100,1,41,173200,600,1800,66,0,0,62420,76334,93002085

60000.SH,60000,20160520,93010120,1,41,173200,600,1900,66,0,0,62420,76334,93002085

60000.SH,60000,20160520,93012000,1,41,173200,600,2000,66,0,0,62420,76334,93002085

60000.SH,60000,20160520,93015100,1,41,173200,600,2100,66,0,0,62420,76334,93002085

market的数据如下:

60000,20160520,92507000,173201

60000,20160520,92903000,173201

60000,20160520,93001000,173201

60000,20160520,93004000,173201

60000,20160520,93007000,173199

60000,20160520,93010000,173199

60000,20160520,93013000,173200

60000,20160520,93016000,173200

60000,20160520,93019000,173200

60000,20160520,93022000,173200

60000,20160520,93025000,173200

分别输入socket9998与9999:

1.7、结果:

2> (60000,2016-05-20 09:30:01.000: marketTime,2016-05-20 09:30:02.300: transactionTime,173201,173200,1200)

2> (60000,2016-05-20 09:30:04.000: marketTime,2016-05-20 09:30:04.150: transactionTime,173201,173200,1400)

2> (60000,2016-05-20 09:30:04.000: marketTime,2016-05-20 09:30:06.100: transactionTime,173201,173200,1600)

2> (60000,2016-05-20 09:30:07.000: marketTime,2016-05-20 09:30:08.100: transactionTime,173199,173200,1800)

2> (60000,2016-05-20 09:30:01.000: marketTime,2016-05-20 09:30:01.130: transactionTime,173201,173200,1100)

2> (60000,2016-05-20 09:30:01.000: marketTime,2016-05-20 09:30:03.970: transactionTime,173201,173200,1300)

2> (60000,2016-05-20 09:30:07.000: marketTime,2016-05-20 09:30:07.900: transactionTime,173199,173200,1700)

~

我们看到,join操作没问题,而且也按照我们的逻辑输出了最终的结果,但唯一遗憾的是我无法再对这个结果进行排序操作,进而进行后续的计算。只能通过map对结果集进行自定义的排序。

这里我的逻辑是希望对结果按照transaction的时间顺序排序后,再进行复杂的计算,所以无法在一个apply中实现。

2、coGroup

这里省略打包发布的命令,直接贴上代码并看输出结果:

import java.text.SimpleDateFormat

import org.apache.flink.api.common.functions.MapFunction
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.util.Collector

import scala.collection.mutable.ListBuffer

object Job {

  case class Transaction(szWindCode:String, szCode:Long, nAction:String, nTime:String, seq:Long, nIndex:Long, nPrice:Long,
                         nVolume:Long, nTurnover:Long, nBSFlag:Int, chOrderKind:String, chFunctionCode:String,
                         nAskOrder:Long, nBidOrder:Long, localTime:Long
                         )

  case class MarketInfo(szCode : Long, nActionDay : String, nTime : String, nMatch : Long)

  def main(args: Array[String]): Unit = {
    /**
      * 参数包含3个:hostname,port1,port2
      * port1:作为Transaction的输入流(例如nc -lk 9000,然后输入参数指定9000)
      * port2:作为Market的输入流(例如nc -lk 9999,然后输入参数指定9999)
      */
    if(args.length != 3){
      System.err.println("USAGE:\nSocketTextStream <hostname> <port1> <port2>")
      return
    }
    val hostname = args(0)
    val port1 = args(1).toInt
    val port2 = args(2).toInt

    /**
      * 1、指定运行环境,设置EventTime
      * 1、Obtain an execution environment
      */
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

    /**
      * 2、创建初始化数据流:Transaction与market
      * 2、Load/create the initial data
      */
    val inputTransaction = env.socketTextStream(hostname, port1)
    val inputMarket = env.socketTextStream(hostname, port2)

    /**
      * 3、实施“累计资金流量”,
      *    资金流量(in) = if(当前价格>LastPrice){sum + = nTurnover}elsif(当前价格=LastPrice且最近的一次Transaction的价格<>LastPrice的价格且那次价格>LastPrice){sum += nTurnover}
      *    资金流量(out) = if(当前价格<LastPrice){sum + = nTurnover}elsif(当前价格=LastPrice且最近的一次Transaction的价格<>LastPrice的价格且那次价格<LastPrice){sum += nTurnover}
      * 3、Specify transformations on this data
      */
    val transactionDataStream = inputTransaction.map(new TransactionPrice)
    val marketDataStream = inputMarket.map(new MarketPrice)

    val eventMarketStream = marketDataStream.assignAscendingTimestamps(_._2)
    val eventTransactionStream = transactionDataStream.assignAscendingTimestamps(_._2)

    val coGroupedStreams = eventTransactionStream
      .coGroup(eventMarketStream)
      .where(_._1)
      .equalTo(_._1)
      .window(TumblingEventTimeWindows.of(Time.seconds(10)))
      .apply {
        (t1: Iterator[(Long, Long, Long, Long)], t2: Iterator[(Long, Long, Long)], out: Collector[(Long, String, String, Long, Long)]) =>

          val format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS")

          val listOut = new ListBuffer[(Long, String, String, Long, Long, Long)]

          //将Iterator的元素赋值给一个ListBuffer
          val l1 = new ListBuffer[(Long,Long,Long,Long)]
          while(t1.hasNext){
            l1.append(t1.next())
          }

          val l2 = new ListBuffer[(Long,Long,Long)]
          while(t2.hasNext){
            l2.append(t2.next())
          }

          //遍历每个ListBuffer,将coGroup后的所有结果进行判断,只取Transaction的时间-Snapshot的时间between 0 和3000(ms)
          for(e1 <- l1){
            for(e2 <- l2){

              if(e1._2 - e2._2 >=0 && e1._2 - e2._2 <= 3 * 1000){
                listOut.append((e1._1,"tranTime: "+format.format(e1._2),"markTime: "+ format.format(e2._2),e1._3,e2._3, e1._4))
                //out.collect(e1._1,"tranTime: "+format.format(e1._2),"markTime: "+ format.format(e2._2),e1._3,e2._3)
              }
            }
          }
          //需要将ListBuffer中的结果按照Transaction时间进行排序
          val l : ListBuffer[(Long, String, String, Long, Long, Long)] = listOut.sortBy(_._2)

          //测试是否按照transactionTime进行排序
          l.foreach(f => println("排序后的结果集:" + f))

          var fundFlowIn : Long = 0
          var fundFlowOut : Long= 0
          var InOutState : Int= 1

          /**
            * 实施“资金流量”的逻辑:
            * 如果交易的价格 > 上一快照的价格,则资金流入
            * 如果交易的价格 < 上一快照的价格,则资金流出
            * 如果交易的价格 = 上一快照的价格,则要看上一交易是属于流入还是流出,如果上一交易是流入,则流入,流出则流出
            * 如果第一笔交易的价格与上一快照的价格相等,则默认资金流入
            */
          for(item <- l) {
            if (item._4 > item._5) {
              fundFlowIn = fundFlowIn + item._6
              InOutState = 1
            } else if (item._4 < item._5) {
              fundFlowOut = fundFlowOut + item._6
              InOutState = 0
            } else {
              if (InOutState == 1) {
                fundFlowIn = fundFlowIn + item._6
                InOutState = 1
              } else {
                fundFlowOut = fundFlowOut + item._6
                InOutState = 0
              }
            }
            //out.collect(item._1,item._2,item._3,item._4,item._5)
          }

          if(!l.isEmpty) {
            val szCode = l.head._1
            val tranStartTime = l.head._2
            val tranEndTime = l.last._2

            out.collect(szCode,tranStartTime,tranEndTime,fundFlowIn, fundFlowOut)
          }
      }
      .name("coGroupedStream Test")

    /**
      * 4、标准输出
      * 4、Specify where to put the results of your computations
      */
    coGroupedStreams.print()

    /**
      * 5、执行程序
      * 5、Trigger the program execution
      */
    env.execute("2 DataStream coGroup")

  }

  class TransactionPrice extends MapFunction[String,(Long, Long, Long, Long)]{
    def map(transactionStream: String): (Long, Long, Long,Long) = {
      val columns = transactionStream.split(",")
      val transaction = Transaction(columns(0),columns(1).toLong,columns(2),columns(3),columns(4).toLong,columns(5).toLong,
        columns(6).toLong,columns(7).toLong,columns(8).toLong,columns(9).toInt,columns(9),columns(10),columns(11).toLong,
        columns(12).toLong,columns(13).toLong)

      val format = new SimpleDateFormat("yyyyMMddHHmmssSSS")

      if(transaction.nTime.length == 8){
        val eventTimeString = transaction.nAction + ‘0‘ + transaction.nTime
        val eventTime : Long = format.parse(eventTimeString).getTime
        (transaction.szCode,eventTime,transaction.nPrice,transaction.nTurnover)
      }else{
        val eventTimeString = transaction.nAction + transaction.nTime
        val eventTime = format.parse(eventTimeString).getTime
        (transaction.szCode,eventTime,transaction.nPrice,transaction.nTurnover)
      }
    }
  }

  class MarketPrice extends MapFunction[String, (Long, Long, Long)]{
    def map(marketStream : String) : (Long, Long, Long) = {
      val columnsMK = marketStream.split(",")

      val marketInfo = MarketInfo(columnsMK(0).toLong,columnsMK(1),columnsMK(2),columnsMK(3).toLong)

      val format = new SimpleDateFormat("yyyyMMddHHmmssSSS")

      if(marketInfo.nTime.length == 8){
        val eventTimeStringMarket = marketInfo.nActionDay + ‘0‘ + marketInfo.nTime
        val eventTimeMarket = format.parse(eventTimeStringMarket).getTime
        (marketInfo.szCode, eventTimeMarket, marketInfo.nMatch)
      }else{
        val eventTimeStringMarket = marketInfo.nActionDay  + marketInfo.nTime
        val eventTimeMarket = format.parse(eventTimeStringMarket).getTime
        (marketInfo.szCode, eventTimeMarket, marketInfo.nMatch)
      }
    }
  }

}

这里边唯一的不同就是apply方法的参数,coGroup是iterator,我就可以直接在apply中进行排序,并计算了。

结果如下:

可以看到,结果按照transaction排序并生成了最终的资金流量(流入与流出)。

其实我想输出的就是每个窗口内的:股票代码、窗口内最早的交易时间、窗口内最后的交易时间、资金流量流入、资金流量流出。

通过coGroup的诸多方法,实现了我的需求。

五、总结

join操作与coGroup操作在Flink流处理中很有用,其中coGroup相对来讲功能更强大一点。

但是,相对于Spark提供了Spark SQL而言,Flink在DataStream中队SQL的支持显然不够,在即将到来的Flink1.1以及未来的Flink1.2版本中,DataStream中会有对SQL的支持,那时候写起程序会容易的多。

参考文档:

CLI

官方文档

Window Join与Window coGroup

官方文档

SQL for

Flink官方博客

JoinedStream源码

CoGroupedStream源码

Streaming Join

implementation

Streaming Left outer

join

时间: 2024-08-12 13:07:00

Flink流计算编程--在双流中体会joinedStream与coGroupedStream的相关文章

Flink流计算编程--在WindowedStream中体会EventTime与ProcessingTime

一.Flink流处理简介 Flink流处理的API叫做DataStream,可以在保证Exactly-Once的前提下提供高吞吐.低延时的实时流处理.用Flink作为流处理框架成功的案例可参考Flink母公司–Data Artisans官方blog中的2篇文章: How we selected Apache Flink as our Stream Processing Framework at the Otto Group Business Intelligence Department RBE

flink流计算随笔(3)

Stateful Computations over Data Streams(在数据流的有状态计算)Apache Flink是一个用于分布式流和批处理数据的开源平台.Flink的核心是一个流数据流引擎,它为数据流上的分布式计算提供数据分布.通信和容错能力.Flink在流引擎之上构建批处理,覆盖本地迭代支持.托管内存和程序优化.通常在程序中的转换和数据流中的操作符之间存在一对一的对应关系.然而,有时一个转换可能包含多个转换操作符. 在串流连接器和批处理连接器文档中记录了源和汇(Sources a

Flink流计算随笔(1)

相比 Spark Stream.Kafka Stream.Storm 等,为什么阿里会选择 Flink 作为新一代流式计算引擎?前期经过了哪些调研和对比? 大沙:我们是 2015 年开始调研新一代流计算引擎的.我们当时的目标就是要设计一款低延迟.exactly once.流和批统一的,能够支撑足够大体量的复杂计算的引擎.Spark streaming 的本质还是一款基于 microbatch 计算的引擎.这种引擎一个天生的缺点就是每个 microbatch 的调度开销比较大,当我们要求越低的延迟

flink流计算随笔(4)

Flink中的程序本质上是并行的和分布式的.在执行期间,流有一个或多个流分区,每个操作符有一个或多个操作符子任务.操作符子任务相互独立,在不同的线程中执行,可能在不同的机器或容器上执行. 运算符子任务的数量是特定运算符的并行度.一个流的并行性总是它的生产操作符的并行性.同一程序的不同运算符可能具有不同级别的并行性. 流可以在两个操作符之间以一对一(或转发)模式传输数据,也可以在重分发模式中传输数据: 一对一One-to-one流(例如上图中源和map()运算符之间的流)保持元素的分区和顺序.这意

flink流计算随笔(6)

?生成,编译模板工程 MacBook-Air:SocketWindowWordCount myhaspl$ bash <(curl https://flink.apache.org/q/sbt-quickstart.sh) % Total % Received % Xferd Average Speed Time Time Time Current Dload Upload Total Spent Left Speed 100 11510 100 11510 0 0 4499 0 0:00:02

flink流计算随笔(2)

MACOS下安装flink: $ brew install apache-flink ... $ flink --version $brew upgrade MACOS下启动flink: $cd /usr/local/Cellar/apache-flink/1.6.0 $./libexec/bin/start-cluster.sh /* * Licensed to the Apache Software Foundation (ASF) under one * or more contribut

流计算及在特来电监控引擎中的实践

随着云计算的深入落地,大数据技术有了坚实的底层支撑,不断向前发展并日趋成熟,无论是传统企业还是互联网公司,都不再满足于离线批处理计算,而是更倾向于应用实时流计算,要想在残酷的企业竞争中立于不败之地,企业数据必须被快速处理并输出结果,流计算无疑将是企业Must Have的大杀器.作为充电生态网的领军企业,特来电在流计算方面很早便开始布局,下面笔者抛砖引玉的谈一下流计算及在特来电监控引擎中的应用实践. 一.由Bit说开去 作为计算机信息中的最小单位,Bit就像工蚁一样忙碌,任一时刻都只能处于以下三种

Apache Flink流分区器剖析

这篇文章介绍Flink的分区器,在流进行转换操作后,Flink通过分区器来精确得控制数据流向. StreamPartitioner StreamPartitioner是Flink流分区器的基类,它只定义了一个抽象方法: public abstract StreamPartitioner<T> copy(); 但这个方法并不是各个分区器之间互相区别的地方,定义不同的分区器的核心在于--各个分区器需要实现channel选择的接口方法: int[] selectChannels(T record,

.Spark Streaming(上)--实时流计算Spark Streaming原理介

Spark入门实战系列--7.Spark Streaming(上)--实时流计算Spark Streaming原理介绍 http://www.cnblogs.com/shishanyuan/p/4747735.html 1.Spark Streaming简介 1.1 概述 Spark Streaming 是Spark核心API的一个扩展,可以实现高吞吐量的.具备容错机制的实时流数据的处理.支持从多种数据源获取数据,包括Kafk.Flume.Twitter.ZeroMQ.Kinesis 以及TCP