Oracle Coherence中文教程二十四:在高速缓存中的数据处理

在高速缓存中的数据处理

coherence提供了理想的基础设施建设数据网格服务和客户端和基于服务器的应用程序使用数据网格。在一个基本的层面上,相干可以在大量的服务器在网格管理一个巨大的数据量,它可以提供接近零延迟访问该数据,它支持跨数据的并行查询中的map-reduce方式;它支持数据库和EIS系统,作为该数据的记录系统的集成。此外,Coherence提供一些服务,这是建立有效的数据网格的理想选择。

本章包含以下各节:

有针对性的执行

并行执行

基于查询的执行

数据网格范围内执行

代理针对性,并行和基于查询的执行

数据网格聚合

基于节点的执行

工作管理

24.1有针对性的执行

Coherence提供执行能力对一个条目的代理人在任何地图数据网格管理的数据:

map.invoke(key, agent);

在分区的数据的情况下,代理上的网格节点拥有数据执行对执行。该网格节点上发生的排队,并发管理,执行代理,由代理的数据访问,和数据修改代理。 (同步所得到的备份数据的修改,如果有的话,需要额外的网络流量。 )对于很多处理的目的,它是更有效的移动代理程序的序列化形式(通常只有几百个字节,最多)比处理分布式并发控制,一致性和数据更新。

对于请求/响应处理,代理返回的结果:

Object oResult = map.invoke(key, agent);

换句话说,作为一个数据网格的连贯性决定位置执行基于数据拓扑结构的配置代理,移动代理,执行代理(自动处理并发控制的项目,同时执行代理) ,备份如有修改,并返回结果。

24.2并行执行

Coherence提供的map-reduce允许代理执行并行跨越所有节点在网格中的条目对收集的功能。并行执行允许平衡跨网格工作要处理大量的数据。 invokeAll是方法的使用方式如下:

map.invokeAll(collectionKeys, agent);

对于请求/响应处理,代理处理每个键返回一个结果:

Map mapResults = map.invokeAll(collectionKeys, agent);

相干决定执行代理程序的基础上的配置数据拓扑的最佳位置,移动代理,执行代理(自动处理并发控制项 ,同时执行代理) ,备份修改(如有的话) ,并返回聚结的结果。请参阅“数据网格聚合”的说明,对结果集进行聚集。

24.3基于查询的执行

Coherence支持查询的能力在整个数据网格。例如,在一个交易系统是可能查询所有开放Order对象为特定的交易者:

例24-1查询整个数据网格

NamedCache map    = CacheFactory.getCache("trades");

Filter     filter = new AndFilter(new EqualsFilter("getTrader", traderid),

new EqualsFilter("getStatus", Status.OPEN));

Set setOpenTradeIds = mapTrades.keySet(filter);

Coherence提供此功能相结合,在数据网格中的并行执行,执行能力的代理人对查询。在上一节中,执行并行发生的,而不是返回的身份或与查询匹配的条目,相干执行代理对条目:

map.invokeAll(filter, agent);

对于请求/响应处理,代理处理每个键返回一个结果:

Map mapResults = map.invokeAll(filter, agent);

换句话说,连贯性结合的并行查询,及其并行执行对数据网格中,实现基于查询的代理调用。

24.4数据网格范围内执行

AlwaysFilter (或空)invokeAll是方法的一个实例传递导致通过代理执行对InvocableMap中的所有条目:

map.invokeAll((Filter) null, agent);

与其他类型的代理调用,请求/响应处理支持

Map mapResults = map.invokeAll((Filter) null, agent);

应用程序可以处理所有的数据分布在一个特定的地图数据网格,一个单一的代码行。

24.5针对性,并行和基于查询的执行代理

代理实现EntryProcessor的接口,通常是通过扩展AbstractProcessor类。

有几个代理与连贯性,包括:

AbstractProcessor - 一个抽象基类,用于建设EntryProcessor

ExtractorProcessor - 提取并返回一个值(如属性值)从一个对象存储在InvocableMap   的

CompositeProcessor - 捆绑了收集EntryProcessor对象对同一条目顺序调用

ConditionalProcessor - 有条件调用EntryProcessor ,如果过滤器对进入到进程的计算结 果为true

PropertyProcessor - 一个抽象基类的实现依赖于一个PropertyManipulator EntryProcessor

NumberIncrementor - 原始积分类型的任何财产,和字节,短,整型,长,浮动,双,  BigInteger的前或后递增的BigDecimal

NumberMultiplier - 原始积分类型的任何财产,和字节,短,整型,长,浮动,双,的   BigInteger , BigDecimal的,相乘,并返回以前的或新的价值

