Apache Camel系列(3)----Redis组件

Redis组件允许你从Redis接收消息,以及将消息发送给Redis。RedisProducer的功能很强大,几乎能执行所有的Redis Command,这些Command都是在Message的header中进行设置的。遗憾的是RedisConsumer仅仅支持pub/sub模式,不支持Point2Point,这意味这在Camel中,通过阻塞的方式消费Lists中的消息是不可行的。我反馈了这个问题到Apache Camel Mail List,希望以后的版本支持P2P更能。下面演示如何使用camel-spring-redis组件。

使用Spring

1,创建Maven工程,添加camel-spring-redis引用,pom.xml文件内容如下:

<dependencies>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>3.8.1</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.camel</groupId>
            <artifactId>camel-core</artifactId>
            <version>2.17.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.camel</groupId>
            <artifactId>camel-spring</artifactId>
            <version>2.17.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.camel</groupId>
            <artifactId>camel-spring-redis</artifactId>
            <version>2.17.0</version>
        </dependency>
    </dependencies>

2,在资源文件下下创建spring bean配置文件,内容如下:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:camel="http://camel.apache.org/schema/spring"
       xsi:schemaLocation="
          http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
          http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd">

    <bean id="connectionFactory" class="org.springframework.data.redis.connection.jedis.JedisConnectionFactory">
        <property name="hostName" value="localhost" />
        <property name="port" value="9999" />
        <property name="password" value="1234567890" />
    </bean>
    <bean id="serializer" class="org.springframework.data.redis.serializer.StringRedisSerializer" />

    <camelContext id="camel" xmlns="http://camel.apache.org/schema/spring">
        <route startupOrder="1">
            <from uri="timer://foo?fixedRate=true&amp;period=1000"/>
            <setHeader headerName="CamelRedis.Command">
                <constant>SET</constant>
            </setHeader>
            <setHeader headerName="CamelRedis.Key">
                <constant>keyOne</constant>
            </setHeader>
            <setHeader headerName="CamelRedis.Value">
                <constant>valueOne</constant>
            </setHeader>
            <to uri="spring-redis://localhost:9999?connectionFactory=#connectionFactory&amp;serializer=#serializer"/>
        </route>
    </camelContext>
</beans>

上边的beans文件中,定义了bean connectionFactory,可以设置redis服务器的相关信息,比如密码。如果你的redis服务器没有设置密码,那么这个bean定义可以省略,此时配置文件如下:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:camel="http://camel.apache.org/schema/spring"
       xsi:schemaLocation="
          http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
          http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd">
    <bean id="serializer" class="org.springframework.data.redis.serializer.StringRedisSerializer" />

    <camelContext id="camel" xmlns="http://camel.apache.org/schema/spring">
        <route startupOrder="1">
            <from uri="timer://foo?fixedRate=true&amp;period=1000"/>
            <setHeader headerName="CamelRedis.Command">
                <constant>SET</constant>
            </setHeader>
            <setHeader headerName="CamelRedis.Key">
                <constant>keyOne</constant>
            </setHeader>
            <setHeader headerName="CamelRedis.Value">
                <constant>valueOne</constant>
            </setHeader>
            <to uri="spring-redis://localhost:9999?serializer=#serializer"/>
        </route>
    </camelContext>
</beans>

setHeader中设置了Redis的相关命令,RedisProducer支持几乎所有的RedisCmmand,具体可参见Redis Component官网

这里的意思是每隔1S为Redis服务器的keyOnve设置一个值valueOne。

3, 创建App3.java,启动spring。

/**
 * Created by sam on 5/10/16.
 */
public class App3 {
    public static void main(String[] args) throws Exception {
        ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("context.xml");
        context.start();
        System.in.read();
    }
}

启动ClassPathXmlApplicationContext的时候,系统会自动创建并运行CamelContext。

监控Redis,得到结果如下:

