cassandra高级操作之JMX操作

需求场景

  项目中有这么个需求:统计集群中各个节点的数据量存储大小,不是记录数。

  一开始有点无头绪,后面查看cassandra官方文档看到Monitoring章节,里面说到:Cassandra中的指标使用Dropwizard Metrics库进行管理。 这些指标可以通过JMX查询,也可以使用多个内置和第三方报告插件推送到外部监控系统(Jconsole)。那么数据量存储大小是不是也是cassandra的某项指标了? 带着疑问,我通过Jconsole看到了cassandra的一些指标(先启动cassandra, 运行  -> Jconsole),如下图

  数据量存储大小就在叫org.apache.cassandra.db的MBean中,具体会在之后介绍。

JMX定义

  引用JMX超详细解读中一段话:

JMX(Java Management Extensions)是一个为应用程序植入管理功能的框架。JMX是一套标准的代理和服务,实际上,用户可以在任何Java应用程序中使用这些代理和服务实现管理。这是官方文档上的定义,我看过很多次也无法很好的理解。我个人的理解是JMX让程序有被管理的功能,例如你开发一个WEB网站,它是在24小时不间断运行,那么你肯定会对网站进行监控,如每天的UV、PV是多少;又或者在业务高峰的期间,你想对接口进行限流,就必须去修改接口并发的配置值。

  应用场景:中间件软件WebLogic的管理页面就是基于JMX开发的,而JBoss则整个系统都基于JMX构架,另外包括cassandra中各项指标的管理。

  对于一些参数的修改,网上有一段描述还是比较形象的:

  1、程序初哥一般是写死在程序中,到要改变的时候就去修改代码,然后重新编译发布。

  2、程序熟手则配置在文件中(JAVA一般都是properties文件),到要改变的时候只要修改配置文件,但还是必须重启系统,以便读取配置文件里最新的值。

  3、程序好手则会写一段代码,把配置值缓存起来,系统在获取的时候,先看看配置文件有没有改动,如有改动则重新从配置里读取,否则从缓存里读取。

  4、程序高手则懂得物为我所用,用JMX把需要配置的属性集中在一个类中,然后写一个MBean,再进行相关配置。另外JMX还提供了一个工具页,以方便我们对参数值进行修改。

  给我的感觉,jmx server进行监听,jmx client进行请求的发送,以此达到通信的目的;cassandra的jmx server已经被cassandra实现,我们只需要实现jmx client,就能从cassandra进程中拿到我们需要的指标数据。

JMX Server

  1、 首先定义一个MBean接口

    接口的命名规范为以具体的实现类为前缀(这个规范很重要),动态代理的过程中需要用到这点。

public interface HelloMBean
{
    String getName();
    void setName(String name);

    void print();
}

  2、定义一个实现类

    实现上面的接口:

public class Hello implements HelloMBean
{

    private String name;

    @Override
    public String getName()
    {
        return this.name;
    }

    @Override
    public void setName(String name)
    {
        this.name = name;
    }

    @Override
    public void print()
    {
        System.out.println("hello, print");
    }

}

  3、定义一个jmx server,并启动它:

public class HelloService
{
    private static final int RMI_PORT = 8099;
    private static final String JMX_SERVER_NAME = "TestJMXServer";
    private static final String USER_NAME = "hello";
    private static final String PASS_WORD = "world";

    public static void main(String[] args) throws Exception
    {
        HelloService service = new HelloService();
        service.startJmxServer();
    }

