parUtils.txx 32 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903
  1. /**
  2. @file parUtils.txx
  3. @brief Definitions of the templated functions in the par module.
  4. @author Rahul S. Sampath, rahul.sampath@gmail.com
  5. @author Hari Sundar, hsundar@gmail.com
  6. @author Shravan Veerapaneni, shravan@seas.upenn.edu
  7. @author Santi Swaroop Adavani, santis@gmail.com
  8. */
  9. #include "dtypes.h"
  10. #include <cassert>
  11. #include <iostream>
  12. #include <algorithm>
  13. #include <cstring>
  14. #include "ompUtils.h"
  15. #include <mpi.h>
  16. namespace pvfmm{
  17. namespace par{
  18. template <typename T>
  19. int Mpi_Alltoallv_sparse(T* sendbuf, int* sendcnts, int* sdispls,
  20. T* recvbuf, int* recvcnts, int* rdispls, const MPI_Comm &comm) {
  21. #ifndef ALLTOALLV_FIX
  22. return Mpi_Alltoallv
  23. (sendbuf, sendcnts, sdispls,
  24. recvbuf, recvcnts, rdispls, comm);
  25. #else
  26. int npes, rank;
  27. MPI_Comm_size(comm, &npes);
  28. MPI_Comm_rank(comm, &rank);
  29. int commCnt = 0;
  30. #pragma omp parallel for reduction(+:commCnt)
  31. for(int i = 0; i < rank; i++) {
  32. if(sendcnts[i] > 0) {
  33. commCnt++;
  34. }
  35. if(recvcnts[i] > 0) {
  36. commCnt++;
  37. }
  38. }
  39. #pragma omp parallel for reduction(+:commCnt)
  40. for(int i = (rank+1); i < npes; i++) {
  41. if(sendcnts[i] > 0) {
  42. commCnt++;
  43. }
  44. if(recvcnts[i] > 0) {
  45. commCnt++;
  46. }
  47. }
  48. MPI_Request* requests = new MPI_Request[commCnt];
  49. assert(requests);
  50. MPI_Status* statuses = new MPI_Status[commCnt];
  51. assert(statuses);
  52. commCnt = 0;
  53. //First place all recv requests. Do not recv from self.
  54. for(int i = 0; i < rank; i++) {
  55. if(recvcnts[i] > 0) {
  56. MPI_Irecv( &(recvbuf[rdispls[i]]) , recvcnts[i], par::Mpi_datatype<T>::value(), i, 1,
  57. comm, &(requests[commCnt]) );
  58. commCnt++;
  59. }
  60. }
  61. for(int i = (rank + 1); i < npes; i++) {
  62. if(recvcnts[i] > 0) {
  63. MPI_Irecv( &(recvbuf[rdispls[i]]) , recvcnts[i], par::Mpi_datatype<T>::value(), i, 1,
  64. comm, &(requests[commCnt]) );
  65. commCnt++;
  66. }
  67. }
  68. //Next send the messages. Do not send to self.
  69. for(int i = 0; i < rank; i++) {
  70. if(sendcnts[i] > 0) {
  71. MPI_Issend( &(sendbuf[sdispls[i]]), sendcnts[i], par::Mpi_datatype<T>::value(), i, 1,
  72. comm, &(requests[commCnt]) );
  73. commCnt++;
  74. }
  75. }
  76. for(int i = (rank + 1); i < npes; i++) {
  77. if(sendcnts[i] > 0) {
  78. MPI_Issend( &(sendbuf[sdispls[i]]), sendcnts[i], par::Mpi_datatype<T>::value(), i, 1,
  79. comm, &(requests[commCnt]) );
  80. commCnt++;
  81. }
  82. }
  83. //Now copy local portion.
  84. #pragma omp parallel for
  85. for(int i = 0; i < sendcnts[rank]; i++) {
  86. recvbuf[rdispls[rank] + i] = sendbuf[sdispls[rank] + i];
  87. }
  88. MPI_Waitall(commCnt, requests, statuses);
  89. delete [] requests;
  90. delete [] statuses;
  91. return 0;
  92. #endif
  93. }
  94. template <typename T>
  95. int Mpi_Alltoallv_dense(T* sbuff_, int* s_cnt_, int* sdisp_,
  96. T* rbuff_, int* r_cnt_, int* rdisp_, const MPI_Comm& comm){
  97. #ifndef ALLTOALLV_FIX
  98. return Mpi_Alltoallv
  99. (sbuff_, s_cnt_, sdisp_,
  100. rbuff_, r_cnt_, rdisp_, c);
  101. #else
  102. int np, pid;
  103. MPI_Comm_size(comm,&np);
  104. MPI_Comm_rank(comm,&pid);
  105. int range[2]={0,np-1};
  106. int split_id, partner;
  107. std::vector<int> s_cnt(np);
  108. #pragma omp parallel for
  109. for(int i=0;i<np;i++){
  110. s_cnt[i]=s_cnt_[i]*sizeof(T)+2*sizeof(int);
  111. }
  112. std::vector<int> sdisp(np); sdisp[0]=0;
  113. omp_par::scan(&s_cnt[0],&sdisp[0],np);
  114. char* sbuff=new char[sdisp[np-1]+s_cnt[np-1]];
  115. #pragma omp parallel for
  116. for(int i=0;i<np;i++){
  117. ((int*)&sbuff[sdisp[i]])[0]=s_cnt[i];
  118. ((int*)&sbuff[sdisp[i]])[1]=pid;
  119. memcpy(&sbuff[sdisp[i]]+2*sizeof(int),&sbuff_[sdisp_[i]],s_cnt[i]-2*sizeof(int));
  120. }
  121. while(range[0]<range[1]){
  122. split_id=(range[0]+range[1])/2;
  123. int new_range[2]={(pid<=split_id?range[0]:split_id+1),
  124. (pid<=split_id?split_id:range[1] )};
  125. int cmp_range[2]={(pid> split_id?range[0]:split_id+1),
  126. (pid> split_id?split_id:range[1] )};
  127. int new_np=new_range[1]-new_range[0]+1;
  128. int cmp_np=cmp_range[1]-cmp_range[0]+1;
  129. partner=pid+cmp_range[0]-new_range[0];
  130. if(partner>range[1]) partner=range[1];
  131. assert(partner>=range[0]);
  132. bool extra_partner=( (range[1]-range[0])%2==0 &&
  133. range[1] ==pid );
  134. //Communication.
  135. {
  136. int* s_lengths=&s_cnt[cmp_range[0]-range[0]];
  137. std::vector<int> s_len_ext(cmp_np,0);
  138. std::vector<int> r_cnt (new_np,0);
  139. std::vector<int> r_cnt_ext(new_np,0);
  140. MPI_Status status;
  141. //Exchange send sizes.
  142. MPI_Sendrecv (&s_lengths[0],cmp_np,MPI_INT, partner,0, &r_cnt [0],new_np,MPI_INT, partner, 0,comm,&status);
  143. if(extra_partner) MPI_Sendrecv(&s_len_ext[0],cmp_np,MPI_INT,split_id,0, &r_cnt_ext[0],new_np,MPI_INT,split_id, 0,comm,&status);
  144. //Allocate receive buffer.
  145. std::vector<int> rdisp (new_np,0);
  146. std::vector<int> rdisp_ext(new_np,0);
  147. omp_par::scan(&r_cnt [0],&rdisp [0],new_np);
  148. omp_par::scan(&r_cnt_ext[0],&rdisp_ext[0],new_np);
  149. int rbuff_size =rdisp [new_np-1]+r_cnt [new_np-1];
  150. int rbuff_size_ext=rdisp_ext[new_np-1]+r_cnt_ext[new_np-1];
  151. char* rbuff = new char[rbuff_size ];
  152. char* rbuffext=(extra_partner? new char[rbuff_size_ext]: NULL);
  153. //Sendrecv data.
  154. {
  155. int * s_cnt_tmp=&s_cnt[cmp_range[0]-range[0]] ;
  156. int * sdisp_tmp=&sdisp[cmp_range[0]-range[0]];
  157. char* sbuff_tmp=&sbuff[sdisp_tmp[0]];
  158. int sbuff_size=sdisp_tmp[cmp_np-1]+s_cnt_tmp[cmp_np-1]-sdisp_tmp[0];
  159. MPI_Sendrecv (sbuff_tmp,sbuff_size,MPI_BYTE, partner,0, &rbuff [0],rbuff_size ,MPI_BYTE, partner, 0,comm,&status);
  160. if(extra_partner) MPI_Sendrecv( NULL, 0,MPI_BYTE,split_id,0, &rbuffext[0],rbuff_size_ext,MPI_BYTE,split_id, 0,comm,&status);
  161. }
  162. //Rearrange received data.
  163. {
  164. //assert(!extra_partner);
  165. int * s_cnt_old=&s_cnt[new_range[0]-range[0]];
  166. int * sdisp_old=&sdisp[new_range[0]-range[0]];
  167. std::vector<int> s_cnt_new(&s_cnt_old[0],&s_cnt_old[new_np]);
  168. std::vector<int> sdisp_new(new_np ,0 );
  169. #pragma omp parallel for
  170. for(int i=0;i<new_np;i++){
  171. s_cnt_new[i]+=r_cnt[i]+r_cnt_ext[i];
  172. }
  173. omp_par::scan(&s_cnt_new[0],&sdisp_new[0],new_np);
  174. //Copy data to sbuff_new.
  175. char* sbuff_new=new char[sdisp_new[new_np-1]+s_cnt_new[new_np-1]];
  176. #pragma omp parallel for
  177. for(int i=0;i<new_np;i++){
  178. memcpy(&sbuff_new[sdisp_new[i] ],&sbuff [sdisp_old[i]],s_cnt_old[i]);
  179. memcpy(&sbuff_new[sdisp_new[i]+s_cnt_old[i] ],&rbuff [rdisp [i]],r_cnt [i]);
  180. memcpy(&sbuff_new[sdisp_new[i]+s_cnt_old[i]+r_cnt[i]],&rbuffext[rdisp_ext[i]],r_cnt_ext[i]);
  181. }
  182. //Free memory.
  183. if(sbuff !=NULL) delete[] sbuff ;
  184. if(rbuff !=NULL) delete[] rbuff ;
  185. if(rbuffext!=NULL) delete[] rbuffext;
  186. //Substitute data for next iteration.
  187. s_cnt=s_cnt_new;
  188. sdisp=sdisp_new;
  189. sbuff=sbuff_new;
  190. }
  191. }
  192. range[0]=new_range[0];
  193. range[1]=new_range[1];
  194. }
  195. //Copy data to rbuff_.
  196. std::vector<char*> buff_ptr(np);
  197. char* tmp_ptr=sbuff;
  198. for(int i=0;i<np;i++){
  199. int& blk_size=((int*)tmp_ptr)[0];
  200. buff_ptr[i]=tmp_ptr;
  201. tmp_ptr+=blk_size;
  202. }
  203. #pragma omp parallel for
  204. for(int i=0;i<np;i++){
  205. int& blk_size=((int*)buff_ptr[i])[0];
  206. int& src_pid=((int*)buff_ptr[i])[1];
  207. assert(blk_size-2*sizeof(int)<=r_cnt_[src_pid]*sizeof(T));
  208. memcpy(&rbuff_[rdisp_[src_pid]],buff_ptr[i]+2*sizeof(int),blk_size-2*sizeof(int));
  209. }
  210. //Free memory.
  211. if(sbuff !=NULL) delete[] sbuff;
  212. return 0;
  213. #endif
  214. }
  215. template<typename T>
  216. int partitionW(Vector<T>& nodeList, long long* wts, const MPI_Comm& comm){
  217. int npes, rank;
  218. MPI_Comm_size(comm, &npes);
  219. MPI_Comm_rank(comm, &rank);
  220. long long npesLong = npes;
  221. long long nlSize = nodeList.Dim();
  222. long long off1= 0, off2= 0, localWt= 0, totalWt = 0;
  223. // First construct arrays of wts.
  224. Vector<long long> wts_(nlSize);
  225. if(wts == NULL) {
  226. wts=&wts_[0];
  227. #pragma omp parallel for
  228. for (long long i = 0; i < nlSize; i++){
  229. wts[i] = 1;
  230. }
  231. }
  232. #pragma omp parallel for reduction(+:localWt)
  233. for (long long i = 0; i < nlSize; i++){
  234. localWt+=wts[i];
  235. }
  236. // compute the total weight of the problem ...
  237. MPI_Allreduce(&localWt, &totalWt, 1, par::Mpi_datatype<long long>::value(), par::Mpi_datatype<long long>::sum(), comm);
  238. MPI_Scan(&localWt, &off2, 1, par::Mpi_datatype<long long>::value(), par::Mpi_datatype<long long>::sum(), comm );
  239. off1=off2-localWt;
  240. // perform a local scan on the weights first ...
  241. Vector<long long> lscn(nlSize);
  242. if(nlSize) {
  243. lscn[0]=off1;
  244. omp_par::scan(&wts[0],&lscn[0],nlSize);
  245. }
  246. Vector<int> int_buff(npesLong*4);
  247. Vector<int> sendSz (npesLong,&int_buff[0]+npesLong*0,false);
  248. Vector<int> recvSz (npesLong,&int_buff[0]+npesLong*1,false);
  249. Vector<int> sendOff(npesLong,&int_buff[0]+npesLong*2,false);
  250. Vector<int> recvOff(npesLong,&int_buff[0]+npesLong*3,false);
  251. // compute the partition offsets and sizes so that All2Allv can be performed.
  252. // initialize ...
  253. #pragma omp parallel for
  254. for (size_t i = 0; i < npesLong; i++) {
  255. sendSz[i] = 0;
  256. }
  257. //The Heart of the algorithm....
  258. if(nlSize>0 && totalWt>0) {
  259. long long pid1=( off1 *npesLong)/totalWt;
  260. long long pid2=((off2+1)*npesLong)/totalWt+1;
  261. assert((totalWt*pid2)/npesLong>=off2);
  262. pid1=(pid1< 0? 0:pid1);
  263. pid2=(pid2>npesLong?npesLong:pid2);
  264. #pragma omp parallel for
  265. for(int i=pid1;i<pid2;i++){
  266. long long wt1=(totalWt*(i ))/npesLong;
  267. long long wt2=(totalWt*(i+1))/npesLong;
  268. long long start = std::lower_bound(&lscn[0], &lscn[0]+nlSize, wt1, std::less<long long>())-&lscn[0];
  269. long long end = std::lower_bound(&lscn[0], &lscn[0]+nlSize, wt2, std::less<long long>())-&lscn[0];
  270. if(i== 0) start=0 ;
  271. if(i==npesLong-1) end =nlSize;
  272. sendSz[i]=end-start;
  273. }
  274. }else sendSz[0]=nlSize;
  275. // communicate with other procs how many you shall be sending and get how
  276. // many to recieve from whom.
  277. MPI_Alltoall(&sendSz[0], 1, par::Mpi_datatype<int>::value(),
  278. &recvSz[0], 1, par::Mpi_datatype<int>::value(), comm);
  279. // compute offsets ...
  280. sendOff[0] = 0; omp_par::scan(&sendSz[0],&sendOff[0],npesLong);
  281. recvOff[0] = 0; omp_par::scan(&recvSz[0],&recvOff[0],npesLong);
  282. // new value of nlSize, ie the local nodes.
  283. long long nn = recvSz[npesLong-1] + recvOff[npes-1];
  284. // allocate memory for the new arrays ...
  285. Vector<T> newNodes;
  286. {
  287. if(nodeList.Capacity()>nn+std::max(nn,nlSize)){
  288. newNodes.ReInit(nn,&nodeList[0]+std::max(nn,nlSize),false);
  289. //}else if(buff!=NULL && buff->Dim()>nn*sizeof(T)){
  290. // newNodes.ReInit(nn,(T*)&(*buff)[0],false);
  291. }else newNodes.Resize(nn);
  292. }
  293. // perform All2All ...
  294. par::Mpi_Alltoallv_sparse<T>(&nodeList[0], &sendSz[0], &sendOff[0],
  295. &newNodes[0], &recvSz[0], &recvOff[0], comm);
  296. // reset the pointer ...
  297. nodeList=newNodes;
  298. return 0;
  299. }//end function
  300. template<typename T>
  301. int partitionW(std::vector<T>& nodeList, long long* wts, const MPI_Comm& comm){
  302. Vector<T> nodeList_=nodeList;
  303. int ret = par::partitionW<T>(nodeList_, wts, comm);
  304. nodeList.assign(&nodeList_[0],&nodeList_[0]+nodeList_.Dim());
  305. return ret;
  306. }
  307. template<typename T>
  308. int HyperQuickSort(const Vector<T>& arr_, Vector<T>& SortedElem, const MPI_Comm& comm_){ // O( ((N/p)+log(p))*(log(N/p)+log(p)) )
  309. // Copy communicator.
  310. MPI_Comm comm=comm_;
  311. // Get comm size and rank.
  312. int npes, myrank;
  313. MPI_Comm_size(comm, &npes);
  314. MPI_Comm_rank(comm, &myrank);
  315. int omp_p=omp_get_max_threads();
  316. srand(myrank);
  317. // Local and global sizes. O(log p)
  318. long long totSize, nelem = arr_.Dim();
  319. //assert(nelem); // TODO: Check if this is needed.
  320. MPI_Allreduce(&nelem, &totSize, 1, par::Mpi_datatype<long long>::value(), par::Mpi_datatype<long long>::sum(), comm);
  321. // Local sort.
  322. Vector<T> arr=arr_;
  323. omp_par::merge_sort(&arr[0], &arr[0]+nelem);
  324. // Allocate memory.
  325. Vector<T> nbuff;
  326. Vector<T> nbuff_ext;
  327. Vector<T> rbuff ;
  328. Vector<T> rbuff_ext;
  329. // Binary split and merge in each iteration.
  330. while(npes>1 && totSize>0){ // O(log p) iterations.
  331. //Determine splitters. O( log(N/p) + log(p) )
  332. T split_key;
  333. long long totSize_new;
  334. //while(true)
  335. {
  336. // Take random splitters. O( 1 ) -- Let p * splt_count = glb_splt_count = const = 100~1000
  337. int splt_count=(1000*nelem)/totSize;
  338. if(npes>1000) splt_count=(((float)rand()/(float)RAND_MAX)*totSize<(1000*nelem)?1:0);
  339. if(splt_count>nelem) splt_count=nelem;
  340. std::vector<T> splitters(splt_count);
  341. for(int i=0;i<splt_count;i++)
  342. splitters[i]=arr[rand()%nelem];
  343. // Gather all splitters. O( log(p) )
  344. int glb_splt_count;
  345. std::vector<int> glb_splt_cnts(npes);
  346. std::vector<int> glb_splt_disp(npes,0);
  347. MPI_Allgather(&splt_count , 1, par::Mpi_datatype<int>::value(),
  348. &glb_splt_cnts[0], 1, par::Mpi_datatype<int>::value(), comm);
  349. omp_par::scan(&glb_splt_cnts[0],&glb_splt_disp[0],npes);
  350. glb_splt_count=glb_splt_cnts[npes-1]+glb_splt_disp[npes-1];
  351. std::vector<T> glb_splitters(glb_splt_count);
  352. MPI_Allgatherv(& splitters[0], splt_count, par::Mpi_datatype<T>::value(),
  353. &glb_splitters[0], &glb_splt_cnts[0], &glb_splt_disp[0],
  354. par::Mpi_datatype<T>::value(), comm);
  355. // Determine split key. O( log(N/p) + log(p) )
  356. std::vector<long long> disp(glb_splt_count,0);
  357. if(nelem>0){
  358. #pragma omp parallel for
  359. for(int i=0;i<glb_splt_count;i++){
  360. disp[i]=std::lower_bound(&arr[0], &arr[0]+nelem, glb_splitters[i])-&arr[0];
  361. }
  362. }
  363. std::vector<long long> glb_disp(glb_splt_count,0);
  364. MPI_Allreduce(&disp[0], &glb_disp[0], glb_splt_count, par::Mpi_datatype<long long>::value(), par::Mpi_datatype<long long>::sum(), comm);
  365. long long* split_disp=&glb_disp[0];
  366. for(int i=0;i<glb_splt_count;i++)
  367. if(labs(glb_disp[i]-totSize/2)<labs(*split_disp-totSize/2)) split_disp=&glb_disp[i];
  368. split_key=glb_splitters[split_disp-&glb_disp[0]];
  369. totSize_new=(myrank<=(npes-1)/2?*split_disp:totSize-*split_disp);
  370. //double err=(((double)*split_disp)/(totSize/2))-1.0;
  371. //if(fabs(err)<0.01 || npes<=16) break;
  372. //else if(!myrank) std::cout<<err<<'\n';
  373. }
  374. // Split problem into two. O( N/p )
  375. int split_id=(npes-1)/2;
  376. {
  377. int new_p0=(myrank<=split_id?0:split_id+1);
  378. int cmp_p0=(myrank> split_id?0:split_id+1);
  379. int partner = myrank+cmp_p0-new_p0;
  380. if(partner>=npes) partner=npes-1;
  381. assert(partner>=0);
  382. bool extra_partner=( npes%2==1 && npes-1==myrank );
  383. // Exchange send sizes.
  384. char *sbuff, *lbuff;
  385. int rsize=0, ssize=0, lsize=0;
  386. int ext_rsize=0, ext_ssize=0;
  387. size_t split_indx=(nelem>0?std::lower_bound(&arr[0], &arr[0]+nelem, split_key)-&arr[0]:0);
  388. ssize= (myrank> split_id? split_indx: nelem -split_indx)*sizeof(T);
  389. sbuff=(char*)(myrank> split_id? &arr[0] : &arr[0]+split_indx);
  390. lsize= (myrank<=split_id? split_indx: nelem -split_indx)*sizeof(T);
  391. lbuff=(char*)(myrank<=split_id? &arr[0] : &arr[0]+split_indx);
  392. MPI_Status status;
  393. MPI_Sendrecv (& ssize,1,MPI_INT, partner,0, & rsize,1,MPI_INT, partner, 0,comm,&status);
  394. if(extra_partner) MPI_Sendrecv(&ext_ssize,1,MPI_INT,split_id,0, &ext_rsize,1,MPI_INT,split_id, 0,comm,&status);
  395. // Exchange data.
  396. rbuff .Resize( rsize/sizeof(T));
  397. rbuff_ext.Resize(ext_rsize/sizeof(T));
  398. MPI_Sendrecv (sbuff,ssize,MPI_BYTE, partner,0, &rbuff [0], rsize,MPI_BYTE, partner, 0,comm,&status);
  399. if(extra_partner) MPI_Sendrecv( NULL, 0,MPI_BYTE,split_id,0, &rbuff_ext[0],ext_rsize,MPI_BYTE,split_id, 0,comm,&status);
  400. int nbuff_size=lsize+rsize+ext_rsize;
  401. nbuff.Resize(nbuff_size/sizeof(T));
  402. omp_par::merge<T*>((T*)lbuff, (T*)(lbuff+lsize), &rbuff[0], &rbuff[0]+(rsize/sizeof(T)), &nbuff[0], omp_p, std::less<T>());
  403. if(ext_rsize>0 && nbuff.Dim()>0){
  404. nbuff_ext.Resize(nbuff_size/sizeof(T));
  405. omp_par::merge<T*>(&nbuff[0], &nbuff[0]+((lsize+rsize)/sizeof(T)), &rbuff_ext[0], &rbuff_ext[0]+(ext_rsize/sizeof(T)), &nbuff_ext[0], omp_p, std::less<T>());
  406. nbuff.Swap(nbuff_ext);
  407. nbuff_ext.Resize(0);
  408. }
  409. // Copy new data.
  410. totSize=totSize_new;
  411. nelem = nbuff_size/sizeof(T);
  412. arr.Swap(nbuff);
  413. nbuff.Resize(0);
  414. }
  415. {// Split comm. O( log(p) ) ??
  416. MPI_Comm scomm;
  417. MPI_Comm_split(comm, myrank<=split_id, myrank, &scomm );
  418. comm=scomm;
  419. npes =(myrank<=split_id? split_id+1: npes -split_id-1);
  420. myrank=(myrank<=split_id? myrank : myrank-split_id-1);
  421. }
  422. }
  423. SortedElem.Resize(nelem);
  424. memcpy(&SortedElem[0], &arr[0], nelem*sizeof(T));
  425. par::partitionW<T>(SortedElem, NULL , comm_);
  426. return 0;
  427. }//end function
  428. template<typename T>
  429. int HyperQuickSort(const std::vector<T>& arr_, std::vector<T>& SortedElem_, const MPI_Comm& comm_){
  430. Vector<T> SortedElem;
  431. const Vector<T> arr(arr_.size(),(T*)&arr_[0],false);
  432. int ret = HyperQuickSort(arr, SortedElem, comm_);
  433. SortedElem_.assign(&SortedElem[0],&SortedElem[0]+SortedElem.Dim());
  434. return ret;
  435. }
  436. template<typename T>
  437. int SortScatterIndex(const Vector<T>& key, Vector<size_t>& scatter_index, const MPI_Comm& comm, const T* split_key_){
  438. typedef SortPair<T,size_t> Pair_t;
  439. int npes, rank;
  440. MPI_Comm_size(comm, &npes);
  441. MPI_Comm_rank(comm, &rank);
  442. long long npesLong = npes;
  443. //Vector<char> buff;
  444. //if(buff_!=NULL && buff_->Dim()>0){
  445. // buff.ReInit(buff_->Dim(),&(*buff_)[0],false);
  446. //}
  447. Vector<Pair_t> parray;
  448. { // Allocate memory
  449. //size_t parray_size=key.Dim()*sizeof(Pair_t);
  450. //if(buff.Dim()>parray_size){
  451. // parray.ReInit(key.Dim(),(Pair_t*)&buff[0],false);
  452. // buff.ReInit(buff.Dim()-parray_size,&buff[0]+parray_size,false);
  453. //}else
  454. parray.Resize(key.Dim());
  455. }
  456. { // Build global index.
  457. long long glb_dsp=0;
  458. long long loc_size=key.Dim();
  459. MPI_Scan(&loc_size, &glb_dsp, 1, par::Mpi_datatype<long long>::value(), par::Mpi_datatype<long long>::sum(), comm);
  460. glb_dsp-=loc_size;
  461. #pragma omp parallel for
  462. for(size_t i=0;i<loc_size;i++){
  463. parray[i].key=key[i];
  464. parray[i].data=glb_dsp+i;
  465. }
  466. }
  467. Vector<Pair_t> psorted;
  468. { // Allocate memory
  469. //if(buff.Dim()>0){
  470. // psorted.ReInit(buff.Dim()/sizeof(Pair_t), (Pair_t*)&buff[0], false);
  471. //}
  472. }
  473. HyperQuickSort(parray, psorted, comm);
  474. if(split_key_!=NULL){ // Partition data
  475. Vector<T> split_key(npesLong);
  476. MPI_Allgather((void*)split_key_ , 1, par::Mpi_datatype<T>::value(),
  477. &split_key[0], 1, par::Mpi_datatype<T>::value(), comm);
  478. Vector<int> int_buff(npesLong*4);
  479. Vector<int> sendSz (npesLong,&int_buff[0]+npesLong*0,false);
  480. Vector<int> recvSz (npesLong,&int_buff[0]+npesLong*1,false);
  481. Vector<int> sendOff(npesLong,&int_buff[0]+npesLong*2,false);
  482. Vector<int> recvOff(npesLong,&int_buff[0]+npesLong*3,false);
  483. long long nlSize = psorted.Dim();
  484. // compute the partition offsets and sizes so that All2Allv can be performed.
  485. // initialize ...
  486. #pragma omp parallel for
  487. for (size_t i = 0; i < npesLong; i++) {
  488. sendSz[i] = 0;
  489. }
  490. //The Heart of the algorithm....
  491. if(nlSize>0) {
  492. // Determine processor range.
  493. long long pid1=std::lower_bound(&split_key[0], &split_key[0]+npesLong, psorted[ 0].key)-&split_key[0]-1;
  494. long long pid2=std::upper_bound(&split_key[0], &split_key[0]+npesLong, psorted[nlSize-1].key)-&split_key[0]+0;
  495. pid1=(pid1< 0? 0:pid1);
  496. pid2=(pid2>npesLong?npesLong:pid2);
  497. #pragma omp parallel for
  498. for(int i=pid1;i<pid2;i++){
  499. Pair_t p1; p1.key=split_key[ i];
  500. Pair_t p2; p2.key=split_key[i+1<npesLong?i+1:i];
  501. long long start = std::lower_bound(&psorted[0], &psorted[0]+nlSize, p1, std::less<Pair_t>())-&psorted[0];
  502. long long end = std::lower_bound(&psorted[0], &psorted[0]+nlSize, p2, std::less<Pair_t>())-&psorted[0];
  503. if(i== 0) start=0 ;
  504. if(i==npesLong-1) end =nlSize;
  505. sendSz[i]=end-start;
  506. }
  507. }
  508. // communicate with other procs how many you shall be sending and get how
  509. // many to recieve from whom.
  510. MPI_Alltoall(&sendSz[0], 1, par::Mpi_datatype<int>::value(),
  511. &recvSz[0], 1, par::Mpi_datatype<int>::value(), comm);
  512. // compute offsets ...
  513. sendOff[0] = 0; omp_par::scan(&sendSz[0],&sendOff[0],npesLong);
  514. recvOff[0] = 0; omp_par::scan(&recvSz[0],&recvOff[0],npesLong);
  515. // new value of nlSize, ie the local nodes.
  516. long long nn = recvSz[npesLong-1] + recvOff[npesLong-1];
  517. // allocate memory for the new arrays ...
  518. Vector<Pair_t> newNodes;
  519. {
  520. if(psorted.Capacity()>nn+std::max(nn,nlSize)){
  521. newNodes.ReInit(nn,&psorted[0]+std::max(nn,nlSize),false);
  522. }else newNodes.Resize(nn);
  523. }
  524. // perform All2All ...
  525. par::Mpi_Alltoallv_sparse<Pair_t>(&psorted[0], &sendSz[0], &sendOff[0],
  526. &newNodes[0], &recvSz[0], &recvOff[0], comm);
  527. // reset the pointer ...
  528. psorted=newNodes;
  529. }
  530. scatter_index.Resize(psorted.Dim());
  531. #pragma omp parallel for
  532. for(size_t i=0;i<psorted.Dim();i++){
  533. scatter_index[i]=psorted[i].data;
  534. }
  535. return 0;
  536. }
  537. template<typename T>
  538. int ScatterForward(Vector<T>& data_, const Vector<size_t>& scatter_index, const MPI_Comm& comm){
  539. typedef SortPair<size_t,size_t> Pair_t;
  540. int npes, rank;
  541. MPI_Comm_size(comm, &npes);
  542. MPI_Comm_rank(comm, &rank);
  543. long long npesLong = npes;
  544. size_t data_dim=0;
  545. long long send_size=0;
  546. long long recv_size=0;
  547. {
  548. recv_size=scatter_index.Dim();
  549. long long glb_size[2]={0,0};
  550. long long loc_size[2]={data_.Dim()*sizeof(T), recv_size};
  551. MPI_Allreduce(&loc_size, &glb_size, 2, par::Mpi_datatype<long long>::value(), par::Mpi_datatype<long long>::sum(), comm);
  552. if(glb_size[0]==0 || glb_size[1]==0) return 0; //Nothing to be done.
  553. data_dim=glb_size[0]/glb_size[1];
  554. assert(glb_size[0]==data_dim*glb_size[1]);
  555. send_size=(data_.Dim()*sizeof(T))/data_dim;
  556. }
  557. Vector<char> recv_buff(recv_size*data_dim);
  558. Vector<char> send_buff(send_size*data_dim);
  559. // Global scan of data size.
  560. Vector<long long> glb_scan(npesLong);
  561. {
  562. long long glb_rank=0;
  563. MPI_Scan(&send_size, &glb_rank, 1, par::Mpi_datatype<long long>::value(), par::Mpi_datatype<long long>::sum(), comm);
  564. glb_rank-=send_size;
  565. MPI_Allgather(&glb_rank , 1, par::Mpi_datatype<long long>::value(),
  566. &glb_scan[0], 1, par::Mpi_datatype<long long>::value(), comm);
  567. }
  568. // Sort scatter_index.
  569. Vector<Pair_t> psorted(recv_size);
  570. {
  571. #pragma omp parallel for
  572. for(size_t i=0;i<recv_size;i++){
  573. psorted[i].key=scatter_index[i];
  574. psorted[i].data=i;
  575. }
  576. omp_par::merge_sort(&psorted[0], &psorted[0]+recv_size);
  577. }
  578. // Exchange send, recv indices.
  579. Vector<size_t> recv_indx(recv_size);
  580. Vector<size_t> send_indx(send_size);
  581. Vector<int> sendSz(npesLong);
  582. Vector<int> sendOff(npesLong);
  583. Vector<int> recvSz(npesLong);
  584. Vector<int> recvOff(npesLong);
  585. {
  586. #pragma omp parallel for
  587. for(size_t i=0;i<recv_size;i++){
  588. recv_indx[i]=psorted[i].key;
  589. }
  590. #pragma omp parallel for
  591. for(size_t i=0;i<npesLong;i++){
  592. size_t start= std::lower_bound(&recv_indx[0], &recv_indx[0]+recv_size, glb_scan[ i])-&recv_indx[0];
  593. size_t end =(i+1<npesLong?std::lower_bound(&recv_indx[0], &recv_indx[0]+recv_size, glb_scan[i+1])-&recv_indx[0]:recv_size);
  594. recvSz[i]=end-start;
  595. recvOff[i]=start;
  596. }
  597. MPI_Alltoall(&recvSz[0], 1, par::Mpi_datatype<int>::value(),
  598. &sendSz[0], 1, par::Mpi_datatype<int>::value(), comm);
  599. sendOff[0] = 0; omp_par::scan(&sendSz[0],&sendOff[0],npesLong);
  600. assert(sendOff[npesLong-1]+sendSz[npesLong-1]==send_size);
  601. par::Mpi_Alltoallv_dense<size_t>(&recv_indx[0], &recvSz[0], &recvOff[0],
  602. &send_indx[0], &sendSz[0], &sendOff[0], comm);
  603. #pragma omp parallel for
  604. for(size_t i=0;i<send_size;i++){
  605. assert(send_indx[i]>=glb_scan[rank]);
  606. send_indx[i]-=glb_scan[rank];
  607. assert(send_indx[i]<send_size);
  608. }
  609. }
  610. // Prepare send buffer
  611. {
  612. char* data=(char*)&data_[0];
  613. #pragma omp parallel for
  614. for(size_t i=0;i<send_size;i++){
  615. size_t src_indx=send_indx[i]*data_dim;
  616. size_t trg_indx=i*data_dim;
  617. for(size_t j=0;j<data_dim;j++)
  618. send_buff[trg_indx+j]=data[src_indx+j];
  619. }
  620. }
  621. // All2Allv
  622. {
  623. #pragma omp parallel for
  624. for(size_t i=0;i<npesLong;i++){
  625. sendSz [i]*=data_dim;
  626. sendOff[i]*=data_dim;
  627. recvSz [i]*=data_dim;
  628. recvOff[i]*=data_dim;
  629. }
  630. par::Mpi_Alltoallv_dense<char>(&send_buff[0], &sendSz[0], &sendOff[0],
  631. &recv_buff[0], &recvSz[0], &recvOff[0], comm);
  632. }
  633. // Build output data.
  634. {
  635. data_.Resize(recv_size*data_dim/sizeof(T));
  636. char* data=(char*)&data_[0];
  637. #pragma omp parallel for
  638. for(size_t i=0;i<recv_size;i++){
  639. size_t src_indx=i*data_dim;
  640. size_t trg_indx=psorted[i].data*data_dim;
  641. for(size_t j=0;j<data_dim;j++)
  642. data[trg_indx+j]=recv_buff[src_indx+j];
  643. }
  644. }
  645. return 0;
  646. }
  647. template<typename T>
  648. int ScatterReverse(Vector<T>& data_, const Vector<size_t>& scatter_index, const MPI_Comm& comm, size_t loc_size){
  649. typedef SortPair<size_t,size_t> Pair_t;
  650. int npes, rank;
  651. MPI_Comm_size(comm, &npes);
  652. MPI_Comm_rank(comm, &rank);
  653. long long npesLong = npes;
  654. size_t data_dim=0;
  655. long long send_size=0;
  656. long long recv_size=0;
  657. {
  658. send_size=scatter_index.Dim();
  659. recv_size=loc_size;
  660. long long glb_size[3]={0,0};
  661. long long loc_size[3]={data_.Dim()*sizeof(T), send_size, recv_size};
  662. MPI_Allreduce(&loc_size, &glb_size, 3, par::Mpi_datatype<long long>::value(), par::Mpi_datatype<long long>::sum(), comm);
  663. if(glb_size[0]==0 || glb_size[1]==0) return 0; //Nothing to be done.
  664. data_dim=glb_size[0]/glb_size[1];
  665. assert(glb_size[0]==data_dim*glb_size[1]);
  666. if(glb_size[1]!=glb_size[2]){
  667. recv_size=(((rank+1)*glb_size[1])/npesLong)-
  668. (( rank *glb_size[1])/npesLong);
  669. }
  670. }
  671. Vector<char> recv_buff(recv_size*data_dim);
  672. Vector<char> send_buff(send_size*data_dim);
  673. // Global data size.
  674. Vector<long long> glb_scan(npesLong);
  675. {
  676. long long glb_rank=0;
  677. MPI_Scan(&recv_size, &glb_rank, 1, par::Mpi_datatype<long long>::value(), par::Mpi_datatype<long long>::sum(), comm);
  678. glb_rank-=recv_size;
  679. MPI_Allgather(&glb_rank , 1, par::Mpi_datatype<long long>::value(),
  680. &glb_scan[0], 1, par::Mpi_datatype<long long>::value(), comm);
  681. }
  682. // Sort scatter_index.
  683. Vector<Pair_t> psorted(send_size);
  684. {
  685. #pragma omp parallel for
  686. for(size_t i=0;i<send_size;i++){
  687. psorted[i].key=scatter_index[i];
  688. psorted[i].data=i;
  689. }
  690. omp_par::merge_sort(&psorted[0], &psorted[0]+send_size);
  691. }
  692. // Exchange send, recv indices.
  693. Vector<size_t> recv_indx(recv_size);
  694. Vector<size_t> send_indx(send_size);
  695. Vector<int> sendSz(npesLong);
  696. Vector<int> sendOff(npesLong);
  697. Vector<int> recvSz(npesLong);
  698. Vector<int> recvOff(npesLong);
  699. {
  700. #pragma omp parallel for
  701. for(size_t i=0;i<send_size;i++){
  702. send_indx[i]=psorted[i].key;
  703. }
  704. #pragma omp parallel for
  705. for(size_t i=0;i<npesLong;i++){
  706. size_t start= std::lower_bound(&send_indx[0], &send_indx[0]+send_size, glb_scan[ i])-&send_indx[0];
  707. size_t end =(i+1<npesLong?std::lower_bound(&send_indx[0], &send_indx[0]+send_size, glb_scan[i+1])-&send_indx[0]:send_size);
  708. sendSz[i]=end-start;
  709. sendOff[i]=start;
  710. }
  711. MPI_Alltoall(&sendSz[0], 1, par::Mpi_datatype<int>::value(),
  712. &recvSz[0], 1, par::Mpi_datatype<int>::value(), comm);
  713. recvOff[0] = 0; omp_par::scan(&recvSz[0],&recvOff[0],npesLong);
  714. assert(recvOff[npesLong-1]+recvSz[npesLong-1]==recv_size);
  715. par::Mpi_Alltoallv_dense<size_t>(&send_indx[0], &sendSz[0], &sendOff[0],
  716. &recv_indx[0], &recvSz[0], &recvOff[0], comm);
  717. #pragma omp parallel for
  718. for(size_t i=0;i<recv_size;i++){
  719. assert(recv_indx[i]>=glb_scan[rank]);
  720. recv_indx[i]-=glb_scan[rank];
  721. assert(recv_indx[i]<recv_size);
  722. }
  723. }
  724. // Prepare send buffer
  725. {
  726. char* data=(char*)&data_[0];
  727. #pragma omp parallel for
  728. for(size_t i=0;i<send_size;i++){
  729. size_t src_indx=psorted[i].data*data_dim;
  730. size_t trg_indx=i*data_dim;
  731. for(size_t j=0;j<data_dim;j++)
  732. send_buff[trg_indx+j]=data[src_indx+j];
  733. }
  734. }
  735. // All2Allv
  736. {
  737. #pragma omp parallel for
  738. for(size_t i=0;i<npesLong;i++){
  739. sendSz [i]*=data_dim;
  740. sendOff[i]*=data_dim;
  741. recvSz [i]*=data_dim;
  742. recvOff[i]*=data_dim;
  743. }
  744. par::Mpi_Alltoallv_dense<char>(&send_buff[0], &sendSz[0], &sendOff[0],
  745. &recv_buff[0], &recvSz[0], &recvOff[0], comm);
  746. }
  747. // Build output data.
  748. {
  749. data_.Resize(recv_size*data_dim/sizeof(T));
  750. char* data=(char*)&data_[0];
  751. #pragma omp parallel for
  752. for(size_t i=0;i<recv_size;i++){
  753. size_t src_indx=i*data_dim;
  754. size_t trg_indx=recv_indx[i]*data_dim;
  755. for(size_t j=0;j<data_dim;j++)
  756. data[trg_indx+j]=recv_buff[src_indx+j];
  757. }
  758. }
  759. return 0;
  760. }
  761. }//end namespace
  762. }//end namespace