beam与kafka和elasticSearch示例 在flink平台运行

示例实现beam用java编程,监听kafka的testmsg主题,然后将收取到的单词,按5秒做一次统计。结果输出到outputmessage 的kafka主题,同时同步到elasticSearch。

kafka需要运行

启动:cd /root/kafuka/kafka_2.12-0.11.0.0
 nohup bin/zookeeper-server-start.sh config/zookeeper.properties &
 nohup bin/kafka-server-start.sh config/server.properties &
 创建topic:
 bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic testmsg
  bin/kafka-topics.sh --list --zookeeper localhost:2181
生产者producer
  bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
消费者consumer
 bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning

elasticSearch

创建索引Put http://192.168.11.100:9200/myindex?pretty
查看所有索引: http://192.168.11.100:9200/_cat/indices?v

获取内容Get http://192.168.11.100:9200/myindex/_search?q=*&pretty
http://192.168.11.100:9200/myindex/_search?q=*&sort=_id:desc&pretty

用mvn自动生成项目代码:

windows在powershell中运行:
 mvn archetype:generate `
 -D archetypeGroupId=org.apache.beam `
 -D archetypeArtifactId=beam-sdks-java-maven-archetypes-examples `
 -D archetypeVersion=2.8.0 `
 -D groupId=org.example `
 -D artifactId=word-count-beam `
 -D version="0.1" `
 -D package=org.apache.beam.examples `
 -D interactiveMode=false

其他参考beam官方文档: <https://beam.apache.org/get-started/quickstart-java/> 