    private void startJmxServer() throws Exception
    {
        //MBeanServer mbs = MBeanServerFactory.createMBeanServer(jmxServerName);
        MBeanServer mbs = this.getMBeanServer(); 

        // 在本地主机上创建并输出一个注册实例,来接收特定端口的请求
        LocateRegistry.createRegistry(RMI_PORT, null, RMISocketFactory.getDefaultSocketFactory()); 

        JMXServiceURL url = new JMXServiceURL("service:jmx:rmi:///jndi/rmi://localhost:" + RMI_PORT + "/" + JMX_SERVER_NAME);
        System.out.println("JMXServiceURL: " + url.toString());  

        Map<String, Object> env = this.putAuthenticator();

        //JMXConnectorServer jmxConnServer = JMXConnectorServerFactory.newJMXConnectorServer(url, null, mbs);   // 不加认证
        JMXConnectorServer jmxConnServer = JMXConnectorServerFactory.newJMXConnectorServer(url, env, mbs);      // 加认证
        jmxConnServer.start();
    }

    private MBeanServer getMBeanServer() throws Exception
    {
        MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
        ObjectName objName = new ObjectName(JMX_SERVER_NAME + ":name=" + "hello");
        mbs.registerMBean(new Hello(), objName);
        return mbs;
    }

    private Map<String, Object> putAuthenticator()
    {
        Map<String,Object> env = new HashMap<String,Object>();
        JMXAuthenticator auth = createJMXAuthenticator();
        env.put(JMXConnectorServer.AUTHENTICATOR, auth);

        env.put("com.sun.jndi.rmi.factory.socket", RMISocketFactory.getDefaultSocketFactory());
        return env;
    }

    private JMXAuthenticator createJMXAuthenticator()
    {
        return new JMXAuthenticator()
        {
            public Subject authenticate(Object credentials)
            {
                String[] sCredentials = (String[]) credentials;
                if (null == sCredentials || sCredentials.length != 2)
                {
                    throw new SecurityException("Authentication failed!");
                }
                String userName = sCredentials[0];
                String password = sCredentials[1];
                if (USER_NAME.equals(userName) && PASS_WORD.equals(password))
                {
                    Set<JMXPrincipal> principals = new HashSet<JMXPrincipal>();
                    principals.add(new JMXPrincipal(userName));
                    return new Subject(true, principals, Collections.EMPTY_SET,
                            Collections.EMPTY_SET);
                }
                throw new SecurityException("Authentication failed!");
            }
        };
    }
}

  

  点下print按钮,你会发现控制台会打印:hello, print。

  cassandra的jmx server已经自己实现了,我们不需要实现它,我们需要实现的是调用它。

JMX client

  这个是我们需要关注和实现的

  1、client端接口定义

    接口中定义的方法是我们需要调用的,方法名必须与server端暴露的方法一样,通过server端动态生成client端的实例,实例中的方法只包括client端接口中定义的方法(若server端暴露的是属性,那么直接在属性前加get,后面cassandra部分会讲到)

public interface HelloClientMBean
{
    void print();          // 方法定义与server端暴露的方法一致
}

  2、连接jmx server

public class HelloClient implements AutoCloseable
{
    private static final String fmtUrl = "service:jmx:rmi:///jndi/rmi://[%s]:%d/TestJMXServer";
    private static final String ssObjName = "TestJMXServer:name=hello";
    private static final int defaultPort = 1099;                       // cassandra默认端口是7199
    final String host;
    final int port;
    private String username;
    private String password;

    private JMXConnector jmxc;
    private MBeanServerConnection mbeanServerConn;
    private HelloMBean hmProxy;

    /**
     * Creates a connection using the specified JMX host, port, username, and password.
     *
     * @param host hostname or IP address of the JMX agent
     * @param port TCP port of the remote JMX agent
     * @throws IOException on connection failures
     */
    public HelloClient(String host, int port, String username, String password) throws IOException
    {
        assert username != null && !username.isEmpty() && password != null && !password.isEmpty()
               : "neither username nor password can be blank";

        this.host = host;
        this.port = port;
        this.username = username;
        this.password = password;
        connect();
    }

    /**
     * Creates a connection using the specified JMX host and port.
     *
     * @param host hostname or IP address of the JMX agent
     * @param port TCP port of the remote JMX agent
     * @throws IOException on connection failures
     */
    public HelloClient(String host, int port) throws IOException
    {
        this.host = host;
        this.port = port;
        connect();
    }