的EntryProcessor接口(皆为InvocableMap接口)只包含两个方法:

示例24-2方法在EntryProcessor接口


/**

* An invocable agent that operates against the Entry objects within a

* Map.

*/

public interface EntryProcessor

extends Serializable

{

/**

* Process a Map Entry.

*

* @param entry  the Entry to process

*

* @return the result of the processing, if any

*/

public Object process(Entry entry);

/**

* Process a Set of InvocableMap Entry objects. This method is

* semantically equivalent to:

* <pre>

*   Map mapResults = new ListMap();

*   for (Iterator iter = setEntries.iterator(); iter.hasNext(); )

*       {

*       Entry entry = (Entry) iter.next();

*       mapResults.put(entry.getKey(), process(entry));

*       }

*   return mapResults;

* </pre>

*

* @param setEntries  a read-only Set of InvocableMap Entry objects to

*                    process

*

* @return a Map containing the results of the processing, up to one

*         entry for each InvocableMap Entry that was processed, keyed

*         by the keys of the Map that were processed, with a

*         corresponding value being the result of the processing for

*         each key

*/

public Map processAll(Set setEntries);

}

(AbstractProcessor实现processAll的方法,如在前面的例子所述。 )

该InvocableMap.Entry被传递到EntryProcessor的是一个扩展的的Map.Entry接口,允许一个EntryProcessor实现获得必要的信息条目,并作出必要的修改,尽可能最有效的方式:

例24-3 InvocableMap.Entry API


/**

* An InvocableMap Entry contains additional information and exposes

* additional operations that the basic Map Entry does not. It allows

* non-existent entries to be represented, thus allowing their optional

* creation. It allows existent entries to be removed from the Map. It

* supports several optimizations that can ultimately be mapped

* through to indexes and other data structures of the underlying Map.

*/

public interface Entry

extends Map.Entry

{

// ----- Map Entry interface ------------------------------------

/**

* Return the key corresponding to this entry. The resultant key does

* not necessarily exist within the containing Map, which is to say

* that <tt>InvocableMap.this.containsKey(getKey)</tt> could return

* false. To test for the presence of this key within the Map, use

* {@link #isPresent}, and to create the entry for the key, use

* {@link #setValue}.

*

* @return the key corresponding to this entry; may be null if the

*         underlying Map supports null keys

*/

public Object getKey();

/**

* Return the value corresponding to this entry. If the entry does

* not exist, then the value is null. To differentiate between

* a null value and a non-existent entry, use {@link #isPresent}.

* <p/>

* <b>Note:</b> any modifications to the value retrieved using this

* method are not guaranteed to persist unless followed by a

* {@link #setValue} or {@link #update} call.

*

* @return the value corresponding to this entry; may be null if the

*         value is null or if the Entry does not exist in the Map

*/

public Object getValue();

/**

* Store the value corresponding to this entry. If the entry does

* not exist, then the entry is created by invoking this method,

* even with a null value (assuming the Map supports null values).

*

* @param oValue  the new value for this Entry

*

* @return the previous value of this Entry, or null if the Entry did

*         not exist

*/

public Object setValue(Object oValue);

// ----- InvocableMap Entry interface ---------------------------

/**

* Store the value corresponding to this entry. If the entry does

* not exist, then the entry is created by invoking this method,

* even with a null value (assuming the Map supports null values).

* <p/>

* Unlike the other form of {@link #setValue(Object) setValue}, this

* form does not return the previous value, and consequently may be

* significantly less expensive (in terms of cost of execution) for

* certain Map implementations.

*

* @param oValue      the new value for this Entry

* @param fSynthetic  pass true only if the insertion into or

*                    modification of the Map should be treated as a

*                    synthetic event

*/

public void setValue(Object oValue, boolean fSynthetic);

/**

* Extract a value out of the Entry‘s value. Calling this method is

* semantically equivalent to

* <tt>extractor.extract(entry.getValue())</tt>, but this method may

* be significantly less expensive because the resultant value may be

* obtained from a forward index, for example.

*

* @param extractor  a ValueExtractor to apply to the Entry‘s value

*

* @return the extracted value

*/

public Object extract(ValueExtractor extractor);

/**

* Update the Entry‘s value. Calling this method is semantically

* equivalent to:

* <pre>

*   Object oTarget = entry.getValue();

*   updater.update(oTarget, oValue);

*   entry.setValue(oTarget, false);

* </pre>

* The benefit of using this method is that it may allow the Entry

* implementation to significantly optimize the operation, such as

* for purposes of delta updates and backup maintenance.

*

* @param updater  a ValueUpdater used to modify the Entry‘s value

*/

public void update(ValueUpdater updater, Object oValue);

/**

* Determine if this Entry exists in the Map. If the Entry is not

* present, it can be created by calling {@link #setValue} or

* {@link #setValue}. If the Entry is present, it can be destroyed by

* calling {@link #remove}.

*

* @return true iff this Entry is existent in the containing Map

*/

public boolean isPresent();

/**

* Remove this Entry from the Map if it is present in the Map.

* <p/>

* This method supports both the operation corresponding to

* {@link Map#remove} and synthetic operations such as

* eviction. If the containing Map does not differentiate between

* the two, then this method must be identical to

* <tt>InvocableMap.this.remove(getKey())</tt>.

*

* @param fSynthetic  pass true only if the removal from the Map

*                    should be treated as a synthetic event

*/

public void remove(boolean fSynthetic);

}

