flink流计算随笔(6)

?生成,编译模板工程

MacBook-Air:SocketWindowWordCount myhaspl$ bash <(curl https://flink.apache.org/q/sbt-quickstart.sh)
  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100 11510  100 11510    0     0   4499      0  0:00:02  0:00:02 --:--:--  4508
This script creates a Flink project using Scala and SBT.

Project name (Flink Project): SocketWindowWordCount
Organization (org.example): myhaspl.com
Version (0.1-SNAPSHOT):
Scala version (2.11.12):
Flink version (1.6.0): 

-----------------------------------------------
Project Name: SocketWindowWordCount
Organization: myhaspl.wordcount
Version: 0.1-SNAPSHOT
Scala version: 2.11.12
Flink version: 1.6.0
-----------------------------------------------
Create Project? (Y/n): y
Creating Flink project under socketwindowwordcount
MacBook-Air:SocketWindowWordCount myhaspl$ ls
socketwindowwordcount
$cd  socketwindowwordcount
$sbt clean assembly
MacBook-Air:socketwindowwordcount myhaspl$ sbt run
[info] Loading settings for project socketwindowwordcount-build from assembly.sbt ...
[info] Loading project definition from /Users/aaaaaa/Documents/scala/learn_2/socketwindowwordcount/project
[info] Loading settings for project root from idea.sbt,build.sbt ...
[info] Set current project to Flink Project (in build file:/Users/aaaaaaaa/Documents/scala/learn_2/socketwindowwordcount/)
[warn] Multiple main classes detected.  Run ‘show discoveredMainClasses‘ to see the list

Multiple main classes detected, select one to run:

 [1] myhaspl.wordcount.Job
 [2] myhaspl.wordcount.SocketTextStreamWordCount
 [3] myhaspl.wordcount.WordCount

Enter number: 3

[info] (and,1)
[info] (arrows,1)
[info] (be,2)
[info] (is,1)
[info] (nobler,1)
[info] (of,2)
[info] (a,1)
[info] (in,1)
[info] (mind,1)
[info] (or,2)
[info] (slings,1)
[info] (suffer,1)
[info] (against,1)
[info] (arms,1)
[info] (not,1)
[info] (outrageous,1)
[info] (sea,1)
[info] (the,3)
[info] (tis,1)
[info] (troubles,1)
[info] (whether,1)
[info] (fortune,1)
[info] (question,1)
[info] (take,1)
[info] (that,1)
[info] (to,4)
[success] Total time: 8 s, completed Oct 11, 2018 8:56:09 AM
MacBook-Air:learn2 myhaspl$ sbt run
[info] Loading settings for project learn2-build from assembly.sbt ...
[info] Loading project definition from /Users/aaaaaaa/Documents/scala/learn_2/learn2/project
[info] Loading settings for project root from idea.sbt,build.sbt ...
[info] Set current project to Flink Project (in build file:/Users/aaaaaaa/Documents/scala/learn_2/learn2/)
[info] Running (fork) learn
[info] 16
[info] 2 add 5 =7
[info] 2 add 0 =2
[info] 15
[success] Total time: 2 s, completed Oct 11, 2018 11:18:48 AM
MacBook-Air:learn_2 myhaspl$ pwd
/Users/A/Documents/scala/learn_2
MacBook-Air:learn_2 myhaspl$ vim learn_2.scala
object learn {

  def main(args: Array[String]): Unit = {
      println(myPower(2,4))
      println(myAdd(2,5))
      println(myAdd(2))
      println(mySum(1,2,3,4,5))
  }

  @annotation.tailrec
  def myPower(x:Int,n:Int,t:Int=1):Int={
    if (n<1) t
    else myPower(x,n-1,x*t)
  }
  def myAdd(x:Int,y:Int=0)={
     val result:Int=x+y
     s"$x add $y =$result"
  }
  def mySum(nums:Int*)={//可变参数
     var sumNum=0
     for (num<-nums){
       sumNum+=num
     }
     sumNum
  }

}
MacBook-Air:learn_2 myhaspl$ ls
learn_2.scala
MacBook-Air:learn_2 myhaspl$ bash <(curl https://flink.apache.org/q/sbt-quickstart.sh)
  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100 11510  100 11510    0     0   3185      0  0:00:03  0:00:03 --:--:--  3189
This script creates a Flink project using Scala and SBT.

Project name (Flink Project): learn2
Organization (org.example): myhaspl
Version (0.1-SNAPSHOT):
Scala version (2.11.12):
Flink version (1.6.0): 

-----------------------------------------------
Project Name: learn2
Organization: myhaspl
Version: 0.1-SNAPSHOT
Scala version: 2.11.12
Flink version: 1.6.0
-----------------------------------------------
Create Project? (Y/n): y
Creating Flink project under learn2
MacBook-Air:learn_2 myhaspl$ ls
learn2      learn_2.scala
MacBook-Air:learn_2 myhaspl$ cd learn2
MacBook-Air:learn2 myhaspl$ ls
README      build.sbt   idea.sbt    project     src
MacBook-Air:learn2 myhaspl$ cd sr
-bash: cd: sr: No such file or directory
MacBook-Air:learn2 myhaspl$ cd src
MacBook-Air:src myhaspl$ ls
main
MacBook-Air:src myhaspl$ cd main
MacBook-Air:main myhaspl$ ls
resources   scala
MacBook-Air:main myhaspl$ cd scala
MacBook-Air:scala myhaspl$ ls
myhaspl
MacBook-Air:scala myhaspl$ cd myhaspl
MacBook-Air:myhaspl myhaspl$ ls
Job.scala           WordCount.scala
SocketTextStreamWordCount.scala
MacBook-Air:myhaspl myhaspl$ rm *.scala
MacBook-Air:myhaspl myhaspl$ ls
MacBook-Air:myhaspl myhaspl$ cp /Users/aaaaa/Documents/scala/learn_1/src/learn.scala learn.scala
MacBook-Air:myhaspl myhaspl$ ls
learn.scala
MacBook-Air:myhaspl myhaspl$ pwd
/Users/aaaaa/Documents/scala/learn_2/learn2/src/main/scala/myhaspl
MacBook-Air:myhaspl myhaspl$ sbt clean assembly

MacBook-Air:learn2 myhaspl$ pwd
/Users/aaaaa/Documents/scala/learn_2/learn2
MacBook-Air:learn2 myhaspl$  sbt clean assembly
[info] Updated file /Users/bbbb/Documents/scala/learn_2/learn2/project/build.properties: set sbt.version to 1.2.4
[info] Loading settings for project learn2-build from assembly.sbt ...
[info] Loading project definition from /Users/aaaaa/Documents/scala/learn_2/learn2/project
[info] Updating ProjectRef(uri("file:/Users/aassfdfsdaxg/Documents/scala/learn_2/learn2/project/"), "learn2-build")...
[info] Done updating.
[warn] There may be incompatibilities among your library dependencies.
[warn] Run ‘evicted‘ to see detailed eviction warnings
[info] Loading settings for project root from idea.sbt,build.sbt ...
[info] Set current project to Flink Project (in build file:/Users/cccccc/Documents/scala/learn_2/learn2/)
[success] Total time: 0 s, completed Oct 11, 2018 11:03:29 AM
[info] Updating ...
[info] Done updating.
[info] Compiling 1 Scala source to /Users/AAA/Documents/scala/learn_2/learn2/target/scala-2.11/classes ...
[info] Done compiling.
[info] Checking every *.class/*.jar file‘s SHA-1.
[info] Merging files...
[info] SHA-1: eaaa2f651ba4387defc6282c2de36e6dd4402f32
[info] Packaging /Users/aaaaaasdf/Documents/scala/learn_2/learn2/target/scala-2.11/Flink Project-assembly-0.1-SNAPSHOT.jar ...
[info] Done packaging.
[success] Total time: 12 s, completed Oct 11, 2018 11:03:41 AM
MacBook-Air:learn2 myhaspl$ sbt
[info] Loading settings for project learn2-build from assembly.sbt ...
[info] Loading project definition from /Users/aaaaaa/Documents/scala/learn_2/learn2/project
[info] Loading settings for project root from idea.sbt,build.sbt ...
[info] Set current project to Flink Project (in build file:/Users/aaaaaa/Documents/scala/learn_2/learn2/)
[info] sbt server started at local:///Users/aaaaaa/.sbt/1.0/server/e759170bdd67731a9bda/sock
sbt:Flink Project> compile
[success] Total time: 1 s, completed Oct 11, 2018 11:27:46 AM
sbt:Flink Project> package
[info] Packaging /Users/aaaaaa/Documents/scala/learn_2/learn2/target/scala-2.11/flink-project_2.11-0.1-SNAPSHOT.jar ...
[info] Done packaging.
[success] Total time: 0 s, completed Oct 11, 2018 11:27:53 AM

MacBook-Air:target myhaspl$ cd scala-2.11
MacBook-Air:scala-2.11 myhaspl$ ls
Flink Project-assembly-0.1-SNAPSHOT.jar flink-project_2.11-0.1-SNAPSHOT.jar
classes                 resolution-cache

跑官方例子,从端口接收字符串文本,然后,wordcount

MacBook-Air:flink myhaspl$ ./libexec/bin/start-cluster.sh
Starting cluster.
Starting standalonesession daemon on host MacBook-Air.local.
Starting taskexecutor daemon on host MacBook-Air.local.

MacBook-Air:Documents myhaspl$ nc -l 9800
aa ss dd ff gg
bye
^C

MacBook-Air:flink myhaspl$ flink run libexec/examples/streaming/SocketWindowWordCount.jar  --port 9800
Starting execution of program
Program execution finished
Job with JobID 1625d3a29cfdf8fa8a77c3e5c8e9d30e has finished.
Job Runtime: 9181 ms
单词以5秒的时间窗口(处理时间,滚动窗口)计数,并打印到标准输出。监视任务管理器的输出文件,并在nc中写入一些文本(点击后一行一行地将输入发送到Flink):
启动nc -l 9800后需要快速输入,5秒

MacBook-Air:flink myhaspl$ ls -la libexec/log/*
-rw-r--r--  1 myhaspl  admin  19733 10 11 14:41 libexec/log/flink-myhaspl-standalonesession-0-MacBook-Air.local.log
-rw-r--r--  1 myhaspl  admin      0 10 11 14:41 libexec/log/flink-myhaspl-standalonesession-0-MacBook-Air.local.out
-rw-r--r--  1 myhaspl  admin  32693 10 11 14:41 libexec/log/flink-myhaspl-taskexecutor-0-MacBook-Air.local.log
-rw-r--r--  1 myhaspl  admin     43 10 11 14:41 libexec/log/flink-myhaspl-taskexecutor-0-MacBook-Air.local.out
MacBook-Air:flink myhaspl$ cat libexec/log/flink-myhaspl-taskexecutor-0-MacBook-Air.local.out
aa : 1
gg : 1
ff : 1
dd : 1
ss : 1
bye : 1
MacBook-Air:flink myhaspl$ 

清空log下的文件,并重新启动cluster,然后,输入几段字符,记住输入时一定快,在5秒内搞定,否则在.out文件中找不到结果的,因为程序上设定如此


object SocketWindowWordCount {

    def main(args: Array[String]) : Unit = {

        // the port to connect to
        val port: Int = try {
            ParameterTool.fromArgs(args).getInt("port")
        } catch {
            case e: Exception => {
                System.err.println("No port specified. Please run ‘SocketWindowWordCount --port <port>‘")
                return
            }
        }

        // get the execution environment
        val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

        // get input data by connecting to the socket
        val text = env.socketTextStream("localhost", port, ‘\n‘)

        // parse the data, group it, window it, and aggregate the counts
        val windowCounts = text
            .flatMap { w => w.split("\\s") }
            .map { w => WordWithCount(w, 1) }
            .keyBy("word")
            .timeWindow(Time.seconds(5), Time.seconds(1))
            .sum("count")

        // print the results with a single thread, rather than in parallel
        windowCounts.print().setParallelism(1)

        env.execute("Socket Window WordCount")
    }

    // Data type for words with count
    case class WordWithCount(word: String, count: Long)
}
MacBook-Air:flink myhaspl$ ./libexec/bin/stop-cluster.sh
MacBook-Air:flink myhaspl$ ./libexec/bin/start-cluster.sh
MacBook-Air:Documents myhaspl$ nc -l 9800
ss
ss
ff
gg
ss
^C
MacBook-Air:flink myhaspl$ flink run libexec/examples/streaming/SocketWindowWordCount.jar  --port 9800
Starting execution of program
Program execution finished
Job with JobID 3cda8e474ed0050ef05828fc91cd8302 has finished.
Job Runtime: 3509 ms

MacBook-Air:flink myhaspl$ cat libexec/log/flink-myhaspl-taskexecutor-0-MacBook-Air.local.out
ss : 2
gg : 1
ff : 1
MacBook-Air:flink myhaspl$ 

我们下面把时间 由5秒搞长一点
重新建立一个flink模板

MacBook-Air:scala myhaspl$ mkdir learn_3
MacBook-Air:scala myhaspl$ cd learn_3
MacBook-Air:learn_3 myhaspl$ ls
MacBook-Air:learn_3 myhaspl$ bash <(curl https://flink.apache.org/q/sbt-quickstart.sh)
  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100 11510  100 11510    0     0   5130      0  0:00:02  0:00:02 --:--:--  5274
This script creates a Flink project using Scala and SBT.

Project name (Flink Project): learn3
Organization (org.example): myhaspl
Version (0.1-SNAPSHOT):
Scala version (2.11.12):
Flink version (1.6.0): 

-----------------------------------------------
Project Name: learn3
Organization: myhaspl
Version: 0.1-SNAPSHOT
Scala version: 2.11.12
Flink version: 1.6.0
-----------------------------------------------
Create Project? (Y/n): y
Creating Flink project under learn3
MacBook-Air:learn_3 myhaspl$
MacBook-Air:learn_3 myhaspl$ cd learn3
MacBook-Air:learn3 myhaspl$ ls
README      build.sbt   idea.sbt    project     src
MacBook-Air:learn3 myhaspl$ cd src
MacBook-Air:src myhaspl$ ls
main
MacBook-Air:src myhaspl$ cd main
MacBook-Air:main myhaspl$ ls
resources   scala
MacBook-Air:main myhaspl$ cd scala
MacBook-Air:scala myhaspl$ ls
myhaspl
MacBook-Air:scala myhaspl$ cd myhaspl
MacBook-Air:myhaspl myhaspl$ ls
Job.scala           WordCount.scala
SocketTextStreamWordCount.scala
MacBook-Air:myhaspl myhaspl$ rm W*.scala
MacBook-Air:myhaspl myhaspl$ rm J*.scala
MacBook-Air:myhaspl myhaspl$ ls
SocketTextStreamWordCount.scala
MacBook-Air:myhaspl myhaspl$
MacBook-Air:myhaspl myhaspl$ pwd
/Users/xxxxx/Documents/scala/learn_3/learn3/src/main/scala/myhaspl
MacBook-Air:myhaspl myhaspl$ cd ../../..
MacBook-Air:src myhaspl$ ls
main
MacBook-Air:src myhaspl$ cd ../..
MacBook-Air:learn_3 myhaspl$ ls
learn3
MacBook-Air:learn_3 myhaspl$ cd learn3
MacBook-Air:learn3 myhaspl$ ls
README      build.sbt   idea.sbt    project     src
MacBook-Air:learn3 myhaspl$ sbt clean assembly
MacBook-Air:learn3 myhaspl$ sbt
[info] Loading settings for project learn3-build from assembly.sbt ...
[info] Loading project definition from /Users/zzzzzz/Documents/scala/learn_3/learn3/project
[info] Loading settings for project root from idea.sbt,build.sbt ...
[info] Set current project to Flink Project (in build file:/Users/zzzzzz/Documents/scala/learn_3/learn3/)
[info] sbt server started at local:///Users/lzz/.sbt/1.0/server/1d8f10f02b1fbf814396/sock
sbt:Flink Project> compile
[success] Total time: 1 s, completed Oct 11, 2018 3:30:19 PM
sbt:Flink Project> package
[info] Packaging /Users/zzz/Documents/scala/learn_3/learn3/target/scala-2.11/flink-project_2.11-0.1-SNAPSHOT.jar ...
[info] Done packaging.
[success] Total time: 0 s, completed Oct 11, 2018 3:30:29 PM
sbt:Flink Project> exit
[info] shutting down server
MacBook-Air:learn3 myhaspl$ 

停止cluster,重新启动,这次取消了时间窗口,可以慢慢输入

MacBook-Air:learn3 myhaspl$ ~/Documents/flink/libexec/bin/stop-cluster.sh
Stopping taskexecutor daemon (pid: 29505) on host MacBook-Air.local.
Stopping standalonesession daemon (pid: 29093) on host MacBook-Air.local.
MacBook-Air:learn3 myhaspl$ ls ~/Documents/flink/libexec/log
flink-myhaspl-standalonesession-0-MacBook-Air.local.log flink-myhaspl-taskexecutor-0-MacBook-Air.local.log
flink-myhaspl-standalonesession-0-MacBook-Air.local.out flink-myhaspl-taskexecutor-0-MacBook-Air.local.out
MacBook-Air:learn3 myhaspl$ rm ~/Documents/flink/libexec/log/*
MacBook-Air:learn3 myhaspl$ ls ~/Documents/flink/libexec/log
MacBook-Air:learn3 myhaspl$
MacBook-Air:learn3 myhaspl$ flink run  ~/Documents/scala/learn_3/learn3/target/scala-2.11/flink-project_2.11-0.1-SNAPSHOT.jar   127.0.0.1 9800
MacBook-Air:learn3 myhaspl$ ~/Documents/flink/libexec/bin/start-cluster.sh
Starting cluster.
Starting standalonesession daemon on host MacBook-Air.local.
Starting taskexecutor daemon on host MacBook-Air.local.
MacBook-Air:learn3 myhaspl$ 

MacBook-Air:learn_3 myhaspl$ nc -l 9800
Unbounded streams have a start but no defined end. They do not terminate and provide data as it is generated. Unbounded streams must be continuously processed, i.e., events must be promptly handled after they have been ingested. It is not possible to wait for all input data to arrive because the input is unbounded and will not be complete at any point in time. Processing unbounded data often requires that events are ingested in a specific order, such as the order in which events occurred, to be able to reason about result completeness.
Bounded streams have a defined start and end. Bounded streams can be processed by ingesting all data before performing any computations. Ordered ingestion is not required to process bounded streams because a bounded data set can always be sorted. Processing of bounded streams is also known as batch processing.
^C

MacBook-Air:learn3 myhaspl$ flink run  ~/Documents/scala/learn_3/learn3/target/scala-2.11/flink-project_2.11-0.1-SNAPSHOT.jar   127.0.0.1 9800
Starting execution of program
Program execution finished
Job with JobID 7f66cd617236d64520a08a44b0544234 has finished.
Job Runtime: 63502 ms
MacBook-Air:learn3 myhaspl$ 

MacBook-Air:learn_3 myhaspl$ ls ~/Documents/flink/libexec/log
flink-myhaspl-standalonesession-0-MacBook-Air.local.log
flink-myhaspl-standalonesession-0-MacBook-Air.local.out
flink-myhaspl-taskexecutor-0-MacBook-Air.local.log
flink-myhaspl-taskexecutor-0-MacBook-Air.local.out
MacBook-Air:learn_3 myhaspl$ cat ~/Documents/flink/libexec/log/flink-myhaspl-taskexecutor-0-MacBook-Air.local.out
...
(bounded,4)
(data,5)
(set,1)
(can,2)
(always,1)
(be,6)
(sorted,1)
(processing,2)
(of,1)
(bounded,5)
(streams,6)
(is,5)
(also,1)
(known,1)
(as,3)
(batch,1)
(processing,3)
(bye,1)

启动flink Scala REPL

MacBook-Air:learn3 myhaspl$ ~/Documents/flink/libexec/bin/start-scala-shell.sh local
Starting Flink Shell:
log4j:WARN No appenders could be found for logger (org.apache.flink.configuration.GlobalConfiguration).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.

Starting local Flink cluster (host: localhost, port: 8081).

Connecting to Flink cluster (host: localhost, port: 8081).

                         ?▓██▓██?
                     ▓████??█▓?▓███▓?
                  ▓███▓??        ???▓██?  ?
                ?██?   ??▓▓█▓▓??      ?████
                ██?         ??▓███?    ?█?█?
                  ?▓█            ███   ▓??██
                    ▓█       ?????▓██▓???▓▓█
                  █? █   ???       ███▓▓█ ?█???
                  ████?   ?▓█▓      ██??? ▓███?
               ??█▓▓██       ▓█?    ▓█?▓██▓ ?█?
         ▓??▓████? ██         ?█    █▓??█???█?
        ███▓?██▓  ▓█           █   █▓ ?▓█▓▓█?
      ?██▓  ?█?            █  █? ?█████▓? ██▓??
     ███? ? █?          ▓ ?█ █████???    ?█?▓  ▓?
    ██▓█ ??▓?          ▓███████▓?       ?█? ?▓ ▓██▓
 ?██▓ ▓█ █▓█       ??█████▓▓??         ██??  █ ?  ▓█?
 ▓█▓  ▓█ ██▓ ?▓▓▓▓▓▓▓?              ?██▓           ?█?
 ▓█    █ ▓███▓??              ?▓▓▓███▓          ??? ▓█
 ██▓    ██?    ??▓▓███▓▓▓▓▓██████▓?            ▓███  █
▓███? ███   ?▓▓???   ?▓████▓?                  ??▓?  █▓
█▓??▓▓██  ??????????▓██▓?                            █▓
██ ▓??█   ▓▓▓▓???  ?█▓       ?▓▓██▓    ▓?          ??▓
▓█▓ ▓?█  █▓?  ??▓▓██?            ?▓█?   ??????▓█████?
 ██? ▓█?█?  ?▓▓?  ▓█                █?      ????   ?█?
 ▓█   ?█▓   ?     █?                ?█              █▓
  █▓   ██         █?                 ▓▓        ?█▓▓▓?█?
   █▓ ?▓██?       ▓?                  ▓█▓?????▓█?    ?█
    ██   ▓█▓?      ?                    ??█?██?      ▓▓
     ▓█?   ?█▓??                         ?? █?█▓?????██
      ?██?    ?▓▓?                     ▓██▓?█? ?▓▓▓▓?█▓
        ?▓██?                          ▓?  ?█▓█  ?????
            ?▓▓▓▓▓?????????????????????????▓▓  ▓??█?

              F L I N K - S C A L A - S H E L L

NOTE: Use the prebound Execution Environments to implement batch or streaming programs.

  Batch - Use the ‘benv‘ variable

    * val dataSet = benv.readTextFile("/path/to/data")
    * dataSet.writeAsText("/path/to/output")
    * benv.execute("My batch program")

    HINT: You can use print() on a DataSet to print the contents to the shell.

  Streaming - Use the ‘senv‘ variable

    * val dataStream = senv.fromElements(1, 2, 3, 4)
    * dataStream.countWindowAll(2).sum(0).print()
    * senv.execute("My streaming program")

    HINT: You can only print a DataStream to the shell in local mode.

scala> 
scala> val text = benv.fromElements(
     |   "To be, or not to be,--that is the question:--",
     |   "Whether ‘tis nobler in the mind to suffer",
     |   "The slings and arrows of outrageous fortune",
     |   "Or to take arms against a sea of troubles,")
scala> val counts = text.flatMap { _.toLowerCase.split("\\W+") }.map { (_, 1) }.groupBy(0).sum(1)
counts: org.apache.flink.api.scala.AggregateDataSet[(String, Int)] = [email protected]

scala> counts.print()
(a,1)
(against,1)
(and,1)
(arms,1)
(arrows,1)
(be,2)
(fortune,1)
(in,1)
(is,1)
(mind,1)
(nobler,1)
(not,1)
(of,2)
(or,2)
(outrageous,1)
(question,1)
(sea,1)
(slings,1)
(suffer,1)
(take,1)
(that,1)
(the,3)
(tis,1)
(to,4)
(troubles,1)
(whether,1)

print()命令将自动将指定的任务发送给JobManager执行,并在终端中显示计算结果。

可以将结果写入文件。然而,在这种情况下,您需要调用execute来运行您的程序:

 benv.execute("MyProgram")
scala> val text = benv.fromElements("Each task slot represents a fixed subset of resources of the TaskManager. A TaskManager with three slots, for example, will dedicate 1/3 of its managed memory to each slot. Slotting the resources means that a subtask will not compete with subtasks from other jobs for managed memory, but instead has a certain amount of reserved managed memory. Note that no CPU isolation happens here; currently slots only separate the managed memory of tasks.")
text: org.apache.flink.api.scala.DataSet[String] = [email protected]

scala> val counts = text.flatMap { _.toLowerCase.split("\\W+") }.map { (_, 1) }.groupBy(0).sum(1)
counts: org.apache.flink.api.scala.AggregateDataSet[(String, Int)] = [email protected]
scala> counts.print()
(1,1)
(3,1)
(a,4)
(amount,1)
(but,1)
(certain,1)
(compete,1)
(cpu,1)
(currently,1)
(dedicate,1)
(each,2)
(example,1)
(fixed,1)
(for,2)
(from,1)
(happens,1)
(has,1)
(here,1)
(instead,1)
(isolation,1)
(its,1)
(jobs,1)
(managed,4)
(means,1)
(memory,4)
(no,1)
(not,1)
(note,1)
(of,5)
(only,1)
(other,1)
(represents,1)
(reserved,1)
(resources,2)
(separate,1)
(slot,2)
(slots,2)
(slotting,1)
(subset,1)
(subtask,1)
(subtasks,1)
(task,1)
(taskmanager,2)
(tasks,1)
(that,2)
(the,3)
(three,1)
(to,1)
(will,2)
(with,2)
scala> :q
 good bye ..
MacBook-Air:learn3 myhaspl$ 

原文地址:http://blog.51cto.com/13959448/2316212

时间: 2024-11-03 00:26:08

flink流计算随笔(6)的相关文章

Flink流计算随笔(1)

相比 Spark Stream.Kafka Stream.Storm 等,为什么阿里会选择 Flink 作为新一代流式计算引擎?前期经过了哪些调研和对比? 大沙:我们是 2015 年开始调研新一代流计算引擎的.我们当时的目标就是要设计一款低延迟.exactly once.流和批统一的,能够支撑足够大体量的复杂计算的引擎.Spark streaming 的本质还是一款基于 microbatch 计算的引擎.这种引擎一个天生的缺点就是每个 microbatch 的调度开销比较大,当我们要求越低的延迟

flink流计算随笔(3)

Stateful Computations over Data Streams(在数据流的有状态计算)Apache Flink是一个用于分布式流和批处理数据的开源平台.Flink的核心是一个流数据流引擎,它为数据流上的分布式计算提供数据分布.通信和容错能力.Flink在流引擎之上构建批处理,覆盖本地迭代支持.托管内存和程序优化.通常在程序中的转换和数据流中的操作符之间存在一对一的对应关系.然而,有时一个转换可能包含多个转换操作符. 在串流连接器和批处理连接器文档中记录了源和汇(Sources a

flink流计算随笔(4)

Flink中的程序本质上是并行的和分布式的.在执行期间,流有一个或多个流分区,每个操作符有一个或多个操作符子任务.操作符子任务相互独立,在不同的线程中执行,可能在不同的机器或容器上执行. 运算符子任务的数量是特定运算符的并行度.一个流的并行性总是它的生产操作符的并行性.同一程序的不同运算符可能具有不同级别的并行性. 流可以在两个操作符之间以一对一(或转发)模式传输数据,也可以在重分发模式中传输数据: 一对一One-to-one流(例如上图中源和map()运算符之间的流)保持元素的分区和顺序.这意

flink流计算随笔(2)

MACOS下安装flink: $ brew install apache-flink ... $ flink --version $brew upgrade MACOS下启动flink: $cd /usr/local/Cellar/apache-flink/1.6.0 $./libexec/bin/start-cluster.sh /* * Licensed to the Apache Software Foundation (ASF) under one * or more contribut

Flink流计算编程--在WindowedStream中体会EventTime与ProcessingTime

一.Flink流处理简介 Flink流处理的API叫做DataStream,可以在保证Exactly-Once的前提下提供高吞吐.低延时的实时流处理.用Flink作为流处理框架成功的案例可参考Flink母公司–Data Artisans官方blog中的2篇文章: How we selected Apache Flink as our Stream Processing Framework at the Otto Group Business Intelligence Department RBE

Flink流计算编程--在双流中体会joinedStream与coGroupedStream

一.joinedStream与coGroupedStream简介 在实际的流计算中,我们经常会遇到多个流进行join的情况,Flink提供了2个Transformations来实现. 如下图: 注意:Join(Cogroups) two data streams on a given key and a common window.这里很明确了,我们要在2个DataStream中指定连接的key以及window下来运算. 二.SQL比较 我们最熟悉的SQL语言中,如果想要实现2个表join,可以

Apache Flink流分区器剖析

这篇文章介绍Flink的分区器,在流进行转换操作后,Flink通过分区器来精确得控制数据流向. StreamPartitioner StreamPartitioner是Flink流分区器的基类,它只定义了一个抽象方法: public abstract StreamPartitioner<T> copy(); 但这个方法并不是各个分区器之间互相区别的地方,定义不同的分区器的核心在于--各个分区器需要实现channel选择的接口方法: int[] selectChannels(T record,

流计算及在特来电监控引擎中的实践

随着云计算的深入落地,大数据技术有了坚实的底层支撑,不断向前发展并日趋成熟,无论是传统企业还是互联网公司,都不再满足于离线批处理计算,而是更倾向于应用实时流计算,要想在残酷的企业竞争中立于不败之地,企业数据必须被快速处理并输出结果,流计算无疑将是企业Must Have的大杀器.作为充电生态网的领军企业,特来电在流计算方面很早便开始布局,下面笔者抛砖引玉的谈一下流计算及在特来电监控引擎中的应用实践. 一.由Bit说开去 作为计算机信息中的最小单位,Bit就像工蚁一样忙碌,任一时刻都只能处于以下三种

解读 2018:13 家开源框架谁能统一流计算?

018 年接近尾声,我018 年接近尾声,我策划了"解读 2018"年终技术盘点系列文章,希望能够给读者清晰地梳理出重要技术领域在这一年来的发展和变化.本文是实时流计算 2018 年终盘点,作者对实时流计算技术的发展现状进行了深入剖析,并对当前大火的各个主流实时流计算框架做了全面.客观的对比,同时对未来流计算可能的发展方向进行预测和展望.策划了"解读 2018"年终技术盘点系列文章,希望能够给读者清晰地梳理出重要技术领域在这一年来的发展和变化.本文是实时流计算 20