ES transport client批量导入

从bulk.txt文件中按行读取,然后bulk导入。首先通过调用client.prepareBulk()实例化一个BulkRequestBuilder对象,调用BulkRequestBuilder对象的add方法添加数据。实现代码:

import java.io.BufferedReader;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.IOException;
import java.net.InetAddress;
import java.net.UnknownHostException;

import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.InetSocketTransportAddress;

public class ElasticSearchBulkIn {

    public static void main(String[] args) {

        try {

            Settings settings = Settings.settingsBuilder()
                    .put("cluster.name", "bropen").build();// cluster.name在elasticsearch.yml中配置

            Client client = TransportClient.builder().settings(settings).build()
                    .addTransportAddress(new InetSocketTransportAddress(
                            InetAddress.getByName("127.0.0.1"), 9300));

            File article = new File("files/bulk.txt");
            FileReader fr=new FileReader(article);
            BufferedReader bfr=new BufferedReader(fr);
            String line=null;
            BulkRequestBuilder bulkRequest=client.prepareBulk();
            int count=0;
            while((line=bfr.readLine())!=null){
                bulkRequest.add(client.prepareIndex("test","article").setSource(line));
                if (count%10==0) {
                    bulkRequest.execute().actionGet();
                }
                count++;
                //System.out.println(line);
            }
            bulkRequest.execute().actionGet();

            bfr.close();
            fr.close();
        } catch (UnknownHostException e) {
            e.printStackTrace();
        } catch (FileNotFoundException e) {
            e.printStackTrace();
        } catch (IOException e) {
            e.printStackTrace();
        }

    }

}

setSource里其实就是json的字符串!!!!见:http://www.cnblogs.com/bonelee/p/6956138.html

Settings settings=ImmutableSettings.settingsBuilder()
        .put("client.transport.sniff",true).put("cluster.name","myelasticsearch").build();
//设置客户端连接transport
        Client client=new TransportClient(settings).addTransportAddress(
                new InetSocketTransportAddress("192.168.1.100",9300));
//建立批量提交类
BulkRequestBuilder bulkRequest=client.prepareBulk();
                while(rs.next()){
//建立批量json对象
                    bulkRequest.add(client.prepareIndex("ryxx","tweet",rs.getString("id")).setSource(jsonBuilder().startObject()
                                                        .field("name",rs.getString("name"))
                            .field("age",rs.getString("age"))
                            .field("address",rs.getString("address"))
                            .field("phone",rs.getString("phone"))
                            .endObject()
                    ));
                }
//批量提交到服务器
                BulkResponse bulkResponse=bulkRequest.execute().actionGet();
//提交过程是否产生错误
                if(bulkResponse.hasFailures()){
                    System.out.println(bulkResponse.buildFailureMessage());  

                }  
 
时间: 2024-10-25 19:02:07

ES transport client批量导入的相关文章

ES transport client使用

ES transport client bulk insert 传输(transport)客户端 TransportClient利用transport模块远程连接一个elasticsearch集群.它并不加入到集群中,只是简单的获得一个或者多个初始化的transport地址,并以轮询的方式与这些地址进行通信. // on startup Client client = new TransportClient() .addTransportAddress(new InetSocketTranspo

Elasticsearch5.0 Java Api(六) -- 批量导入索引

将计算机本地的.json格式文件中的数据,批量导入到索引库中 1 package com.juyun.test; 2 3 import java.io.BufferedReader; 4 import java.io.File; 5 import java.io.FileNotFoundException; 6 import java.io.FileReader; 7 import java.io.IOException; 8 import java.net.InetAddress; 9 imp

HBase结合MapReduce批量导入(HDFS中的数据导入到HBase)

HBase结合MapReduce批量导入 1 package hbase; 2 3 import java.text.SimpleDateFormat; 4 import java.util.Date; 5 6 import org.apache.hadoop.conf.Configuration; 7 import org.apache.hadoop.hbase.client.Put; 8 import org.apache.hadoop.hbase.mapreduce.TableOutput

Hadoop之——HBASE结合MapReduce批量导入数据

转载请注明出处:http://blog.csdn.net/l1028386804/article/details/46463889 废话不多说.直接上代码,你懂得 package hbase; import java.text.SimpleDateFormat; import java.util.Date; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.client.Put; import

hbase 结合MapReduce 批量导入

hbase结合Mapreduce的批量导入: 直接给出代码讲述:(具体操作结合代码中的注释) package hbase; import java.io.IOException; import java.net.URISyntaxException; import java.text.SimpleDateFormat; import java.util.Date; import org.apache.hadoop.conf.Configuration; import org.apache.had

数据批量导入HBase

测试数据: datas 1001 lilei 17 13800001111 1002 lily 16 13800001112 1003 lucy 16 13800001113 1004 meimei 16 13800001114 数据批量导入使用mr,先生成HFile文件然后在用completebulkload工具导入. 1.需要先在hbase 创建表名: hbase> create 'student', {NAME => 'info'} maven pom.xml配置文件如下: <de

使用python向Redis批量导入数据

1.使用pipeline进行批量导入数据.包含先使用rpush插入数据,然后使用expire改动过期时间 class Redis_Handler(Handler): def connect(self): #print self.host,self.port,self.table self.conn = Connection(self.host,self.port,self.table) def execute(self, action_name): filename = "/tmp/temp.t

HBase结合MapReduce批量导入

2016年5月14日13:17:05 作者:数据分析玩家 Hbase是Hadoop生态体系配置的数据库,我们可以通过HTable api中的put方法向Hbase数据库中插入数据,但是由于put效率太低,不能批量插入大量的数据,文本将详细介绍如何通过MapReduce运算框架向Hbase数据库中导入数据. 开篇先介绍业务场景:将电信手机上网日志中的数据导入到Hbase数据库中,将部分数据以及相应字段描述列出: 图片格式描述: 先介绍一个日期格式的转换: public class TestDate

Hbase调用JavaAPI实现批量导入操作

将手机上网日志文件批量导入到Hbase中,操作步骤: 1.将日志文件(请下载附件)上传到HDFS中,利用hadoop的操作命令上传:hadoop  fs -put input  / 2.创建Hbase表,通过Java操作 Java代码   package com.jiewen.hbase; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.