24.6数据网格聚合

在除了标量剂,的InvocableMap接口还支持对条目的一个子集,以获得一个单一的结果,执行操作的条目聚合。条目聚合发生在两端并联工作时,用大量的数据网格提供的map-reduce支持。

例24-4汇聚在InvocableMap API


/**

* Perform an aggregating operation against the entries specified by the

* passed keys.

*

* @param collKeys  the Collection of keys that specify the entries within

*                  this Map to aggregate across

* @param agent     the EntryAggregator that is used to aggregate across

*                  the specified entries of this Map

*

* @return the result of the aggregation

*/

public Object aggregate(Collection collKeys, EntryAggregator agent);

/**

* Perform an aggregating operation against the set of entries that are

* selected by the given Filter.

* <p/>

* <b>Note:</b> calling this method on partitioned caches requires a

* Coherence Enterprise Edition (or higher) license.

*

* @param filter  the Filter that is used to select entries within this

*                Map to aggregate across

* @param agent   the EntryAggregator that is used to aggregate across

*                the selected entries of this Map

*

* @return the result of the aggregation

*/

public Object aggregate(Filter filter, EntryAggregator agent);

一个简单的EntryAggregator处理一个组的InvocableMap.Entry对象,实现了结果:

例24-5 EntryAggregator API


/**

* An EntryAggregator represents processing that can be directed to occur

* against some subset of the entries in an InvocableMap, resulting in a

* aggregated result. Common examples of aggregation include functions

* such as min(), max() and avg(). However, the concept of aggregation

* applies to any process that must evaluate a group of entries to

* come up with a single answer.

*/

public interface EntryAggregator

extends Serializable

{

/**

* Process a set of InvocableMap Entry objects to produce an

* aggregated result.

*

* @param setEntries  a Set of read-only InvocableMap Entry objects to

*                    aggregate

*

* @return the aggregated result from processing the entries

*/

public Object aggregate(Set setEntries);

}

在数据网格的高效的执行力,聚合过程的设计必须能够以并行方式运行。

例如的24-6 ParallelAwareAggregator API并行运行聚合


/**

* A ParallelAwareAggregator is an advanced extension to EntryAggregator

* that is explicitly capable of being run in parallel, for example in a

* distributed environment.

*/

public interface ParallelAwareAggregator

extends EntryAggregator

{

/**

* Get an aggregator that can take the place of this aggregator in

* situations in which the InvocableMap can aggregate in parallel.

*

* @return the aggregator that is run in parallel

*/

public EntryAggregator getParallelAggregator();

/**

* Aggregate the results of the parallel aggregations.

*

* @return the aggregation of the parallel aggregation results

*/

public Object aggregateResults(Collection collResults);

}

Coherence包含的一些诶聚合函数,还有:

Count

DistinctValues

DoubleAverage

DoubleMax

DoubleMin

DoubleSum

LongMax

LongMin

LongSum

注意事项:

所有配有连贯性的聚合是平行的感知。

见com.tangosol.util.aggregator包的列表连贯性聚合。要实现自己的聚合,请参阅AbstractAggregator的抽象基类。

24.7基于节点的执行

Coherence提供调用服务,它允许执行单通代理(称为Invocable的对象)网格内的任何地方。代理可以执行在某个节点的网格,在网格中任何一组特定的节点上并行,或平行的网格中的所有节点上。

调用服务配置缓存配置文件使用<invocation-scheme>的元素。使用的服务的名称,应用程序可以很容易地获得该服务的引用:

InvocationService service = (InvocationService)CacheFactory.getService("MyService");

代理仅仅是可运行的类是应用程序的一部分。一个简单的代理的一个例子是一个请求从JVM的GC :