[[email protected] redis-2.8.19]$ src/redis-cli -p 9999 -a 1234567890
127.0.0.1:9999> MONITOR
OK
1462931627.929228 [0 127.0.0.1:50939] "PING"
1462931686.110952 [0 127.0.0.1:50943] "AUTH" "1234567890"
1462931686.120240 [0 127.0.0.1:50943] "SET" "keyOne" "valueOne"
1462931687.065705 [0 127.0.0.1:50943] "SET" "keyOne" "valueOne"
1462931688.066442 [0 127.0.0.1:50943] "SET" "keyOne" "valueOne"
1462931689.066169 [0 127.0.0.1:50943] "SET" "keyOne" "valueOne"
1462931690.065948 [0 127.0.0.1:50943] "SET" "keyOne" "valueOne"
1462931691.065674 [0 127.0.0.1:50943] "SET" "keyOne" "valueOne"

不使用Spring

公司不是所有的项目都用了Spring,所以研究了两天,做了个非Spring的Demo,还是maven工程,pom.xml配置文件和上边的一样,创建app4.java类,代码如下:

public class App4 {
    public static void main(String[] args) throws Exception {
        JedisConnectionFactory connectionFactory = new JedisConnectionFactory(); // 创建connectionFactory
        connectionFactory.setHostName("localhost");
        connectionFactory.setPassword("1234567890");
        connectionFactory.setPort(9999);
        SimpleRegistry registry = new SimpleRegistry();
        connectionFactory.afterPropertiesSet(); // 必须要调用该方法来初始化connectionFactory
        registry.put("connectionFactory", connectionFactory); //注册connectionFactory
        registry.put("serializer", new StringRedisSerializer()); //注册serializer

        CamelContext context = new DefaultCamelContext(registry);
        context.addRoutes(new RouteBuilder() {
            public void configure() {
                errorHandler(deadLetterChannel("stream:out"));
                from("timer://foo?fixedRate=true&period=1000").
                        setHeader("CamelRedis.Command", constant("PUBLISH")).
                        setHeader("CamelRedis.Channel", constant("testChannel")).
                        setHeader("CamelRedis.Message", constant(new Date().toString())).
                        to("spring-redis://localhost:9999?connectionFactory=#connectionFactory&serializer=#serializer");
            }
        });
        context.setTracing(true);
        context.start();
        Thread.sleep(Integer.MAX_VALUE);
        context.stop();
    }
}

这段代码主要是使用SimpleRegistry来注册bean信息,并将SimpleRegistry作为CamelContext的参数,这样在endpoint的Url中就可以使用之前注册的bean了。

另外在创建完connectionFactory后要调用afterPropertiesSet()方法来完成初始化。如果你的redis没有设置密码,并且不需要serializer,那么代码更简单,如下:

public class App4 {
    public static void main(String[] args) throws Exception {
        CamelContext context = new DefaultCamelContext();
        context.addRoutes(new RouteBuilder() {
            public void configure() {
                errorHandler(deadLetterChannel("stream:out"));
                from("timer://foo?fixedRate=true&period=1000").
                        setHeader("CamelRedis.Command", constant("PUBLISH")).
                        setHeader("CamelRedis.Channel", constant("testChannel")).
                        setHeader("CamelRedis.Message", constant(new Date().toString())).
                        to("spring-redis://localhost:9999");
            }
        });
        context.setTracing(true);
        context.start();
        Thread.sleep(Integer.MAX_VALUE);
        context.stop();
    }
}

以上代码经测试,都可运行。

时间: 2024-10-17 16:35:31

Apache Camel系列(3)----Redis组件的相关文章

Apache Camel系列(1)----使用场景

Apache Camel是一个基于Enterprise Integration Pattern(企业整合模式,简称EIP)的开源框架.EIP定义了一些不同应用系统之间的消息传输模型,包括常见的Point-to-Point,Pub/Sub模型.更多关于EIP的信息,可以参见这里 Apache Camel主要提供了以下功能: 1,实现了EIP的大部分模式,如果你要在不同的应用系统之间以不同的方式传递消息,那么你可以从Apache Camel中找到解决反感. 2,提供了大量Component(组件),

Apache Camel系列(2)----Hello World

下面创建一个Apache Camel的Hello World程序,该程序使用Maven,Intellij 15,运行环境是JDK 8. 1,创建一个maven工程,在pom.xml文件中添加apache camel的dependencies. <dependencies> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version&g

