实战Java内存泄漏问题分析 -- hazelcast2.0.3使用时内存泄漏 -- 2

hazelcast 提供了3中方法调用startCleanup:

第一种是在ConcuurentMapManager的构造函数中,通过调用node的executorManager中的ScheduledExecutorService来创建每秒执行一次cleanup操作的线程(代码如下)。由于这是ConcuurentMapManager构造函数的代码,所以这种调用startCleanup的操作是默认就会有的。

node.executorManager.getScheduledExecutorService().scheduleAtFixedRate(new Runnable() {
           publicvoid run() {
               for (CMap cMap : maps.values()) {
                   cMap.startCleanup(false);
               }
           }
        }, 1, 1, TimeUnit.SECONDS);

第二种是通过配置文件来触发startCleanup的执行,配置 PutOperationhandlerif overcapacity policy,我们系统的配置文件没有配置这方面的policy,所有这种方式在我们系统中没有使用。

第三种是自己直接写代码去调用startCleanup函数(public方法,线程安全的). 这个没有实现在我们的系统中。

所以我的调查方向放在了第一种调用的情况,hazelcast里面的ScheduledExecutorService是通过java.util.ScheduledThreadPoolExecutor 来实现的.

esScheduled = new ScheduledThreadPoolExecutor(5, new ExecutorThreadFactory(node.threadGroup,
                node.getThreadPoolNamePrefix("scheduled"), classLoader), new RejectionHandler()) {
            protected void beforeExecute(Thread t, Runnable r) {
                threadPoolBeforeExecute(t, r);
            }
        }

查看ScheduledThreadPoolExecutor的实现,它把线程实现分成了3个部分: runnable tasks可执行任务, workers to execute the tasks执行任务的具体线程 以及 ScheduledThreadPoolExecutor 调度workers按照要求执行runnable tasks。我们通过scheduleAtFixdRate提交了task,scheduleAtFixedRate先把它打包成重复执行的ScheduleFutureTask

<pre name="code" class="java">    public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
                                                  long initialDelay,
                                                  long period,
                                                  TimeUnit unit) {
        if (command == null || unit == null)
            throw new NullPointerException();
        if (period <= 0)
            throw new IllegalArgumentException();
        RunnableScheduledFuture<?> t = decorateTask(command,
            new <strong>ScheduledFutureTas</strong>k<Object>(command,
                                            null,
                                            triggerTime(initialDelay, unit),
                                            unit.toNanos(period)));
        delayedExecute(t);
        return t;
}

ScheduleFutureTask的run方法实现重新schedule:

public void  run() {
 boolean periodic = isPeriodic();
 if (!canRunInCurrentRunState(periodic))
  cancel(false);
 else if (!periodic)
  ScheduledFutureTask.super.run();
 else if (ScheduledFutureTask.super.runAndReset()) {
  setNextRunTime();
 <strong> reExecutePeriodic(outerTask);</strong>
 }
}

delayedExecute里面如果当前worker的数目小于初始化定义的CorePool的数目,就创建新的worker线程,然后把task放到queue里面

private void delayedExecute(Runnable command) {
        if (isShutdown()) {
            reject(command);
            return;
        }
        // Prestart a thread if necessary. We cannot prestart it
        // running the task because the task (probably) shouldn't be
        // run yet, so thread will just idle until delay elapses.
        if (getPoolSize() < getCorePoolSize())
            prestartCoreThread();

       <strong> super.getQueue().add(command);</strong>
}
public boolean prestartCoreThread() {
        return addIfUnderCorePoolSize(null);
    }
    private boolean addIfUnderCorePoolSize(Runnable firstTask) {
        Thread t = null;
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            if (poolSize < corePoolSize && runState == RUNNING)
                t = addThread(firstTask);
        } finally {
            mainLock.unlock();
        }
        return t != null;
    }
private Thread addThread(Runnable firstTask) {
        Worker w = new Worker(firstTask);
        Thread t = threadFactory.newThread(w);
        boolean workerStarted = false;
        if (t != null) {
            if (t.isAlive()) // precheck that t is startable
                throw new IllegalThreadStateException();
            w.thread = t;
            workers.add(w);
            int nt = ++poolSize;
            if (nt > largestPoolSize)
                largestPoolSize = nt;
            try {
                t.start();
                workerStarted = true;
            }
            finally {
                if (!workerStarted)
                    workers.remove(w);
            }
        }
        return t;
}