例24-7简单的代理请求垃圾收集


/**

* Agent that issues a garbage collection.

*/

public class GCAgent

extends AbstractInvocable

{

public void run()

{

System.gc();

}

}

执行该代理在整个集群中,它需要一行代码:

service.execute(new GCAgent(), null, null);

下面是一个例子的一个代理,它支持一个网格范围内的请求/响应模型:

例24-8代理以支持网格范围内的请求和响应模型


/**

* 代理确定网格节点有多少空闲内存。

*/

public class FreeMemAgent

extends AbstractInvocable

{

public void run()

{

Runtime runtime = Runtime.getRuntime();

int cbFree  = runtime.freeMemory();

int cbTotal = runtime.totalMemory();

setResult(new int[] {cbFree, cbTotal});

}

}

执行该代理在整个电网和检索所有结果,但它仍然只需要一行代码:

Map map = service.query(new FreeMemAgent(), null);

虽然它很容易做一个网格范围内的请求/响应,它需要更多的代码打印结果:

例24-9从电网范围的请求或响应打印结果


Iterator iter = map.entrySet().iterator();

while (iter.hasNext())

{

Map.Entry entry  = (Map.Entry) iter.next();

Member    member = (Member) entry.getKey();

int[]     anInfo = (int[]) entry.getValue();

if (anInfo != null) // nullif member died

System.out.println("Member " + member + " has "

+ anInfo[0] + " bytes free out of "

+ anInfo[1] + " bytes total");

}

代理操作可以是有状态的,这意味着它们的调用状态序列化和传输网格节点上运行代理。

例24-10状态代理业务


/**

* Agent that carries some state with it.

*/

public class StatefulAgent

extends AbstractInvocable

{

public StatefulAgent(String sKey)

{

m_sKey = sKey;

}

public void run()

{

// the agent has the key that it was constructed with

String sKey = m_sKey;

// ...

}

private String m_sKey;

}

24.8工作管理

Coherence提供了支持网格的实施CommonJ工作管理器。使用工作管理器,应用程序可以提交收集的工作,必须执行。工作管理器工作在这样一种方式,它是并行执行的,通常在整个网格分配。换句话说,如果有10个工作项目提交和10在网格中的服务器,每个服务器有可能处理一个工作项目。此外,分布在网格工作项目可以定制,从而使某些服务器(例如,作为一个特定的主机服务网关)是第一选择,为了效率和本地运行某些工作项目的数据。

然后,应用程序可以等待要完成的工作,它可以等待多久可以提供超时。用于此目的的API是相当强大,它允许应用程序等待完成第一个工作项,或指定的一组完成的工作项目。通过这个API相结合的方法,它是可以做这样的事情:“这里有10个项目执行;这7个不重要的项目,等待不超过5秒,这3个重要的项目中,等待不超过30秒”。

例24-11使用工作管理


Work[] aWork = ...

Collection collBigItems = new ArrayList();

Collection collAllItems = new ArrayList();

for (int i = 0, c = aWork.length; i < c; ++i)

{

WorkItem item = manager.schedule(aWork[i]);

if (i < 3)

{

// the first three work items are the important ones

collBigItems.add(item);

}

collAllItems.add(item);

}

Collection collDone = manager.waitForAll(collAllItems, 5000L);

if (!collDone.containsAll(collBigItems))

{

// wait the remainder of 30 seconds for the important work to finish

manager.waitForAll(collBigItems, 25000L);

}

时间: 2024-07-31 10:04:49

Oracle Coherence中文教程二十四:在高速缓存中的数据处理的相关文章

Oracle Coherence中文教程二十:预加载缓存

预加载缓存 本章介绍了不同的模式,你可以用它来预加载缓存.该模式包括批量装载和分布载荷. 本章包含以下各节: 执行批量加载和处理 执行分布式批量加载 20.1执行批量加载和处理 例20-5, PagedQuery.java ,演示了在一个连贯缓存技术,有效地批量加载和处理项目. 20.1.1批量写入缓存 使用连贯性时,一个常见的场景是预先填充缓存应用程序使用它之前.一个简单的方法来做到这例20-1中的Java代码所示: 例20-1预加载缓存 public static void bulkLoad

Oracle Coherence中文教程二十五:Map管理操作触发器

Map管理操作触发器 Map触发补充标准的Oracle Coherence的能力,以提供高度自定义的缓存管理系统.例如,地图触发器可以防止非法交易,执行复杂安全授权或复杂的业务规则,提供透明的事件日志和审计,并收集统计数据修改.触发器的其他可能用途,包括限制行动,打击一个缓存,在应用程序重新部署时间发出. 例如,假设你有代码是与NamedCache工作,条目插入地图之前,你想改变一个条目的行为或内容.除了地图触发,而无需修改现有的代码,使你做出这种改变. 地图触发器也可以作为升级过程的一部分.除

