Elasticsearch Bulk API批量索引

这篇博客介绍一下Elasticsearch对多个文档进行索引的简便方法。Bulk api的支持可以实现一次请求执行批量的添加、删除、更新等操作.Bulk操作使用的是UDP协议,UDP无法确保与ElasticSearch服务器通信时不丢失数据.

一、Bulk API

使用bulk命令时,REST API以_bulk结尾,批量操作写在json文件中,官网给出的语法格式:

action_and_meta_data\n
optional_source\n
action_and_meta_data\n
optional_source\n
....
action_and_meta_data\n
optional_source\n

也就是说每一个操作都有2行数据组成,末尾要回车换行。第一行用来说明操作命令和原数据、第二行是自定义的选项.举个例子,同时执行插入2条数据、删除一条数据, 新建bulkdata.json,写入如下内容:

{ "create" : { "_index" : "blog", "_type" : "article", "_id" : "3" }}
{ "title":"title1","posttime":"2016-07-02","content":"内容一" }

{ "create" : { "_index" : "blog", "_type" : "article", "_id" : "4" }}
{ "title":"title2","posttime":"2016-07-03","content":"内容2" }

{ "delete":{"_index" : "blog", "_type" : "article", "_id" : "1" }}

执行:

$ curl -XPOST "http://localhost:9200/_bulk?pretty" --data-binary @bulkAdd.json
{
  "took" : 11,
  "errors" : false,
  "items" : [ {
    "create" : {
      "_index" : "blog",
      "_type" : "article",
      "_id" : "13",
      "_version" : 1,
      "_shards" : {
        "total" : 1,
        "successful" : 1,
        "failed" : 0
      },
      "status" : 201
    }
  } ]
}

返回值:

注意:行末要回车换行,不然会因为命令不能识别而出现错误.

$ curl -XPOST "http://localhost:9200/_bulk?pretty" --data-binary @bulkAdd.json
{
  "error" : {
    "root_cause" : [ {
      "type" : "action_request_validation_exception",
      "reason" : "Validation Failed: 1: no requests added;"
    } ],
    "type" : "action_request_validation_exception",
    "reason" : "Validation Failed: 1: no requests added;"
  },
  "status" : 400
}

二、批量导出

下面的例子是把索引库中的文档以json格式批量导出到文件中,其中集群名称为”bropen”,索引库名为”blog”,type为”article”,项目根目录下新建files/bulk.txt,索引内容写入bulk.txt中:

import java.io.BufferedWriter;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.net.InetAddress;
import java.net.UnknownHostException;

import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.SearchHits;

public class ElasticSearchBulkOut {

    public static void main(String[] args) {

        try {
            Settings settings = Settings.settingsBuilder()
                    .put("cluster.name", "bropen").build();// cluster.name在elasticsearch.yml

            Client client = TransportClient.builder().settings(settings).build()
                    .addTransportAddress(new InetSocketTransportAddress(
                            InetAddress.getByName("127.0.0.1"), 9300));

            QueryBuilder qb = QueryBuilders.matchAllQuery();
            SearchResponse response = client.prepareSearch("blog")
                    .setTypes("article").setQuery(QueryBuilders.matchAllQuery())
                    .execute().actionGet();
            SearchHits resultHits = response.getHits();

            File article = new File("files/bulk.txt");
            FileWriter fw = new FileWriter(article);
            BufferedWriter bfw = new BufferedWriter(fw);

            if (resultHits.getHits().length == 0) {
                System.out.println("查到0条数据!");

            } else {
                for (int i = 0; i < resultHits.getHits().length; i++) {
                    String jsonStr = resultHits.getHits()[i]
                            .getSourceAsString();
                    System.out.println(jsonStr);
                    bfw.write(jsonStr);
                    bfw.write("\n");
                }
            }
            bfw.close();
            fw.close();

        } catch (UnknownHostException e) {
            e.printStackTrace();
        } catch (IOException e) {
            e.printStackTrace();
        }

    }

}

三、批量导入

从刚才导出的bulk.txt文件中按行读取,然后bulk导入。首先通过调用client.prepareBulk()实例化一个BulkRequestBuilder对象,调用BulkRequestBuilder对象的add方法添加数据。实现代码:

import java.io.BufferedReader;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.IOException;
import java.net.InetAddress;
import java.net.UnknownHostException;

import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.InetSocketTransportAddress;

public class ElasticSearchBulkIn {

