预加载缓存
本章介绍了不同的模式,你可以用它来预加载缓存。该模式包括批量装载和分布载荷。
本章包含以下各节:
执行批量加载和处理
执行分布式批量加载
20.1执行批量加载和处理
例20-5, PagedQuery.java ,演示了在一个连贯缓存技术,有效地批量加载和处理项目。
20.1.1批量写入缓存
使用连贯性时,一个常见的场景是预先填充缓存应用程序使用它之前。一个简单的方法来做到这例20-1中的Java代码所示:
例20-1预加载缓存
public static void bulkLoad(NamedCache cache, Connection conn) { Statement s; ResultSet rs; try { s = conn.createStatement(); rs = s.executeQuery("select key, value from table"); while (rs.next()) { Integer key = new Integer(rs.getInt(1)); String value = rs.getString(2); cache.put(key, value); } ... } catch (SQLException e) {...} } |
这种技术的工作原理,但每次通话将可能导致网络流量,特别是分区和复制缓存。此外,把每个调用返回的对象,它只是替换缓存(每java.util.Map接口)增加了更多不必要的开销。加载缓存可以进行更高效而不是通过使用ConcurrentMap.putAll的方法。这一点在实施例20-2 :
例20-2预加载缓存使用ConcurrentMap.putAll
public static void bulkLoad(NamedCache cache, Connection conn) { Statement s; ResultSet rs; Map buffer = new HashMap(); try { int count = 0; s = conn.createStatement(); rs = s.executeQuery("select key, value from table"); while (rs.next()) { Integer key = new Integer(rs.getInt(1)); String value = rs.getString(2); buffer.put(key, value); // this loads 1000 items at a time into the cache if ((count++ % 1000) == 0) { cache.putAll(buffer); buffer.clear(); } } if (!buffer.isEmpty()) { cache.putAll(buffer); } ... } catch (SQLException e) {...} } |
20.1.2高效处理过滤结果
Coherence提供基于标准的查询缓存的能力,通过使用过滤器API 。下面是一个例子(给定的整数键和字符串值项) :
例20-3使用过滤器来查询缓存
NamedCache c = CacheFactory.getCache("test"); // Search for entries that start with ‘c‘ Filter query = new LikeFilter(IdentityExtractor.INSTANCE, "c%", ‘\\‘, true); // Perform query, return all entries that match Set results = c.entrySet(query); for (Iterator i = results.iterator(); i.hasNext();) { Map.Entry e = (Map.Entry) i.next(); out("key: "+e.getKey() + ", value: "+e.getValue()); } |
此示例适用于小的数据集,但它可能会遇到的问题,如运行的堆空间,如果数据集太大。例20-4说明了一种模式来处理查询结果分批来避免这个问题:
示例20-4批量处理查询结果
public static void performQuery() { NamedCache c = CacheFactory.getCache("test"); // Search for entries that start with ‘c‘ Filter query = new LikeFilter(IdentityExtractor.INSTANCE, "c%", ‘\\‘, true); // Perform query, return keys of entries that match Set keys = c.keySet(query); // The amount of objects to process at a time final int BUFFER_SIZE = 100; // Object buffer Set buffer = new HashSet(BUFFER_SIZE); for (Iterator i = keys.iterator(); i.hasNext();) { buffer.add(i.next()); if (buffer.size() >= BUFFER_SIZE) { // Bulk load BUFFER_SIZE number of objects from cache Map entries = c.getAll(buffer); // Process each entry process(entries); // Done processing these keys, clear buffer buffer.clear(); } } // Handle the last partial chunk (if any) if (!buffer.isEmpty()) { process(c.getAll(buffer)); } } public static void process(Map map) { for (Iterator ie = map.entrySet().iterator(); ie.hasNext();) { Map.Entry e = (Map.Entry) ie.next(); out("key: "+e.getKey() + ", value: "+e.getValue()); } } |
在这个例子中,过滤器匹配的条目的所有密钥都返回,但只有BUFFER_SIZE (在这种情况下, 100 )是在同一时间从缓存中检索条目。
请注意,的可以处理LimitFilter部分中的结果,类似于上面的例子。然而LimitFilter是指,例如在用户界面中的结果的情况下,寻呼。它不是一种有效的方法来处理一个查询结果中的所有数据。
20.1.3批量加载和处理实例
示例20-5说明PagedQuery.java,有示例程序,演示在上一节所描述的概念。
若要运行该示例,请按照下列步骤操作:
保存下面的Java文件,com/tangosol/examples/PagedQuery.java
点的classpath的连贯库和当前目录
编译并运行示例
示例20-5 A样品批量加载程序
package com.tangosol.examples; import com.tangosol.net.CacheFactory; import com.tangosol.net.NamedCache; import com.tangosol.net.cache.NearCache; import com.tangosol.util.Base; import com.tangosol.util.Filter; import com.tangosol.util.filter.LikeFilter; import java.io.Serializable; import java.util.HashMap; import java.util.Iterator; import java.util.Map; import java.util.Random; import java.util.Set; import java.util.HashSet; /** * This sample application demonstrates the following: * <ul> * <li> * <b>Obtaining a back cache from a near cache for populating a cache.</b> * Since the near cache holds a limited subset of the data in a cache it is * more efficient to bulk load data directly into the back cache instead of * the near cache. * </li> * <li> * <b>Populating a cache in bulk using <tt>putAll</tt>.</b> * This is more efficient than <tt>put</tt> for a large amount of entries. * </li> * <li> * <b>Executing a filter against a cache and processing the results in bulk.</b> * This sample issues a query against the cache using a filter. The result is * a set of keys that represent the query results. Instead of iterating * through the keys and loading each item individually with a <tt>get</tt>, * this sample loads entries from the cache in bulk using <tt>getAll</tt> which * is more efficient. * </li> * * @author cp */ public class PagedQuery extends Base { /** * Command line execution entry point. */ public static void main(String[] asArg) { NamedCache cacheContacts = CacheFactory.getCache("contacts", Contact.class.getClassLoader()); populateCache(cacheContacts); executeFilter(cacheContacts); CacheFactory.shutdown(); } // ----- populate the cache --------------------------------------------- /** * Populate the cache with test data. This example shows how to populate * the cache a chunk at a time using {@link NamedCache#putAll} which is more * efficient than {@link NamedCache#put}. * * @param cacheDirect the cache to populate. Note that this should <b>not</b> * be a near cache since that thrashes the cache * if the load size exceeds the near cache max size. */ public static void populateCache(NamedCache cacheDirect) { if (cacheDirect.isEmpty()) { Map mapBuffer = new HashMap(); for (int i = 0; i < 100000; ++i) { // some fake data Contact contact = new Contact(); contact.setName(getRandomName() + ‘ ‘ + getRandomName()); contact.setPhone(getRandomPhone()); mapBuffer.put(new Integer(i), contact); // this loads 1000 items at a time into the cache if ((i % 1000) == 0) { out("Adding "+mapBuffer.size()+" entries to cache"); cacheDirect.putAll(mapBuffer); mapBuffer.clear(); } } if (!mapBuffer.isEmpty()) { cacheDirect.putAll(mapBuffer); } } } /** * Creates a random name. * * @return a random string between 4 to 11 chars long */ public static String getRandomName() { Random rnd = getRandom(); int cch = 4 + rnd.nextInt(7); char[] ach = new char[cch]; ach[0] = (char) (‘A‘ + rnd.nextInt(26)); for (int of = 1; of < cch; ++of) { ach[of] = (char) (‘a‘ + rnd.nextInt(26)); } return new String(ach); } /** * Creates a random phone number * * @return a random string of integers 10 chars long */ public static String getRandomPhone() { Random rnd = getRandom(); return "(" + toDecString(100 + rnd.nextInt(900), 3) + ") " + toDecString(100 + rnd.nextInt(900), 3) + "-" + toDecString(10000, 4); } // ----- process the cache ---------------------------------------------- /** * Query the cache and process the results in batches. This example * shows how to load a chunk at a time using {@link NamedCache#getAll} * which is more efficient than {@link NamedCache#get}. * * @param cacheDirect the cache to issue the query against */ private static void executeFilter(NamedCache cacheDirect) { Filter query = new LikeFilter("getName", "C%"); // to process 100 entries at a time final int CHUNK_COUNT = 100; // Start by querying for all the keys that match Set setKeys = cacheDirect.keySet(query); // Create a collection to hold the "current" chunk of keys Set setBuffer = new HashSet(); // Iterate through the keys for (Iterator iter = setKeys.iterator(); iter.hasNext(); ) { // Collect the keys into the current chunk setBuffer.add(iter.next()); // handle the current chunk when it gets big enough if (setBuffer.size() >= CHUNK_COUNT) { // Instead of retrieving each object with a get, // retrieve a chunk of objects at a time with a getAll. processContacts(cacheDirect.getAll(setBuffer)); setBuffer.clear(); } } // Handle the last partial chunk (if any) if (!setBuffer.isEmpty()) { processContacts(cacheDirect.getAll(setBuffer)); } } /** * Process the map of contacts. In a real application some sort of * processing for each map entry would occur. In this example each * entry is logged to output. * * @param map the map of contacts to be processed */ public static void processContacts(Map map) { out("processing chunk of " + map.size() + " contacts:"); for (Iterator iter = map.entrySet().iterator(); iter.hasNext(); ) { Map.Entry entry = (Map.Entry) iter.next(); out(" [" + entry.getKey() + "]=" + entry.getValue()); } } // ----- inner classes -------------------------------------------------- /** * Sample object used to populate cache */ public static class Contact extends Base implements Serializable { public Contact() {} public String getName() { return m_sName; } public void setName(String sName) { m_sName = sName; } public String getPhone() { return m_sPhone; } public void setPhone(String sPhone) { m_sPhone = sPhone; } public String toString() { return "Contact{" + "Name=" + getName() + ", Phone=" + getPhone() + "}"; } public boolean equals(Object o) { if (o instanceof Contact) { Contact that = (Contact) o; return equals(this.getName(), that.getName()) && equals(this.getPhone(), that.getPhone()); } return false; } public int hashCode() { int result; result = (m_sName != null ? m_sName.hashCode() : 0); result = 31 * result + (m_sPhone != null ? m_sPhone.hashCode() : 0); return result; } private String m_sName; private String m_sPhone; } } |
例20-6说明了终端输出的连贯性,当您编译和运行示例:
例20-6终端输出批量加载程序
$ export COHERENCE_HOME=[**Coherence install directory**]
$ export CLASSPATH=$COHERENCE_HOME/lib/coherence.jar:.
$ javac com/tangosol/examples/PagedQuery.java
$ java com.tangosol.examples.PagedQuery
2008-09-15 12:19:44.156 Oracle Coherence 3.4/405 <Info> (thread=main, member=n/a): Loaded operational configuration from resource "jar:file:/C:/coherence/lib/coherence.jar!/tangosol-coherence.xml" 2008-09-15 12:19:44.171 Oracle Coherence 3.4/405 <Info> (thread=main, member=n/a): Loaded operational overrides from resource "jar:file:/C:/coherence/lib/coherence.jar!/tangosol-coherence-override-dev.xml" 2008-09-15 12:19:44.171 Oracle Coherence 3.4/405 <D5> (thread=main, member=n/a): Optional configuration override "/tangosol-coherence-override.xml" is not specified Oracle Coherence Version 3.4/405 Grid Edition: Development mode Copyright (c) 2000-2008 Oracle. All rights reserved. 2008-09-15 12:19:44.812 Oracle Coherence GE 3.4/405 <D5> (thread=Cluster, member=n/a): Service Cluster joined the cluster with senior service member n/a 2008-09-15 12:19:48.062 Oracle Coherence GE 3.4/405 <Info> (thread=Cluster, member=n/a): Created a new cluster with Member(Id=1, Timestamp=2008-09-15 12:19:44.609, Address=xxx.xxx.x.xxx:8088, MachineId=26828, Edition=Grid Edition, Mode=Development, CpuCount=2, SocketCount=1) UID=0xC0A800CC00000112B9BC9B6168CC1F98 Adding 1024 entries to cache Adding 1024 entries to cache ...repeated many times... Adding 1024 entries to cache Adding 1024 entries to cache Adding 1024 entries to cache processing chunk of 100 contacts: [25827]=Contact{Name=Cgkyleass Kmknztk, Phone=(285) 452-0000} [4847]=Contact{Name=Cyedlujlc Ruexrtgla, Phone=(255) 296-0000} ...repeated many times [33516]=Contact{Name=Cjfwlxa Wsfhrj, Phone=(683) 968-0000} [71832]=Contact{Name=Clfsyk Dwncpr, Phone=(551) 957-0000} processing chunk of 100 contacts: [38789]=Contact{Name=Cezmcxaokf Kwztt, Phone=(725) 575-0000} [87654]=Contact{Name=Cuxcwtkl Tqxmw, Phone=(244) 521-0000} ...repeated many times [96164]=Contact{Name=Cfpmbvq Qaxty, Phone=(596) 381-0000} [29502]=Contact{Name=Cofcdfgzp Nczpdg, Phone=(563) 983-0000} ... processing chunk of 80 contacts: [49179]=Contact{Name=Czbjokh Nrinuphmsv, Phone=(140) 353-0000} [84463]=Contact{Name=Cyidbd Rnria, Phone=(571) 681-0000} ... [2530]=Contact{Name=Ciazkpbos Awndvrvcd, Phone=(676) 700-0000} [9371]=Contact{Name=Cpqo Rmdw, Phone=(977) 729-0000} |
20.2执行分布式批量加载
预填充一个大的数据集分区的高速缓存相干时,它可能是更有效的分发工作的连贯性群集成员。分布式负载允许更高的数据吞吐率利用总的网络带宽和CPU功率的集群缓存。当执行一个分布式的负载,应用程序必须决定在以下方面:
群集成员执行负载
如何划分各成员之间的数据集
应用程序应该考虑被放置在选择成员时,底层的数据源(如数据库或文件系统)和除以工作负载。例如,一个单一的数据库可以很容易被淹没,如果有太多的成员同时执行查询。
20.2.1分布式批量加载示例
本节概述了一般的步骤来执行一个简单的分布式负载。该示例假定该数据被存储在文件中,并分发给所有的存储功能的群集的成员。
1、检索存储功能的成员的组。例如,下面的方法使用的getStorageEnabledMembers方法来检索存储功能的会员提供了一个分布式缓存。
示例20-7检索存储启用缓存成员
protected Set getStorageMembers(NamedCache cache) { return ((PartitionedService) cache.getCacheService()) .getOwnershipEnabledMembers(); } |
2、除存储功能的集群成员之间的工作。例如,下面的程序返回一个地图,键入成员,包含分配给该成员的文件列表。
例20-8常规分配到一个缓存成员的文件列表
protected Map<Member, List<String>> divideWork(Set members, List<String> fileNames) { Iterator i = members.iterator(); Map<Member, List<String>> mapWork = new HashMap(members.size()); for (String sFileName : fileNames) { Member member = (Member) i.next(); List<String> memberFileNames = mapWork.get(member); if (memberFileNames == null) { memberFileNames = new ArrayList(); mapWork.put(member, memberFileNames); } memberFileNames.add(sFileName); // recycle through the members if (!i.hasNext()) { i = members.iterator(); } } return mapWork; } |
3、启动执行负载对每个成员的任务。例如,使用相干InvocationService “启动任务。在这种情况下,的实施LoaderInvocable必须遍历memberFileNames的和处理的每个文件,其内容加载到高速缓存中。通常在客户端上进行高速缓存操作必须执行通过LoaderInvocable 。
例20-9类加载缓存的每个成员
public void load() { NamedCache cache = getCache(); Set members = getStorageMembers(cache); List<String> fileNames = getFileNames(); Map<Member, List<String>> mapWork = divideWork(members, fileNames); InvocationService service = (InvocationService) CacheFactory.getService("InvocationService"); for (Map.Entry<Member, List<String>> entry : mapWork.entrySet()) { Member member = entry.getKey(); List<String> memberFileNames = entry.getValue(); LoaderInvocable task = new LoaderInvocable(memberFileNames, cache.getCacheName()); service.execute(task, Collections.singleton(member), this); } } |