替换pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<!--
    Licensed to the Apache Software Foundation (ASF) under one or more
    contributor license agreements.  See the NOTICE file distributed with
    this work for additional information regarding copyright ownership.
    The ASF licenses this file to You under the Apache License, Version 2.0
    (the "License"); you may not use this file except in compliance with
    the License.  You may obtain a copy of the License at

       http://www.apache.org/licenses/LICENSE-2.0

    Unless required by applicable law or agreed to in writing, software
    distributed under the License is distributed on an "AS IS" BASIS,
    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    See the License for the specific language governing permissions and
    limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>

  <groupId>org.example</groupId>
  <artifactId>word-count-beam</artifactId>
  <version>0.1</version>

  <packaging>jar</packaging>

  <properties>
    <beam.version>2.8.0</beam.version>

    <bigquery.version>v2-rev402-1.24.1</bigquery.version>
    <google-clients.version>1.24.1</google-clients.version>
    <guava.version>20.0</guava.version>
    <hamcrest.version>1.3</hamcrest.version>
    <jackson.version>2.9.5</jackson.version>
    <joda.version>2.4</joda.version>
    <junit.version>4.12</junit.version>
    <maven-compiler-plugin.version>3.7.0</maven-compiler-plugin.version>
    <maven-exec-plugin.version>1.6.0</maven-exec-plugin.version>
    <maven-jar-plugin.version>3.0.2</maven-jar-plugin.version>
    <maven-shade-plugin.version>3.1.0</maven-shade-plugin.version>
    <mockito.version>1.10.19</mockito.version>
    <pubsub.version>v1-rev399-1.24.1</pubsub.version>
    <slf4j.version>1.7.25</slf4j.version>
    <spark.version>2.3.2</spark.version>
    <hadoop.version>2.7.3</hadoop.version>
    <maven-surefire-plugin.version>2.21.0</maven-surefire-plugin.version>

    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
  </properties>

  <repositories>
    <repository>
      <id>apache.snapshots</id>
      <name>Apache Development Snapshot Repository</name>
      <url>https://repository.apache.org/content/repositories/snapshots/</url>
      <releases>
        <enabled>false</enabled>
      </releases>
      <snapshots>
        <enabled>true</enabled>
      </snapshots>
    </repository>
  </repositories>

  <build>
    <plugins>
      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-compiler-plugin</artifactId>
        <version>${maven-compiler-plugin.version}</version>
        <configuration>
          <source>1.8</source>
          <target>1.8</target>
        </configuration>
      </plugin>

      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-surefire-plugin</artifactId>
        <version>${maven-surefire-plugin.version}</version>
        <configuration>
          <parallel>all</parallel>
          <threadCount>4</threadCount>
          <redirectTestOutputToFile>true</redirectTestOutputToFile>
        </configuration>
        <dependencies>
          <dependency>
            <groupId>org.apache.maven.surefire</groupId>
            <artifactId>surefire-junit47</artifactId>
            <version>${maven-surefire-plugin.version}</version>
          </dependency>
        </dependencies>
      </plugin>

      <!-- Ensure that the Maven jar plugin runs before the Maven
        shade plugin by listing the plugin higher within the file. -->
      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-jar-plugin</artifactId>
        <version>${maven-jar-plugin.version}</version>
      </plugin>

      <!--
        Configures `mvn package` to produce a bundled jar ("fat jar") for runners
        that require this for job submission to a cluster.
      -->
      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-shade-plugin</artifactId>
        <version>${maven-shade-plugin.version}</version>
        <executions>
          <execution>
            <phase>package</phase>
            <goals>
              <goal>shade</goal>
            </goals>
            <configuration>
              <finalName>${project.artifactId}-bundled-${project.version}</finalName>
              <filters>
                <filter>
                  <artifact>*:*</artifact>
                  <excludes>
                    <exclude>META-INF/LICENSE</exclude>
                    <exclude>META-INF/*.SF</exclude>
                    <exclude>META-INF/*.DSA</exclude>
                    <exclude>META-INF/*.RSA</exclude>
                  </excludes>
                </filter>
              </filters>
              <transformers>
                <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
              </transformers>
            </configuration>
          </execution>
        </executions>
      </plugin>
    </plugins>

    <pluginManagement>
      <plugins>
        <plugin>
          <groupId>org.codehaus.mojo</groupId>
          <artifactId>exec-maven-plugin</artifactId>
          <version>${maven-exec-plugin.version}</version>
          <configuration>
            <cleanupDaemonThreads>false</cleanupDaemonThreads>
          </configuration>
        </plugin>
      </plugins>
    </pluginManagement>
  </build>

  <profiles>
    <profile>
      <id>direct-runner</id>
      <activation>
        <activeByDefault>true</activeByDefault>
      </activation>
      <!-- Makes the DirectRunner available when running a pipeline. -->
      <dependencies>
        <dependency>
          <groupId>org.apache.beam</groupId>
          <artifactId>beam-runners-direct-java</artifactId>
          <version>${beam.version}</version>
          <scope>runtime</scope>
        </dependency>
      </dependencies>
    </profile>

    <profile>
      <id>apex-runner</id>
      <!-- Makes the ApexRunner available when running a pipeline. -->
      <dependencies>
        <dependency>
          <groupId>org.apache.beam</groupId>
          <artifactId>beam-runners-apex</artifactId>
          <version>${beam.version}</version>
          <scope>runtime</scope>
        </dependency>
        <!--
          Apex depends on httpclient version 4.3.6, project has a transitive dependency to httpclient 4.0.1 from
          google-http-client. Apex dependency version being specified explicitly so that it gets picked up. This
          can be removed when the project no longer has a dependency on a different httpclient version.
        -->
        <dependency>
          <groupId>org.apache.httpcomponents</groupId>
          <artifactId>httpclient</artifactId>
          <version>4.3.6</version>
          <scope>runtime</scope>
          <exclusions>
            <exclusion>
              <groupId>commons-codec</groupId>
              <artifactId>commons-codec</artifactId>
            </exclusion>
          </exclusions>
        </dependency>
        <!--
          Apex 3.6 is built against YARN 2.6. Version in the fat jar has to match
          what‘s on the cluster, hence we need to repeat the Apex Hadoop dependencies here.
        -->
        <dependency>
          <groupId>org.apache.hadoop</groupId>
          <artifactId>hadoop-yarn-client</artifactId>
          <version>${hadoop.version}</version>
          <scope>runtime</scope>
        </dependency>
        <dependency>
          <groupId>org.apache.hadoop</groupId>
          <artifactId>hadoop-common</artifactId>
          <version>${hadoop.version}</version>
          <scope>runtime</scope>
        </dependency>
      </dependencies>
    </profile>

    <profile>
      <id>dataflow-runner</id>
      <!-- Makes the DataflowRunner available when running a pipeline. -->
      <dependencies>
        <dependency>
          <groupId>org.apache.beam</groupId>
          <artifactId>beam-runners-google-cloud-dataflow-java</artifactId>
          <version>${beam.version}</version>
          <scope>runtime</scope>
        </dependency>
      </dependencies>
    </profile>

    <profile>
      <id>flink-runner</id>
      <!-- Makes the FlinkRunner available when running a pipeline. -->
      <dependencies>
        <dependency>
          <groupId>org.apache.beam</groupId>
          <artifactId>beam-runners-flink_2.11</artifactId>
          <version>${beam.version}</version>
          <scope>runtime</scope>
        </dependency>
      </dependencies>
    </profile>

    <profile>
      <id>spark-runner</id>
      <!-- Makes the SparkRunner available when running a pipeline. Additionally,
           overrides some Spark dependencies to Beam-compatible versions. -->
      <properties>
        <netty.version>4.1.17.Final</netty.version>
      </properties>
      <dependencies>
        <dependency>
          <groupId>org.apache.beam</groupId>
          <artifactId>beam-runners-spark</artifactId>
          <version>${beam.version}</version>
          <scope>runtime</scope>
        </dependency>
        <dependency>
          <groupId>org.apache.beam</groupId>
          <artifactId>beam-sdks-java-io-hadoop-file-system</artifactId>
          <version>${beam.version}</version>
          <scope>runtime</scope>
        </dependency>
        <dependency>
          <groupId>org.apache.spark</groupId>
          <artifactId>spark-streaming_2.11</artifactId>
          <version>${spark.version}</version>
          <scope>runtime</scope>
          <exclusions>
            <exclusion>
              <groupId>org.slf4j</groupId>
              <artifactId>jul-to-slf4j</artifactId>
            </exclusion>
          </exclusions>
        </dependency>
        <dependency>
          <groupId>com.fasterxml.jackson.module</groupId>
          <artifactId>jackson-module-scala_2.11</artifactId>
          <version>${jackson.version}</version>
          <scope>runtime</scope>
        </dependency>
        <!-- [BEAM-3519] GCP IO exposes netty on its API surface, causing conflicts with runners -->
        <dependency>
          <groupId>org.apache.beam</groupId>
          <artifactId>beam-sdks-java-io-google-cloud-platform</artifactId>
          <version>${beam.version}</version>
          <exclusions>
            <exclusion>
              <groupId>io.grpc</groupId>
              <artifactId>grpc-netty</artifactId>
            </exclusion>
            <exclusion>
              <groupId>io.netty</groupId>
              <artifactId>netty-handler</artifactId>
            </exclusion>
          </exclusions>
        </dependency>
      </dependencies>
    </profile>
    <profile>
      <id>gearpump-runner</id>
      <dependencies>
        <dependency>
          <groupId>org.apache.beam</groupId>
          <artifactId>beam-runners-gearpump</artifactId>
          <version>${beam.version}</version>
          <scope>runtime</scope>
        </dependency>
      </dependencies>
    </profile>
  </profiles>

  <dependencies>
    <!-- Adds a dependency on the Beam SDK. -->
    <dependency>
      <groupId>org.apache.beam</groupId>
      <artifactId>beam-sdks-java-core</artifactId>
      <version>${beam.version}</version>
    </dependency>

    <!-- Adds a dependency on the Beam Google Cloud Platform IO module. -->
    <dependency>
      <groupId>org.apache.beam</groupId>
      <artifactId>beam-sdks-java-io-google-cloud-platform</artifactId>
      <version>${beam.version}</version>
    </dependency>

    <!-- Dependencies below this line are specific dependencies needed by the examples code. -->
    <dependency>
      <groupId>com.google.api-client</groupId>
      <artifactId>google-api-client</artifactId>
      <version>${google-clients.version}</version>
      <exclusions>
        <!-- Exclude an old version of guava that is being pulled
             in by a transitive dependency of google-api-client -->
        <exclusion>
          <groupId>com.google.guava</groupId>
          <artifactId>guava-jdk5</artifactId>
        </exclusion>
      </exclusions>
    </dependency>

    <dependency>
      <groupId>com.google.apis</groupId>
      <artifactId>google-api-services-bigquery</artifactId>
      <version>${bigquery.version}</version>
      <exclusions>
        <!-- Exclude an old version of guava that is being pulled
             in by a transitive dependency of google-api-client -->
        <exclusion>
          <groupId>com.google.guava</groupId>
          <artifactId>guava-jdk5</artifactId>
        </exclusion>
      </exclusions>
    </dependency>

    <dependency>
      <groupId>com.google.http-client</groupId>
      <artifactId>google-http-client</artifactId>
      <version>${google-clients.version}</version>
      <exclusions>
        <!-- Exclude an old version of guava that is being pulled
             in by a transitive dependency of google-api-client -->
        <exclusion>
          <groupId>com.google.guava</groupId>
          <artifactId>guava-jdk5</artifactId>
        </exclusion>
      </exclusions>
    </dependency>

    <dependency>
      <groupId>com.google.apis</groupId>
      <artifactId>google-api-services-pubsub</artifactId>
      <version>${pubsub.version}</version>
      <exclusions>
        <!-- Exclude an old version of guava that is being pulled
             in by a transitive dependency of google-api-client -->
        <exclusion>
          <groupId>com.google.guava</groupId>
          <artifactId>guava-jdk5</artifactId>
        </exclusion>
      </exclusions>
    </dependency>

    <dependency>
      <groupId>joda-time</groupId>
      <artifactId>joda-time</artifactId>
      <version>${joda.version}</version>
    </dependency>

    <dependency>
      <groupId>com.google.guava</groupId>
      <artifactId>guava</artifactId>
      <version>${guava.version}</version>
    </dependency>

    <!-- Add slf4j API frontend binding with JUL backend -->
    <dependency>
      <groupId>org.slf4j</groupId>
      <artifactId>slf4j-api</artifactId>
      <version>${slf4j.version}</version>
    </dependency>

    <dependency>
      <groupId>org.slf4j</groupId>
      <artifactId>slf4j-jdk14</artifactId>
      <version>${slf4j.version}</version>
      <!-- When loaded at runtime this will wire up slf4j to the JUL backend -->
      <scope>runtime</scope>
    </dependency>

    <!-- Hamcrest and JUnit are required dependencies of PAssert,
         which is used in the main code of DebuggingWordCount example. -->
    <dependency>
      <groupId>org.hamcrest</groupId>
      <artifactId>hamcrest-core</artifactId>
      <version>${hamcrest.version}</version>
    </dependency>

    <dependency>
      <groupId>org.hamcrest</groupId>
      <artifactId>hamcrest-library</artifactId>
      <version>${hamcrest.version}</version>
    </dependency>

    <dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
      <version>${junit.version}</version>
    </dependency>

    <!-- The DirectRunner is needed for unit tests. -->
    <dependency>
      <groupId>org.apache.beam</groupId>
      <artifactId>beam-runners-direct-java</artifactId>
      <version>${beam.version}</version>
      <scope>test</scope>
    </dependency>

    <dependency>
      <groupId>org.mockito</groupId>
      <artifactId>mockito-core</artifactId>
      <version>${mockito.version}</version>
      <scope>test</scope>
    </dependency>

    <!-- kafka -->
    <dependency>
        <groupId>org.apache.beam</groupId>
        <artifactId>beam-sdks-java-io-kafka</artifactId>
        <version>${beam.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>0.11.0.0</version>
    </dependency>

    <!-- kafka -->
    <dependency>
        <groupId>org.apache.beam</groupId>
        <artifactId>beam-sdks-java-io-elasticsearch</artifactId>
        <version>${beam.version}</version>
    </dependency>

  </dependencies>
</project>

将如下代码加入java目录 src/main/java/org.apache.beam.examples

/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.beam.examples;

import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO;
import org.apache.beam.sdk.io.kafka.KafkaIO;
import org.apache.beam.sdk.io.kafka.KafkaRecord;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.Validation.Required;
import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.joda.time.Duration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.collect.ImmutableMap;
//import org.apache.beam.runners.flink.FlinkRunner;

public class KafkaSample {

    public static void main(String[] args) {
        String hosts = "211.100.75.227:9092";// 192.168.1.110:11092,192.168.1.119:11092,192.168.1.120:11092

        String sourceTopic = "testmsg";
        // 创建管道工厂
        WordCountOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().as(WordCountOptions.class);

        // 设置相关管道
        Pipeline pipeline = Pipeline.create(options);

        // 这里 kV 后说明 kafka 中的 key 和 value 均为 String 类型
        PCollection<KafkaRecord<String, String>> lines = pipeline.apply(KafkaIO.<String,

        String> read().withBootstrapServers(hosts)// 必需设置 kafka的服务器地址和端口
                .withTopic(sourceTopic)// 必需设置要读取的 kafka 的 topic 名称
                .withKeyDeserializer(StringDeserializer.class)// 必需序列化 key
                .withValueDeserializer(StringDeserializer.class)
                // 必需序列化 value
                .updateConsumerProperties(ImmutableMap.<String, Object> of("auto.offset.reset", "latest")));// 这个属性
                                                                                                            // kafka
                                                                                                            // 最常见的.earliest
        // 为输出的消息类型。或者进行处理后返回的消息类型
        PCollection<String> kafkadata = lines.apply("Remove Kafka Metadata", ParDo.of(new DoFn<KafkaRecord<String, String>, String>() {
            private static final long serialVersionUID = 1L;

            @ProcessElement
            public void processElement(ProcessContext ctx) {
                System.out.println("get from topic:" + ctx.element().getKV());
                ctx.output(ctx.element().getKV().getValue());// 对kafka收到的消息处理
            }
        }));
        PCollection<String> windowedEvents = kafkadata.apply(Window.<String> into(FixedWindows.of(Duration.standardSeconds(5))));
        PCollection<KV<String, Long>> wordcount = windowedEvents.apply(Count.<String> perElement()); // 统计每一个
                                                                                                        // kafka
                                                                                                        // 消息的
                                                                                                        // Count
        PCollection<String> wordtj = wordcount.apply("ConcatResultKVs", MapElements.via( // 拼接最后的格式化输出(Key
                                                                                            // 为
                                                                                            // Word,Value
                                                                                            // 为
                                                                                            // Count)
                new SimpleFunction<KV<String, Long>, String>() {
                    private static final long serialVersionUID = 1L;

                    @Override
                    public String apply(KV<String, Long> input) {
                        System.out.println("key=" + input.getKey());
                        System.out.println("value=" + input.getValue());
                        String ret = " {\"" + input.getKey() + "\":\"" + input.getValue() + "\"}";
                        System.out.println(ret);
                        return ret;
                    }
                }));

        /* sink to kafka*/
        wordtj.apply(KafkaIO.<Void, String> write().withBootstrapServers(hosts)// 设置写会
                // kafka
                // 的集群配置地址
                .withTopic("outputmessage")// 设置返回 kafka 的消息主题
                // .withKeySerializer(StringSerializer.class)// 这里不用设置了,因为上面
                // Void
                .withValueSerializer(StringSerializer.class)
                // Dataflow runner and Spark 兼容, Flink 对 kafka0.11 才支持。我的版本是
                // 0.10 不兼容
                // .withEOS(20, "eos-sink-group-id")
                .values() // 只需要在此写入默认的 key 就行了,默认为 null 值
        ); // 输出结果

        /* sink to elasticsearch */
        String[] addresses = { "http://192.168.11.100:9200" };
        wordtj.apply(ElasticsearchIO.write().withConnectionConfiguration(ElasticsearchIO.ConnectionConfiguration.create(addresses, "myindex", "testdoc")));

        pipeline.run().waitUntilFinish();
    }

    public interface WordCountOptions extends PipelineOptions {

        /**
         * By default, this example reads from a public dataset containing the
         * text of King Lear. Set this option to choose a different input file
         * or glob.
         */
        @Description("Path of the file to read from")
        @Default.String("gs://apache-beam-samples/shakespeare/kinglear.txt")
        String getInputFile();

        void setInputFile(String value);

        /** Set this required option to specify where to write the output. */
        @Description("Path of the file to write to")
        @Required
        String getOutput();

        void setOutput(String value);
    }

    private static final Logger logger = LoggerFactory.getLogger(KafkaSample.class);

    /**
     * Options supported by {@link WordCount}.
     *
     * <p>
     * Concept #4: Defining your own configuration options. Here, you can add
     * your own arguments to be processed by the command-line parser, and
     * specify default values for them. You can then access the options values
     * in your pipeline code.
     *
     * <p>
     * Inherits standard configuration options.
     */
    public interface KFOptions extends PipelineOptions {

        /**
         * By default, this example reads from a public dataset containing the
         * text of King Lear. Set this option to choose a different input file
         * or glob.
         */
        @Description("Path of the file to read from")
        @Default.String("211.100.75.227:9092")
        String getBrokers();

        void setBrokers(String value);

    }
}

修改里面kafka地址,elasticSearch地址。大功告成,可以执行了!

beam平台直接运行:

Direct-Local runner
mvn compile exec:java -D exec.mainClass=org.apache.beam.examples.KafkaSample `
 -D exec.args="--inputFile=pom.xml --output=counts" -P direct-runner

自启动Flink local平台上运行:

 mvn compile exec:java -D exec.mainClass=org.apache.beam.examples.KafkaSample `
  -D exec.args="--runner=FlinkRunner --inputFile=pom.xml --output=counts" -P flink-runner 

打包放入已经运行的flink local平台上运行:

mvn package -Pflink-runner
这样可以打包后,上传到flink,指定启动类:
--runner=FlinkRunner --inputFile=C:\path\to\quickstart\pom.xml --output=C:\tmp\counts  --filesToStage=.\target\word-count-beam-bundled-0.1.jar

org.apache.beam.examples.KafkaSample

原文地址:https://www.cnblogs.com/bigben0123/p/10072427.html

时间: 2024-10-07 23:41:10

beam与kafka和elasticSearch示例 在flink平台运行的相关文章

使用Akka、Kafka和ElasticSearch等构建分析引擎 -- good

本文翻译自Building Analytics Engine Using Akka, Kafka & ElasticSearch,已获得原作者Satendra Kumar和网站授权. 在这篇文章里,我将和大家分享一下我用Scala.Akka.Play.Kafka和ElasticSearch等构建大型分布式.容错.可扩展的分析引擎的经验. 我的分析引擎主要是用于文本分析的.输入有结构化的.非结构化的和半结构化的数据,我们会用分析引擎对数据进行大量处理.如下图所示为第一代架构,分析引擎可以用REST

java实现Kafka的消费者示例

使用java实现Kafka的消费者 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 8

【JAVA版】Storm程序整合Kafka、Mongodb示例及部署

一.环境 一台Centos6.5主机 Mongo 3.0 Kafka_2.11-0.8.2.1 Storm-0.9.5 Zookeeper-3.4.6 java 1.7 (后因在mac上打包的jar由1.8编译没法儿运行,改为java 1.8) 其余环境暂略 二.运行启动 启动zookeeper 确认配置正确,配置相关可自行搜索. [[email protected] zookeeper-3.4.6]#pwd /data0/xxx/zookeeper-3.4.6 [[email protecte

cassandra,hbase,kafka,elasticsearch redis 对比总结

cassandra: partition分区器有两种方法: RandomPartitioner 一致性哈希       ByteOrderedPartitioner  按照自己大小 那么cassandra为什么能快呢.因为它采取了这么一个解决方案: 数据首先写到 commit log, 然后写到memtable.干完了这两件事,写就算成功了,别的事情您就不用操心了.后面事情是sstable(磁盘),然后是压缩文件 好在cassandra有一个 Bloom filter, 这个东东就是专门用来判断

Filebeat+Kafka+Logstash+ElasticSearch+Kibana 日志采集方案

前言 Elastic Stack 提供 Beats 和 Logstash 套件来采集任何来源.任何格式的数据.其实Beats 和 Logstash的功能差不多,都能够与 Elasticsearch 产生协同作用,而且 logstash比filebeat功能更强大一点,2个都使用是因为:Beats 是一个轻量级的采集器,支持从边缘机器向 Logstash 和 Elasticsearch 发送数据.考虑到 Logstash 占用系 统资源较多,我们采用 Filebeat 来作为我们的日志采集器.并且