Oracle Coherence中文教程二十六:使用Coherence Query语言

使用Coherence Query语言 本章介绍如何使用连贯性的查询语言(CohQL)互动与连贯性高速缓存. CohQL是一个重量轻语法(SQL的传统),用于执行高速缓存操作上的连贯群集.语言可用于以编程方式或从一个命令行工具. 本章包含以下各节: 了解连贯性查询语言语法 使用命令行工具CohQL 大厦过滤器在Java程序 其他相干查询语言范例 注意事项: 虽然可能会出现CohQL语法类似于SQL,重要的是要记住,不是SQL的语法,实际上是更多的上下文相关的Java持久化查询语言(JPQL)标准

Oracle Coherence中文教程二:安装Oracle Coherence

安装Oracle Coherence 本章提供说明安装Oracle Coherence的java(简称为Coherence).本章不包括安装连贯性的说明*扩展的客户端分发(C ++和.NET)或相干*网站.请参阅Oracle Coherence的客户指南和Oracle Coherence的Oracle Coherence的网络,用户指南,安装这些组件的说明. 本章包含以下各节: 系统要求 提取分派 设置环境变量 初次运行连贯性 2.1系统需求 以下是建议的最低系统要求安装在开发环境中的连贯性:

Oracle Coherence中文教程二十一:使用缓存事件

使用缓存事件 Coherence提供缓存使用JavaBean事件模型的事件.收到你的需要,你需要他们的地方,不管变化实际上是发生在集群中的事件,这是非常简单的.熟悉JavaBean模型的开发应与事件工作有没有困难,即使在复杂的群集. 本章包含以下各节: 监听器接口和事件对象 了解事件担保 支持活动的缓存和类 注册的所有活动 使用内部类作为MapListener 配置一个MapListener一个Cache 具体身份签约活动 过滤事件 "精简版"的活动 高级:听查询 高级:合成事件 高级

Oracle Coherence中文教程二十三:使用连续查询缓存

使用连续查询缓存 虽然有可能取得时间从连贯缓存查询结果中的一个点,并有可能以接收事件,将改变该查询的结果,连贯性提供了一个功能,它结合了与一个连续的数据流中的相关事件的查询结果在一个实时的方式保持一个最新的日期的查询结果.这种能力被称作连续查询,因为它具有相同的效果,如果想要查询的零延迟和查询被执行多次,每毫秒!点时间查询结果和事件的更多信息,请参见第22章,"查询缓存中的数据." 相干物化成一个连续查询缓存查询结果,然后,缓存保持最新实时查询使用事件侦听器实现连续查询功能.换句话说,

二十四、Struts2中的UI标签

二十四.Struts2中的UI标签 Struts2中UI标签的优势: 数据回显 页面布局和排版(Freemark),struts2提供了一些常用的排版(主题:xhtml默认 simple ajax) 可以使用OGNL表达式 模板: 常量设置的:struts.ui.theme=xhtml 开发中建议设置为struts.ui.theme=simple;

Oracle Coherence中文教程十四:缓存数据来源

缓存数据来源 本章提供了用于缓存数据源使用作为临时记录系统的连贯性.本章包括样品和实施注意事项. 本章包含以下各节: 的缓存数据来源概述 选择一个高速缓存策略 创建一个缓存存储实现 在缓存存储实施堵漏 样品的缓存存储 可控的缓存存储范例 实施注意事项 14.1缓存数据源概述 Coherence支持透明,读/写缓存的任何数据源,包括数据库,Web服务,打包应用程序和文件系统,数据库是最常见的使用案例.作为速记, "数据库"是用来形容任何后端数据源.有效的缓存必须同时支持密集只读和读/写操

Oracle Coherence中文教程十:调整TCMP行为

调整TCMP行为 本章提供了指令TCMP默认设置改变. TCMP还提供简要概述.参见"了解TCMP TCMP其他细节.此外,请参阅Oracle Coherence的管理员指南,其中包括许多微调的建议和说明. 本章包含以下各节: 概述TCMP数据传输 节流数据传输 减少负载的捆绑包 更改数据包重传行为 配置传输数据包池大小 配置的数据包缓冲区的大小 调整数据包的最大尺寸 更改分组扬声器音量阈值的 更改消息处理行为 更改TCMP插座供应商实施 10.1概述TCMP数据传输 集群成员使用Tangoso