Parallel Computing–Cannon算法 (MPI 实现)

原理不解释,直接上代码

代码中被注释的源程序可用于打印中间结果,检查运算是否正确。

#include "mpi.h"
#include <math.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>

void scatter_matrix(int* fstream,int n1,int n2,int*Q,int root,int tag){
	/*每个矩阵块的大小*/
	int rows=(n1+root-1)/root;
	int cols=(n2+root-1)/root;
	int* tmp_matrix=(int*)malloc(rows*cols*sizeof(int));

	int i,j;
	memset(Q,0,rows*cols*sizeof(int));
	for(i=0;i<root;i++){
		for(j=0;j<root;j++){
			int p=0,q=0;
			int imin=i*rows*n2;
			int jmin=j*cols;
			memset(tmp_matrix,0,sizeof(tmp_matrix));

			/*在划分矩阵时,由于地空间不连续,需要另开辟一个数组连续的保存起来,以便于调用MPI_Send*/
			for(p=0;p<rows;p++,imin+=n2){
				for(q=0;q<cols;q++){
					tmp_matrix[p*cols+q]=fstream[imin+jmin+q];
				}
			}
			if(i==0&&j==0){
				/*进程0 不需要使用MPI_Send将数据发送给自己,直接使用memcpy将结果拷贝即可*/
				memcpy(Q,tmp_matrix,rows*cols*sizeof(int));
			}else{
				/*将分块发送给位于i行,j列的进程*/
				MPI_Send(tmp_matrix,rows*cols,MPI_INT,i*root+j,tag,MPI_COMM_WORLD);
			}
		}
	}
}
/*
 *@row:矩阵所在的行
 *@col:矩阵所在的列
 *@sp:sp=root=sqrt(nprocs)
 *@return 根据行列号计算进程实际编号
*/
int get_index(int row,int col,int sp){
	int tmp=((row+sp)%sp)*sp+(col+sp)%sp;
	return tmp;
}
/*计算矩阵乘法,将结果存入C中*/
void matrix_multi(int* A,int *B,int *C,int n1,int n2,int n3,int myid){
	int i=0,j=0,k=0;
	int* tmp_C=(int*)malloc(n1*n3*sizeof(int));
	memset(tmp_C,0,sizeof(int)*n1*n3);

	for(i=0;i<n1;i++){
		for(j=0;j<n3;j++){
			for(k=0;k<n2;k++){
				tmp_C[i*n3+j]+=A[i*n2+k]*B[k*n3+j];
			}
			C[i*n3+j]+=tmp_C[i*n3+j];
		}

	}

}

