Oracle Coherence中文教程二十:预加载缓存

预加载缓存

本章介绍了不同的模式,你可以用它来预加载缓存。该模式包括批量装载和分布载荷。

本章包含以下各节:

执行批量加载和处理

执行分布式批量加载

20.1执行批量加载和处理

例20-5, PagedQuery.java ,演示了在一个连贯缓存技术,有效地批量加载和处理项目。

20.1.1批量写入缓存

使用连贯性时,一个常见的场景是预先填充缓存应用程序使用它之前。一个简单的方法来做到这例20-1中的Java代码所示:

例20-1预加载缓存


public static void bulkLoad(NamedCache cache, Connection conn)

{

Statement s;

ResultSet rs;

try

{

s = conn.createStatement();

rs = s.executeQuery("select key, value from table");

while (rs.next())

{

Integer key   = new Integer(rs.getInt(1));

String  value = rs.getString(2);

cache.put(key, value);

}

...

}

catch (SQLException e)

{...}

}

这种技术的工作原理,但每次通话将可能导致网络流量,特别是分区和复制缓存。此外,把每个调用返回的对象,它只是替换缓存(每java.util.Map接口)增加了更多不必要的开销。加载缓存可以进行更高效而不是通过使用ConcurrentMap.putAll的方法。这一点在实施例20-2 :

例20-2预加载缓存使用ConcurrentMap.putAll


public static void bulkLoad(NamedCache cache, Connection conn)

{

Statement s;

ResultSet rs;

Map       buffer = new HashMap();

try

{

int count = 0;

s = conn.createStatement();

rs = s.executeQuery("select key, value from table");

while (rs.next())

{

Integer key   = new Integer(rs.getInt(1));

String  value = rs.getString(2);

buffer.put(key, value);

// this loads 1000 items at a time into the cache

if ((count++ % 1000) == 0)

{

cache.putAll(buffer);

buffer.clear();

}

}

if (!buffer.isEmpty())

{

cache.putAll(buffer);

}

...

}

catch (SQLException e)

{...}

}

20.1.2高效处理过滤结果

Coherence提供基于标准的查询缓存的能力,通过使用过滤器API 。下面是一个例子(给定的整数键和字符串值项) :

例20-3使用过滤器来查询缓存


NamedCache c = CacheFactory.getCache("test");

// Search for entries that start with ‘c‘

Filter query = new LikeFilter(IdentityExtractor.INSTANCE, "c%", ‘\\‘, true);

// Perform query, return all entries that match

Set results = c.entrySet(query);

for (Iterator i = results.iterator(); i.hasNext();)

{

Map.Entry e = (Map.Entry) i.next();

out("key: "+e.getKey() + ", value: "+e.getValue());

}

此示例适用于小的数据集,但它可能会遇到的问题,如运行的堆空间,如果数据集太大。例20-4说明了一种模式来处理查询结果分批来避免这个问题:

示例20-4批量处理查询结果


public static void performQuery()

{

NamedCache c = CacheFactory.getCache("test");

// Search for entries that start with ‘c‘

Filter query = new LikeFilter(IdentityExtractor.INSTANCE, "c%", ‘\\‘, true);

// Perform query, return keys of entries that match

Set keys = c.keySet(query);

// The amount of objects to process at a time

final int BUFFER_SIZE = 100;

// Object buffer

Set buffer = new HashSet(BUFFER_SIZE);

for (Iterator i = keys.iterator(); i.hasNext();)

{

buffer.add(i.next());

if (buffer.size() >= BUFFER_SIZE)

{

// Bulk load BUFFER_SIZE number of objects from cache

Map entries = c.getAll(buffer);

// Process each entry

process(entries);

// Done processing these keys, clear buffer

buffer.clear();

}

}

// Handle the last partial chunk (if any)

if (!buffer.isEmpty())

{

process(c.getAll(buffer));

}

}

public static void process(Map map)

{

for (Iterator ie = map.entrySet().iterator(); ie.hasNext();)

{

Map.Entry e = (Map.Entry) ie.next();

out("key: "+e.getKey() + ", value: "+e.getValue());

}

}

在这个例子中,过滤器匹配的条目的所有密钥都返回,但只有BUFFER_SIZE (在这种情况下, 100 )是在同一时间从缓存中检索条目。

请注意,的可以处理LimitFilter部分中的结果,类似于上面的例子。然而LimitFilter是指,例如在用户界面中的结果的情况下,寻呼。它不是一种有效的方法来处理一个查询结果中的所有数据。

20.1.3批量加载和处理实例

示例20-5说明PagedQuery.java,有示例程序,演示在上一节所描述的概念。

若要运行该示例,请按照下列步骤操作:

保存下面的Java文件,com/tangosol/examples/PagedQuery.java

点的classpath的连贯库和当前目录

编译并运行示例

示例20-5 A样品批量加载程序


package com.tangosol.examples;

import com.tangosol.net.CacheFactory;

import com.tangosol.net.NamedCache;

import com.tangosol.net.cache.NearCache;

import com.tangosol.util.Base;

import com.tangosol.util.Filter;

import com.tangosol.util.filter.LikeFilter;

import java.io.Serializable;

import java.util.HashMap;

import java.util.Iterator;

import java.util.Map;

import java.util.Random;

import java.util.Set;

import java.util.HashSet;

/**

* This sample application demonstrates the following:

* <ul>

* <li>

* <b>Obtaining a back cache from a near cache for populating a cache.</b>

* Since the near cache holds a limited subset of the data in a cache it is

* more efficient to bulk load data directly into the back cache instead of

* the near cache.

* </li>

* <li>

* <b>Populating a cache in bulk using <tt>putAll</tt>.</b>

* This is more efficient than <tt>put</tt> for a large amount of entries.

* </li>

* <li>

* <b>Executing a filter against a cache and processing the results in bulk.</b>

* This sample issues a query against the cache using a filter.  The result is

* a set of keys that represent the query results.  Instead of iterating

* through the keys and loading each item individually with a <tt>get</tt>,

* this sample loads entries from the cache in bulk using <tt>getAll</tt> which

* is more efficient.

* </li>

*

* @author cp

*/

public class PagedQuery

extends Base

