What is ElasticSearch ?
Elasticsearch是一个基于Apache Lucene(TM)的开源搜索引擎。无论在开源还是专有领域,Lucene可以被认为是迄今为止最先进、性能最好的、功能最全的搜索引擎库。
但是,Lucene只是一个库。想要使用它,你必须使用Java来作为开发语言并将其直接集成到你的应用中,更糟糕的是,Lucene非常复杂,你需要深入了解检索的相关知识来理解它是如何工作的。
Elasticsearch也使用Java开发并使用Lucene作为其核心来实现所有索引和搜索的功能,但是它的目的是通过简单的RESTful API来隐藏Lucene的复杂性,从而让全文搜索变得简单。
不过,Elasticsearch不仅仅是Lucene和全文搜索,我们还能这样去描述它:
- 分布式的实时文件存储,每个字段都被索引并可被搜索
- 分布式的实时分析搜索引擎
- 可以扩展到上百台服务器,处理PB级结构化或非结构化数据
而且,所有的这些功能被集成到一个服务里面,你的应用可以通过简单的RESTful API、各种语言的客户端甚至命令行与之交互。
上手Elasticsearch非常容易。它提供了许多合理的缺省值,并对初学者隐藏了复杂的搜索引擎理论。它开箱即用(安装即可使用),只需很少的学习既可在生产环境中使用。
Elasticsearch在Apache 2 license下许可使用,可以免费下载、使用和修改。
随着你对Elasticsearch的理解加深,你可以根据不同的问题领域定制Elasticsearch的高级特性,这一切都是可配置的,并且配置非常灵活。
Config elasticSearch.properties
#配置elasticsearch index
csg_bill_index_name=hsc_test_index
#配置elasticsearch type
csg_bill_type_name=billdetails
#配置elasticsearch es address
csg_bill_es_address=l-test.h.beta.cn0:9300
#配置elasticsearch cluster
csg_bill_es_cluster=elasticsearch_f
Create elasticSearch TransportSessionFactory
/**
* Created by xueping.you on 15-7-30.
*/
@Service
public class ESTransportSessionFactory {
private final static Logger LOGGER = LoggerFactory.getLogger(ESTransportSessionFactory.class);
@Value("${csg_bill_es_address}")
private String elasticAddress;
@Value("${csg_bill_es_cluster}")
private String elasticCluster;
private TransportClient transportClient;
@PostConstruct
public void init(){
checkArgument(!Strings.isEmpty(elasticCluster) ,
"ElasticSearch Cluster Name Null or EmptyString");
TransportAddress[] transportAddresses = null;
List<String> addressList =
Lists.newArrayList(
Splitter.on(",").trimResults().omitEmptyStrings().split(elasticAddress)
);
checkArgument(!CollectionUtils.isEmpty(addressList) ,
"ElasticSearch Cluster Address Can‘t be Empty");
transportAddresses = new TransportAddress[addressList.size()];
Splitter splitter = Splitter.on(":").omitEmptyStrings().trimResults();
for(int i=0; i<addressList.size() ; i++){
List<String> singleAddressPair =
Lists.newArrayList(splitter.split(addressList.get(i)));
checkArgument(singleAddressPair.size()==2 ,
"ElasticSearch Address format address:port error" + addressList.get(i)
);
transportAddresses[i] = new InetSocketTransportAddress(
singleAddressPair.get(0),
Integer.parseInt(singleAddressPair.get(1))
);
}
Settings settings = ImmutableSettings.builder()
.put("cluster.name" , elasticCluster)
.build();
transportClient = new TransportClient(settings);
transportClient.addTransportAddresses(transportAddresses);
LOGGER.info("ElasticSearch Init Done ElasticAddress={} , ElasticClusterName={}"
, elasticAddress , elasticCluster);
}
@PreDestroy
public void destroy() {
if (transportClient != null) {
transportClient.close();
}
}
public TransportClient getTransportClient() {
return transportClient;
}
}
Service Method Interface
/**
* Created by xueping.you on 15-7-30.
*/
@Service
public class ESBillDetailServiceBase {
private final static Logger LOGGER = LoggerFactory.getLogger(ESBillDetailServiceBase.class);
@Resource
private ESTransportSessionFactory esTransportSessionFactory;
@Value("${csg_bill_index_name}")
private String ES_INDEX;
@Value("${csg_bill_type_name}")
private String ES_TYPE;
private static ObjectMapper objectMapper = new ObjectMapper();
static {
objectMapper.setDateFormat(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"));
}
/**
* Method1 将订单号 , 对账单号 , 产品Id , 账单号 ,结算供应商ID,结算对象Id,create_ts进行索引
* @param esBillDetails
*/
public void insertAllOnDuplicateIdCover(List<ESBillDetail> esBillDetails){
Stopwatch stopwatch = Stopwatch.createStarted();
if(CollectionUtils.isEmpty(esBillDetails)){
return;
}
TransportClient client = esTransportSessionFactory.getTransportClient();
BulkRequestBuilder bulkRequestBuilder = client.prepareBulk();
for(ESBillDetail esBillDetail : esBillDetails){
String source;
try {
source = objectMapper.writeValueAsString(esBillDetail);
} catch (Exception e1) {
LOGGER.error("create billDetail index error source : " + esBillDetail, e1);
QMonitor.recordOne(QMonitorConstants.ES_BILL_DETAIL_INDEXING_ERROR);
continue;
}
IndexRequestBuilder indexRequestBuilder =
client.prepareIndex(ES_INDEX, ES_TYPE,
esBillDetail.getId()).setSource(source);
bulkRequestBuilder.add(indexRequestBuilder);
}
BulkResponse bulkResponse = bulkRequestBuilder.execute().actionGet();
if(bulkResponse.hasFailures()){
Iterator<BulkItemResponse> itemResponseIterator = bulkResponse.iterator();
while(itemResponseIterator.hasNext()){
BulkItemResponse itemResponse = itemResponseIterator.next();
if(itemResponse!=null && itemResponse.isFailed()){
LOGGER.info("indexing billDetail error indexMessage={},
errorMassage={}",Joiner.on(‘,‘) .join(itemResponse.getIndex(),itemResponse.getType(),itemResponse.getId()),
JsonUtils.toJSONString(itemResponse));
}
}
QMonitor.recordOne(QMonitorConstants.ES_BILL_DETAIL_INDEXING_ERROR);
}
QMonitor.recordOne(QMonitorConstants.ES_BILL_DETAIL_INDEXING_DONE , stopwatch.elapsed(TimeUnit.MILLISECONDS));
}
/**
* Method2 通过订单号 ,对账单号, 产品Id , 账单号 的任意条件进行查询,支持分页
*/
public List<ESBillDetail> query(ESBillDetailQuery esBillDetailQuery , Pagination pagination){
try {
Stopwatch stopwatch = Stopwatch.createStarted();
List<ESBillDetail> esBillDetails = Lists.newArrayList();
BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
for(Map.Entry entry : esBillDetailQuery.entrySet()){
ESBillDetailQuery.Param param = (ESBillDetailQuery.Param)entry.getKey();
param.decorateBoolQueryBuilder(entry.getValue(),boolQueryBuilder);
}
SearchRequestBuilder requestBuilder = esTransportSessionFactory.getTransportClient()
.prepareSearch(ES_INDEX)
.setTypes(ES_TYPE)
.setSearchType(SearchType.DFS_QUERY_THEN_FETCH)
.setQuery(boolQueryBuilder);
if(pagination!=null){
requestBuilder.setFrom(pagination.getStartIndex())
.setSize(pagination.getPageSize());
}
SearchResponse searchResponse = requestBuilder.execute().actionGet();
SearchHits searchHits = searchResponse.getHits();
for(SearchHit searchHit : searchHits){
Map<String , Object> map = searchHit.getSource();
ESBillDetail esBillDetail = objectMapper.readValue(
JsonUtils.toJSONString(map) ,
new TypeReference<ESBillDetail>() {});
esBillDetails.add(esBillDetail);
}
LOGGER.info("Query ESDetail done query={} , time={}" , esBillDetailQuery , stopwatch.elapsed(TimeUnit.MILLISECONDS));
QMonitor.recordOne(QMonitorConstants.ES_BILL_QUERY , stopwatch.elapsed(TimeUnit.MILLISECONDS));
return esBillDetails;
}catch (Exception e){
LOGGER.error("elastic search query Error query={} , page={}" , esBillDetailQuery , pagination , e);
QMonitor.recordOne(QMonitorConstants.ES_BILL_DETAIL_QUERY_ERROR);
throw new RuntimeException("ES 查询异常");
}
}
/**
* 此方法慎用!!!!!!!!!!!!!!!!!!!
* Method3 通过匹配条件删除ES 中的记录
*/
public boolean delete(ESBillDetailQuery esBillDetailQuery){
try {
Stopwatch stopwatch = Stopwatch.createStarted();
BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
for(Map.Entry entry : esBillDetailQuery.entrySet()){
ESBillDetailQuery.Param param = (ESBillDetailQuery.Param)entry.getKey();
param.decorateBoolQueryBuilder(entry.getValue(),boolQueryBuilder);
}
DeleteByQueryResponse deleteByQueryResponse = esTransportSessionFactory.getTransportClient()
.prepareDeleteByQuery(ES_INDEX )
.setTypes(ES_TYPE)
.setQuery(boolQueryBuilder).execute().actionGet();
LOGGER.info("Delete ESDetail done query={} , time={}" , esBillDetailQuery , stopwatch.elapsed(TimeUnit.MILLISECONDS));
QMonitor.recordOne(QMonitorConstants.ES_BILL_DELETE , stopwatch.elapsed(TimeUnit.MILLISECONDS));
return deleteByQueryResponse.status().equals(RestStatus.OK) ? true : false;
}catch (Exception e){
LOGGER.error("elastic search delete Error query={} , page={}" , esBillDetailQuery , e);
QMonitor.recordOne(QMonitorConstants.ES_BILL_DETAIL_QUERY_ERROR);
throw new RuntimeException("ES 删除记录异常");
}
}
ESBillDetailQuery
/**
* Created by xueping.you on 15-7-30.
*/
public class ESBillDetailQuery extends GenericQuery<ESBillDetailQuery.Param> {
//全为Str类型
public enum Param{
ids {
@Override
public <T, Q extends QueryBuilder> void decorateBoolQueryBuilder(T param, Q queryBuilder) {
List<String> strsParam = (List<String>)param;
BoolQueryBuilder boolQueryBuilder = (BoolQueryBuilder)queryBuilder;
if(CollectionUtils.isEmpty(strsParam)){
return;
}
BoolQueryBuilder subCodeQuery = QueryBuilders.boolQuery();
for(String strParam : strsParam){
subCodeQuery.should(QueryBuilders.matchQuery("billNo" , strParam).operator(MatchQueryBuilder.Operator.AND));
}
boolQueryBuilder.must(subCodeQuery);
}
},
billNos {
@Override
public <T, Q extends QueryBuilder> void decorateBoolQueryBuilder(T param, Q queryBuilder) {
List<String> strsParam = (List<String>)param;
BoolQueryBuilder boolQueryBuilder = (BoolQueryBuilder)queryBuilder;
if(CollectionUtils.isEmpty(strsParam)){
return;
}
BoolQueryBuilder subCodeQuery = QueryBuilders.boolQuery();
for(String strParam : strsParam){
subCodeQuery.should(QueryBuilders.matchQuery("billNo" , strParam).operator(MatchQueryBuilder.Operator.AND));
}
boolQueryBuilder.must(subCodeQuery);
}
},
orderNos {
@Override
public <T, Q extends QueryBuilder> void decorateBoolQueryBuilder(T param, Q queryBuilder) {
List<String> strsParam = (List<String>)param;
BoolQueryBuilder boolQueryBuilder = (BoolQueryBuilder)queryBuilder;
if(CollectionUtils.isEmpty(strsParam)){
return;
}
BoolQueryBuilder subCodeQuery = QueryBuilders.boolQuery();
for(String strParam : strsParam){
subCodeQuery.should(QueryBuilders.matchQuery("orderNo" , strParam).operator(MatchQueryBuilder.Operator.AND));
}
boolQueryBuilder.must(subCodeQuery);
}
},
productIds {
@Override
public <T, Q extends QueryBuilder> void decorateBoolQueryBuilder(T param, Q queryBuilder) {
List<String> strsParam = (List<String>)param;
BoolQueryBuilder boolQueryBuilder = (BoolQueryBuilder)queryBuilder;
if(CollectionUtils.isEmpty(strsParam)){
return;
}
BoolQueryBuilder subCodeQuery = QueryBuilders.boolQuery();
for(String strParam : strsParam){
subCodeQuery.should(QueryBuilders.matchQuery("productId" , strParam).operator(MatchQueryBuilder.Operator.AND));
}
boolQueryBuilder.must(subCodeQuery);
}
},
reconcileIds {
@Override
public <T, Q extends QueryBuilder> void decorateBoolQueryBuilder(T param, Q queryBuilder) {
List<String> strsParam = (List<String>)param;
BoolQueryBuilder boolQueryBuilder = (BoolQueryBuilder)queryBuilder;
if(CollectionUtils.isEmpty(strsParam)){
return;
}
BoolQueryBuilder subCodeQuery = QueryBuilders.boolQuery();
for(String strParam : strsParam){
subCodeQuery.should(QueryBuilders.matchQuery("reconcileId" , strParam).operator(MatchQueryBuilder.Operator.AND));
}
boolQueryBuilder.must(subCodeQuery);
}
};
public abstract <T, Q extends QueryBuilder> void decorateBoolQueryBuilder(T param , Q queryBuilder );
}
public ESBillDetailQuery() {
}
public ESBillDetailQuery(Integer startIndex, Integer maxCount) {
super(startIndex, maxCount);
}
public ESBillDetailQuery(Class<Param> paramType) {
super(paramType);
}
}
时间: 2024-10-10 06:33:28