/*用于矩阵下标定位对齐*/
void shuffle(int*A,int*buf_A,int buf_A_size,int *B,int*buf_B,int buf_B_size,int root,int myid){
	int i,j;
	MPI_Status status;
	int cur_col=0;
	int cur_row=0;
	/*通过进程编号计算获得当前进程所在的行号和列号*/
	cur_row=myid/root;
	cur_col=myid-cur_row*root;
	/*对于矩阵A,第i行的矩阵需要向左平移i次*/
	for(i=0;i<cur_row;i++){
		/*接收来自右边的数据,并将当前矩阵发送给左边的进程*/
		MPI_Sendrecv(A,buf_A_size,MPI_INT,get_index(cur_row,cur_col-1,root),102,
			     buf_A,buf_A_size,MPI_INT,get_index(cur_row,cur_col+1,root),102,MPI_COMM_WORLD,&status);
		memcpy(A,buf_A,buf_A_size*sizeof(int));/*buf_A用于通信时缓存矩阵*/
		memset(buf_A,0,buf_A_size*sizeof(int));
	}
	/*对于矩阵B,第j列的矩阵需要向上平移j次*/
	for(j=0;j<cur_col;j++){
		/*接收来自下边的数据,并将当前矩阵发送给上边的进程*/
		MPI_Sendrecv(B,buf_B_size,MPI_INT,get_index(cur_row-1,cur_col,root),103,
			     buf_B,buf_B_size,MPI_INT,get_index(cur_row+1,cur_col,root),103,MPI_COMM_WORLD,&status);
		memcpy(B,buf_B,buf_B_size*sizeof(int));/*buf_B用于通信时缓存矩阵*/
		memset(buf_B,0,buf_B_size*sizeof(int));
	}
	/*printf("I have shuffled!\n");*/
}
void cannon(int*A,int*buf_A,int buf_A_size,int *B,int*buf_B,int buf_B_size,
	int *C,int buf_C_size,int row_a,int col_a,int col_b,int root,int myid){
	MPI_Status status;
	double elapsed_time,multiply_time=0,passdata_time=0;
	int i,j;
	memset(C,0,sizeof(int)*buf_C_size);
	int cur_col=0;
	int cur_row=0;
	/*通过进程编号计算获得当前进程所在的行号和列号*/
	cur_row=myid/root;
	cur_col=myid-cur_row*root;

	for(i=0;i<root;i++){/*一共需要循环root次,root=sqrt(nprocs)*/
		elapsed_time=MPI_Wtime();
		matrix_multi(A,B,C,row_a,col_a,col_b,myid);/*计算矩阵乘法*/
		elapsed_time=MPI_Wtime()-elapsed_time;
		multiply_time+=elapsed_time;
		/*elapsed_time=MPI_Wtime();		*/
		/*接收来自右边(row,col+1)的数据,并将当前矩阵发送给左边(row,col-1)的进程*/
		MPI_Sendrecv(A,buf_A_size,MPI_INT,get_index(cur_row,cur_col-1,root),102,
			     buf_A,buf_A_size,MPI_INT,get_index(cur_row,cur_col+1,root),102,MPI_COMM_WORLD,&status);
		/*接收来自下边(row+1,col)的数据,并将当前矩阵发送给上边(row-1,col)的进程*/
		MPI_Sendrecv(B,buf_B_size,MPI_INT,get_index(cur_row-1,cur_col,root),103,
			     buf_B,buf_B_size,MPI_INT,get_index(cur_row+1,cur_col,root),103,MPI_COMM_WORLD,&status);
		/*elapsed_time=MPI_Wtime()-elapsed_time;
		passdata_time+=elapsed_time;*/
		memcpy(B,buf_B,buf_B_size*sizeof(int));/*将buf_B中的数据拷贝至B中*/
		memcpy(A,buf_A,buf_A_size*sizeof(int));/*将buf_A中的数据拷贝至A中*/

	}
	/*将计算结果发送给数组C*/
	MPI_Send(C,row_a*col_b,MPI_INT,0,104,MPI_COMM_WORLD);

	printf("proc:%d, passdata time:%lf    multiply time:%lf\n",myid,passdata_time,multiply_time);
}

void gather_matrix(int *fstream,int n1,int n3,int*C,int root,FILE*fhc){
	MPI_Status status;
	int rows=(n1+root-1)/root;
	int cols=(n3+root-1)/root;
	int* tmp_matrix=(int*)malloc(rows*cols*sizeof(int));
	int i,j;

	for(i=0;i<root;i++){
		for(j=0;j<root;j++){
			int p,q;
			int imin=i*rows*n3;
			int jmin=j*cols;
			memset(tmp_matrix,0,sizeof(tmp_matrix));
			/*接收来自各个进程的数据*/
			MPI_Recv(tmp_matrix,rows*cols,MPI_INT,i*root+j,104,MPI_COMM_WORLD,&status);
			/*printf("I am passed proc:%d \n",i*root+j);*/
			/*将接收的矩阵tmp拼接到矩阵C中去,需要按照合理顺序拼接,否则结果会出错*/
			for(p=0;p<rows;p++,imin+=n3){
				for(q=0;q<cols;q++){
					fstream[imin+jmin+q]=tmp_matrix[p*cols+q];
					/*printf("%d ",((int*)fstream)[imin+jmin+q]);*/
				}
				/*printf("\n");*/
			}
		}
	}

	/*将结果打印到文件中*/
	for(i=0;i<n1;i++){
		for(j=0;j<n3;j++){
			fprintf(fhc,"%d ",fstream[i*n3+j]);
		}
		fprintf(fhc,"\n");
	}
}

