Coherence装载数据的研究

最近给客户准备培训,看到Coherence可以通过三种方式批量加载数据,分别是:

  • Custom application
  • InvocableMap - PreloadRequest
  • Invocation Service

Custom application的方式简单易懂,基本就是通过put和putAll方法实现,就不再纠结了。但问题是无论是put还是putAll

都是一个串行过程,如果装载大量数据的话,就需要有一种并行机制实现并行装载。

本文对第二种方式InvocableMap做一些研究,PreloadRequest主要是基于一个entry的集合通过Cache Loader进行装载,

其命令主要是:

包含如下特征:

  • 装载前必须知道要装载的所有的key值。
  • 本身装载的动作通过CacheLoader来实现。
  • 装载是并行过程,每个存储节点负责把分布在自己Cache的内容按照key值,从数据库中装载

代码:

Person.java


package dataload;

import java.io.Serializable;

public class Person implements Serializable {
private String Id;
private String Firstname;

public void setId(String Id) {
this.Id = Id;
}

public String getId() {
return Id;
}

public void setFirstname(String Firstname) {
this.Firstname = Firstname;
}

public String getFirstname() {
return Firstname;
}

public void setLastname(String Lastname) {
this.Lastname = Lastname;
}

public String getLastname() {
return Lastname;
}

public void setAddress(String Address) {
this.Address = Address;
}

public String getAddress() {
return Address;
}
private String Lastname;
private String Address;

public Person() {
super();
}

public Person(String sId,String sFirstname,String sLastname,String sAddress) {
Id=sId;
Firstname=sFirstname;
Lastname=sLastname;
Address=sAddress;
}
}

实现CacheLoader的DBCacheStore.java,比较核心的是看load方法


package dataload;

import com.tangosol.net.CacheFactory;
import com.tangosol.net.NamedCache;
import com.tangosol.net.cache.CacheStore;
import com.tangosol.util.Base;

import com.tangosol.util.InvocableMap;

import java.sql.DriverManager;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;

import java.util.Collection;
import java.util.Hashtable;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;

import javax.naming.Context;
import javax.naming.InitialContext;

import java.sql.ResultSet;
import java.sql.Statement;

import java.util.Collections;
import java.util.HashMap;
import java.util.Set;

import javax.naming.NamingException;