    /**
     * Creates a connection using the specified JMX host and default port.
     *
     * @param host hostname or IP address of the JMX agent
     * @throws IOException on connection failures
     */
    public HelloClient(String host) throws IOException
    {
        this.host = host;
        this.port = defaultPort;
        connect();
    }

    /**
     * Create a connection to the JMX agent and setup the M[X]Bean proxies.
     *
     * @throws IOException on connection failures
     */
    private void connect() throws IOException
    {
        JMXServiceURL jmxUrl = new JMXServiceURL(String.format(fmtUrl, host, port));
        Map<String,Object> env = new HashMap<String,Object>();
        if (username != null)
        {
            String[] creds = { username, password };
            env.put(JMXConnector.CREDENTIALS, creds);
        }

        env.put("com.sun.jndi.rmi.factory.socket", getRMIClientSocketFactory());

        jmxc = JMXConnectorFactory.connect(jmxUrl, env);
        mbeanServerConn = jmxc.getMBeanServerConnection();

        try
        {
            ObjectName name = new ObjectName(ssObjName);
            hmProxy = JMX.newMBeanProxy(mbeanServerConn, name, HelloMBean.class);
        }
        catch (MalformedObjectNameException e)
        {
            throw new RuntimeException(
                    "Invalid ObjectName? Please report this as a bug.", e);
        }
    }

    private RMIClientSocketFactory getRMIClientSocketFactory() throws IOException
    {
        if (Boolean.parseBoolean(System.getProperty("ssl.enable")))
            return new SslRMIClientSocketFactory();
        else
            return RMISocketFactory.getDefaultSocketFactory();
    }

    public void print()
    {
        hmProxy.print();
    }

    @Override
    public void close() throws Exception
    {
        jmxc.close();
    }
}

  3、接口调用

public class JmxClient
{
    public static void main(String[] args) throws Exception
    {
        HelloClient client = new HelloClient("localhost", 8099, "hello", "world");
        client.print();
        client.close();
    }

}

    会在控制台打印:hello, print。

统计cassandra集群中各个节点的数据量存储大小

  也分3步:

  1、client端接口定义

    因为我们只关心数据量存储大小,所以我们只需要在接口定义一个方法

public interface StorageServiceMBean
{
    /** Human-readable load value.  Keys are IP addresses. */
    public Map<String, String> getLoadMap();            // cassandra端暴露的是属性LoadMap,那么此方法名由get加LoadMap组成, 那么getLoad方法就可以获取LoadMap的值
}

  2、连接jmx server    

    cassandra-env.sh配置文件中有cassandra的JMX默认端口:JMX_PORT="7199"

public class CassNodeProbe implements AutoCloseable
{
    private static final String fmtUrl = "service:jmx:rmi:///jndi/rmi://[%s]:%d/jmxrmi";
    private static final String ssObjName = "org.apache.cassandra.db:type=StorageService";
    private static final int defaultPort = 7199;
    final String host;
    final int port;
    private String username;
    private String password;

    private JMXConnector jmxc;
    private MBeanServerConnection mbeanServerConn;
    private StorageServiceMBean ssProxy;

    /**
     * Creates a NodeProbe using the specified JMX host, port, username, and password.
     *
     * @param host hostname or IP address of the JMX agent
     * @param port TCP port of the remote JMX agent
     * @throws IOException on connection failures
     */
    public CassNodeProbe(String host, int port, String username, String password) throws IOException
    {
        assert username != null && !username.isEmpty() && password != null && !password.isEmpty()
               : "neither username nor password can be blank";

        this.host = host;
        this.port = port;
        this.username = username;
        this.password = password;
        connect();
    }

    /**
     * Creates a NodeProbe using the specified JMX host and port.
     *
     * @param host hostname or IP address of the JMX agent
     * @param port TCP port of the remote JMX agent
     * @throws IOException on connection failures
     */
    public CassNodeProbe(String host, int port) throws IOException
    {
        this.host = host;
        this.port = port;
        connect();
    }

