elasticsearch与mongodb分布式集群环境下数据同步

1.ElasticSearch是什么

ElasticSearch 是一个基于Lucene构建的开源、分布式,RESTful搜索引擎。它的服务是为具有数据库和Web前端的应用程序提供附加的组件(即可搜索的存储库)。ElasticSearch为应用程序提供搜索算法和相关的基础架构,用户只需要将应用程序中的数据上载到ElasticSearch数据存储中,就可以通过RESTful URL与其交互。ElasticSearch的架构明显不同于它之前的其他搜索引擎架构,因为它是通过水平伸缩的方式来构建的。不同于Solr,它在设计之初的目标就是构建分布式平台,这使得它能够和云技术以及大数据技术的崛起完美吻合。ElasticSearch构建在更稳定的开源搜索引擎Lucene之上,它的工作方式与无模式的JSON文档数据非常类似。

ElasticSearch的关键特征

  • RESTful风格

在所有的ElasticSearch的介绍中都不可避免的提到了它是一种具有RESTful特点的搜索引擎。那么什么是RESTful呢?REST(Representational State Transfer表述性状态转移)是一种针对网络应用的设计和开发方式,可以降低开发的复杂性并提高系统的可伸缩性。REST有一些设计概念和准则,凡是遵循这些准则所开发的应用即具备RESTful风格。在REST风格结构中,所有的请求都必须在一个由URL制定的具体地址的对象上进行。例如,如果用/schools/代表一系列学校的话,/schools/1就代表id为1的那所学校,依次类推。这种设计风格为用户提供了一种简单便捷的操作方式,用户可以通过curl等RESTful API与ElasticSearch进行交互,避免了管理XML配置文件的麻烦。下面将简单介绍

一下通过curl工具对ElasticSearch进行CRUD(增删改查)操作。

l  索引构建

为了对一个JSON对象进行索引创建,需要向REST API提交PUT请求,在请求中指定由索引名称,type名称和ID组成的URL。即

http://localhost:9200/<index>/<type>/[<id>]

例如:curl -XPUT "http://localhost:9200/movies/movie/1" -d‘

{

"title": "The Godfather",

"director": "Francis Ford Coppola",

"year":1972

}‘

l  通过ID获得索引数据

向已经构建的索引发送GET请求,即http://localhost:9200/<index>/<type>/<id>

例如:curl -XGET "http://localhost:9200/movies/movie/1" -d‘‘

后面不带参数时 -d‘‘不要也可以

l  删除文档

通过ID指定的索引删除单个文档。URL和索引创建、获取时相同。

例如:curl -XDELETE "http://localhost:9200/movies/movie/1" -d‘‘

  • ElasticSearch采用Gateway的概念,使得全备份变得更简单。

由于ElasticSearch是专门为分布式环境设计的,所以怎么去对所有节点的索引信息进行持久化是个问题。当然,除了索引信息以外,还有集群信息,mapping和事务日志等都需要进行持久化。当你的节点出现故障或者集群重启的时候,这些信息就变得非常重要。ElasticSearch中有一个专门的gateway模块负责元信息的持久化存储。(Solr里边是不是通过Zookeeper在管理这部分?)

  • ElasticSearch支持facetting(facetedsearch,分面搜索)和precolating

分面是指事物的多维度属性。例如一本书包含主题、作者、年代等方面。而分面搜索是指通过事物的这些属性不断筛选、过滤搜索结果的方法。当然这点在Lucene中已经得到了实现,所以Solr也支持faceted searching。至于precolating特性则是ElasticSearch设计中的一大亮点。Precolator(过滤器)允许你在ElasticSearch中执行与上文档、建立索引、执行查询这样的常规操作恰恰相反的过程。通过Precolate API,可以在索引上注册许多查询,然后向指定的文档发送prelocate请求,返回匹配该文档的注册查询。举个简单的例子,假设我们想获取所有包含了”elasticsearch”这个词的tweet,则可以在索引上注册一个query语句,在每一条tweet上过滤用户注册的查询,可以获得匹配每条tweet的那些查询。下面是一个简单的示例:

首先,建立一个索引:

curl –XPUT localhost:9200/test

接着,注册一个对test索引的precolator 查询,制定的名称为kuku

---该处在本机测试不成功,还没找到原因---

curl –XPUT localhost:9200/_precolator/test/kuku –d’{

“query”:{

“term”:{

“field1”:”value1”

}

}

}’

现在,可以过滤一个文本看看哪些查询跟它是匹配的

crul –XGETlocalhost:9200/test/type/_precolate –d’{

“doc”:{

“filed1”:”value1”

}

}’

得到的返回结构如下

{“ok”: true, “matches”: [“kuku”]}

--end--

  • ElasticSearch的分布式特点

ElasticSearch不同于Solr,从设计之初就是面向分布式的应用环境,因此具备很多便于搭建分布式应用的特点。例如索引可以被划分为多个分片,每个分片可以有多个副本,每一个节点可以持有一个或多个分片,自动实现负载均衡和分片副本的路由。另外,ElasticSearch具有self-contained的特点,不必使用Tomcat等servlet容器。ElasticSearch的集群是自发现、自管理的(通过内置的Zen discovery模块实现),配置十分简单,只要在config/elasticsearch.yml中配置相同的cluster.name即可。

  • 支持多种数据源

ElasticSearch有一个叫做river的插件式模块,可以将外部数据源中的数据导入elasticsearch并在上面建立索引。River在集群上是单例模式的,它被自动分配到一个节点上,当这个节点挂掉后,river会被自动分配到另外的一个节点上。目前支持的数据源包括:Wikipedia, MongoDB, CouchDB, RabbitMQ, RSS, Sofa, JDBC, FileSystem,Dropbox等。River有一些指定的规范,依照这些规范可以开发适合于自己的应用数据的插件。

2 elasticsearch如何建立的数据源连接?

ElasticSearch通过river建立与各个数据源之间的连接。例如mongodb,这种连接方式多半是以第三方插件的方式,由一些开源贡献者贡献出来的插件建立与各种类型的数据管理系统以及MQ等建立river,索引数据的。本文主要研究的是MONGODB与ES的结合,用的是richardwilly98开发的river。

https://github.com/richardwilly98/elasticsearch-river-mongodb

3 mongodb 集群环境搭建

详见:http://blog.csdn.net/huwei2003/article/details/40453159

4 elasticsearch 如何对真正分布式mongodb集群建立river,并且索引数据

1.  首先下载并且解压Elasticsearch

  1. unzip elasticsearch-0.90.5.zip

2 下载并且解压elasticsearch-servicewrapper-master.zip

  1. unzip elasticsearch-servicewrapper-master.zip
  1. cd elasticsearch-servicewrapper-master
  2. mv service /root/gy/elasticsearch-0.90.5/bin

3 启动elasticsearch

  1. sh elasticsearch start

4 下载river插件

  1. ./plugin --install com.github.richardwilly98.elasticsearch/elasticsearch-river-mongodb/1.7.1

这里值得一提的是river的版本必须与mongodb 和ElasticSearch匹配,如果不匹配,那么river的时候不能将mongodb里面所有的数据index进入es。

匹配规则请见下方:

本次测试用的是 es 1.1.2 + mongodb 2.4.6

https://github.com/richardwilly98/elasticsearch-river-mongodb

5 建立river

  1. curl -XPUT "http://localhost:9200/_river/mongodb/_meta" -d‘
  2. {
  3. "type":"mongodb",
  4. "mongodb":{
  5. "servers":[{"host":“192.168.225.131","port":37017}],
  6. "db":"dbname",
  7. "collection":"collectionname",
  8. "gridfs":false,
  9. "options":{
  10. "include_fields":["_id","VERSION","ACCESSION","file"]
  11. }
  12. },
  13. "index":{
  14. "name":"indexname",
  15. "type":"meta"
  16. }
  17. }‘

注: index 里 name 为索引名 要小写,type 里的meta 为 collection name

由于本次测试使用的是mongodb sharding 集群环境,所以在river连接时,使用mongos 路由,就能够正常的把mongo集群中的所有数据都建立索引。

gridfs,options 可不设置

#curl 方式建立river (并建立resume索引)
curl -XPUT "localhost:9200/_river/tbJobResume/_meta" -d ‘
{
"type": "mongodb",
"mongodb": {
    "host": "192.168.225.131",
    "port": "37017",
    "db": "MongoModelJobResume",
    "collection": "tbJobResume"
},
"index": {
    "name": "resume",
    "type": "tbJobResume"} }‘

说明:_river/tbJobResume  tbJobResume 我用的是表名,创建每个索引的时候最好不同 -d 后面的 ‘内容‘两个单引号不要丢了
type 后面是 mongodb 因为用的是 mongodb 数据库

mongodb: 分别是 ip,port,db(name),collection 就不用解释了

index: name 要建立的索引名,最好是小写(应该是必须)

index:type collection名,即该索引对应的数据集合名

验证:
curl "http://localhost:9200/_river/tbJobResume/_meta"
这样就建好了resume索引,mongodb如果有数据也会同步过来

特别注意:如果tbJobResume表中有字段是地理坐标,需要map成geo_point类型,在创建索引前设置mapping,如下:

curl -XPUT ‘http://localhost:9200/resume‘ -d ‘
{
    "mappings": {
    "tbJobResume": {
      "properties": {
        "Location": {
          "type": "geo_point"
        }
      }
    }
  }
}‘

设置完后在创建索引

---下面是建的另外一个索引---
curl -XPUT "localhost:9200/_river/tbJobPosition/_meta" -d ‘
{
"type": "mongodb",
"mongodb": {
    "host": "192.168.225.131",
    "port": "37017",
    "db": "MongoModelJob",
    "collection": "tbJobPosition"
},
"index": {
    "name": "position",
    "type": "tbJobPosition"} }‘

curl "http://localhost:9200/_river/tbJobPosition/_meta"

---------------
#curl put索引数据

curl -XPUT "http://localhost:9200/customer/tbCustomer/1" -d‘
{
   "_id": 1,
   "Name": "Francis Ford Coppola 1",
    "Sex":1
}‘
该方法会创建customer索引并put进一条数据,tbCustomer是type

curl -XPUT ‘http://192.168.225.131:9200/dept/employee/32‘ -d ‘{ "empname": "emp32"}‘
curl -XPUT ‘http://192.168.225.131:9200/dept/employee/31‘ -d ‘{ "empname": "emp31"}‘

该方法也会创建dept索引并put进一条数据,employee是type

创建river并索引的变准模版如下:

$ curl -XPUT "localhost:9200/_river/${es.river.name}/_meta" -d ‘
{
  "type": "mongodb",
  "mongodb": { 
    "servers":
    [
      { "host": ${mongo.instance1.host}, "port": ${mongo.instance1.port} },
      { "host": ${mongo.instance2.host}, "port": ${mongo.instance2.port} }
    ],
    "options": { 
      "secondary_read_preference" : true, 
      "drop_collection": ${mongo.drop.collection}, 
      "exclude_fields": ${mongo.exclude.fields},
      "include_fields": ${mongo.include.fields},
      "include_collection": ${mongo.include.collection},
      "import_all_collections": ${mongo.import.all.collections},
      "initial_timestamp": {
        "script_type": ${mongo.initial.timestamp.script.type},
        "script": ${mongo.initial.timestamp.script}
      },
      "skip_initial_import" : ${mongo.skip.initial.import},
      "store_statistics" : ${mongo.store.statistics},
    },
    "credentials":
    [
      { "db": "local", "user": ${mongo.local.user}, "password": ${mongo.local.password} },
      { "db": "admin", "user": ${mongo.db.user}, "password": ${mongo.db.password} }
    ],
    "db": ${mongo.db.name}, 
    "collection": ${mongo.collection.name}, 
    "gridfs": ${mongo.is.gridfs.collection},
    "filter": ${mongo.filter}
  }, 
  "index": { 
    "name": ${es.index.name}, 
    "throttle_size": ${es.throttle.size},
    "bulk_size": ${es.bulk.size},
    "type": ${es.type.name}
    "bulk": {
      "actions": ${es.bulk.actions},
      "size": ${es.bulk.size},
      "concurrent_requests": ${es.bulk.concurrent.requests},
      "flush_interval": ${es.bulk.flush.interval}
    }
  }
}‘

--template end--

--url--

本插件git地址:https://github.com/laigood/elasticsearch-river-mongodb

6 测试例子

连接mongo集群,meta collection数据量有22394792条数据

查看ES数据量

最后我在master1 master2 master3上都建立了ElasticSearch,并且3台es rebalance成功,并且数据的总数任然为22394792.

时间: 2024-07-29 08:52:14

elasticsearch与mongodb分布式集群环境下数据同步的相关文章

Linux安装ElasticSearch与MongoDB分布式集群环境下数据同步

ElasticSearch有一个叫做river的插件式模块,可以将外部数据源中的数据导入elasticsearch并在上面建立索引.River在集群上是单例模式的,它被自动分配到一个节点上,当这个节点挂掉后,river会被自动分配到另外的一个节点上.目前支持的数据源包括:Wikipedia, MongoDB, CouchDB, RabbitMQ, RSS, Sofa, JDBC, FileSystem,Dropbox等.River有一些指定的规范,依照这些规范可以开发适合于自己的应用数据的插件.

基于HBase Hadoop 分布式集群环境下的MapReduce程序开发

HBase分布式集群环境搭建成功后,连续4.5天实验客户端Map/Reduce程序开发,这方面的代码网上多得是,写个测试代码非常容易,可是真正运行起来可说是历经挫折.下面就是我最终调通并让程序在集群上运行起来的一些经验教训. 一.首先说一下我的环境: 1,集群的环境配置请见这篇博文. 2,开发客户机环境:操作系统是CentOS6.5,JDK版本是1.7.0-60,开发工具是Eclipse(原始安装是从google的ADT网站下载的ADT专用开发环境,后来加装了Java企业开发的工具,启动Flas

分布式集群环境下,如何实现session共享二(项目开发)

在上一篇分布式集群环境下,如何实现session共享一(应用场景)中,介绍了在分布式集群下,需要实现session共享的应用场景.并且最后留下了一个问题:在集群环境下,如何实现session的共享呢?.要解决这个问题,放在一篇中内容量有点大,还是一步一步来吧.本篇先搭建一个基础的web应用,完全基于原生态的servlet实现.思路是这样的: 1.准备一个页面index.jsp,页面中可以提交key/value对的请求参数数据数据 2.编写一个servlet,接收页面提交的请求,获取请求参数,并且

分布式集群环境下,如何实现session共享三(环境搭建)

这是分布式集群环境下,如何实现session共享系列的第三篇.在上一篇:分布式集群环境下,如何实现session共享二(项目开发)中,准备好了一个通过原生态的servlet操作session的案例.本篇需要搭建相关的环境,包括:tomcat.nginx.redis. 1.通过两个tomcat搭建集群:tomcat_1.tomcat_2 2.通过nginx实现负载均衡 3.通过redis存储session 1.安装tomcat 1.1.tomcat_1 上传tomcat_1到服务器192.168.

集群环境下定时任务调度问题与方案探讨

摘要 问题:从单机扩展到集群 方案一:不做改造,直接扩展 方案二:多处调度.一处执行 方案三:一处调度.一处执行 方案四:一处调度.多处执行 方案五:多处调度.多处执行 摘要 从改造工作量.可用性.负载均衡.资源利用等方面,简单介绍了几种集群环境下定时任务调度的方案. 问题:从单机扩展到集群 单机环境的定时任务很简单.无论是用比较原始的Timer,还是用自成体系的quartz.spring-scheduler,都可以轻松写意的实现功能. 但是,当应用水平扩展到集群环境下时, 定时任务会出现重复调

Apache shiro集群实现 (六)分布式集群系统下的高可用session解决方案---Session共享

Apache shiro集群实现 (一) shiro入门介绍 Apache shiro集群实现 (二) shiro 的INI配置 Apache shiro集群实现 (三)shiro身份认证(Shiro Authentication) Apache shiro集群实现 (四)shiro授权(Authentication)--访问控制 Apache shiro集群实现 (五)分布式集群系统下的高可用session解决方案 Apache shiro集群实现 (六)分布式集群系统下的高可用session

分布式集群系统下的高可用session解决方案

目前,为了使web能适应大规模的访问,需要实现应用的集群部署. 而实现集群部署首先要解决session的统一,即需要实现session的共享机制. 目前,在集群系统下实现session统一的有如下几种方案: (1) 应用服务器间的session复制共享(如tomcat session共享) (2) 基于cache DB缓存的session共享 应用服务器间的session复制共享 session复制共享,主要是指集群环境下,多台应用服务器之间同步session,使session保持一致,对外透明

扫码登录实现及服务器集群环境下精准推送解决方案

又是新的一年,其实我心有那么一丝期待,因为每一年都会有新的进步,比如技术方面呀,与同事的沟通方面呀等等,这篇文章就是总结一下上一年我做的扫码登录功能的一些经验.        估计大家肯定在电脑上登录过微信,中间有一步在手机上点确定登录的操作,点完之后电脑上的微信就自动登录了,看上去挺屌的,但知道具体怎么实现后你就会觉得也就那样.另外,文章标题上什么"推送解决方案",方案肯定谈不上,不过这词可以用在简历上,用来唬下面试官还是可以的,因为面试官一看你简历上写着"在公司提出了什么

Ubuntu14(64位) 集群环境下安装Hadoop2.4

经过前边的积累,今天终于实现了集群环境下部署Hadoop,并成功运行了官方的例子. 工作如下: 两台机器: NameNode:上网小本,3G内存,机器名:YP-X100e,IP:192.168.101.130. DataNode:虚拟机,Win7下载VMWare10中虚拟Ubuntu14,虚拟机器名:ph-v370,IP:192.168.101.110 确保可互相ping通,按照机器名和IP配置各自机器的/etc/hosts文件和/etc/hostname文件,我的hosts配置内容如下 127