{

/**

* Command line execution entry point.

*/

public static void main(String[] asArg)

{

NamedCache cacheContacts = CacheFactory.getCache("contacts",

Contact.class.getClassLoader());

populateCache(cacheContacts);

executeFilter(cacheContacts);

CacheFactory.shutdown();

}

// ----- populate the cache ---------------------------------------------

/**

* Populate the cache with test data.  This example shows how to populate

* the cache a chunk at a time using {@link NamedCache#putAll} which is more

* efficient than {@link NamedCache#put}.

*

* @param cacheDirect  the cache to populate. Note that this should <b>not</b>

*                     be a near cache since that thrashes the cache

*                     if the load size exceeds the near cache max size.

*/

public static void populateCache(NamedCache cacheDirect)

{

if (cacheDirect.isEmpty())

{

Map mapBuffer = new HashMap();

for (int i = 0; i < 100000; ++i)

{

// some fake data

Contact contact = new Contact();

contact.setName(getRandomName() + ‘ ‘ + getRandomName());

contact.setPhone(getRandomPhone());

mapBuffer.put(new Integer(i), contact);

// this loads 1000 items at a time into the cache

if ((i % 1000) == 0)

{

out("Adding "+mapBuffer.size()+" entries to cache");

cacheDirect.putAll(mapBuffer);

mapBuffer.clear();

}

}

if (!mapBuffer.isEmpty())

{

cacheDirect.putAll(mapBuffer);

}

}

}

/**

* Creates a random name.

*

* @return  a random string between 4 to 11 chars long

*/

public static String getRandomName()

{

Random rnd = getRandom();

int    cch = 4 + rnd.nextInt(7);

char[] ach = new char[cch];

ach[0] = (char) (‘A‘ + rnd.nextInt(26));

for (int of = 1; of < cch; ++of)

{

ach[of] = (char) (‘a‘ + rnd.nextInt(26));

}

return new String(ach);

}

/**

* Creates a random phone number

*

* @return  a random string of integers 10 chars long

*/

public static String getRandomPhone()

{

Random rnd = getRandom();

return "("

+ toDecString(100 + rnd.nextInt(900), 3)

+ ") "

+ toDecString(100 + rnd.nextInt(900), 3)

+ "-"

+ toDecString(10000, 4);

}

// ----- process the cache ----------------------------------------------

/**

* Query the cache and process the results in batches.  This example

* shows how to load a chunk at a time using {@link NamedCache#getAll}

* which is more efficient than {@link NamedCache#get}.

*

* @param cacheDirect  the cache to issue the query against

*/

private static void executeFilter(NamedCache cacheDirect)

{

Filter query = new LikeFilter("getName", "C%");

// to process 100 entries at a time

final int CHUNK_COUNT = 100;

// Start by querying for all the keys that match

Set setKeys = cacheDirect.keySet(query);

// Create a collection to hold the "current" chunk of keys

Set setBuffer = new HashSet();

// Iterate through the keys

for (Iterator iter = setKeys.iterator(); iter.hasNext(); )

{

// Collect the keys into the current chunk

setBuffer.add(iter.next());

// handle the current chunk when it gets big enough

if (setBuffer.size() >= CHUNK_COUNT)

{

// Instead of retrieving each object with a get,

// retrieve a chunk of objects at a time with a getAll.

processContacts(cacheDirect.getAll(setBuffer));

setBuffer.clear();

}

}

// Handle the last partial chunk (if any)

if (!setBuffer.isEmpty())

{

processContacts(cacheDirect.getAll(setBuffer));

}

}

/**

* Process the map of contacts. In a real application some sort of

* processing for each map entry would occur. In this example each

* entry is logged to output.

*

* @param map  the map of contacts to be processed

*/

public static void processContacts(Map map)

{

out("processing chunk of " + map.size() + " contacts:");

for (Iterator iter = map.entrySet().iterator(); iter.hasNext(); )

{

Map.Entry entry = (Map.Entry) iter.next();

out("  [" + entry.getKey() + "]=" + entry.getValue());

}

}

// ----- inner classes --------------------------------------------------

/**

* Sample object used to populate cache

*/

public static class Contact

extends Base

implements Serializable

{

public Contact() {}

public String getName()

{

return m_sName;

}

public void setName(String sName)

{

m_sName = sName;

}

public String getPhone()

{

return m_sPhone;

}

public void setPhone(String sPhone)

{

m_sPhone = sPhone;

}

public String toString()

{

return "Contact{"

+ "Name=" + getName()

+ ", Phone=" + getPhone()

+ "}";

}

public boolean equals(Object o)

{

if (o instanceof Contact)

{

Contact that = (Contact) o;

return equals(this.getName(), that.getName())

&& equals(this.getPhone(), that.getPhone());

}

return false;

}

public int hashCode()

{

int result;

result = (m_sName != null ? m_sName.hashCode() : 0);

result = 31 * result + (m_sPhone != null ? m_sPhone.hashCode() : 0);

return result;

}

private String m_sName;

private String m_sPhone;

}

}

例20-6说明了终端输出的连贯性,当您编译和运行示例:

例20-6终端输出批量加载程序

$ export COHERENCE_HOME=[**Coherence install directory**]

$ export CLASSPATH=$COHERENCE_HOME/lib/coherence.jar:.

$ javac com/tangosol/examples/PagedQuery.java

$ java com.tangosol.examples.PagedQuery