架构设计:系统间通信(36)——Apache Camel快速入门(上)

1.本专题主旨 1-1.关于技术组件 在这个专题中,我们介绍了相当数量技术组件:Flume.Kafka.ActiveMQ.Rabbitmq.Zookeeper.Thrift .Netty.DUBBO等等,还包括本文要进行介绍的Apache Camel.有的技术组件讲得比较深入,有的技术组件则是点到为止.于是一些读者朋友发来信息向我提到,这个专题的文章感觉就像一个技术名词的大杂烩,并不清楚作者的想要通过这个专题表达什么思想. 提出这个质疑的朋友不在少数,所以我觉得有必要进行一个统一的说明.这个专题

Apache Shiro系列四:Shiro的架构

Shiro的设计目标就是让应用程序的安全管理更简单.更直观. 软件系统一般是基于用户故事来做设计.也就是我们会基于一个客户如何与这个软件系统交互来设计用户界面和服务接口.比如,你可能会说:“如果用户登录了我们的系统,我就给他们显示一个按钮,点击之后可以查看他自己的账户信息.如果没有登录,我就给他显示一个注册按钮.” 上述应用程序在很大程度上是为了满足用户的需求而编写的,即便这个“用户”不是人,而是一个其他的软件系统.你仍然是按照谁当前正在与你的系统交互的逻辑来编写你的逻辑代码. Shiro的设计

[每日一学]apache camel简介

apache camel 是轻量级esb框架.如下是它的架构图: 它有几个比较重要的概念就是: endpoint,所谓的endpoint,就是一种可以接收或发送数据的组件.可以支持多种协议,如jms,http,file等. 另一个重要的概念就是processor,它是用来处理具体业务逻辑的组件. 还有一个是:route,用来路由,指示数据从哪里来到哪里去,中间用哪个processor处理. 而processor之间用exchange对象来传送数据,有点像jms,通俗一点就像上学时传的小纸条,记住

Apache Shiro系列之五:配置

Shiro设计的初衷就是可以运行于任何环境:无论是简单的命令行应用程序还是复杂的企业集群应用.由于运行环境的多样性,所以有多种配置机制可用于配置,本节我们将介绍Shiro内核支持的这几种配置机制. 小贴士:多种配置方案: Shiro的SecurityManager是和JavaBean兼容的,所以我们可以使用诸如Java.Xml(Spring.Jboss.Guice等).YAML.Json.Groovy等配置方式.   一.基于Java代码的配置 最简单的创建并且使用SecurityManager

在Apache Tomcat 7设置redis作为session store

在Apache Tomcat 7设置redis作为session store redis已经有组件支持直接在tomcat7中设置下将redis作为tomcat默认的session存储器,下面介绍下配置过程 1.从http://redis.io/下载redis,按照redis服务端 wget http://download.redis.io/redis-stable.tar.gz tar xvzf redis-stable.tar.gz cd redis-stable make 2.启动redis

架构设计:系统间通信(38)——Apache Camel快速入门(下1)

======================= (接上文<架构设计:系统间通信(37)--Apache Camel快速入门(中)>) 3-5-2-3循环动态路由 Dynamic Router 动态循环路由的特点是开发人员可以通过条件表达式等方式,动态决定下一个路由位置.在下一路由位置处理完成后Exchange将被重新返回到路由判断点,并由动态循环路由再次做出新路径的判断.如此循环执行直到动态循环路由不能再找到任何一条新的路由路径为止.下图来源于官网(http://camel.apache.or

Apache Camel 与 Spring Boot 集成,通过FTP定时采集、处理文件

1.概要: 本项目主要是通过在Spring平台上配置Camel.FTP,实现定时从FTP服务器下载文件到本地.解析文件.存入数据库等功能. 2.搭建空项目: Spring Boot有几种自动生成空项目的机制:CLI.Spring tool suite.网站Spring Initializr,我们选择第三个. 访问网站http://start.spring.io/,如下图 在dependencies添加依赖包的时候,在框中输入camle.jdbc.mysql会自动弹出提示,确认即为选中,如下图: