这里验证第三个方法,原理是将需要装载的数据分载在所有的存储节点上,不同的地方是利用了存储节点提供的InvocationService进行装载,而不是PreloadRequest,
原理如图
前提条件是:
- 需要知道所有要装载的key值
- 需要根据存储节点的数目把key值进行分摊,这里是通过
- Map<Member, List<String>> divideWork(Set members)这个方法,输入Coherence的存储节点成员,输出一个map结构,以member为key,所有的entry key值为value.
- 装载数据的任务,主要是通过驱动MyLoadInvocable的run方法,把数据在各个节点中进行装载,MyLoadInvocable必须扩展AbstractInvocable并实现PortableObject,不知何解,我尝试实现Seriable方法,结果出错
- 在拆解所有key值的任务过程中,发现list<String>数组被后面的值覆盖,后来每次放入map的时候新建一个List才避免此现象发生.
- 不需要实现CacheLoader或者CacheStore方法
Person.java
package dataload; import java.io.Serializable; public class Person implements Serializable { public void setId(String Id) { public String getId() { public void setFirstname(String Firstname) { public String getFirstname() { public void setLastname(String Lastname) { public String getLastname() { public void setAddress(String Address) { public String getAddress() { public Person() { public Person(String sId,String sFirstname,String sLastname,String sAddress) { |
MyLoadInvocable.java
装载数据的任务,主要是通过驱动这个任务的run方法,把数据在各个节点中进行装载
package dataload; import com.tangosol.io.pof.PofReader; import java.io.IOException; import java.sql.Connection; import java.util.Hashtable; import javax.naming.Context; import serp.bytecode.NameCache; public class MyLoadInvocable extends AbstractInvocable implements PortableObject { private List<String> m_memberKeys; public MyLoadInvocable() { public MyLoadInvocable(List<String> memberKeys, String cache) { public Connection getConnection() { Hashtable<String,String> ht = new Hashtable<String,String>(); m_con = ds.getConnection(); return m_con; public void run() { try for(int i = 0; i < m_memberKeys.size(); i++) String id = (String)m_memberKeys.get(i); stmt.setString(1, id); } } stmt.close(); }catch (Exception e) } public void readExternal(PofReader in) /** } |
LoadUsingEP.java
装载的客户端,负责数据分段,InvocationService查找以及驱动。
package dataload; import com.tangosol.net.CacheFactory; import java.sql.Connection; import java.sql.SQLException; import java.sql.Statement; import java.util.ArrayList; import javax.naming.Context; public class LoaderUsingEP { private Connection m_con; public Connection getConnection() { Hashtable<String,String> ht = new Hashtable<String,String>(); m_con = ds.getConnection(); return m_con; protected Set getStorageMembers(NamedCache cache) protected Map<Member, List<String>> divideWork(Set members) try { int onecount = totalcount / membercount; sql = "select id from persons"; ResultSet rs1 = st.executeQuery(sql); while (rs1.next()) { if (count < onecount) { list.add(rs1.getString("id")); Member member = (Member) i.next(); ArrayList<String> list2=new ArrayList<String>(); list.clear(); /* print the list value } currentworker ++; if (currentworker == membercount-1) { } } Member member = (Member) i.next(); st.close(); for(Map.Entry<Member, List<String>> entry:mapWork.entrySet()){ } public void load() Set members = getStorageMembers(cache); Map<Member, List<String>> mapWork = divideWork(members); InvocationService service = (InvocationService) for (Map.Entry<Member, List<String>> entry : mapWork.entrySet()) Member member = entry.getKey(); MyLoadInvocable task = new MyLoadInvocable(memberKeys, cache.getCacheName()); public static void main(String[] args) { } |
需要配置的客户端schema
storage-override-client.xml
<?xml version="1.0"?> <!DOCTYPE cache-config SYSTEM "cache-config.dtd"> <cache-config> <caching-scheme-mapping> <!-- Caches with names that start with ‘DBBacked‘ will be created as distributed-db-backed. --> <cache-mapping> <cache-name>SampleCache</cache-name> <scheme-name>distributed-pof</scheme-name> </cache-mapping> </caching-scheme-mapping> <caching-schemes> <!-- DB Backed Distributed caching scheme. --> <distributed-scheme> <scheme-name>distributed-pof</scheme-name> <service-name>DistributedCache</service-name> <backing-map-scheme> <read-write-backing-map-scheme> <internal-cache-scheme> <cachestore-scheme> <listener/> <invocation-scheme> </caching-schemes> |
存储节点的Schema
<?xml version="1.0"?> <!DOCTYPE cache-config SYSTEM "cache-config.dtd"> <cache-config> <caching-scheme-mapping> <!-- Caches with names that start with ‘DBBacked‘ will be created as distributed-db-backed. --> <cache-mapping> <cache-name>SampleCache</cache-name> <scheme-name>distributed-pof</scheme-name> </cache-mapping> </caching-scheme-mapping> <caching-schemes> <!-- DB Backed Distributed caching scheme. --> <distributed-scheme> <scheme-name>distributed-pof</scheme-name> <service-name>DistributedCache</service-name> <backing-map-scheme> <read-write-backing-map-scheme> <internal-cache-scheme> <cachestore-scheme> <listener/> <invocation-scheme> </caching-schemes> |
输出结果
可见数据分片装载.