kafka+windows+java+springboot中的配置

1.百度kafka+zookeeper+windows配置

1.1  zookeeper配置

dataDir=/tmp/zookeeper
# the port at which the clients will connect
clientPort=2181
# disable the per-ip limit on the number of connections since this is a non-production config
maxClientCnxns=0

1.2 kafka server.properties配置

advertised.host.name=IP

log.dirs=D:/kafka_2.11-1.0.0/log

zookeeper.connect=IP:2181

1.3 windows hosts配置

IP localhost

2.maven构建springboot项目

2.1 intellij idea 新建kafka项目

2.2 kafka配置pom.xml

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>com.tangxin.kafka</groupId>
    <artifactId>kafka</artifactId>
    <version>1.0</version>
    <name>kafka</name>
    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    </properties>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>1.4.1.RELEASE</version>
    </parent>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.29</version>
        </dependency>

        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka_2.11</artifactId>
            <version>0.10.0.1</version>
            <exclusions>
                <exclusion>
                    <groupId>org.slf4j</groupId>
                    <artifactId>slf4j-log4j12</artifactId>
                </exclusion>
            </exclusions>
        </dependency>

        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
            <version>1.2.0.RELEASE</version>
        </dependency>

    </dependencies>

    <build>
        <resources>
            <resource>
                <directory>src/main/java</directory>
                <includes>
                    <include>**/*.xml</include>
                </includes>
                <filtering>false</filtering>
            </resource>
            <resource>
                <directory>src/main/resources</directory>
                <!--<excludes>-->
                <!--<exclude>*</exclude>-->
                <!--</excludes>-->
            </resource>
        </resources>
        <plugins>

            <plugin>
                <artifactId>maven-compiler-plugin</artifactId>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                    <encoding>utf-8</encoding>

                    <compilerArguments>
                        <extdirs>lib</extdirs>
                    </compilerArguments>
                </configuration>
                <version>2.3.2</version>
            </plugin>
            <plugin>
                <artifactId>maven-resources-plugin</artifactId>
                <configuration>
                    <encoding>utf-8</encoding>
                </configuration>
                <version>2.4.3</version>
            </plugin>
        </plugins>
    </build>
</project>

2.3 新建springboot启动类Application

package com.tangxin.kafka;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class Application {

    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }

}

2.4 新建springboot项目中resources目录的配置文件

application.yml

server:
  display-name: kafka
  port: 8888
  contextPath: /kafka

spring:
    profiles:
        active: dev

application-dev.properties

kafka.bootstrap-servers=x.x.x.x:9092

log4j2.xml

<?xml version="1.0" encoding="UTF-8"?>
<Configuration status="trace" dest="/data/logs/work/log.log">
    <appenders>
        <Console name="Console" target="SYSTEM_OUT">
            <PatternLayout>
                <charset>UTF-8</charset>
                <Pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n</Pattern>
            </PatternLayout>
            <ThresholdFilter level="INFO" onMatch="ACCEPT" onMismatch="DENY"/>
        </Console>
        <RollingFile name="RollingFile" fileName="/data/logs/work/work.log"
                     filePattern="/data/logs/work/work-%d{yyyy-MM-dd}-%i.log">
            <PatternLayout>
                <charset>UTF-8</charset>
                <Pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n</Pattern>
            </PatternLayout>
            <Policies>
                <TimeBasedTriggeringPolicy/>
                <SizeBasedTriggeringPolicy size="1000 MB"/>
            </Policies>
            <DefaultRolloverStrategy max="20"/>
            <ThresholdFilter level="info" onMatch="ACCEPT" onMismatch="DENY"/>
        </RollingFile>

        <RollingFile name="ErrorFile" fileName="/data/logs/work/error.log"  filePattern="/data/logs/work/error.%d{yyyy-MM-dd}.%i.log">
            <PatternLayout>
                <charset>UTF-8</charset>
                <Pattern>%d{HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%M%n</Pattern>
            </PatternLayout>
            <Filters>
                <ThresholdFilter level="error" onMatch="ACCEPT" onMismatch="DENY"/>
            </Filters>
            <Policies>
                <TimeBasedTriggeringPolicy />
                <SizeBasedTriggeringPolicy size="50 MB"/>
            </Policies>
            <DefaultRolloverStrategy fileIndex="min" max="100"/>
        </RollingFile>
    </appenders>
    <loggers>
        <Root level="info">
            <appender-ref ref="Console"/>
            <appender-ref ref="RollingFile"  level="info"/>
        </Root>
        <Logger name="com.tangxin.kafka">
            <appender-ref ref="ErrorFile" />
        </Logger>
    </loggers>
</Configuration>

2.5 kafka配置类

package com.tangxin.kafka.service;

import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;

import java.util.HashMap;
import java.util.Map;

@Configuration
@EnableKafka
public class KafkaProducerConfig {

    @Value("${kafka.bootstrap-servers}")
    private String bootstrapServers;

    public Map<String, Object> producerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        return props;
    }

    public ProducerFactory<String, String> producerFactory() {
        return new DefaultKafkaProducerFactory<>(producerConfigs());
    }

    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        return new KafkaTemplate(producerFactory());
    }
}

2.6 controller层调用kafka发送

package com.tangxin.kafka.web;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

@RestController
public class KafkaController {

    @Autowired
    private KafkaTemplate kafkaTemplate;

    @RequestMapping(value = "/send", method = { RequestMethod.GET, RequestMethod.POST })
    public String callFeedInfo() throws Exception {
        ExecutorService executorService = Executors.newSingleThreadExecutor();
        executorService.submit(() -> {
            try {
                kafkaTemplate.send("feed-info","1000");
            } catch (Exception e) {
                e.printStackTrace();
            }
        });
        return "send done!";
    }

}

3.windows启动zookeeper和kafka

4.遇到的问题

2017-11-27 17:55:38.484 [kafka-producer-network-thread | producer-1] ERROR org.springframework.kafka.support.LoggingProducerListener - Exception thrown when sending a message with key=‘null‘ and payload=‘1‘ to topic mytopic:
org.apache.kafka.common.errors.TimeoutException: Batch containing 1 record(s) expired due to timeout while requesting metadata from brokers for mytopic-1

之所以写这个随笔就是因为这个问题,本地访问没有问题因为本机localhost和ip映射估计没问题,如果你两台电脑,一台想做server,一台想做开发就可能会遇到这样的问题,开始可能你各种官方各种百度可能都无法解决这个问题,这个问题涉及hostname主机名,说实话网络这块确实不熟悉,之前弄hadoop和spark入门时也遇到类似问题纠结很久。总结下可能存在的。

1. 防火墙是否关闭

2.windows下是否安装了vmware软件,控制面板\网络和 Internet\网络连接 禁用vmware network adapter

3.kafka配置

advertised.host.name=IP

log.dirs=D:/kafka_2.11-1.0.0/log

zookeeper.connect=IP:2181

windows hosts配置
IP localhost
时间: 2024-08-14 01:31:11

kafka+windows+java+springboot中的配置的相关文章

JDK在windows和linux中安装配置指南

1 Windows下安装.配置jdk 1.1 准备工作 到http://pan.baidu.com/s/1i3l6MDR下载 jdk-7u55-windows-x64.exe 1.2 安装 双击运行jdk-7u55-windows-x64.exe 1.3 配置环境变量 新建JAVA_HOME变量,值为 C:\JAVA\jdk1.7.0_55 在Path变量最后追加: %JAVA_HOME%\bin;%JAVA_HOME%\jre\bin 新建CLASSPATH变量,值为 .;%JAVA_HOME

springboot中数据库配置加密

在springboot中,配置数据库等信息时,用户名和密码明文显示会大大降低安全性,在此介绍一种加密方式,简单易用. 添加依赖: <dependency>    <groupId>com.github.ulisesbocchio</groupId>    <artifactId>jasypt-spring-boot-starter</artifactId>    <version>1.8</version> </de

springboot中xml配置之@ImportResource

springboot中进行相关的配置往往有java配置和xml配置两种方式. 使用java的方式配置只需要使用@configuration注解即可,而使用xml的方式配置的话需要使用@ImportResource来加载配置文件 不过多描述,直接以一个很简单的通过xml配置注入bean的例子来展示@ImportResource注解的使用 xml配置放在resources目录下 <?xml version="1.0" encoding="UTF-8"?> &

springboot中切换配置(多个配置文件--生产、开发、测试)

问题描述: 在springboot项目中可能有测试环境.开发环境.生产环境,在这些环境中我们可能要使用不同的配置,如果每次切换环境的时候都要重新写一份配置文件就很麻烦了,所以下面提供一种方法可以快速且简便的切换不同环境下的配置. 解决方案: 1.首先在resources目录下创建完整的配置文件(包括测试.开发.生产环境下的相关配置文件),然后创建一个application.yml文件,因为springboot项目在启动的时候会默认加载该配置文件,解析其中的内容 2.在application.ym

在SpringBoot中配置定时任务

前言 之前在spring中使用过定时任务,使用注解的方式配置很方便,在SpringBoot中的配置基本相同,只是原来在spring中的xml文件的一些配置需要改变,在SpringBoot中也非常简单. 已经加入我的github模版中:https://github.com/LinkinStars/springBootTemplate 定时任务的分类 所谓定时任务,就是在项目启动之后,定时的去执行一个任务,从而满足业务的需要. 定时任务分为下面几种,串行,并行,同步,异步 串行,并行:当配置了多个定

JAVA框架中XML文件

其实在JAVA开发中servlet配置,映射注入配置等等都可以用xml来配置 在此处的department是实体类的名字,而不是对应的数据库表的名字 数据库表的字段名=#{实体类属性名} 逆向工程生成的XML文件有查找更新等功能,但是当我们查找的时候需要返回一个类, 我们应该在开头写返回结果 resultMap id="自己起的名字" type="返回的结果类型,此处为Department实体类"  <id property="实体类主键名"

Springboot中SpringMvc拦截器配置与应用(实战)

一.什么是拦截器,及其作用 拦截器(Interceptor): 用于在某个方法被访问之前进行拦截,然后在方法执行之前或之后加入某些操作,其实就是AOP的一种实现策略.它通过动态拦截Action调用的对象,允许开发者定义在一个action执行的前后执行的代码,也可以在一个action执行前阻止其执行.同时也是提供了一种可以提取action中可重用的部分的方式. 拦截器的使用场景越来越多,尤其是面向切片编程流行之后.那通常拦截器可以做什么呢? 之前我们在Agent介绍中,提到过统计函数的调用耗时.这

使用kafka bin目录中的zookeeper-shell.sh来查看kafka在zookeeper中的配置

cd kafka_2.11-0.10.2.1\bin\windowsecho ls /brokers/ids |  zookeeper-shell.bat localhost:2181 使用kafka bin目录中的zookeeper-shell.sh来查看kafka在zookeeper中的配置. 连接zookeeper: bin/zookeeper-shell.sh 127.0.0.1:2181 https://my.oschina.net/tongyufu/blog/1806196 http

在Windows电脑中安装配置多个jdk

一.在电脑中安装配置多个jdk 背景:我的是win10,预先装好的是jdk1.8,想再安装个jdk7方便使用. jdk1.7 百度网盘链接:https://pan.baidu.com/s/10_qWdOzSm3csJaKWy0x4RA  提取码:h0wf jdk1.8 百度网盘链接:https://pan.baidu.com/s/1uBYCy2jhPGrwcMTGoW2nYg 提取码:fwx7  1.先打开DOS命令行输入“java -version”查看自己的java版本,查出是jdk1.8: