kafka-connect-hdfs重启,进去RECOVERY状态,从hadoop hdfs拿租约,很正常,但是也太久了吧

虽说这个算是正常现象,等的时间也太久了吧。分钟级了。这个RECOVERY里面的WAL有点多余。有这么久的时间,早从新读取kafka写入hdfs了。纯属个人见解。

@SuppressWarnings("fallthrough")
    public boolean recover() {
        try {
            switch (state) {
            case RECOVERY_STARTED:
                log.info("Started recovery for topic partition {}", tp);
                pause();
                nextState();
            case RECOVERY_PARTITION_PAUSED:
                applyWAL();
                nextState();
            case WAL_APPLIED:
                truncateWAL();
                nextState();
            case WAL_TRUNCATED:
                resetOffsets();
                nextState();
            case OFFSET_RESET:
                resume();
                nextState();
                log.info("Finished recovery for topic partition {}", tp);
                break;
            default:
                log.error("{} is not a valid state to perform recovery for topic partition {}.", state, tp);
            }
        } catch (ConnectException e) {
            log.error("Recovery failed at state {}", state, e);
            setRetryTimeout(timeoutMs);
            return false;
        }
        return true;
    }
2017-08-18 01:35:53,716 ERROR [io.confluent.connect.hdfs.TopicPartitionWriter][215] - <Recovery failed at state RECOVERY_PARTITION_PAUSED>
org.apache.kafka.connect.errors.ConnectException: org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs.protocol.RecoveryInProgressException): Failed to close file ******/log. Lease recovery is in progress. Try again later.
    at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.recoverLeaseInternal(FSNamesystem.java:3113)
    at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.appendFileInternal(FSNamesystem.java:2905)
    at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.appendFileInt(FSNamesystem.java:3189)
    at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.appendFile(FSNamesystem.java:3153)
    at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.append(NameNodeRpcServer.java:612)
    at org.apache.hadoop.hdfs.server.namenode.AuthorizationProviderProxyClientProtocol.append(AuthorizationProviderProxyClientProtocol.java:125)
    at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.append(ClientNamenodeProtocolServerSideTranslatorPB.java:414)
    at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
    at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:617)
    at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:1073)
    at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2086)
    at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2082)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:415)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1693)
    at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2080)

    at io.confluent.connect.hdfs.wal.FSWAL.acquireLease(FSWAL.java:88)
    at io.confluent.connect.hdfs.wal.FSWAL.apply(FSWAL.java:105)
    at io.confluent.connect.hdfs.TopicPartitionWriter.applyWAL(TopicPartitionWriter.java:550)
    at io.confluent.connect.hdfs.TopicPartitionWriter.recover(TopicPartitionWriter.java:198)
    at io.confluent.connect.hdfs.DataWriter.recover(DataWriter.java:247)
    at io.confluent.connect.hdfs.DataWriter.open(DataWriter.java:289)
    at io.confluent.connect.hdfs.HdfsSinkTask.open(HdfsSinkTask.java:118)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.openPartitions(WorkerSinkTask.java:428)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.access$1000(WorkerSinkTask.java:54)
    at org.apache.kafka.connect.runtime.WorkerSinkTask$HandleRebalance.onPartitionsAssigned(WorkerSinkTask.java:464)
    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:234)
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$2.onSuccess(AbstractCoordinator.java:255)
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$2.onSuccess(AbstractCoordinator.java:250)
    at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133)
    at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107)
    at org.apache.kafka.clients.consumer.internals.RequestFuture$2.onSuccess(RequestFuture.java:182)
    at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133)
    at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107)
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$SyncGroupResponseHandler.handle(AbstractCoordinator.java:459)
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$SyncGroupResponseHandler.handle(AbstractCoordinator.java:445)
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:702)
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:681)
    at org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:167)
    at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133)
    at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107)
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.onComplete(ConsumerNetworkClient.java:426)
    at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:278)
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:360)
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224)
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:192)
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:163)
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:266)
    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.ensurePartitionAssignment(ConsumerCoordinator.java:366)
    at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:975)
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:938)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.pollConsumer(WorkerSinkTask.java:316)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:222)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:170)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:142)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:140)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:175)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
    at java.util.concurrent.FutureTask.run(FutureTask.java:262)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs.protocol.RecoveryInProgressException): Failed to close file ******/log. Lease recovery is in progress. Try again later.
    at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.recoverLeaseInternal(FSNamesystem.java:3113)
    at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.appendFileInternal(FSNamesystem.java:2905)
    at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.appendFileInt(FSNamesystem.java:3189)
    at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.appendFile(FSNamesystem.java:3153)
    at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.append(NameNodeRpcServer.java:612)
    at org.apache.hadoop.hdfs.server.namenode.AuthorizationProviderProxyClientProtocol.append(AuthorizationProviderProxyClientProtocol.java:125)
    at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.append(ClientNamenodeProtocolServerSideTranslatorPB.java:414)
    at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
    at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:617)
    at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:1073)
    at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2086)
    at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2082)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:415)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1693)
    at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2080)

    at org.apache.hadoop.ipc.Client.call(Client.java:1468)
    at org.apache.hadoop.ipc.Client.call(Client.java:1399)
    at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:232)
    at com.sun.proxy.$Proxy50.append(Unknown Source)
    at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.append(ClientNamenodeProtocolTranslatorPB.java:313)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:187)
    at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
    at com.sun.proxy.$Proxy51.append(Unknown Source)
    at org.apache.hadoop.hdfs.DFSClient.callAppend(DFSClient.java:1767)
    at org.apache.hadoop.hdfs.DFSClient.append(DFSClient.java:1803)
    at org.apache.hadoop.hdfs.DFSClient.append(DFSClient.java:1796)
    at org.apache.hadoop.hdfs.DistributedFileSystem$4.doCall(DistributedFileSystem.java:323)
    at org.apache.hadoop.hdfs.DistributedFileSystem$4.doCall(DistributedFileSystem.java:319)
    at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
    at org.apache.hadoop.hdfs.DistributedFileSystem.append(DistributedFileSystem.java:319)
    at org.apache.hadoop.fs.FileSystem.append(FileSystem.java:1173)
    at io.confluent.connect.hdfs.wal.WALFile$Writer.<init>(WALFile.java:221)
    at io.confluent.connect.hdfs.wal.WALFile.createWriter(WALFile.java:67)
    at io.confluent.connect.hdfs.wal.FSWAL.acquireLease(FSWAL.java:73)
    ... 45 more