    /**
     * Creates a NodeProbe using the specified JMX host and default port.
     *
     * @param host hostname or IP address of the JMX agent
     * @throws IOException on connection failures
     */
    public CassNodeProbe(String host) throws IOException
    {
        this.host = host;
        this.port = defaultPort;
        connect();
    }

    /**
     * Create a connection to the JMX agent and setup the M[X]Bean proxies.
     *
     * @throws IOException on connection failures
     */
    private void connect() throws IOException
    {
        JMXServiceURL jmxUrl = new JMXServiceURL(String.format(fmtUrl, host, port));
        Map<String,Object> env = new HashMap<String,Object>();
        if (username != null)
        {
            String[] creds = { username, password };
            env.put(JMXConnector.CREDENTIALS, creds);
        }

        env.put("com.sun.jndi.rmi.factory.socket", getRMIClientSocketFactory());

        jmxc = JMXConnectorFactory.connect(jmxUrl, env);
        mbeanServerConn = jmxc.getMBeanServerConnection();

        try
        {
            ObjectName name = new ObjectName(ssObjName);
            ssProxy = JMX.newMBeanProxy(mbeanServerConn, name, StorageServiceMBean.class);
        }
        catch (MalformedObjectNameException e)
        {
            throw new RuntimeException(
                    "Invalid ObjectName? Please report this as a bug.", e);
        }
    }

    private RMIClientSocketFactory getRMIClientSocketFactory() throws IOException
    {
        if (Boolean.parseBoolean(System.getProperty("ssl.enable")))
            return new SslRMIClientSocketFactory();
        else
            return RMISocketFactory.getDefaultSocketFactory();
    }

    public Map<String, String> getCassClusterStorage()
    {
        return ssProxy.getLoadMap();
    }

    @Override
    public void close() throws Exception
    {
        jmxc.close();
    }
}

  3、接口调用

public class JMXTest
{

    public static void main(String[] args) throws Exception
    {
        CassNodeProbe prode = new CassNodeProbe("127.0.0.1");
        Map<String, String> nodeStorages = prode.getCassClusterStorage();
        System.out.println(nodeStorages);
        prode.close();
    }

}

  最后得到结果:{127.0.0.1=266.36 KB}

  cassandra的jmx 认证访问我就不做演示了,大家自己去实现。

cassandra jmx client实现:cassandra-all

  cassandra给我们提供了工具jar,jmx server暴露的在这个工具jar中都有对应的请求方式;

  如若大家用到的很少则可以自己实现,而不需要用cassandra-all,当然我们可以拷贝cassandra-all中我们需要的代码到我们的工程中,那么我们就可以不用引用此jar,但是又满足了我们的需求

<dependency>
    <groupId>org.apache.cassandra</groupId>
    <artifactId>cassandra-all</artifactId>
    <version>2.1.14</version>
</dependency>

  工程附件

参考:

  http://www.cnblogs.com/FlyAway2013/p/jmx.html

  http://www.cnblogs.com/dongguacai/p/5900507.html

时间: 2024-11-06 15:08:38

cassandra高级操作之JMX操作的相关文章

高级dml操作,insert操作

drop table e1 purge; drop table e2 purge; create table e1 as select ename,sal,hiredate from emp where 9=0; create table e2 as select ename,deptno,mgr from emp where 9=0; insert all into e1 values(ename,sal,hiredate) into e2 values(ename,depno,mgr) se

atitit.浏览器web gui操作类库 和 操作chrome浏览器的类库使用总结

atitit.浏览器web gui操作类库 和 操作chrome浏览器的类库使用总结 1. 常见标准 1 1.1. 录制重放 1 1.2. 一个窗体一个proxy cookie 1 1.3. exec js 1 1.4. js 调用java 1 1.5. 修改dom属性 2 1.6. 关键字驱动 2 1.7. 加载js类库 2 1.8. 一个窗口一个代理 2 1.9. 独立窗口cookie 2 1.10. 无图模式支持 2 1.11. 支持自定义路径 2 2. 框架选型selenium2(web

Python之路-(Django(csrf,中间件,缓存,信号,Model操作,Form操作))

csrf 中间件 缓存 信号 Model操作 Form操作 csrf: 用 django 有多久,我跟 csrf 这个概念打交道就有久了. 每次初始化一个项目时都能看到 django.middleware.csrf.CsrfViewMiddleware 这个中间件 每次在模板里写 form 时都知道要加一个 {% csrf_token %} tag 每次发 ajax POST 请求,都需要加一个 X_CSRFTOKEN 的 header 什么是 CSRF CSRF, Cross Site Req

ubuntu命令行操作mysql常用操作

登陆mysql [email protected]:~/ruby/mydiary$ mysql -u root -p Enter password: Welcome to the MySQL monitor. Commands end with ; or \g. 查看所有的数据库 mysql> show databases; --注意必须要用;结尾否则不会立即执行代码 +--------------------+ | Database | +--------------------+ | inf

ThinkPHP - 前置操作+后置操作

前置操作和后置操作 系统会检测当前操作(不仅仅是index操作,其他操作一样可以使用)是否具有前置和后置操作,如果存在就会按照顺序执行,前置和后置操作的方法名是在要执行的方法前面加 _before_和_after_,例如: class CityAction extends Action{ //前置操作方法 public function _before_index(){ echo 'before<br/>'; } public function index(){ echo 'index<

JAVA的IO操作:内存操作流

掌握内存操作流 输入和输出都是从文件中来的,当然,也可将输出的位置设置在内存上,这就需要ByteArrayInputStream和ByteArrayOutputStream ByteArrayInputStream:将内容写入到内存中, ByteArrayOutputStream:将内存中数据输出 此时的操作应该以内存为操作点. 利用此类 完成一些功能. 常用方法 ByteArrayInputStream :是InputStream子类. public class ByteArrayInputS

hdu5795 A Simple Nim 求nim求法,打表找sg值规律 给定n堆石子,每堆有若干石子,两个人轮流操作,每次操作可以选择任意一堆取走任意个石子(不可以为空) 或者选择一堆,把它分成三堆,每堆不为空。求先手必胜,还是后手必胜。

/** 题目:A Simple Nim 链接:http://acm.hdu.edu.cn/showproblem.php?pid=5795 题意:给定n堆石子,每堆有若干石子,两个人轮流操作,每次操作可以选择任意一堆取走任意个石子(不可以为空) 或者选择一堆,把它分成三堆,每堆不为空.求先手必胜,还是后手必胜. 思路: 组合游戏Nim: 计算出每一堆的sg值,然后取异或.异或和>0那么先手,否则后手. 对于每一堆的sg值求解方法: 设:sg(x)表示x个石子的sg值.sg(x) = mex{sg

jqm文件上传,上传图片,jqm的表单操作,jqm的ajax的使用,jqm文件操作大全,文件操作demo

最近在论坛中看到,在使用html5中上传图片或文件,出现各种问题.这一方面,我也一直没有做过,今天就抽出了一点时间来学习一下.现在的示例已经ok了,我就给大家分享一下,希望对大家有帮助. 好吧,我们先看看效果截图吧: 还行吧,来看页面代码: <!DOCTYPE html PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN" "http://www.w3.org/TR/html4/loose.dtd"> <h

Node.js学习笔记【3】NodeJS基础、代码的组织和部署、文件操作、网络操作、进程管理、异步编程

一.表 学生表 CREATE TABLE `t_student` ( `stuNum` int(11) NOT NULL auto_increment, `stuName` varchar(20) default NULL, `birthday` date default NULL, PRIMARY KEY  (`stuNum`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8 学生分数表 CREATE TABLE `t_stu_score` ( `id` int(11