一.使用logstash同步订单数据(订单表和订单项表)到ElasticSearch:
1.到官网下载logstash:https://www.elastic.co/cn/downloads/logstash
2.安装logstash前,确保需要先安装java的jdk环境
3.下载后,解压:之后千万别到bin环境点击logstash.bat这个命令启动,这样会报错的
4.接下来,在logstash安装目录找到config文件夹,在那里新增一个文件夹,我新建的为shop文件夹,然后在里面添加如下文件:
5.开始时.last_run_item.txt和last_run_order.txt文件是没数据的
6.logstash_order.conf文件的配置如下:
# Sample Logstash configuration for creating a simple # Beats -> Logstash -> Elasticsearch pipeline. input { jdbc { type => "order_mast" #下面同步ES可以根据type进行区分,单是单个表同步是,可以不写这个 jdbc_driver_library => "../config/shop/mysql-connector-java-5.1.6-bin.jar" #这个是shop文件夹下的jar包 jdbc_paging_enabled => "true" jdbc_page_size => "2000" jdbc_driver_class => "com.mysql.jdbc.Driver" #jdbc跟账号密码需改成对应环境的 jdbc_connection_string => "jdbc:mysql://192.168.50.117:3306/shop_dm?useUnicode=true&characterEncoding=UTF-8&allowMultiQueries=true&useSSL=false" jdbc_user => "shop" jdbc_password => "shop" schedule => "* * * * *" #这个代表每分钟同步一次 statement_filepath => "../config/shop/order_mast.sql" #这个是shop文件下的sql文件 record_last_run => true use_column_value => false last_run_metadata_path => "../config/shop/last_run_order.txt" #这个是记录上一次更新的是什么时间,这样就可以实现增量新增了 clean_run => false #是否将 字段(column) 名称转小写 lowercase_column_names => false } jdbc { type => "order_item" #下面同步ES可以根据type进行区分,单是单个表同步是,可以不写这个 jdbc_driver_library => "../config/shop/mysql-connector-java-5.1.6-bin.jar" #这个是shop文件夹下的jar包 jdbc_paging_enabled => "true" jdbc_page_size => "2000" jdbc_driver_class => "com.mysql.jdbc.Driver" #这个代表每分钟同步一次 #jdbc跟账号密码需改成对应环境的 jdbc_connection_string => "jdbc:mysql://192.168.50.117:3306/shop_dm?useUnicode=true&characterEncoding=UTF-8&allowMultiQueries=true&useSSL=false" jdbc_user => "shop" jdbc_password => "shop" schedule => "* * * * *" statement_filepath => "../config/shop/order_item.sql" #这个是shop文件下的sql文件 record_last_run => true use_column_value => false last_run_metadata_path => "../config/shop/last_run_item.txt" #这个是记录上一次更新的是什么时间,这样就可以实现增量新增了 clean_run => false #是否将 字段(column) 名称转小写 lowercase_column_names => false } } filter { #jdbc默认json,暂时没找到修改方法 #json { # source => "message" # remove_field => ["message"] #} mutate { #需要移除的字段 remove_field => "@timestamp" remove_field => "@version" } } output { if [type]=="order_mast"{ elasticsearch { hosts => ["http://localhost:9200"] #如果有账号密码,在下面添加,并去除#号 #user => elastic #password => "[email protected]" index => "shop_order_mast" document_type => "order_mast" #这个在es7.0版本后就没有type属性了 document_id => "%{cod_order_id}" } } if [type]=="order_item"{ elasticsearch { hosts => ["http://localhost:9200"] #如果有账号密码,在下面添加,并去除#号 #user => elastic #password => "[email protected]" index => "shop_order_item" document_type => "order_item" document_id => "%{cod_order_item_id}" } } stdout { codec => json_lines } }
//如果只有一张表的时候,单表output的配置:
output { elasticsearch { hosts => ["http://localhost:9200"] #如果有账号密码,在下面添加,并去除#号 #user => elastic #password => "[email protected]" index => "shop_order_mast" document_type => "order_mast" #这个在es7.0版本后就没有type属性了 document_id => "%{cod_order_id}" } stdout { codec => json_lines } }}
//sql的写法,这里只提供orderItem
SELECT `cod_order_item_id` , -- 注意,这里写了cod_order_item_id和下面同样下了cod_order_item_id的意义不一样,第一个是作为ES文档的Id,会跟上面logstash_order.conf文件的 document_id => "%{cod_order_item_id}"匹配上 `cod_order_item_id` as "orderItemId", `cod_order_id`as "orderId", `flg_item_type`as "itemType", `cod_market_id`as "marketId", `cod_item_id`as "itemId", `cod_item_id_main`as "mainItemId", `txt_name`as "itemTitle", `cod_item_quantity`as "quantity", `amt_item`as "itemPrice", `cod_score_total`as "scoreTotal", `amt_score`as "scoreAmount", `amt_charge`as "chargeAmount", `amt_standard_price`as "standardPrice", `amt_balance_discount`as "balanceDiscountAmount", `amt_payment_total`as "itemTotalAmount", `amt_coupon_total`as "couponTotalAmount", `amt_act_discount`as "actDiscountAmount", `cod_order_parent_id`as "parentOrderId", `cod_merchant_no`as "shopId", `cod_create_user`as "createUserId", DATE_FORMAT( `dat_modify`, ‘%Y-%m-%d %T‘ ) AS "updateTime", DATE_FORMAT( `dat_create`, ‘%Y-%m-%d %T‘ ) AS "createTime", `cod_modify_user`as "updateUserId" from shop_order_item WHERE dat_modify >= :sql_last_value -- 这个sql_last_value会读取shop文件夹下的last_run_item.txt的值,第一次同步时,没有该值,所以默认就会是1970年7月1日,相当于是全量新增了
7.如果运行过一次后,打开last_run_item.txt可以看到
8.启动logstash:需要保证你的ES已经启动了,并创建了对应的index和type
window环境:在安装目录bin文件下,打开命令窗口,或者打开命令窗口,切换到该路径: logstash -f ../config/shop/logstash_order.conf
如果是在linux环境,切换安装的bin目录执行:
nohup logstash -f ../config/shop/logstash_order.conf > ../logs/logstash.out &
9.之后打开ES查询数据
可以看到数据已经同步过来了
10.之后可以在项目中进行对应的数据操作了,因为该同步是一分钟同步一次,所以对于实时性要求特别高的,可以在代码中使用ES的crud操作也进行同步,这样就可以保证万无一失了
11.ES相关操作可以参考:https://www.cnblogs.com/yangxiaohui227/p/11237268.html
12.附上一个orderItem表的(ES版本为6.4.3)操作
@Configuration public class ElasticsearchConfig implements InitializingBean{ private static final Logger LOGGER = LoggerFactory.getLogger(ElasticsearchConfig.class); @Value("${elasticsearch.cluster.name}") private String clusterName; @Value("${elasticsearch.port}") private Integer port; @Value("${elasticsearch.host}") private String host; /** * Springboot整合Elasticsearch 在项目启动前设置一下的属性,防止报错 * 解决netty冲突后初始化client时还会抛出异常 * java.lang.IllegalStateException: availableProcessors is already set to [4], rejecting [4] */ @PostConstruct void init() { System.setProperty("es.set.netty.runtime.available.processors", "false"); } // @Before @Bean public TransportClient getTransportClient() { TransportClient client=null; LOGGER.info("elasticsearch init."); try { Settings settings = Settings.builder() .put("cluster.name", clusterName) //集群名字 .put("client.transport.sniff", true)//增加嗅探机制,找到ES集群 .put("thread_pool.search.size", 5).build();//增加线程池个数 client = new PreBuiltTransportClient(settings); TransportAddress transportAddress = new TransportAddress(InetAddress.getByName(host), port); client.addTransportAddresses(transportAddress); LOGGER.info("elasticsearch init success."); return client; } catch (Exception e) { throw new RuntimeException("elasticsearch init fail."+ e); } }}
//高级查询对象 public class EsQueryObject { private String orderId; private String customerId; private String txtOrderTitle; private Integer orderStatus; private Integer paymentStatus; private String phone; private String recieveName; private String addresss; private String orderSubmitTime_S; private String orderSubmitTime_E; private String payTime_S; private String payTime_E; private BigDecimal minPayAmount; private BigDecimal maxPayAmount; private String shopId; private String itemId; private String itemTile; private Page page; public String getOrderId() { return orderId; } public void setOrderId(String orderId) { this.orderId = orderId; } public String getCustomerId() { return customerId; } public void setCustomerId(String customerId) { this.customerId = customerId; } public String getTxtOrderTitle() { return txtOrderTitle; } public void setTxtOrderTitle(String txtOrderTitle) { this.txtOrderTitle = txtOrderTitle; } public Integer getOrderStatus() { return orderStatus; } public void setOrderStatus(Integer orderStatus) { this.orderStatus = orderStatus; } public Integer getPaymentStatus() { return paymentStatus; } public void setPaymentStatus(Integer paymentStatus) { this.paymentStatus = paymentStatus; } public String getPhone() { return phone; } public void setPhone(String phone) { this.phone = phone; } public String getRecieveName() { return recieveName; } public void setRecieveName(String recieveName) { this.recieveName = recieveName; } public String getAddresss() { return addresss; } public void setAddresss(String addresss) { this.addresss = addresss; } public String getOrderSubmitTime_S() { return orderSubmitTime_S; } public void setOrderSubmitTime_S(String orderSubmitTime_S) { this.orderSubmitTime_S = orderSubmitTime_S; } public String getOrderSubmitTime_E() { return orderSubmitTime_E; } public void setOrderSubmitTime_E(String orderSubmitTime_E) { this.orderSubmitTime_E = orderSubmitTime_E; } public String getPayTime_S() { return payTime_S; } public void setPayTime_S(String payTime_S) { this.payTime_S = payTime_S; } public String getPayTime_E() { return payTime_E; } public void setPayTime_E(String payTime_E) { this.payTime_E = payTime_E; } public BigDecimal getMinPayAmount() { return minPayAmount; } public void setMinPayAmount(BigDecimal minPayAmount) { this.minPayAmount = minPayAmount; } public BigDecimal getMaxPayAmount() { return maxPayAmount; } public void setMaxPayAmount(BigDecimal maxPayAmount) { this.maxPayAmount = maxPayAmount; } public String getShopId() { return shopId; } public void setShopId(String shopId) { this.shopId = shopId; } public String getItemId() { return itemId; } public void setItemId(String itemId) { this.itemId = itemId; } public String getItemTile() { return itemTile; } public void setItemTile(String itemTile) { this.itemTile = itemTile; } public Page getPage() { return page; } public void setPage(Page page) { this.page = page; } }
package com.tft.shop.service.order; import com.alibaba.fastjson.JSON; import com.bootcrabframework.cloud.core.common.base.GenericBaseService; import com.bootcrabframework.cloud.core.util.CommonUtil; import com.bootcrabframework.cloud.core.util.DateUtil; import com.google.common.collect.Lists; import com.tft.shop.constant.order.OrderConstant; import com.tft.shop.entity.es.EsShopOrderItem; import com.tft.shop.entity.es.EsShopOrderItemRequestDTO; import com.tft.shop.entity.order.ShopOrderItem; import com.tft.shop.util.StringUtil; import org.elasticsearch.action.ActionFuture; import org.elasticsearch.action.bulk.BulkItemResponse; import org.elasticsearch.action.bulk.BulkRequest; import org.elasticsearch.action.bulk.BulkResponse; import org.elasticsearch.action.delete.DeleteRequest; import org.elasticsearch.action.delete.DeleteResponse; import org.elasticsearch.action.get.GetRequestBuilder; import org.elasticsearch.action.get.GetResponse; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.search.SearchType; import org.elasticsearch.action.update.UpdateResponse; import org.elasticsearch.client.transport.TransportClient; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.index.query.BoolQueryBuilder; import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.SearchHits; import org.elasticsearch.search.builder.SearchSourceBuilder; import org.elasticsearch.search.sort.FieldSortBuilder; import org.elasticsearch.search.sort.SortOrder; import org.springframework.stereotype.Service; import org.springframework.util.StringUtils; import javax.annotation.Resource; import java.util.ArrayList; import java.util.List; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; @Service public class EsShopOrderItemService extends GenericBaseService { @Resource private TransportClient transportClient; //批量新增 public void batchInsert(List<EsShopOrderItem> list){ if(CommonUtil.isNull(list)){ return; } BulkRequest bulkRequest = new BulkRequest(); list.forEach(a->{ IndexRequest indexRequest = new IndexRequest(OrderConstant.ES_ORDER_ITEM_INDEX, OrderConstant.ES_ORDER_ITEM_TYPE, a.getOrderItemId()); indexRequest.source(JSON.toJSONString(a), XContentType.JSON); bulkRequest.add(indexRequest); }); ActionFuture<BulkResponse> bulk = transportClient.bulk(bulkRequest); boolean failures = bulk.actionGet().hasFailures(); if(!failures){ return; //没有失败 } //如果有失败,输出哪一条是失败的 try { BulkResponse bulkItemResponses = bulk.get(); if(bulkItemResponses==null){ return; } if(CommonUtil.isNull(bulkItemResponses.getItems())){ return; } for (BulkItemResponse bulkItemResponse : bulkItemResponses.getItems()) { boolean failed = bulkItemResponse.isFailed(); if(failed){ logger.error("订单项插入ES失败,错误信息{},对应订单项编号{}",bulkItemResponse.getId(),bulkItemResponse.getFailureMessage()); } } } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } } //单条新增 public void insertOne(EsShopOrderItem item){ if(null==item){ return; } List<EsShopOrderItem> list =Lists.newArrayList(); list.add(item); this.batchInsert(list); } //单条新增 public void insertOne(ShopOrderItem orderItem){ this.insertOne(shopOrderItemChangeToEsOrderItem(orderItem)); } private EsShopOrderItem shopOrderItemChangeToEsOrderItem(ShopOrderItem orderItem){ if(null==orderItem){ return null; } EsShopOrderItem shopOrderItem = new EsShopOrderItem(); shopOrderItem.setOrderItemId(orderItem.getCodOrderItemId()); shopOrderItem.setOrderId(orderItem.getCodOrderId()); shopOrderItem.setItemType(orderItem.getFlgItemType()); shopOrderItem.setMarketId(orderItem.getCodMarketId()); shopOrderItem.setItemId(orderItem.getCodItemId()); shopOrderItem.setMainItemId(orderItem.getCodItemIdMain()); shopOrderItem.setItemTitle(orderItem.getTxtName()); shopOrderItem.setQuantity(orderItem.getCodItemQuantity()); shopOrderItem.setItemPrice(orderItem.getAmtItem()); shopOrderItem.setScoreTotal(orderItem.getCodScoreTotal()); shopOrderItem.setScoreAmount(orderItem.getAmtScore()); shopOrderItem.setChargeAmount(orderItem.getAmtCharge()); shopOrderItem.setStandardPrice(orderItem.getAmtStandardPrice()); shopOrderItem.setBalanceDiscountAmount(orderItem.getAmtBalanceDiscount()); shopOrderItem.setItemTotalAmount(orderItem.getAmtPaymentTotal()); shopOrderItem.setActDiscountAmount(orderItem.getAmtActDiscount()); shopOrderItem.setCouponTotalAmount(orderItem.getAmtCouponTotal()); shopOrderItem.setParentOrderId(orderItem.getCodOrderParentId()); shopOrderItem.setShopId(orderItem.getCodMerchantNo()); shopOrderItem.setCreateUserId(orderItem.getCodCreateUser()); if(null!=orderItem.getDatCreate()){ shopOrderItem.setCreateTime(DateUtil.dateFormat(orderItem.getDatCreate(),DateUtil.TIME_FORMAT_FULL)); } if(null!=orderItem.getDatModify()){ shopOrderItem.setUpdateTime(DateUtil.dateFormat(orderItem.getDatModify(),DateUtil.TIME_FORMAT_FULL)); } shopOrderItem.setUpdateUserId(orderItem.getCodModifyUser()); return shopOrderItem; } //删除 public void deleteOne(String orderItemId){ if(CommonUtil.isNull(orderItemId)){ return; } ActionFuture<DeleteResponse> actionFuture = transportClient.delete(new DeleteRequest(OrderConstant.ES_ORDER_ITEM_INDEX, OrderConstant.ES_ORDER_ITEM_TYPE, orderItemId)); if(actionFuture==null){ return; } DeleteResponse deleteResponse = actionFuture.actionGet(); if(null==deleteResponse || null==deleteResponse.status()){ return; } if(deleteResponse.status().getStatus()!=200){ logger.error("删除ES订单项,编号为{},删除失败",orderItemId); } } //修改 public void updateOne(EsShopOrderItem esShopOrderItem){ if(null==esShopOrderItem){ return; } UpdateResponse updateResponse = transportClient.prepareUpdate(OrderConstant.ES_ORDER_ITEM_INDEX, OrderConstant.ES_ORDER_ITEM_TYPE, esShopOrderItem.getOrderItemId()) .setDoc(JSON.toJSONString(esShopOrderItem), XContentType.JSON).execute().actionGet(); if(null==updateResponse || null==updateResponse.status()){ return; } if(updateResponse.status().getStatus()!=200){ logger.error("修改ES订单项失败,编号为{}",esShopOrderItem.getOrderItemId()); } } //修改 public void updateOne(ShopOrderItem orderItem){ this.updateOne(this.shopOrderItemChangeToEsOrderItem(orderItem)); } //查询单个 public EsShopOrderItem selectById(String orderItemId){ if(StringUtil.isEmpty(orderItemId)){ return null; } GetRequestBuilder ret = transportClient.prepareGet(OrderConstant.ES_ORDER_ITEM_INDEX, OrderConstant.ES_ORDER_ITEM_TYPE, orderItemId); if(null==ret || null==ret.get()){ return null; } GetResponse response = ret.get(); if(StringUtil.isEmpty(response.getSourceAsString())){ return null; } return JSON.parseObject(response.getSourceAsString(),EsShopOrderItem.class); } /** * * * @param req 高级查询对象,当用商品标题查询的时候,限制只返回最大2000条 * @return */ public List<EsShopOrderItem> queryAdvanced(EsShopOrderItemRequestDTO req){ if(null==req){ return null; } SearchRequest searchRequest = new SearchRequest(OrderConstant.ES_ORDER_ITEM_INDEX); searchRequest.types(OrderConstant.ES_ORDER_ITEM_TYPE); // 构造查询器 SearchSourceBuilder sourceBuilder = new SearchSourceBuilder(); BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery(); if(!StringUtils.isEmpty(req.getItemTitle())){ boolQueryBuilder.must(QueryBuilders.matchQuery("itemTitle",req.getItemTitle())); } if(!StringUtils.isEmpty(req.getItemId())){ boolQueryBuilder.must(QueryBuilders.termQuery("itemId",req.getItemId())); } if(!StringUtils.isEmpty(req.getShopId())){ boolQueryBuilder.must(QueryBuilders.termQuery("shopId",req.getShopId())); } if(!StringUtils.isEmpty(req.getCustomerId())){ boolQueryBuilder.must(QueryBuilders.termQuery("createUserId",req.getCustomerId())); } if(!StringUtils.isEmpty(req.getParentOrderId())){ boolQueryBuilder.must(QueryBuilders.termQuery("parentOrderId",req.getParentOrderId())); } if(!StringUtils.isEmpty(req.getOrderId())){ boolQueryBuilder.must(QueryBuilders.termQuery("orderId",req.getOrderId())); } if(null!=req.getItemType() && req.getItemType()>=0){ boolQueryBuilder.must(QueryBuilders.termQuery("itemType",req.getItemType())); } if(!StringUtils.isEmpty(req.getCreateStartTime())){ boolQueryBuilder.must(QueryBuilders.rangeQuery("createTime").gte(req.getCreateStartTime())); } if(!StringUtils.isEmpty(req.getCreateEndTime())){ boolQueryBuilder.must(QueryBuilders.rangeQuery("createTime").lte(req.getCreateEndTime())); } sourceBuilder.query(boolQueryBuilder); sourceBuilder.timeout(new TimeValue(60, TimeUnit.SECONDS)); if(!StringUtils.isEmpty(req.getItemTitle())){ sourceBuilder.from(0).size(2000); } sourceBuilder.sort(new FieldSortBuilder("createTime").order(SortOrder.DESC)); searchRequest.source(sourceBuilder); searchRequest.searchType(SearchType.QUERY_THEN_FETCH); SearchResponse searchResponse = transportClient.search(searchRequest).actionGet(); if(null==searchResponse || null==searchResponse.getHits() || searchResponse.getHits().totalHits<=0){ return null; } List<EsShopOrderItem> list = new ArrayList<>(); SearchHits hits = searchResponse.getHits(); for (SearchHit hit : hits) { String sourceAsString = hit.getSourceAsString(); EsShopOrderItem orderItem = JSON.parseObject(sourceAsString, EsShopOrderItem.class); list.add(orderItem); } return list; } public List<String> queryOrderIdList(EsShopOrderItemRequestDTO req){ if(null==req){ return null; } List<EsShopOrderItem> shopOrderItems = this.queryAdvanced(req); if(CommonUtil.isNull(shopOrderItems)){ return null; } return shopOrderItems.stream().map(a->a.getOrderId()).collect(Collectors.toList()); } }
//附上shop_order_item的mapping配置:
put shop_order_item
{
"settings": {
"analysis": {
"analyzer": {
"thai_analyzer": {
"type": "custom",
"tokenizer": "thai",
"filter": [
"lowercase",
"asciifolding"
]
},
"caseSensitive": {
"filter": "lowercase",
"type": "custom",
"tokenizer": "keyword"
}
}
}
},
"mappings": {
"order_item": {
"properties": {
"orderId": {
"type": "keyword"
},
"parentOrderId": {
"type": "keyword"
},
"shopId": {
"type": "keyword"
},
"orderItemId": {
"type": "keyword"
},
"itemTitle": {
"type": "text",
"analyzer": "thai_analyzer",
"search_analyzer": "thai_analyzer"
},
"itemId": {
"type": "keyword"
},
"mainItemId": {
"type": "keyword"
},
"marketId": {
"type": "keyword"
},
"itemType": {
"type": "integer"
},
"quantity": {
"type": "integer"
},
"scoreTotal": {
"type": "integer"
},
"scoreAmount": {
"type": "double"
},
"chargeAmount": {
"type": "double"
},
"itemPrice": {
"type": "double"
},
"standardPrice": {
"type": "double"
},
"itemTotalAmount": {
"type": "double"
},
"couponTotalAmount": {
"type": "double"
},
"balanceDiscountAmount": {
"type": "double"
},
"actDiscountAmount": {
"type": "double"
},
"createTime": {
"format": "yyyy-MM-dd HH:mm:ss||yyyy-MM-dd",
"type": "date"
},
"updateTime": {
"format": "yyyy-MM-dd HH:mm:ss||yyyy-MM-dd",
"type": "date"
},
"createUserId": {
"type": "keyword"
},
"updateUserId": {
"type": "keyword"
}
}
}
}
}
原文地址:https://www.cnblogs.com/yangxiaohui227/p/11511213.html