Parallel中分区器Partitioner的简单使用

Partitioner.Create(1,10,4).GetDynamicPartitions()

为长度为10的序列创建分区,每个分区至多4个元素,分区方法及结果:Partitioner.Create(0,
10, 4).GetDynamicPartitions()

得到3个前闭后开区间: [0,
4)即{0, 1, 2, 3}, [4, 8)即{4,
5, 6, 7}, [8,
10)即{8, 9},  注意被枚举的数字均为数组下标;

为长度为10的序列创建分区,至多4个分区,分区方法及结果:
Partitioner.Create(0, 10, 3).GetDynamicPartitions()

得到4个前闭后开区间: [0,3)即{0,
1, 2}, [3,
6)即{3, 4, 5}, [6, 9)即{6,
7, 8}, [9,
10)即{9}, 注意被枚举的数字均为数组下标;

定义分区个数
= 定义并发线程(笔者这样讲并不严格), 故定义方法如下:

private static void
NewMethod<T>(IList<T> array, Int32 rangeCount) {

var
rangeSize = (Int32)Math.Ceiling(array.Count / (Double)rangeCount);

var
part = Partitioner.Create(0, array.Count, rangeSize);

Parallel.ForEach(part, (range, state, rangeIndex) => {

for (Int32 index = range.Item1; index < range.Item2; index++)
{

Console.WriteLine("[{0,2}] {1} {2}",
Thread.CurrentThread.ManagedThreadId, rangeIndex, index);

}

});

}

对于分区个数定义为3,
可以看到线程ID 在1,3,4中切换, 线程[1]遍历了4个元素, 线程[3]遍历了4个元素, 线程[4]遍历了2个元素

对于分区个数定义为4,
可以看到线程ID 在1, 3, 4, 5中切换, 线程[1]遍历了3个元素, 线程[3]遍历了3个元素, 线程[4]遍历了3个元素,
线程5遍历了1个元素.


Parallel中分区器Partitioner的简单使用

时间: 2024-12-28 01:59:50

Parallel中分区器Partitioner的简单使用的相关文章

SPARK之分区器

Spark目前支持Hash分区和Range分区,用户也可以自定义分区,Hash分区为当前的默认分区,Spark中分区器直接决定了RDD中分区的个数.RDD中每条数据经过Shuffle过程属于哪个分区和Reduce的个数 只有Key-Value类型的RDD才有分区器的,非Key-Value类型的RDD分区器的值是None 每个RDD的分区ID范围:0~numPartitions-1,决定这个值是属于那个分区的 原文地址:https://www.cnblogs.com/xiangyuguan/p/1

spark自定义分区器

1.spark中默认的分区器: Spark目前支持Hash分区和Range分区,用户也可以自定义分区,Hash分区为当前的默认分区,Spark中分区器直接决定了RDD中分区的个数.RDD中每条数据经过Shuffle过程属于哪个分区和Reduce的个数. 只有Key-Value类型的RDD才有分区器的,非Key-Value类型的RDD分区器的值是None 每个RDD的分区ID范围:0~numPartitions-1,决定这个值是属于那个分区的2. 参考博客:https://www.jianshu.

Apache Flink流分区器剖析

这篇文章介绍Flink的分区器,在流进行转换操作后,Flink通过分区器来精确得控制数据流向. StreamPartitioner StreamPartitioner是Flink流分区器的基类,它只定义了一个抽象方法: public abstract StreamPartitioner<T> copy(); 但这个方法并不是各个分区器之间互相区别的地方,定义不同的分区器的核心在于--各个分区器需要实现channel选择的接口方法: int[] selectChannels(T record,

在Parallel中使用DbSet.Add()发现的一系列多线程问题和解决过程

发现问题 需求很简单,大致就是要批量往数据库写数据,于是打算用Parallel并行的方式写入,希望能利用计算机多核特性加快程序执行速度.想的很美好,于是快速撸了类似下面的一串代码: using (var db = new SmsEntities()) { Parallel.For(0, 1000, (i) => { db.MemberCard.Add(new MemberCard() { CardNo = "NO_" + i.ToString(), Banlance = 0, C

Kafka 分区分配计算(分区器 Partitions )

KafkaProducer在调用send方法发送消息至broker的过程中,首先是经过拦截器Inteceptors处理,然后是经过序列化Serializer处理,之后就到了Partitions阶段,即分区分配计算阶段.在某些应用场景下,业务逻辑需要控制每条消息落到合适的分区中,有些情形下则只要根据默认的分配规则即可.在KafkaProducer计算分配时,首先根据的是ProducerRecord中的partition字段指定的序号计算分区.读者有可能刚睡醒,看到这个ProducerRecord似

Cassandra数据分布之5分区器

分区器决定了数据在集群中节点的分布.分区器的功能是通过为每一行数据的分区键(partion key)分配一个令牌(token),然后通过这个令牌(token)将数据保存在cassandra集群中. Cassandra提供了如下如下4种分区器.Cassandra中的实现如下图: ByteOrderedPartitioner:有序分区器,它将键值数据看做是裸字节. LocalPartitioner:对分区键未做任何处理的分区器. Murmur3Partitioner:基于MurmurHash哈希算法

hadoop编程小技巧(3)---自定义分区类Partitioner

Hadoop代码测试环境:Hadoop2.4 原理:在Hadoop的MapReduce过程中,Mapper读取处理完成数据后,会把数据发送到Partitioner,由Partitioner来决定每条记录应该送往哪个reducer节点,默认使用的是HashPartitioner,其核心代码如下: /** Use {@link Object#hashCode()} to partition. */ public int getPartition(K2 key, V2 value, int numRe

JAVA垃圾收集器之Parallel Old收集器

Parallel Old收集器是JAVA虚拟机中垃圾收集器的一种.和Serial Old收集器一样,工作在JAV虚拟机的老年代.这种垃圾收集器使用多线程和"标记-整理"算法.它在JDK 1.6中才开始提供. 在注重吞吐量及CPU资源敏感的场合,都可以优先考虑Parallel Scavenge加Parallel Old收集器. 1.运行代码 package com.gc; import java.util.ArrayList; import java.util.List; /** * 简

JAVA垃圾收集器之Parallel Scavenge收集器

Parallel Scavenge收集器是JAVA虚拟机中垃圾收集器的一种.和ParNew收集器类似,是一个新生代收集器.使用复制算法的并行多线程收集器. 1.特点 Parallel Scavenge收集器的关注点与其他收集器不同, ParallelScavenge收集器的目标则是达到一个可控制的吞吐量(Throughput).所谓吞吐量就是CPU用于运行用户代码的时间与CPU总消耗时间的比值,即吞吐量 = 运行用户代码时间 /(运行用户代码时间 + 垃圾收集时间),虚拟机总共运行了100分钟,