int main(int argc,char**argv){
	int myid,numprocs;
	int i,j;
	MPI_Status status;
	int root=0;
	int dim[3];
	double elapsed_time=0;
	int max_rows_a,max_cols_a,max_rows_b,max_cols_b;
	int buf_A_size,buf_B_size,buf_C_size;
	FILE* fhc;
	/*suppose A:n1*n2 ,B:n2*n3;n1,n2,n3 are read from input file*/
	int n1,n2,n3;
	/*buffer for matrix A,B,C will be shifted ,so they each have two buffer*/
	int *A,*B,*C,*buf_A,*buf_B;

	/*on proc0,buffers to cache matrix files of A,B and C*/
	int *fstream_a=NULL,*fstream_b=NULL,*fstream_c=NULL;
	MPI_Init(&argc,&argv);/*初始化*/
	MPI_Comm_rank(MPI_COMM_WORLD,&myid);/*获取当前进程ID*/
	MPI_Comm_size(MPI_COMM_WORLD,&numprocs);/*获取全部进程数量*/

	root=sqrt(numprocs);
	if(numprocs!=root*root){
		/*如果进程总数不是平方数,则结束程序*/
		printf("process number must be a squre!\n");
		exit(-1);
	}

	/*on proc0,preprocess the command line,read in file
	 for A,B and put their sizes in dim[]*/
	if(myid==0){
		FILE *file_a,*file_b,*file_c;
		int n1,n2,n3;
		int i,j;
		file_a=fopen(argv[1],"r");/*打开文件a,文件名从运行时给的参数中获得*/
		file_b=fopen(argv[2],"r");/*打开文件b,文件名从运行时给的参数中获得*/
		fscanf(file_a,"%d %d",&n1,&n2);/*从文件a中读取矩阵A的行数,列数*/
		fscanf(file_b,"%d %d",&n2,&n3);/*从文件b中读取矩阵B的行数,列数*/

		dim[0]=n1,dim[1]=n2,dim[2]=n3;
		fstream_a=(int*)malloc(n1*n2*sizeof(int));/*分配一块内存,用于将矩阵A读入内存*/
		fstream_b=(int*)malloc(n2*n3*sizeof(int));/*分配一块内存,用于将矩阵B读入内存*/
		/*printf("Yeah! I got n1=%d,n2=%d,n3=%d\n",n1,n2,n3);*/
		/*读入矩阵A,保存在fstream_a中*/
		for(i=0;i<n1;i++)
			for(j=0;j<n2;j++)
			fscanf(file_a,"%d",&((int*)fstream_a)[i*n2+j]);
		/*读入矩阵B,保存在fstream_b中*/
		for(i=0;i<n2;i++)
			for(j=0;j<n3;j++)
				fscanf(file_b,"%d",&((int*)fstream_b)[i*n3+j]);
	}
	/*将矩阵的行数,列数通过Bcast广播给所有进程*/
	MPI_Bcast(dim,3,MPI_INT,0,MPI_COMM_WORLD);
	n1=dim[0],n2=dim[1],n3=dim[2];

	/*begin new version*/

	max_rows_a=(n1+root-1)/root;/*子矩阵块A的行数*/
	max_cols_a=(n2+root-1)/root;/*子矩阵块A的列数*/
	max_rows_b=max_cols_a;      /*子矩阵块B的行数*/
	max_cols_b=(n3+root-1)/root;/*子矩阵块B的列数*/
	buf_A_size=max_rows_a*max_cols_a;/*子矩阵块A的大小*/
	buf_B_size=max_rows_b*max_cols_b;/*子矩阵块B的大小*/
	buf_C_size=max_rows_a*max_cols_b;/*子矩阵块C的大小*/

	/*给A,,buf_A,buf_B,B,C分配内存空间,其中buf_A,buf_B用于通讯中的缓存*/
	A=(int*)malloc(sizeof(int)*buf_A_size);
	buf_A=(int*)malloc(sizeof(int)*buf_A_size);
	B=(int*)malloc(sizeof(int)*buf_B_size);
	buf_B=(int*)malloc(sizeof(int)*buf_B_size);
	C=(int*)malloc(sizeof(int)*buf_C_size);
	if(A==NULL||buf_A==NULL||B==NULL||buf_B==NULL||C==NULL)
	{
		/*如果内存申请失败,就退出*/
		printf("Memory allocation failed!\n");
		exit(-1);
	}

	/*proc 0 scatter A,B to other procs in a 2D block distribution fashion*/
	if(myid==0){
		/*printf("max_rows_a:%d\n",max_rows_a);
		printf("max_cols_a:%d\n",max_cols_a);
		printf("max_rows_b:%d\n",max_rows_b);
		printf("max_cols_b:%d\n",max_cols_b);*/
		/*进程0 将矩阵A,B划分成小块,分发给其他进程*/
		scatter_matrix((int*)fstream_a,n1,n2,A,root,100);
		/*printf("I am debuging!\n");*/
		scatter_matrix((int*)fstream_b,n2,n3,B,root,101);
		/*printf("I am finding fault!\n");*/
	}else{
		/*其他进程接收来自进程0 发送的矩阵A,B*/
		MPI_Recv(A,max_rows_a*max_cols_a,MPI_INT,0,100,MPI_COMM_WORLD,&status);
		MPI_Recv(B,max_rows_b*max_cols_b,MPI_INT,0,101,MPI_COMM_WORLD,&status);
	}

	MPI_Barrier(MPI_COMM_WORLD);/*等待全部进程完成数据接收工作。*/

	/*printf("I am proc %d\n",myid);
	for(i=0;i<max_rows_a;i++){
		printf("%d:      ",myid);
		for(j=0;j<max_cols_a;j++){
			printf("%d ",A[i*max_cols_a+j]);
		}
		printf("\n");
	}
	printf("I am proc %d\n",myid);
	for(i=0;i<max_rows_b;i++){
		printf("%d:      ",myid);
		for(j=0;j<max_cols_b;j++){
			printf("%d ",B[i*max_cols_b+j]);
		}
		printf("\n");
	}*/

	/*compute C=A*B by Cannon algorithm*/
	/*矩阵块必须定位对齐,先做预处理*/
	shuffle(A,buf_A,buf_A_size,B,buf_B,buf_B_size,root,myid);
	elapsed_time=MPI_Wtime();
	/*包含cannon全部内容*/
	cannon(A,buf_A,buf_A_size,B,buf_B,buf_B_size,
		C,buf_C_size,max_rows_a,max_cols_a,max_cols_b,root,myid);
	MPI_Barrier(MPI_COMM_WORLD);
	elapsed_time=MPI_Wtime()-elapsed_time;/*统计cannon算法实际耗时*/

	MPI_Barrier(MPI_COMM_WORLD);/*等待所有进程完成cannon算法,将结果发送给进程0*/

	int fsize_c=sizeof(int)*n1*n3;
	if(myid==0){
		/*进程0创建文件写句柄,准备将计算结果写入文件中*/
		if(!(fhc=fopen(argv[3],"w"))){
			printf("Cant‘t open file %s\n",argv[3]);
			MPI_Finalize();
		}
		fstream_c=(int*)malloc(fsize_c);
		/*进程0 接收来自各个进程的结果矩阵,拼接成一个完整的结果,写入文件,持久化数据结果*/
		gather_matrix(fstream_c,n1,n3,C,root,fhc);
	}

	MPI_Barrier(MPI_COMM_WORLD);    /*make sure proc 0 read all it needs*/

	if(myid==0){
		int i,j;
		printf("Cannon algorithm :multiply a %d* %d with a %d*%d, use %lf(s)\n",
			n1,n2,n2,n3,elapsed_time);
		/*printf("I have finished!\n");*/
		fclose(fhc);/*关闭文件读写句柄*/
		/*释放申请的内存空间*/
		free(fstream_a);
		free(fstream_b);
		free(fstream_c);
	}

	/*释放申请的内存空间*/
	free(A);free(buf_A);
	free(B);free(buf_B);
	free(C);
	MPI_Finalize();
	return 0;
}
时间: 2024-11-08 06:59:19

Parallel Computing–Cannon算法 (MPI 实现)的相关文章

分布式并行计算方案:parallel computing by kafka-storm 发布了

HighAvailabilityToolkit1.3 发布了 version 1.3,如何在分布式集群中,充分利用多节点,对大数据进行拆分,实现并行计算,"parallel computing by kafka-storm " 提供了一种很好的思路. 源码 : https://github.com/yfwangpeng/HighAvailabilityToolkit 微博:http://weibo.com/58wp58

Method and apparatus for an atomic operation in a parallel computing environment

A method and apparatus for a?atomic?operation?is described. A method comprises receiving a first program unit in a parallel computing environment, the first program unit including a memory update?operation?to be performed atomically, the memory updat

General mistakes in parallel computing

这是2013年写的一篇旧文,放在gegahost.net上面  http://raison.gegahost.net/?p=97 March 11, 2013 General mistakes in parallel computing Filed under: concurrency,software — Tags: atomic, cocurrency, data race, dead lock, parellel — Raison @ 2:51 am (Original Work by P

Parallel.For 平行算法 使用

之前看到Parallel的用法,觉得很高深,很腻害,今天专门抽空研究了一下,发现还是很easy的. .NET Framework 4.0 新加的功能,所以4.0之前的无法使用哦. 下面介绍一下,Parallel称为 平行算法,用白话说,就是为了充分利用电脑多核处理器的优势,使得每隔核心都可以努力干活,不让他们闲着,来提高运行效率. 不过使用需要注意几点: 1:Parallel 并行处理时 如果涉及到共享资源的话,使用要很小心,因为并行同时访问共享资源,就会出现不确定的状态,非要使用,可以加锁来解