2008-09-15 12:19:44.156 Oracle Coherence 3.4/405 <Info> (thread=main, member=n/a): Loaded operational configuration from

resource "jar:file:/C:/coherence/lib/coherence.jar!/tangosol-coherence.xml"

2008-09-15 12:19:44.171 Oracle Coherence 3.4/405 <Info> (thread=main, member=n/a): Loaded operational overrides from

resource "jar:file:/C:/coherence/lib/coherence.jar!/tangosol-coherence-override-dev.xml"

2008-09-15 12:19:44.171 Oracle Coherence 3.4/405 <D5> (thread=main, member=n/a): Optional configuration override

"/tangosol-coherence-override.xml" is not specified

Oracle Coherence Version 3.4/405

Grid Edition: Development mode

Copyright (c) 2000-2008 Oracle. All rights reserved.

2008-09-15 12:19:44.812 Oracle Coherence GE 3.4/405 <D5> (thread=Cluster, member=n/a): Service Cluster joined the cluster

with senior service member n/a

2008-09-15 12:19:48.062 Oracle Coherence GE 3.4/405 <Info> (thread=Cluster, member=n/a): Created a new cluster with

Member(Id=1, Timestamp=2008-09-15 12:19:44.609, Address=xxx.xxx.x.xxx:8088, MachineId=26828, Edition=Grid Edition,

Mode=Development, CpuCount=2, SocketCount=1) UID=0xC0A800CC00000112B9BC9B6168CC1F98

Adding 1024 entries to cache

Adding 1024 entries to cache

...repeated many times...

Adding 1024 entries to cache

Adding 1024 entries to cache

Adding 1024 entries to cache

processing chunk of 100 contacts:

[25827]=Contact{Name=Cgkyleass Kmknztk, Phone=(285) 452-0000}

[4847]=Contact{Name=Cyedlujlc Ruexrtgla, Phone=(255) 296-0000}

...repeated many times

[33516]=Contact{Name=Cjfwlxa Wsfhrj, Phone=(683) 968-0000}

[71832]=Contact{Name=Clfsyk Dwncpr, Phone=(551) 957-0000}

processing chunk of 100 contacts:

[38789]=Contact{Name=Cezmcxaokf Kwztt, Phone=(725) 575-0000}

[87654]=Contact{Name=Cuxcwtkl Tqxmw, Phone=(244) 521-0000}

...repeated many times

[96164]=Contact{Name=Cfpmbvq Qaxty, Phone=(596) 381-0000}

[29502]=Contact{Name=Cofcdfgzp Nczpdg, Phone=(563) 983-0000}

...

processing chunk of 80 contacts:

[49179]=Contact{Name=Czbjokh Nrinuphmsv, Phone=(140) 353-0000}

[84463]=Contact{Name=Cyidbd Rnria, Phone=(571) 681-0000}

...

[2530]=Contact{Name=Ciazkpbos Awndvrvcd, Phone=(676) 700-0000}

[9371]=Contact{Name=Cpqo Rmdw, Phone=(977) 729-0000}

20.2执行分布式批量加载

预填充一个大的数据集分区的高速缓存相干时,它可能是更有效的分发工作的连贯性群集成员。分布式负载允许更高的数据吞吐率利用总的网络带宽和CPU功率的集群缓存。当执行一个分布式的负载,应用程序必须决定在以下方面:

群集成员执行负载

如何划分各成员之间的数据集

应用程序应该考虑被放置在选择成员时,底层的数据源(如数据库或文件系统)和除以工作负载。例如,一个单一的数据库可以很容易被淹没,如果有太多的成员同时执行查询。

20.2.1分布式批量加载示例

本节概述了一般的步骤来执行一个简单的分布式负载。该示例假定该数据被存储在文件中,并分发给所有的存储功能的群集的成员。

1、检索存储功能的成员的组。例如,下面的方法使用的getStorageEnabledMembers方法来检索存储功能的会员提供了一个分布式缓存。