时间: 2024-12-18 13:13:39

kafka-connect-hdfs重启,进去RECOVERY状态,从hadoop hdfs拿租约,很正常,但是也太久了吧的相关文章

Hadoop HA HDFS启动错误之org.apache.hadoop.ipc.Client: Retrying connect to server问题解决

近日,在搭建Hadoop HA QJM集群的时候,出现一个问题,如本文标题. 网上有很多HA的博文,其实比较好的博文就是官方文档,讲的已经非常详细.所以,HA的搭建这里不再赘述. 本文就想给出一篇org.apache.hadoop.ipc.Client: Retrying connect to server错误的解决的方法. 因为在搜索引擎中输入了错误问题,没有找到一篇解决问题的.这里写一篇备忘,也可以给出现同样问题的朋友一个提示. 一.问题描述 HA按照规划配置好,启动后,NameNode不能

Hadoop HDFS: the directory item limit is exceed: limit=1048576问题的解决

问题描述: 1.文件无法写入hadoop hdfs文件系统: 2.hadoop namenode日志记录 the directory item limit is exceed: limit=1048576 3.hadoop单个目录下文件超1048576个,默认limit限制数为1048576,所以要调大limit限制数 解决办法: hdfs-site.xml配置文件添加配置参数 <property>   <name>dfs.namenode.fs-limits.max-direct

Kafka Connect HDFS

概述 Kafka 的数据如何传输到HDFS?如果仔细思考,会发现这个问题并不简单. 不妨先想一下这两个问题? 1)为什么要将Kafka的数据传输到HDFS上? 2)为什么不直接写HDFS而要通过Kafka? HDFS一直以来是为离线数据的存储和计算设计的,因此对实时事件数据的写入并不友好,而Kafka生来就是为实时数据设计的,但是数据在Kafka上无法使用离线计算框架来作批量离线分析. 那么,Kafka为什么就不能支持批量离线分析呢?想象我们将Kafka的数据按天拆分topic,并建足够多的分区

Kafka Connect Details 详解

目录 1. Kafka Connect Details 详解 1.1. 概览 1.2. 启动和配置 1.2.1. Standalone 单机模式 1.2.2. Distribute 分布式模式 1.2.3. Connector的配置 1.3. Transformations 转换器 1.4. REST API 1.5. Kafka Connect 开发详解 1.6. Kafka Connect VS Producer Consumer 1.6.1. Kafka Connect的优点 1.7. 第

Kafka: Connect

Kafka Connect 简介 Kafka Connect 是一个可以在Kafka与其他系统之间提供可靠的.易于扩展的数据流处理工具.使用它能够使得数据进出Kafka变得很简单.Kafka Connect有如下特性: ·是一个通用的构造kafka connector的框架 ·有单机.分布式两种模式.开发时建议使用单机模式,生产环境下使用分布式模式. ·提供restful的管理connector的API. ·自动化的offset管理.Kafka Connect自动的管理offset提交. ·分布

Apache Kafka系列(五) Kafka Connect及FileConnector示例

Apache Kafka系列(一) 起步 Apache Kafka系列(二) 命令行工具(CLI) Apache Kafka系列(三) Java API使用 Apache Kafka系列(四) 多线程Consumer方案 Apache Kafka系列(五) Kafka Connect及FileConnector示例 一. Kafka Connect简介 Kafka是一个使用越来越广的消息系统,尤其是在大数据开发中(实时数据处理和分析).为何集成其他系统和解耦应用,经常使用Producer来发送消

Build an ETL Pipeline With Kafka Connect via JDBC Connectors

This article is an in-depth tutorial for using Kafka to move data from PostgreSQL to Hadoop HDFS via JDBC connections. Read this eGuide to discover the fundamental differences between iPaaS and dPaaS and how the innovative approach of dPaaS gets to t

Kafka connect in practice(2): distributed mode mysql binlog -&gt;kafka-&gt;hive

In the previous post Kafka connect in practice(1): standalone, I have introduced about the basics of kafka connect  configuration and demonstrate a local standalone demo. In this post we will show the knowledge about distributed data pull an sink. To

Kafka Connect Architecture

Kafka Connect's goal of copying data between systems has been tackled by a variety of frameworks, many of them still actively developed and maintained. This section explains the motivation behind Kafka Connect, where it fits in the design space, and