STROME --realtime &amp; online parallel computing

Data Collections ---> Stream to Channel (as source input) ----> Parallel Computing---> Results (as source ouput) -----> To DB ( Presentation) 原文地址:https://www.cnblogs.com/iiiDragon/p/9388824.html

Python—kmeans算法学习笔记

一.   什么是聚类 聚类简单的说就是要把一个文档集合根据文档的相似性把文档分成若干类,但是究竟分成多少类,这个要取决于文档集合里文档自身的性质.下面这个图就是一个简单的例子,我们可以把不同的文档聚合为3类.另外聚类是典型的无指导学习,所谓无指导学习是指不需要有人干预,无须人为文档进行标注. 二.聚类算法:from sklearn.cluster import KMeans def __init__(self, n_clusters=8, init='k-means++', n_init=10,

何时使用 Parallel.ForEach,何时使用 PLINQ

翻译自:When Should I Use Parallel.ForEach? When Should I Use PLINQ? 原作者: Pamela Vagata, Parallel Computing Platform Group, Microsoft Corporation 原文pdf:http://download.csdn.net/detail/sqlchen/7509513 ======================================================

Notes of Principles of Parallel Programming - partial

0.1 TopicNotes of Lin C., Snyder L.. Principles of Parallel Programming. Beijing: China Machine Press. 2008. (1) Parallel Computer Architecture - done 2015/5/24(2) Parallel Abstraction(3) Scable Algorithm Techniques(4) PP Languages: Java(Thread), MPI

[High Performance Computing] {Udacity} L4: Intro to OpenMP

Getting started with OpenMP These instructions for getting started with OpenMP are repeated in Project 0. They are included here for those students who wish to program with OpenMP now. Vagrant Your first task is to set-up Vagrant on your machine if y