示例20-7检索存储启用缓存成员


protected Set getStorageMembers(NamedCache cache)

{

return ((PartitionedService) cache.getCacheService())

.getOwnershipEnabledMembers();

}

2、除存储功能的集群成员之间的工作。例如,下面的程序返回一个地图,键入成员,包含分配给该成员的文件列表。

例20-8常规分配到一个缓存成员的文件列表


protected Map<Member, List<String>> divideWork(Set members, List<String> fileNames)

{

Iterator i = members.iterator();

Map<Member, List<String>> mapWork = new HashMap(members.size());

for (String sFileName : fileNames)

{

Member member = (Member) i.next();

List<String> memberFileNames = mapWork.get(member);

if (memberFileNames == null)

{

memberFileNames = new ArrayList();

mapWork.put(member, memberFileNames);

}

memberFileNames.add(sFileName);

// recycle through the members

if (!i.hasNext())

{

i = members.iterator();

}

}

return mapWork;

}

3、启动执行负载对每个成员的任务。例如,使用相干InvocationService “启动任务。在这种情况下,的实施LoaderInvocable必须遍历memberFileNames的和处理的每个文件,其内容加载到高速缓存中。通常在客户端上进行高速缓存操作必须执行通过LoaderInvocable 。

例20-9类加载缓存的每个成员


public void load()

{

NamedCache cache = getCache();

Set members = getStorageMembers(cache);

List<String> fileNames = getFileNames();

Map<Member, List<String>> mapWork = divideWork(members, fileNames);

InvocationService service = (InvocationService)

CacheFactory.getService("InvocationService");

for (Map.Entry<Member, List<String>> entry : mapWork.entrySet())

{

Member member = entry.getKey();

List<String> memberFileNames = entry.getValue();

LoaderInvocable task = new LoaderInvocable(memberFileNames, cache.getCacheName());

service.execute(task, Collections.singleton(member), this);

}

}

时间: 2024-10-10 16:34:30

Oracle Coherence中文教程二十:预加载缓存的相关文章

Oracle Coherence中文教程二十五:Map管理操作触发器

Map管理操作触发器 Map触发补充标准的Oracle Coherence的能力,以提供高度自定义的缓存管理系统.例如,地图触发器可以防止非法交易,执行复杂安全授权或复杂的业务规则,提供透明的事件日志和审计,并收集统计数据修改.触发器的其他可能用途,包括限制行动,打击一个缓存,在应用程序重新部署时间发出. 例如,假设你有代码是与NamedCache工作,条目插入地图之前,你想改变一个条目的行为或内容.除了地图触发,而无需修改现有的代码,使你做出这种改变. 地图触发器也可以作为升级过程的一部分.除

Oracle Coherence中文教程二十四:在高速缓存中的数据处理

在高速缓存中的数据处理 coherence提供了理想的基础设施建设数据网格服务和客户端和基于服务器的应用程序使用数据网格.在一个基本的层面上,相干可以在大量的服务器在网格管理一个巨大的数据量,它可以提供接近零延迟访问该数据,它支持跨数据的并行查询中的map-reduce方式;它支持数据库和EIS系统,作为该数据的记录系统的集成.此外,Coherence提供一些服务,这是建立有效的数据网格的理想选择. 本章包含以下各节: 有针对性的执行 并行执行 基于查询的执行 数据网格范围内执行 代理针对性,并

Oracle Coherence中文教程二十六:使用Coherence Query语言

使用Coherence Query语言 本章介绍如何使用连贯性的查询语言(CohQL)互动与连贯性高速缓存. CohQL是一个重量轻语法(SQL的传统),用于执行高速缓存操作上的连贯群集.语言可用于以编程方式或从一个命令行工具. 本章包含以下各节: 了解连贯性查询语言语法 使用命令行工具CohQL 大厦过滤器在Java程序 其他相干查询语言范例 注意事项: 虽然可能会出现CohQL语法类似于SQL,重要的是要记住,不是SQL的语法,实际上是更多的上下文相关的Java持久化查询语言(JPQL)标准