    public static void main(String[] args) {

        try {

            Settings settings = Settings.settingsBuilder()
                    .put("cluster.name", "bropen").build();// cluster.name在elasticsearch.yml中配置

            Client client = TransportClient.builder().settings(settings).build()
                    .addTransportAddress(new InetSocketTransportAddress(
                            InetAddress.getByName("127.0.0.1"), 9300));

            File article = new File("files/bulk.txt");
            FileReader fr=new FileReader(article);
            BufferedReader bfr=new BufferedReader(fr);
            String line=null;
            BulkRequestBuilder bulkRequest=client.prepareBulk();
            int count=0;
            while((line=bfr.readLine())!=null){
                bulkRequest.add(client.prepareIndex("test","article").setSource(line));
                if (count%10==0) {
                    bulkRequest.execute().actionGet();
                }
                count++;
                //System.out.println(line);
            }
            bulkRequest.execute().actionGet();

            bfr.close();
            fr.close();
        } catch (UnknownHostException e) {
            e.printStackTrace();
        } catch (FileNotFoundException e) {
            e.printStackTrace();
        } catch (IOException e) {
            e.printStackTrace();
        }

    }

}

参考文档:

  1. Elasticsearch Reference [2.3] ? Document APIs ? Bulk API
时间: 2024-11-05 19:36:24

Elasticsearch Bulk API批量索引的相关文章

[ElasticSearch]Java API 之 索引管理

ElasticSearch为了便于处理索引管理(Indices administration)请求,提供了 org.elasticsearch.client.IndicesAdminClient接口.通过如下代码从 Client 对象中获得这个接口的实现: IndicesAdminClient indicesAdminClient = client.admin().indices(); IndicesAdminClient定义了好几种prepareXXX()方法作为创建请求的入口点. 1. 索引

ElasticSearch Java Api -检索索引库

上篇博客记录了如何用java调用api把数据写入索引,这次记录下如何搜索. 一.准备数据 String data1 = JsonUtil.model2Json(new Blog(1, "git简介", "2016-06-19", "SVN与Git最主要的区别...")); String data2 = JsonUtil.model2Json(new Blog(2, "Java中泛型的介绍与简单使用", "2016-0

Elasticsearch Java API 很全的整理

Elasticsearch 的API 分为 REST Client API(http请求形式)以及 transportClient API两种.相比来说transportClient API效率更高,transportClient 是通过Elasticsearch内部RPC的形式进行请求的,连接可以是一个长连接,相当于是把客户端的请求当成 Elasticsearch 集群的一个节点.但是从Elasticsearch 7 后就会移除transportClient .主要原因是transportCl

elasticsearch 中文API bulk(六)

bulk API bulk API允许开发者在一个请求中索引和删除多个文档.下面是使用实例. import static org.elasticsearch.common.xcontent.XContentFactory.*; BulkRequestBuilder bulkRequest = client.prepareBulk(); // either use client#prepare, or use Requests# to directly build index/delete req

ElasticSearch Java Api -创建索引

ElasticSearch JAVA API官网文档:https://www.elastic.co/guide/en/elasticsearch/client/java-api/current/java-docs-index.html 一.生成JSON 创建索引的第一步是要把对象转换为JSON字符串.官网给出了四种创建JSON文档的方法: 1.1手写方式生成 String json = "{" + "\"user\":\"kimchy\"

es批量索引

使用Python操作Elasticsearch数据索引的教程 这篇文章主要介绍了使用Python操作Elasticsearch数据索引的教程,Elasticsearch处理数据索引非常高效,要的朋友可以参考下 Elasticsearch是一个分布式.Restful的搜索及分析服务器,Apache Solr一样,它也是基于Lucence的索引服务器,但我认为Elasticsearch对比Solr的优点在于: 轻量级:安装启动方便,下载文件之后一条命令就可以启动: Schema free:可以向服务

第08章 ElasticSearch Java API

本章内容 使用客户端对象(client object)连接到本地或远程ElasticSearch集群. 逐条或批量索引文档. 更新文档内容. 使用各种ElasticSearch支持的查询方式. 处理ElasticSearch返回的错误信息. 通过发送各种管理指令来收集集群状态信息或执行管理任务. 8.3 连接到集群 8.3.1 成为ElasticSearch节点 第一种连接到ElasticSearch节点的方式是把应用程序当成ElasticSearch集群中的一个节点. Node node=no

Elasticsearch批量操作API用法介绍

Elasticsearch的Bulk API允许批量提交index和delete请求,有如下两种用法: 用法1 BulkRequestBuilder requestBuilder = client.prepareBulk(); for(Person person : personList){ String obj = getIndexDataFromHotspotData(person); if(obj != null){ requestBuilder.add(client.prepareInd

elasticsearch系列三:索引详解(分词器、文档管理、路由详解)

一.分词器 1. 认识分词器  1.1 Analyzer   分析器 在ES中一个Analyzer 由下面三种组件组合而成: character filter :字符过滤器,对文本进行字符过滤处理,如处理文本中的html标签字符.处理完后再交给tokenizer进行分词.一个analyzer中可包含0个或多个字符过滤器,多个按配置顺序依次进行处理. tokenizer:分词器,对文本进行分词.一个analyzer必需且只可包含一个tokenizer. token filter:词项过滤器,对to