从bulk.txt文件中按行读取,然后bulk导入。首先通过调用client.prepareBulk()
实例化一个BulkRequestBuilder对象,调用BulkRequestBuilder对象的add方法添加数据。实现代码:
import java.io.BufferedReader;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.IOException;
import java.net.InetAddress;
import java.net.UnknownHostException;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
public class ElasticSearchBulkIn {
public static void main(String[] args) {
try {
Settings settings = Settings.settingsBuilder()
.put("cluster.name", "bropen").build();// cluster.name在elasticsearch.yml中配置
Client client = TransportClient.builder().settings(settings).build()
.addTransportAddress(new InetSocketTransportAddress(
InetAddress.getByName("127.0.0.1"), 9300));
File article = new File("files/bulk.txt");
FileReader fr=new FileReader(article);
BufferedReader bfr=new BufferedReader(fr);
String line=null;
BulkRequestBuilder bulkRequest=client.prepareBulk();
int count=0;
while((line=bfr.readLine())!=null){
bulkRequest.add(client.prepareIndex("test","article").setSource(line));
if (count%10==0) {
bulkRequest.execute().actionGet();
}
count++;
//System.out.println(line);
}
bulkRequest.execute().actionGet();
bfr.close();
fr.close();
} catch (UnknownHostException e) {
e.printStackTrace();
} catch (FileNotFoundException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
}
}
setSource里其实就是json的字符串!!!!见:http://www.cnblogs.com/bonelee/p/6956138.html
Settings settings=ImmutableSettings.settingsBuilder() .put("client.transport.sniff",true).put("cluster.name","myelasticsearch").build(); //设置客户端连接transport Client client=new TransportClient(settings).addTransportAddress( new InetSocketTransportAddress("192.168.1.100",9300)); //建立批量提交类 BulkRequestBuilder bulkRequest=client.prepareBulk(); while(rs.next()){ //建立批量json对象 bulkRequest.add(client.prepareIndex("ryxx","tweet",rs.getString("id")).setSource(jsonBuilder().startObject() .field("name",rs.getString("name")) .field("age",rs.getString("age")) .field("address",rs.getString("address")) .field("phone",rs.getString("phone")) .endObject() )); } //批量提交到服务器 BulkResponse bulkResponse=bulkRequest.execute().actionGet(); //提交过程是否产生错误 if(bulkResponse.hasFailures()){ System.out.println(bulkResponse.buildFailureMessage()); }
时间: 2024-10-25 19:02:07