Oracle Coherence中文教程二十一:使用缓存事件

使用缓存事件 Coherence提供缓存使用JavaBean事件模型的事件.收到你的需要,你需要他们的地方,不管变化实际上是发生在集群中的事件,这是非常简单的.熟悉JavaBean模型的开发应与事件工作有没有困难,即使在复杂的群集. 本章包含以下各节: 监听器接口和事件对象 了解事件担保 支持活动的缓存和类 注册的所有活动 使用内部类作为MapListener 配置一个MapListener一个Cache 具体身份签约活动 过滤事件 "精简版"的活动 高级:听查询 高级:合成事件 高级

Oracle Coherence中文教程二:安装Oracle Coherence

安装Oracle Coherence 本章提供说明安装Oracle Coherence的java(简称为Coherence).本章不包括安装连贯性的说明*扩展的客户端分发(C ++和.NET)或相干*网站.请参阅Oracle Coherence的客户指南和Oracle Coherence的Oracle Coherence的网络,用户指南,安装这些组件的说明. 本章包含以下各节: 系统要求 提取分派 设置环境变量 初次运行连贯性 2.1系统需求 以下是建议的最低系统要求安装在开发环境中的连贯性:

Oracle Coherence中文教程二十三:使用连续查询缓存

使用连续查询缓存 虽然有可能取得时间从连贯缓存查询结果中的一个点,并有可能以接收事件,将改变该查询的结果,连贯性提供了一个功能,它结合了与一个连续的数据流中的相关事件的查询结果在一个实时的方式保持一个最新的日期的查询结果.这种能力被称作连续查询,因为它具有相同的效果,如果想要查询的零延迟和查询被执行多次,每毫秒!点时间查询结果和事件的更多信息,请参见第22章,"查询缓存中的数据." 相干物化成一个连续查询缓存查询结果,然后,缓存保持最新实时查询使用事件侦听器实现连续查询功能.换句话说,

Oracle Coherence中文教程十四:缓存数据来源

缓存数据来源 本章提供了用于缓存数据源使用作为临时记录系统的连贯性.本章包括样品和实施注意事项. 本章包含以下各节: 的缓存数据来源概述 选择一个高速缓存策略 创建一个缓存存储实现 在缓存存储实施堵漏 样品的缓存存储 可控的缓存存储范例 实施注意事项 14.1缓存数据源概述 Coherence支持透明,读/写缓存的任何数据源,包括数据库,Web服务,打包应用程序和文件系统,数据库是最常见的使用案例.作为速记, "数据库"是用来形容任何后端数据源.有效的缓存必须同时支持密集只读和读/写操

SWFObject使用方法和中文教程及IE无法加载播放的问题

SWFObject使用方法和中文教程 SWFObject的使用是非常简单的,只需要包含 swfobject.js这个js文件,然后在DOM中插入一些简单的JS代码,就能嵌入Flash媒体资源了. 下面是一个最简单的范例: 1.文件顶部需加载swfobject.js <script type="text/javascript" src="swfobject.js"></script> <script type="text/jav

Oracle Coherence中文教程十九:使用便携式对象格式

使用便携式对象格式 使用便携式对象格式(POF)具有许多优点,包括语言独立性的性能优势.建议你仔细看您的系列化解决方案时,在POF工作具有连贯性.对于如何使用POF建设.NET时,延长客户的信息,请参阅"楼宇集成对象NET客户端"Oracle Coherence的客户端指南.对于如何构建C++扩展客户时,与POF的信息,请参阅"楼宇集成对象的C + +客户端"Oracle Coherence的客户端指南. 本章包含以下各节: POF系列化概述 使用POF API序列