一个简单的solrserver组件
实现索引更新的异步处理,以及查询接口,日志/线程池/队列监控没有加上。
SolrDocment封装
接口:
public interface ISolrDocument {public SolrInputDocument convertToInputDocument() throws Exception;
public void buildSolrDocument(SolrDocument document) throws Exception;
}
实现:
public class SolrDocumentImpl implements ISolrDocument {
@Override
public SolrInputDocument convertToInputDocument() throws Exception{
final SolrInputDocument doc = new SolrInputDocument();Field[] fields = this.getClass().getDeclaredFields();
for (Field f : fields) {
f.setAccessible(true);
String idxName = f.getName();
Object fieldValue =f.get(this);
doc.addField(idxName, fieldValue);
}
return doc;
}@Override
public void buildSolrDocument(SolrDocument document) throws Exception{
Field[] fields = this.getClass().getDeclaredFields();
for (Field f : fields) {
f.setAccessible(true);String idxName = f.getName();
Object fieldValue = document.get(idxName);
f.set(this, fieldValue);}
}
查询结构集封装
public class DataResult<T> {
private int start;
private int limits;
private long totalRecord;
private List<T> records;public int getLimits() {
return limits;
}public void setLimits(int limits) {
this.limits = limits;
}public long getTotalRecord() {
return totalRecord;
}public void setTotalRecord(long totalRecord) {
this.totalRecord = totalRecord;
}public List<T> getRecords() {
return records;
}public void setRecords(List<T> records) {
this.records = records;
}public int getStart() {
return start;
}public void setStart(int start) {
this.start = start;
}
}
SolrServer封装
接口:
public interface ISolrServer {public ThreadPoolExecutor getThreadPool();
void init() throws Exception;
/**
* 索引优化
*/
void optimize();/**
* 清理索引
* @param query
*/
void clearDocs(SolrQuery query);/**
* 批量更新索引
*
* @param docs
*/
void batchUpdateDocs(List<ISolrDocument> docs);/**
* 分页查询
*
* @param clazz
* @param query
* @return
*/
public <T extends ISolrDocument> DataResult<T> pageSearch(SolrQuery query, Class<T> clazz);void destory() throws Exception;
}
实现:
public class SolrServerImpl implements ISolrServer {private CloudSolrServer solrServer;
private String zookeeperStr;
private String coreName;
private String idName;
private int corePoolSize;
private int maxPoolSize;
private int threadQueueSize;
private ThreadPoolExecutor threadPool;
@Override
public void init() throws Exception {
this.threadPool
= new ThreadPoolExecutor(corePoolSize,
maxPoolSize,
10000,
TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(threadQueueSize),
new SolrThreadFactory("solr-pool"),
new ThreadPoolExecutor.DiscardOldestPolicy());
solrServer=new CloudSolrServer(this.zookeeperStr);
solrServer.setDefaultCollection(coreName);
solrServer.setIdField(idName);
solrServer.setZkClientTimeout(10000);
solrServer.setZkConnectTimeout(5000);
}@Override
public void optimize() {}
@Override
public void clearDocs(final SolrQuery query) {
this.threadPool.execute(new Runnable() {
@Override
public void run() {
try {
SolrServerImpl.this.solrServer.deleteByQuery(query.getQuery());
SolrServerImpl.this.solrServer.commit(false, false, true);
}catch (Exception e){
e.printStackTrace();
}}
});
}@Override
public void batchUpdateDocs(List<ISolrDocument> docs) {
try {
List<SolrInputDocument> inputDocs=new ArrayList<SolrInputDocument>();
for(ISolrDocument doc:docs){
inputDocs.add(doc.convertToInputDocument());
}
this.solrServer.add(inputDocs);
this.solrServer.commit(false,false,true);
}catch (Exception e){
e.printStackTrace();
}
}@Override
public <T extends ISolrDocument> DataResult<T> pageSearch(SolrQuery query, Class<T> clazz) {
int limit=query.getRows()==0?20:query.getRows();
int start=query.getStart();
List<T> dataResults=new ArrayList<T>(limit);
DataResult datas=new DataResult();
datas.setLimits(limit);
datas.setStart(start);
try{
QueryResponse response= this.solrServer.query(query);
SolrDocumentList solrDocumentList=response.getResults();
datas.setTotalRecord(solrDocumentList.getNumFound());
for (SolrDocument doc :solrDocumentList){
T iSolrDocument=clazz.newInstance();
iSolrDocument.buildSolrDocument(doc);
dataResults.add(iSolrDocument);
}
datas.setRecords(dataResults);
}catch (Exception e){
e.printStackTrace();
}return datas;
}@Override
public void destory() throws Exception {
this.solrServer.shutdown();}
public void setIdName(String idName) {
this.idName = idName;
}public void setCoreName(String coreName) {
this.coreName = coreName;
}public void setZookeeperStr(String zookeeperStr) {
this.zookeeperStr = zookeeperStr;
}public CloudSolrServer getSolrServer() {
return solrServer;
}public void setSolrServer(CloudSolrServer solrServer) {
this.solrServer = solrServer;
}public String getZookeeperStr() {
return zookeeperStr;
}public String getCoreName() {
return coreName;
}public String getIdName() {
return idName;
}public int getCorePoolSize() {
return corePoolSize;
}public void setCorePoolSize(int corePoolSize) {
this.corePoolSize = corePoolSize;
}public int getMaxPoolSize() {
return maxPoolSize;
}public void setMaxPoolSize(int maxPoolSize) {
this.maxPoolSize = maxPoolSize;
}public int getThreadQueueSize() {
return threadQueueSize;
}public void setThreadQueueSize(int threadQueueSize) {
this.threadQueueSize = threadQueueSize;
}public ThreadPoolExecutor getThreadPool() {
return threadPool;
}public void setThreadPool(ThreadPoolExecutor threadPool) {
this.threadPool = threadPool;
}
}
任务处理封装
public class BatchTaskExecutor{private ISolrServer iSolrServer;
private int batchSize = 100;
private int queueLength = 2000;
private int workThreadSize = 2;
private Semaphore semaphore;
private LinkedBlockingQueue<ISolrDocument> docQueue;
public void init(){
this.semaphore = new Semaphore(this.workThreadSize);
docQueue = new LinkedBlockingQueue<ISolrDocument>(this.queueLength);
while(true){
this.iSolrServer.getThreadPool().execute(new Runnable() {
@Override
public void run() {
List<ISolrDocument> list=BatchTaskExecutor.this.batchPollDocs();
BatchTaskExecutor.this.acquire();
try {
BatchTaskExecutor.this.iSolrServer.batchUpdateDocs(list);
}catch (Exception e){
e.printStackTrace();
}finally {
BatchTaskExecutor.this.release();
}
}
});
}}
public boolean acquire() {
try {
return this.semaphore.tryAcquire(1,10, TimeUnit.SECONDS);
} catch (InterruptedException e) {
e.printStackTrace();
return false;
}
}public void release() {
this.semaphore.release();
}public boolean offerDoc(ISolrDocument doc) {
try {
return this.docQueue.offer(doc, 1000, TimeUnit.MILLISECONDS);
} catch (Exception e) {
e.printStackTrace();
}
return false;
}public List<ISolrDocument> batchPollDocs() {
List<ISolrDocument> docs = new ArrayList<ISolrDocument>(this.batchSize);
while (docs.size() < this.batchSize) {
ISolrDocument doc = null;
try {
doc = this.docQueue.poll(1000, TimeUnit.MILLISECONDS);
} catch (Exception e) {
e.printStackTrace();
} finally {
if (doc==null) {
break;
} else {
docs.add(doc);
}
}}
return docs;
}
}
封装一个简单的solrserver组件,布布扣,bubuko.com