1. elasticsearch安装
官方下载地址:https://www.elastic.co/downloads/elasticsearch
解压文件 elasticsearch-2.4.0.zip
修改配置文件
elasticsearch-2.4.0 cat config/elasticsearch.yml |grep -v "#" cluster.name: rainbow network.host: 127.0.0.1 http.port: 9200
配置说明
cluster.name表示es集群的名称,可以自定义一个自己需要的集群名称
http.port 表示对外提供http服务时的http端口。
network.host 表示本地监听绑定的ip地址,此处为测试环境,直接使用本机的ip地址 127.0.0.1.
启动说明
elasticsearch-2.4.0 nohup ./bin/elasticsearch &
启动后显示信息
es启动监听两个端口,9300和9200
9300端口是使用tcp客户端连接使用的端口;
9200端口是通过http协议连接es使用的端口;
2. 使用http方式增加和查询数据
增加数据(http PUT):
? curl -XPUT localhost:9200/user_idx/type_tags/12 -d\ ‘{"name" : "Mr.YF", "tags" : ["Go","Java","Lua","C++","Tcl","..."]}‘ {"_index":"user_idx","_type":"type_tags","_id":"12","_version":1,"_shards":{"total":2,"successful":1,"failed":0},"created":true}
查询数据(http GET)
3. 使用elasticsearch客户端编写java代码访问es
编写es客户端提供者类,对es连接做简单的封装
SearchClientProvider.java
package org.wyf.elasticsearch.search; /** * Created by wyf on 16/9/25. */ import org.elasticsearch.client.transport.TransportClient; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.InetSocketTransportAddress; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.wyf.common.property.XmlProperties; import javax.annotation.PostConstruct; import javax.annotation.PreDestroy; import java.net.InetAddress; import java.net.UnknownHostException; import java.util.HashMap; import java.util.Iterator; import java.util.Map; import static java.lang.System.out; public class SearchClientProvider { private static final Logger LOGGER = LoggerFactory.getLogger(SearchClientProvider.class); private TransportClient client = null; private volatile boolean inited = false; public TransportClient get() { return this.client; } @PreDestroy public synchronized void close() { if (this.client != null) { this.client.close(); } } @PostConstruct public synchronized void init() { if (!inited) { try { Map<String, String> config = XmlProperties.loadFromXml("properties/elasticsearch.xml"); if (config == null) { out.println("load xml err"); return; } Iterator<Map.Entry<String, String>> iterator = config.entrySet().iterator(); Map<String, String> settingConfig = new HashMap<String, String>(); while (iterator.hasNext()) { Map.Entry<String, String> next = iterator.next(); if (!next.getKey().equals("transport.addresses")) { settingConfig.put(next.getKey(), next.getValue()); } } Settings settings = Settings.builder().put(settingConfig).build(); TransportClient client = TransportClient.builder().settings(settings).build(); this.client = client; String[] addresses = config.get("transport.addresses").split(","); for (String address : addresses) { String[] hostAndPort = address.split(":"); client.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName(hostAndPort[0]), Integer.valueOf(hostAndPort[1]))); } this.inited = true; } catch (UnknownHostException e) { LOGGER.error(String.format("init search client err:=>msg:[%s]", e.getMessage()), e); if (client != null) { this.client.close(); } } } } }
编写elasticsearch的配置文件加载类
XmlProperties.java
package org.wyf.common.property; import org.slf4j.LoggerFactory; import org.slf4j.Logger; import java.io.File; import java.io.FileInputStream; import java.io.IOException; import java.io.InputStream; import java.util.*; public class XmlProperties { private static final Logger LOGGER = LoggerFactory.getLogger(XmlProperties.class); private XmlProperties() { } public static Map<String, String> loadFromXml(String xmlPropertiesPath) { try { Object in = XmlProperties.class.getClassLoader().getResourceAsStream(xmlPropertiesPath); if(in != null) { LOGGER.info("Found the xml properties [{}] in class path,use it", xmlPropertiesPath); Map e1 = loadFromXml((InputStream)in); return e1; } Map<String, String> resMap = null; File e = new File(xmlPropertiesPath); if(!e.isFile()) { return resMap; } LOGGER.info("Found the xml properties [{}] in file path,use it", xmlPropertiesPath); in = new FileInputStream(new File(xmlPropertiesPath)); resMap = loadFromXml((InputStream)in); ((InputStream) in).close(); return resMap; } catch (Exception var7) { LOGGER.error("Load xml properties [" + xmlPropertiesPath + "] error.", var7); } return null; } public static Map<String, String> loadFromXml(InputStream in) throws IOException { Properties properties = new Properties(); properties.loadFromXML(in); HashMap map = new HashMap(); Set entries = properties.entrySet(); Iterator iter = entries.iterator(); while(iter.hasNext()) { Map.Entry entry = (Map.Entry)iter.next(); map.put((String)entry.getKey(), (String)entry.getValue()); } return map; } }
编写elasticsearch简单的操作类,包含成员函数 save update get 和delete
SearchImpl.java
package org.wyf.elasticsearch; /** * Created by wyf on 16/9/25. */ import org.elasticsearch.action.delete.DeleteRequestBuilder; import org.elasticsearch.action.delete.DeleteResponse; import org.elasticsearch.action.get.GetRequestBuilder; import org.elasticsearch.action.get.GetResponse; import org.elasticsearch.action.index.IndexRequestBuilder; import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.client.transport.TransportClient; import org.elasticsearch.index.engine.VersionConflictEngineException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.wyf.elasticsearch.search.SearchClientProvider; import javax.annotation.PostConstruct; import java.util.Map; public class SearchImpl implements ISearch { private static final Logger LOGGER = LoggerFactory.getLogger(SearchImpl.class); private SearchClientProvider searchClientProvider; public SearchImpl() { searchClientProvider = new SearchClientProvider(); searchClientProvider.init(); } public Map<String, Object> save(String index, String type, String id, Map<String, Object> data) { TransportClient client = searchClientProvider.get(); IndexRequestBuilder builder = client.prepareIndex(index, type, id); IndexResponse response = builder .setSource(data) .execute() .actionGet(); LOGGER.info("save index:=>index:{}, type:{}, id:{}, data:{}, rsp:{}", index, type, id, data, response); return data; } public int update(String index, String type, String id, Map<String, Object> data) { int i = 2; do { try { if (_innerUpdate(index, type, id, data)) return 1; } catch (VersionConflictEngineException e) { LOGGER.warn(String.format("update index:=>index:%s, type:%s, id:%s, data:%s, rsp:%s", index, type, id, data, e.getMessage()), e); } } while ((i--) > 0); return _innerUpdate(index, type, id, data) ? 1 : 0; } public int delete(String index, String type, String id) { TransportClient client = searchClientProvider.get(); DeleteRequestBuilder builder = client.prepareDelete(index, type, id); DeleteResponse response = builder.execute().actionGet(); LOGGER.info("delete index:=>index:{}, type:{}, id:{}, rsp:{}", index, type, id, response); return response.isFound() ? 1 : 0; } public Map<String, Object> get(String index, String type, String id) { TransportClient client = searchClientProvider.get(); GetRequestBuilder builder = client.prepareGet(index, type, id); GetResponse response = builder.execute().actionGet(); return response.isExists() ? response.getSource() : null; } private boolean _innerUpdate(String index, String type, String id, Map<String, Object> data) { TransportClient client = searchClientProvider.get(); GetRequestBuilder getRequestBuilder = client.prepareGet(index, type, id); GetResponse getResponse = getRequestBuilder.execute().actionGet(); if (getResponse.isExists()) { final long version = getResponse.getVersion(); final Map<String, Object> source = getResponse.getSource(); source.putAll(data); IndexRequestBuilder builder = client.prepareIndex(index, type, id); IndexResponse response = builder .setVersion(version) .setSource(source) .execute() .actionGet(); LOGGER.info("update index:=>index:{}, type:{}, id:{}, data:{}, rsp:{}", index, type, id, data, response); return true; } throw new RuntimeException(String.format("can not get document:=>index:%s, type:%s, id:%s ", index, type, id)); } }
编写单元测试类
ElasticSearchTest.java
package org.wyf; import org.junit.Before; import org.junit.Test; import org.wyf.elasticsearch.ISearch; import org.wyf.elasticsearch.SearchImpl; import java.util.HashMap; import java.util.Iterator; import java.util.Map; import java.util.Set; /** * Created by wyf on 16/9/25. */ public class ElasticsearchTest { ISearch search; @Test public void get() { Map<String,Object> map = search.get("a", "type1", "123"); Set<String> set = map.keySet(); for (Iterator<String> it = set.iterator(); it.hasNext();) { String key = it.next(); System.out.println(key + ":" + map.get(key)); } } @Test public void save() { Map<String, Object> values = new HashMap<String, Object>(); values.put("k1", "v1"); values.put("k2", "v2"); Map<String,Object> map = search.save("a", "type1", "123", values); Set<String> set = map.keySet(); for (Iterator<String> it = set.iterator(); it.hasNext();) { String key = it.next(); System.out.println(key + ":" + map.get(key)); } } @Before public void before() { search = new SearchImpl(); } }
elasticsearch的配置文件
<?xml version="1.0" encoding="UTF-8" standalone="no"?> <!DOCTYPE properties SYSTEM "http://java.sun.com/dtd/properties.dtd"> <properties> <comment>elasticsearch的配置</comment> <entry key="client.transport.sniff">true</entry> <entry key="cluster.name">rainbow</entry> <entry key="transport.addresses">127.0.0.1:9300</entry> </properties>
测试结果:
测试save函数结果
http接口查询结果页面
遇到的问题
最开始调试时返回如下错误
NoNodeAvailableException[None of the configured nodes are available: [{#transport#-1}{127.0.0.1}{127.0.0.1:9200}] ] at org.elasticsearch.client.transport.TransportClientNodesService.ensureNodesAreAvailable(TransportClientNodesService.java:290) at org.elasticsearch.client.transport.TransportClientNodesService.execute(TransportClientNodesService.java:207) at org.elasticsearch.client.transport.support.TransportProxyClient.execute(TransportProxyClient.java:55) at org.elasticsearch.client.transport.TransportClient.doExecute(TransportClient.java:283) at org.elasticsearch.client.support.AbstractClient.execute(AbstractClient.java:347) at org.elasticsearch.action.ActionRequestBuilder.execute(ActionRequestBuilder.java:85) at org.elasticsearch.action.ActionRequestBuilder.execute(ActionRequestBuilder.java:59) at org.wyf.elasticsearch.SearchImpl.save(SearchImpl.java:36) at org.wyf.ElasticsearchTest.save(ElasticsearchTest.java:35) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:47) at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:44) at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:271) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:70) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:50) at org.junit.runners.ParentRunner$3.run(ParentRunner.java:238) at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:63) at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:236) at org.junit.runners.ParentRunner.access$000(ParentRunner.java:53) at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:229) at org.junit.runners.ParentRunner.run(ParentRunner.java:309) at org.junit.runner.JUnitCore.run(JUnitCore.java:160) at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:117) at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:42) at com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:253) at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:84) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at com.intellij.rt.execution.application.AppMain.main(AppMain.java:147)
解决办法是检查es集群的名称和端口是否正确
由于端口配置错误,导致如上问题
正确的端口是9300,正如最开始说的那样,使用tcp方式访问es时,使用端口9300,使用http方式访问es使用端口9200.
代码git地址:
https://github.com/gityf/java_demo/tree/master/demo/src/main/java/org/wyf/elasticsearch
Done.