Kafka Zookeeper 基本命令示例

装修中... Kafka 新建Topic bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic my-topic 查看已存在Topic列表 bin/kafka-topics.sh --list --zookeeper localhost:2181 查看指定Topic状态 bin/kafka-topics.sh --describe --zookee

Elasticsearch示例

/** * @author: yqq * @date: 2019/2/28 * @description: */ public class TestMain { private static RestClient restClient; static { restClient=RestClient.builder(new HttpHost("localhost",9200,"http")).build(); } /** * 1.查询所有数据 * @throws Ex

ELK+Filebeat+Kafka+ZooKeeper 构建海量日志分析平台(转)

参考:http://www.tuicool.com/articles/R77fieA 我在做ELK日志平台开始之初选择为ELK+Redis直接构建,在采集nginx日志时一切正常,当我采集我司业务报文日志类后,logstash会报大量的redis connect timeout.换成redis cluster后也是同样的情况后,就考虑对消息中间件进行替换重新选型,经过各种刷文档,决定选用kafka来替换redis.根据网上找的一篇参考文档中的架构图如下: 注:由于环境有限,在对该架构图中的ela

ELK+Filebeat+Kafka+ZooKeeper 构建海量日志分析平台

原创作品,允许转载,转载时请务必以超链接形式标明文章 原始出处 .作者信息和本声明.否则将追究法律责任.http://tchuairen.blog.51cto.com/3848118/1861167 什么要做日志分析平台? 随着业务量的增长,每天业务服务器将会产生上亿条的日志,单个日志文件达几个GB,这时我们发现用Linux自带工具,cat grep awk 分析越来越力不从心了,而且除了服务器日志,还有程序报错日志,分布在不同的服务器,查阅繁琐. 待解决的痛点: 1.大量不同种类的日志成为了运