/**
* An example implementation of CacheStore
* interface.
*
* @author erm 2003.05.01
*/
public class DBCacheStore
extends Base
implements CacheStore
{
// ----- constructors ---------------------------------------------------
/**
* Constructs DBCacheStore for a given database table.
*
* @param sTableName the db table name
*/
public DBCacheStore(String sTableName)
{
m_sTableName = sTableName;
cache = CacheFactory.getCache("SampleCache");

}

// ---- accessors -------------------------------------------------------

/**
* Obtain the name of the table this CacheStore is persisting to.
*
* @return the name of the table this CacheStore is persisting to
*/
public String getTableName()
{
return m_sTableName;
}

/**
* Obtain the connection being used to connect to the database.
*
* @return the connection used to connect to the database
*/
public Connection getConnection() {
try {
Context ctx = null;

Hashtable<String,String> ht = new Hashtable<String,String>();
ht.put(Context.INITIAL_CONTEXT_FACTORY,"weblogic.jndi.WLInitialContextFactory");
ht.put(Context.PROVIDER_URL,"t3://localhost:7001");
ctx = new InitialContext(ht);
javax.sql.DataSource ds= (javax.sql.DataSource) ctx.lookup("ds");

m_con = ds.getConnection();
} catch (Exception e) {
System.out.println(e.getMessage());
}

return m_con;
}

// ----- CacheStore Interface --------------------------------------------

/**
* Return the value associated with the specified key, or null if the
* key does not have an associated value in the underlying store.
*
* @param oKey key whose associated value is to be returned
*
* @return the value associated with the specified key, or
* <tt>null</tt> if no value is available for that key
*/
public Object load(Object oKey)
{
Object oValue = null;
Person person = null;
Connection con = getConnection();
String sSQL = "SELECT id, firstname,lastname,address FROM " + getTableName()
+ " WHERE id = ?";
System.out.println("Enter load= "+sSQL);

try
{
PreparedStatement stmt = con.prepareStatement(sSQL);

stmt.setString(1, String.valueOf(oKey));
System.out.println("key="+String.valueOf(oKey));
ResultSet rslt = stmt.executeQuery();
if (rslt.next())
{
person = new Person(rslt.getString("id"),rslt.getString("firstname"),rslt.getString("lastname"),rslt.getString("address"));
oValue = person;

if (rslt.next())
{
throw new SQLException("Not a unique key: " + oKey);
}
}
stmt.close();

}
catch (SQLException e)
{

System.out.println("=============="+e.getMessage());

//throw ensureRuntimeException(e, "Load failed: key=" + oKey);
}
return oValue;
}

/**
* Store the specified value under the specific key in the underlying
* store. This method is intended to support both key/value creation
* and value update for a specific key.
*
* @param oKey key to store the value under
* @param oValue value to be stored
*
* @throws UnsupportedOperationException if this implementation or the
* underlying store is read-only
*/
public void store(Object oKey, Object oValue)
{
/*
Connection con = getConnection();
String sTable = getTableName();
String sSQL;

if (load(oKey) != null)
{
sSQL = "UPDATE " + sTable + " SET value = ? where id = ?";
}
else
{
sSQL = "INSERT INTO " + sTable + " (value, id) VALUES (?,?)";
}
try
{
PreparedStatement stmt = con.prepareStatement(sSQL);
int i = 0;
stmt.setString(++i, String.valueOf(oValue));
stmt.setString(++i, String.valueOf(oKey));
stmt.executeUpdate();
stmt.close();
}
catch (SQLException e)
{
throw ensureRuntimeException(e, "Store failed: key=" + oKey);
}
*/
}

/**
* Remove the specified key from the underlying store if present.
*
* @param oKey key whose mapping is to be removed from the map
*
* @throws UnsupportedOperationException if this implementation or the
* underlying store is read-only
*/
public void erase(Object oKey)
{
/*
Connection con = getConnection();
String sSQL = "DELETE FROM " + getTableName() + " WHERE id=?";
try
{
PreparedStatement stmt = con.prepareStatement(sSQL);

stmt.setString(1, String.valueOf(oKey));
stmt.executeUpdate();
stmt.close();
}
catch (SQLException e)
{
throw ensureRuntimeException(e, "Erase failed: key=" + oKey);
}
*/
}

/**
* Remove the specified keys from the underlying store if present.
*
* @param colKeys keys whose mappings are being removed from the cache
*
* @throws UnsupportedOperationException if this implementation or the
* underlying store is read-only
*/
public void eraseAll(Collection colKeys)
{
throw new UnsupportedOperationException();
}

/**
* Return the values associated with each the specified keys in the
* passed collection. If a key does not have an associated value in
* the underlying store, then the return map will not have an entry
* for that key.
*
* @param colKeys a collection of keys to load
*
* @return a Map of keys to associated values for the specified keys
*/
public Map loadAll(Collection colKeys)
{
/* System.out.println("Enter LoadAll Map");
Map mapResults = new HashMap();
for (Object entry : (Set<Object>) colKeys) {
System.out.println(entry);
mapResults.put(entry, load(entry));
}
return mapResults;
*/
return Collections.emptyMap();
//throw new UnsupportedOperationException();
}

/**
* Store the specified values under the specified keys in the underlying
* store. This method is intended to support both key/value creation
* and value update for the specified keys.
*
* @param mapEntries a Map of any number of keys and values to store
*
* @throws UnsupportedOperationException if this implementation or the
* underlying store is read-only
*/
public void storeAll(Map mapEntries)
{
throw new UnsupportedOperationException();
}

/**
* Iterate all keys in the underlying store.
*
* @return a read-only iterator of the keys in the underlying store
*/
public Iterator keys()
{
Connection con = getConnection();
String sSQL = "SELECT id FROM " + getTableName();
List list = new LinkedList();

try
{
PreparedStatement stmt = con.prepareStatement(sSQL);
ResultSet rslt = stmt.executeQuery();
while (rslt.next())
{
Object oKey = rslt.getString(1);
list.add(oKey);
}
stmt.close();
}
catch (SQLException e)
{
throw ensureRuntimeException(e, "Iterator failed");
}

return list.iterator();
}

// ----- data members ---------------------------------------------------

/**
* The connection.
*/
protected Connection m_con;

/**
* The db table name.
*/
protected String m_sTableName;

protected NamedCache cache;
}

CoherencePreLoad.java程序


package dataload;

import java.sql.ResultSet;
import java.sql.Statement;
import com.tangosol.net.CacheFactory;
import com.tangosol.net.NamedCache;

import com.tangosol.util.InvocableMap;
import com.tangosol.util.processor.PreloadRequest;

import java.sql.Connection;

import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Hashtable;

import javax.naming.Context;
import javax.naming.InitialContext;

public class CoherencePreLoad {
public CoherencePreLoad() {
super();
}

public static void main(String[] args) {
CoherencePreLoad coherencePreLoad = new CoherencePreLoad();

NamedCache cache = CacheFactory.getCache("SampleCache");
//cache.put("1","eric");

String sql = "select id from persons order by id";
Connection con = null;
Statement s = null;
ResultSet rs = null;
int count =0;
Collection keys = new HashSet();;
String key = null;

try{

Context ctx = null;

Hashtable<String,String> ht = new Hashtable<String,String>();
ht.put(Context.INITIAL_CONTEXT_FACTORY,"weblogic.jndi.WLInitialContextFactory");
ht.put(Context.PROVIDER_URL,"t3://localhost:7001");
ctx = new InitialContext(ht);
javax.sql.DataSource ds= (javax.sql.DataSource) ctx.lookup("ds");

con = ds.getConnection();
s = con.createStatement();
rs = s.executeQuery(sql);
System.out.println("Loading with SQL ");

while (rs.next()) {
key = rs.getString(1);
System.out.println(key);
keys.add(key);
count++;

// this loads 1000 items at a time into the cache
if ((count++ % 1000) == 0) {
cache.invokeAll(keys, new PreloadRequest() );
keys.clear();
}
}
if (!keys.isEmpty()) {
System.out.println("Enter");
//InvocableMap.EntryProcessor preloadrequest = new PreloadRequest();
cache.invokeAll(keys, new PreloadRequest() );
System.out.println("finish");
}

}catch (Exception e) {
System.out.println("============"+e.getStackTrace());
System.out.println(e.getMessage());
}
}
}

然后需要在缓存的配置中进行设置


<?xml version="1.0"?>
<!DOCTYPE cache-config SYSTEM "cache-config.dtd">
<cache-config>
<caching-scheme-mapping>
<!--
Caches with names that start with ‘DBBacked‘ will be created
as distributed-db-backed.
-->
<cache-mapping>
<cache-name>SampleCache</cache-name>
<scheme-name>distributed-pof</scheme-name>
</cache-mapping>
</caching-scheme-mapping>
<caching-schemes>
<!--
DB Backed Distributed caching scheme.
-->
<distributed-scheme>
<scheme-name>distributed-pof</scheme-name>
<service-name>DistributedCache</service-name>
<backing-map-scheme>

<read-write-backing-map-scheme>

<internal-cache-scheme>
<local-scheme/>
</internal-cache-scheme>

<cachestore-scheme>
<class-scheme>
<class-name>dataload.DBCacheStore</class-name>
<init-params>
<init-param>
<param-type>java.lang.String</param-type>
<param-value>persons</param-value>
</init-param>
</init-params>
</class-scheme>
</cachestore-scheme>
</read-write-backing-map-scheme>
</backing-map-scheme>

<listener/>
<autostart>true</autostart>
<local-storage>true</local-storage>
</distributed-scheme>

</caching-schemes>
</cache-config>

需要注意的是,必须在启动Cache-server的时候加入weblogic.jar以及dataload的包,因为在DBCacheStore中用到了weblogic JNDI去寻找数据源。

输出结果如下:

在jdeveloper端的Coherence节点

在coherence server端的存储节点

通过visualVM监控是否已经写入缓存

时间: 2024-08-08 11:48:32

Coherence装载数据的研究的相关文章

Coherence装载数据的研究 - Invocation Service

这里验证第三个方法,原理是将需要装载的数据分载在所有的存储节点上,不同的地方是利用了存储节点提供的InvocationService进行装载,而不是PreloadRequest, 原理如图 前提条件是: 需要知道所有要装载的key值 需要根据存储节点的数目把key值进行分摊,这里是通过 Map<Member, List<String>> divideWork(Set members)这个方法,输入Coherence的存储节点成员,输出一个map结构,以member为key,所有的e

Extjs的form表单自动装载数据(通过Ext.data.Model的代理加载数据)

在做项目的时候遇到一个问题,以前双击grid页面一行数据的时候,会吧双击这一行的数据自动加载到双击的页面(Ext的弹出框),可以通过this.down(''form).getForm().loadRecord(record)来自动加载,可是现在有一个需求就是双击grid一行弹出一个新的浏览器页面(不是ext的弹出框,通过window.open()实现),只能把双击的id传到页面,再重新查数据手动赋值,如果一个页面字段很多,一个一个赋值是很辛苦的,所以就想能不能自动装载数据 通过长时间研究发现,t

总结一下用caffe跑图片数据的研究流程

最近在用caffe玩一些数据集,这些数据集是从淘宝爬下来的图片.主要是想研究一下对女性衣服的分类. 下面是一些具体的操作流程,这里总结一下. 1 爬取数据.写爬虫从淘宝爬取自己需要的数据. 2 数据预处理.将图片从jpg,png格式转为leveldb格式.因为caffe的输入层datalayer是从leveldb读取的.这一步自己基于caffe写了个工具实现转换. 转换命令例子: ./convert_imagedata.bin /home/linger/imdata/skirt_train/ /

使用 Hive装载数据的几种方式

装载数据 1.以LOAD的方式装载数据 LOAD DATA [LOCAL] INPATH 'filepath' [OVERWRITE] INTO TABLE tablename [PARTITION(partcol1=val1, partcol2=val2 ...)] 1) 使用LOCAL这个关键字,那么这个路径应该为本地文件系统路径,是拷贝本地数据到位于HDFS上的目标位置,而不使用LOCAL这个关键字,那么这个路径应该为HDFS中的路径,是把本身就在HDFS上的数据转移到目标位置. 同时,因

android的liveview装载数据

设置布局 <?xml version="1.0" encoding="utf-8"?> <RelativeLayout xmlns:android="http://schemas.android.com/apk/res/android" android:layout_width="match_parent" android:layout_height="match_parent" >

想知道站点是否被惩罚?这些数据赶紧研究!

众所周知,搜索引擎的算法规则总是在不断的调整中,再加上最近"魏则西时间"对于竞价排名的影响,使得百度近期的负面评价增加了许多.联合调查组要求百度对竞价排名的算法进行调整,以改善医疗机构的推广占比,同时百度增加的审核职责也更为严格.处于这样的大环境下,小编今天来查看站点信息时,发现合肥人才网(www.400815.com)的权重.收录.关键词排名等数据都处于下降状态,这又是为什么呢? 也许很多新手SEO从业者会说,目前的形式下百度对自然排名肯定也有加强审核,所以数据有所波动很正常,可作为

R语言数据清理:视频游戏数据案例研究

Wesnoth之战是一款开源的回合制策略游戏.游戏世界很丰富,有几个派系,地图和数百个可用单位.在本教程中,您将学习如何将中等大小的数据集(如游戏元数据)转换为有用的格式,以便使用R进行进一步分析. 您将了解整洁数据集遵循的关键原则,为什么跟踪它们有用,以及如何清理您给出的数据.整理也是了解新数据集的好方法. 最后,在本教程中,您将学习如何编写一个函数,使您的分析看起来更清晰,并允许您以非常可重复的方式在分析中执行重复元素.该功能允许您将最新版本的数据动态加载到灵活的数据方案中,这意味着在添加新

(转载)IQ 16.0 SP02起支持从压缩文件直接装载数据到表中

参考文档: http://m.blog.chinaunix.net/uid-16765068-id-4405877.htmlhttp://www.cnblogs.com/lichmama/p/4103048.html 大致过程: /** 创建测试视图 **/ CREATE VIEW BCPVIEW24 AS SELECT TIMEID , SYSTEM_ID , MSISDN , CITY_ID , RECORDTYPE , NETWORKINITIATION , SERVEDIMSI , SE

【大数据论文笔记】大数据技术研究综述

大数据的基本概念: 1.大数据的产生 a.科学研究 b.物联网的应用 c.海量网络信息的产生 2.大数据概念的提出 3.大数据的"4V"特征 a.Volume(容量大):大数据巨大的数据量与数据完整性 b.Variety(种类多):要在海量.种类繁多的数据间发现其内在关联 c.Velocity(速度快):更快地满足实时性需求 d.Value(价值密度低):将信息转化为知识 4.大数据的应用领域 a.商业 b.金融 c.医疗 d.制造业 大数据的处理流程 1.数据采集 2.数据处理与集成