所有启动的worker就做一件事情,从queue中取task执行

     try {
                hasRun = true;
                Runnable task = firstTask;
                firstTask = null;
                while (task != null || (task = <strong>getTask</strong>()) != null) {
                    <strong>runTask(task);</strong>
                    task = null;
                }
            } finally {
                workerDone(this);
            }
        }
    }
    Runnable getTask() {
       <strong> for (;;) {</strong>
            try {
                int state = runState;
                if (state > SHUTDOWN)
                    return null;
                Runnable r;
                if (state == SHUTDOWN)  // Help drain queue
                    r = workQueue.poll();
                else if (poolSize > corePoolSize || allowCoreThreadTimeOut)
                    r = workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS);
                else
                   <strong> r = workQueue.take();</strong>
                if (r != null)
                    return r;
                if (workerCanExit()) {
                    if (runState >= SHUTDOWN) // Wake up others
                        interruptIdleWorkers();
                    return null;
                }
                // Else retry
            } catch (InterruptedException ie) {
                // On interruption, re-check runState
            }
        }
}
private void runTask(Runnable task) {
            final ReentrantLock runLock = this.runLock;
            runLock.lock();
            try {
                if ((runState >= STOP ||
                    (Thread.interrupted() && runState >= STOP)) &&
                    hasRun)
                    thread.interrupt();
                boolean ran = false;
                beforeExecute(thread, task);
               <strong> try {
                    task.run();
                    ran = true;
                    afterExecute(task, null);
                    ++completedTasks;
                } catch (RuntimeException ex) {
                    if (!ran)
                        afterExecute(task, ex);
                    throw ex;
                }</strong>
            } finally {
                runLock.unlock();
            }
        }

了解了java threadpool的工作原理之后,我们可以知道,startCleanup是代码pass给ScheduledThreadPoolExecutor的runnable task,它不被执行,可能的原因有:

1. ScheduledThreadPoolExecutor初始化时候出错,task完全没有提交成功。由于lastCleanup并不是系统应用的启动时间,已经过了几个月了,所以,很明显在系统初始化的时候,esScheduled(ScheduledThreadPoolExecutor)还是正常工作的,只是突然在2月4号停止了工作,所以这种可能性可以排除。

2.    Worker 没有正常工作,不在从ScheduledThreadPoolExecutor的queue里面取数据,这个很快就被我排除了:

首先heap dump中有5个pending workers in esScheduled (0/2/3/5/9):

其次从thread dump中可以看出,这五个线程都是在等着从queue里面取数据:

    ……
   <strong> at java/util/concurrent/locks/AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2025)[optimiz</strong>ed]
    at java/util/concurrent/DelayQueue.take(DelayQueue.java:164)[optimized]
    at java/util/concurrent/ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:609)[inlined]
    at java/util/concurrent/ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:602)[optimized]
    at java/util/concurrent/ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:947)[optimized]
    at java/util/concurrent/ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:907)
    at java/lang/Thread.run(Thread.java:662)
    at jrockit/vm/RNI.c2java(JJJJJ)V(Native Method)
    -- end of trace
hz._hzInstance_1_com.ericsson.ngin.session.ra.hazelcast.scheduled.thread-2" id=51 idx=0xd8 tid=32639 prio=5 alive, parked, native_blocked
hz._hzInstance_1_com.ericsson.ngin.session.ra.hazelcast.scheduled.thread-3" id=52 idx=0xdc tid=32640 prio=5 alive, parked, native_blocked
hz._hzInstance_1_com.ericsson.ngin.session.ra.hazelcast.scheduled.thread-4" id=53 idx=0xe0 tid=32641 prio=5 alive, parked, native_blocked
hz._hzInstance_1_com.ericsson.ngin.session.ra.hazelcast.scheduled.thread-5" id=75590 idx=0x3cc tid=3308 prio=5 alive, parked, native_blocked

所以worker不正常也被排除了。

3.  我们提交给系统的runner task自动从queue里面消失了,从memory dump中确实发现queue没有tasks了

而没有task的原因很明显是因为当前task执行完之后没有重新reschedule,至于原因,由于scheduledFutrueTask已经不存在,无法从memory dump和thread dump中分析出结果,成为了一个谜。。。。。。

public void  run() {
 boolean periodic = isPeriodic();
 if (!canRunInCurrentRunState(periodic))
  cancel(false);
 else if (!periodic)
  ScheduledFutureTask.super.run();
 else if (ScheduledFutureTask.super.runAndReset()) {
  setNextRunTime();
 <strong> reExecutePeriodic(outerTask);</strong>
 }
}

实战Java内存泄漏问题分析 -- hazelcast2.0.3使用时内存泄漏 -- 2

时间: 2024-08-06 03:47:08

实战Java内存泄漏问题分析 -- hazelcast2.0.3使用时内存泄漏 -- 2的相关文章

实战Java内存泄漏问题分析 -- hazelcast2.0.3使用时内存泄漏 -- 1

公司当年有一个自己缓存集群用户session的Java library,是基于hazlcast2.0.3实现的,最近在customer site集群环境中某个blade报了Out of Memory Exception, 其他blades都正常,马上用jrockit jrcmd命令dump了堆和线程进行分析. printf "##################### heap ##################\n" su -p occas -c "/opt/jrocki

Java 中无返回值的方法在使用时应该注意的问题

Java 中的方法是形态多样的.无返回值的方法在使用时应该规避哪些问题呢? 一.不可以打印调用或是赋值调用,只能是单独调用(非常重要): 二.返回值没有,不代表参数就没有: 三.不能return一个具体的值,否则会报错: 四.return分号可以直接省略: 五.无返回值方法应该使用类型void. 原文地址:https://www.cnblogs.com/yanglongbo/p/10981842.html

JAVA 从GC日志分析堆内存 第七节

JAVA 从GC日志分析堆内存 第七节 在上一章中,我们只设置了整个堆的内存大小.但是我们知道,堆又分为了新生代,年老代.他们之间的内存怎么分配呢?新生代又分为Eden和Survivor,他们的比例大小能改变吗?其实这些都是可控的,以前没有讲到是因为就算讲了也只是讲讲而已,看不到实质性的东西.因此这章我们通过分析GC日志来一步步讲解如何细化设置堆内存. 首先我们来了解几个相关的参数: -XX:+PrintGCDetails:用于告诉虚拟机回收垃圾的时候顺便打印日志. -Xloggc:路径 :将打

java内存查看与分析

业界有很多强大的java profile的工具,比如Jporfiler,yourkit,这些收费的东西我就不想说了,想说的是,其实java自己就提供了很多内存监控的小工具,下面列举的工具只是一小部分,仔细研究下jdk的工具,还是蛮有意思的呢:) 1:gc日志输出 在jvm启动参数中加入 -XX:+PrintGC -XX:+PrintGCDetails -XX:+PrintGCTimestamps -XX:+PrintGCApplicationStopedTime,jvm将会按照这些参数顺序输出g

Java 内存查看与分析

1:gc日志输出 在jvm启动参数中加入 -XX:+PrintGC -XX:+PrintGCDetails -XX:+PrintGCTimestamps -XX:+PrintGCApplicationStopedTime,jvm将会按照这些参数顺序输出gc概要信息,详细信息,gc时间信息,gc造成的应用暂停时间.如果在刚才的参数后面加入参数 -Xloggc:文件路径,gc信息将会输出到指定的文件中.其他参数还有 -verbose:gc和-XX:+PrintTenuringDistribution

通过 thread dump 分析找到高CPU耗用与内存溢出的Java代码

http://heylinux.com/archives/1085.html通过 thread dump 分析找到高CPU耗用与内存溢出的Java代码 首先,要感谢我的好朋友 钊花 的经验分享. 相信大家在实际的工作当中,肯定会遇到由代码所导致的高CPU耗用以及内存溢出的情况. 通常这种情况发生时,我们会认为这些问题理所当然的该由开发人员自己去解决,因为操作系统环境是没有任何问题的. 但实际上,我们是可以帮助他们的,效果好的话还可以定位到具体出问题的代码行数,思路如下: 1.通过对CPU与内存的

java内存管理的分析

java 中的内存分为四个部分: stack(栈):存放基本类型的数据和对象的引用,即存放局部变量. Note: 如果存放的是基本类型数据(非静态变量),则直接将变量名和值存入stack中. 如果存放的是引用类型,则将变量名存入栈,然后指向它new出的对象(存放在堆中). heap(堆)存放 new 出来的东西. data segment(数据区):分为静态区和常量区(常量池) 静态区(static segment): 存放在对象中用 static 定义的静态成员(即静态变量,如果该静态变量是基

记一次内存泄漏DUMP分析

自从进入一家创业公司以后,逐渐忙成狗,却无所收获,感觉自身的技术能力用武之地很少,工作生活都在业务逻辑中颠倒. 前些天线上服务内存吃紧,让运维把DUMP拿下来,分析一下聊以自慰. 先来统计一下大对象信息 0:000> !dumpheap -min 85000 -stat Statistics: MT Count TotalSize Class Name 000007feec34c168 7 57734750 System.Char[] 000007feec34aee0 14 115469904

Java虚拟机6:垃圾收集(GC)-1(内存溢出和内存泄漏的区别)

1.前言 在进行垃圾收集之前需要普及几个比较重要的概念. 2.内存溢出和内存泄露的概念和区别: (1):内存溢出(out of memory):是指程序在申请内存时,没有足够的内存空间可以分配,系统不能满足需求,出现了out of memory:比如申请了一个int,但是它存了long才能存下的数,那就是内存溢出. (2):内存泄露(memory leak):是指程序在申请内存之后,无法释放掉已经申请到的内存空间,它始终占用着内存,这样越积越多,最后内存被占光.内存泄露一般都是因为内存中有一块很