parUtils.txx 35 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957
  1. /**
  2. @file parUtils.txx
  3. @brief Definitions of the templated functions in the par module.
  4. @author Rahul S. Sampath, rahul.sampath@gmail.com
  5. @author Hari Sundar, hsundar@gmail.com
  6. @author Shravan Veerapaneni, shravan@seas.upenn.edu
  7. @author Santi Swaroop Adavani, santis@gmail.com
  8. */
  9. #include <cmath>
  10. #include <cassert>
  11. #include <cstring>
  12. #include <cstdlib>
  13. #include <iostream>
  14. #include <algorithm>
  15. #include <dtypes.h>
  16. #include <ompUtils.h>
  17. #include <mem_mgr.hpp>
  18. #include <matrix.hpp>
  19. namespace pvfmm{
  20. namespace par{
  21. template <typename T>
  22. int Mpi_Alltoallv_sparse(T* sendbuf, int* sendcnts, int* sdispls,
  23. T* recvbuf, int* recvcnts, int* rdispls, const MPI_Comm &comm) {
  24. #ifndef ALLTOALLV_FIX
  25. return MPI_Alltoallv(sendbuf, sendcnts, sdispls, par::Mpi_datatype<T>::value(),
  26. recvbuf, recvcnts, rdispls, par::Mpi_datatype<T>::value(), comm);
  27. #else
  28. int npes, rank;
  29. MPI_Comm_size(comm, &npes);
  30. MPI_Comm_rank(comm, &rank);
  31. int commCnt = 0;
  32. #pragma omp parallel for reduction(+:commCnt)
  33. for(int i = 0; i < rank; i++) {
  34. if(sendcnts[i] > 0) {
  35. commCnt++;
  36. }
  37. if(recvcnts[i] > 0) {
  38. commCnt++;
  39. }
  40. }
  41. #pragma omp parallel for reduction(+:commCnt)
  42. for(int i = (rank+1); i < npes; i++) {
  43. if(sendcnts[i] > 0) {
  44. commCnt++;
  45. }
  46. if(recvcnts[i] > 0) {
  47. commCnt++;
  48. }
  49. }
  50. MPI_Request* requests = mem::aligned_new<MPI_Request>(commCnt);
  51. assert(requests || !commCnt);
  52. MPI_Status* statuses = mem::aligned_new<MPI_Status>(commCnt);
  53. assert(statuses || !commCnt);
  54. commCnt = 0;
  55. //First place all recv requests. Do not recv from self.
  56. for(int i = 0; i < rank; i++) {
  57. if(recvcnts[i] > 0) {
  58. MPI_Irecv( &(recvbuf[rdispls[i]]) , recvcnts[i], par::Mpi_datatype<T>::value(), i, 1,
  59. comm, &(requests[commCnt]) );
  60. commCnt++;
  61. }
  62. }
  63. for(int i = (rank + 1); i < npes; i++) {
  64. if(recvcnts[i] > 0) {
  65. MPI_Irecv( &(recvbuf[rdispls[i]]) , recvcnts[i], par::Mpi_datatype<T>::value(), i, 1,
  66. comm, &(requests[commCnt]) );
  67. commCnt++;
  68. }
  69. }
  70. //Next send the messages. Do not send to self.
  71. for(int i = 0; i < rank; i++) {
  72. if(sendcnts[i] > 0) {
  73. MPI_Issend( &(sendbuf[sdispls[i]]), sendcnts[i], par::Mpi_datatype<T>::value(), i, 1,
  74. comm, &(requests[commCnt]) );
  75. commCnt++;
  76. }
  77. }
  78. for(int i = (rank + 1); i < npes; i++) {
  79. if(sendcnts[i] > 0) {
  80. MPI_Issend( &(sendbuf[sdispls[i]]), sendcnts[i], par::Mpi_datatype<T>::value(), i, 1,
  81. comm, &(requests[commCnt]) );
  82. commCnt++;
  83. }
  84. }
  85. //Now copy local portion.
  86. #pragma omp parallel for
  87. for(int i = 0; i < sendcnts[rank]; i++) {
  88. recvbuf[rdispls[rank] + i] = sendbuf[sdispls[rank] + i];
  89. }
  90. if(commCnt) MPI_Waitall(commCnt, requests, statuses);
  91. mem::aligned_delete(requests);
  92. mem::aligned_delete(statuses);
  93. return 0;
  94. #endif
  95. }
  96. template <typename T>
  97. int Mpi_Alltoallv_dense(T* sbuff_, int* s_cnt_, int* sdisp_,
  98. T* rbuff_, int* r_cnt_, int* rdisp_, const MPI_Comm& comm){
  99. #ifndef ALLTOALLV_FIX
  100. return MPI_Alltoallv(sbuff_, s_cnt_, sdisp_, par::Mpi_datatype<T>::value(),
  101. rbuff_, r_cnt_, rdisp_, par::Mpi_datatype<T>::value(), comm);
  102. #else
  103. int np, pid;
  104. MPI_Comm_size(comm,&np);
  105. MPI_Comm_rank(comm,&pid);
  106. int range[2]={0,np-1};
  107. int split_id, partner;
  108. std::vector<int> s_cnt(np);
  109. #pragma omp parallel for
  110. for(int i=0;i<np;i++){
  111. s_cnt[i]=s_cnt_[i]*sizeof(T)+2*sizeof(int);
  112. }
  113. std::vector<int> sdisp(np); sdisp[0]=0;
  114. omp_par::scan(&s_cnt[0],&sdisp[0],np);
  115. char* sbuff=mem::aligned_new<char>(sdisp[np-1]+s_cnt[np-1]);
  116. #pragma omp parallel for
  117. for(int i=0;i<np;i++){
  118. ((int*)&sbuff[sdisp[i]])[0]=s_cnt[i];
  119. ((int*)&sbuff[sdisp[i]])[1]=pid;
  120. memcpy(&sbuff[sdisp[i]]+2*sizeof(int),&sbuff_[sdisp_[i]],s_cnt[i]-2*sizeof(int));
  121. }
  122. while(range[0]<range[1]){
  123. split_id=(range[0]+range[1])/2;
  124. int new_range[2]={(pid<=split_id?range[0]:split_id+1),
  125. (pid<=split_id?split_id:range[1] )};
  126. int cmp_range[2]={(pid> split_id?range[0]:split_id+1),
  127. (pid> split_id?split_id:range[1] )};
  128. int new_np=new_range[1]-new_range[0]+1;
  129. int cmp_np=cmp_range[1]-cmp_range[0]+1;
  130. partner=pid+cmp_range[0]-new_range[0];
  131. if(partner>range[1]) partner=range[1];
  132. assert(partner>=range[0]);
  133. bool extra_partner=( (range[1]-range[0])%2==0 &&
  134. range[1] ==pid );
  135. //Communication.
  136. {
  137. int* s_lengths=&s_cnt[cmp_range[0]-range[0]];
  138. std::vector<int> s_len_ext(cmp_np,0);
  139. std::vector<int> r_cnt (new_np,0);
  140. std::vector<int> r_cnt_ext(new_np,0);
  141. MPI_Status status;
  142. //Exchange send sizes.
  143. MPI_Sendrecv (&s_lengths[0],cmp_np,MPI_INT, partner,0, &r_cnt [0],new_np,MPI_INT, partner, 0,comm,&status);
  144. if(extra_partner) MPI_Sendrecv(&s_len_ext[0],cmp_np,MPI_INT,split_id,0, &r_cnt_ext[0],new_np,MPI_INT,split_id, 0,comm,&status);
  145. //Allocate receive buffer.
  146. std::vector<int> rdisp (new_np,0);
  147. std::vector<int> rdisp_ext(new_np,0);
  148. omp_par::scan(&r_cnt [0],&rdisp [0],new_np);
  149. omp_par::scan(&r_cnt_ext[0],&rdisp_ext[0],new_np);
  150. int rbuff_size =rdisp [new_np-1]+r_cnt [new_np-1];
  151. int rbuff_size_ext=rdisp_ext[new_np-1]+r_cnt_ext[new_np-1];
  152. char* rbuff = mem::aligned_new<char>(rbuff_size );
  153. char* rbuffext=(extra_partner? mem::aligned_new<char>(rbuff_size_ext): NULL);
  154. //Sendrecv data.
  155. {
  156. int * s_cnt_tmp=&s_cnt[cmp_range[0]-range[0]] ;
  157. int * sdisp_tmp=&sdisp[cmp_range[0]-range[0]];
  158. char* sbuff_tmp=&sbuff[sdisp_tmp[0]];
  159. int sbuff_size=sdisp_tmp[cmp_np-1]+s_cnt_tmp[cmp_np-1]-sdisp_tmp[0];
  160. MPI_Sendrecv (sbuff_tmp,sbuff_size,MPI_BYTE, partner,0, &rbuff [0],rbuff_size ,MPI_BYTE, partner, 0,comm,&status);
  161. if(extra_partner) MPI_Sendrecv( NULL, 0,MPI_BYTE,split_id,0, &rbuffext[0],rbuff_size_ext,MPI_BYTE,split_id, 0,comm,&status);
  162. }
  163. //Rearrange received data.
  164. {
  165. //assert(!extra_partner);
  166. int * s_cnt_old=&s_cnt[new_range[0]-range[0]];
  167. int * sdisp_old=&sdisp[new_range[0]-range[0]];
  168. std::vector<int> s_cnt_new(&s_cnt_old[0],&s_cnt_old[new_np]);
  169. std::vector<int> sdisp_new(new_np ,0 );
  170. #pragma omp parallel for
  171. for(int i=0;i<new_np;i++){
  172. s_cnt_new[i]+=r_cnt[i]+r_cnt_ext[i];
  173. }
  174. omp_par::scan(&s_cnt_new[0],&sdisp_new[0],new_np);
  175. //Copy data to sbuff_new.
  176. char* sbuff_new=mem::aligned_new<char>(sdisp_new[new_np-1]+s_cnt_new[new_np-1]);
  177. #pragma omp parallel for
  178. for(int i=0;i<new_np;i++){
  179. memcpy(&sbuff_new[sdisp_new[i] ],&sbuff [sdisp_old[i]],s_cnt_old[i]);
  180. memcpy(&sbuff_new[sdisp_new[i]+s_cnt_old[i] ],&rbuff [rdisp [i]],r_cnt [i]);
  181. memcpy(&sbuff_new[sdisp_new[i]+s_cnt_old[i]+r_cnt[i]],&rbuffext[rdisp_ext[i]],r_cnt_ext[i]);
  182. }
  183. //Free memory.
  184. if(sbuff !=NULL) mem::aligned_delete(sbuff );
  185. if(rbuff !=NULL) mem::aligned_delete(rbuff );
  186. if(rbuffext!=NULL) mem::aligned_delete(rbuffext);
  187. //Substitute data for next iteration.
  188. s_cnt=s_cnt_new;
  189. sdisp=sdisp_new;
  190. sbuff=sbuff_new;
  191. }
  192. }
  193. range[0]=new_range[0];
  194. range[1]=new_range[1];
  195. }
  196. //Copy data to rbuff_.
  197. std::vector<char*> buff_ptr(np);
  198. char* tmp_ptr=sbuff;
  199. for(int i=0;i<np;i++){
  200. int& blk_size=((int*)tmp_ptr)[0];
  201. buff_ptr[i]=tmp_ptr;
  202. tmp_ptr+=blk_size;
  203. }
  204. #pragma omp parallel for
  205. for(int i=0;i<np;i++){
  206. int& blk_size=((int*)buff_ptr[i])[0];
  207. int& src_pid=((int*)buff_ptr[i])[1];
  208. assert(blk_size-2*sizeof(int)<=r_cnt_[src_pid]*sizeof(T));
  209. memcpy(&rbuff_[rdisp_[src_pid]],buff_ptr[i]+2*sizeof(int),blk_size-2*sizeof(int));
  210. }
  211. //Free memory.
  212. if(sbuff !=NULL) mem::aligned_delete(sbuff);
  213. return 0;
  214. #endif
  215. }
  216. template<typename T>
  217. int partitionW(Vector<T>& nodeList, long long* wts, const MPI_Comm& comm){
  218. int npes, rank;
  219. MPI_Comm_size(comm, &npes);
  220. MPI_Comm_rank(comm, &rank);
  221. long long npesLong = npes;
  222. long long nlSize = nodeList.Dim();
  223. long long off1= 0, off2= 0, localWt= 0, totalWt = 0;
  224. // First construct arrays of wts.
  225. Vector<long long> wts_(nlSize);
  226. if(wts == NULL) {
  227. wts=&wts_[0];
  228. #pragma omp parallel for
  229. for (long long i = 0; i < nlSize; i++){
  230. wts[i] = 1;
  231. }
  232. }
  233. #pragma omp parallel for reduction(+:localWt)
  234. for (long long i = 0; i < nlSize; i++){
  235. localWt+=wts[i];
  236. }
  237. // compute the total weight of the problem ...
  238. MPI_Allreduce(&localWt, &totalWt, 1, par::Mpi_datatype<long long>::value(), par::Mpi_datatype<long long>::sum(), comm);
  239. MPI_Scan(&localWt, &off2, 1, par::Mpi_datatype<long long>::value(), par::Mpi_datatype<long long>::sum(), comm );
  240. off1=off2-localWt;
  241. // perform a local scan on the weights first ...
  242. Vector<long long> lscn(nlSize);
  243. if(nlSize) {
  244. lscn[0]=off1;
  245. omp_par::scan(&wts[0],&lscn[0],nlSize);
  246. }
  247. Vector<int> int_buff(npesLong*4);
  248. Vector<int> sendSz (npesLong,&int_buff[0]+npesLong*0,false);
  249. Vector<int> recvSz (npesLong,&int_buff[0]+npesLong*1,false);
  250. Vector<int> sendOff(npesLong,&int_buff[0]+npesLong*2,false);
  251. Vector<int> recvOff(npesLong,&int_buff[0]+npesLong*3,false);
  252. // compute the partition offsets and sizes so that All2Allv can be performed.
  253. // initialize ...
  254. #pragma omp parallel for
  255. for (size_t i = 0; i < npesLong; i++) {
  256. sendSz[i] = 0;
  257. }
  258. //The Heart of the algorithm....
  259. if(nlSize>0 && totalWt>0) {
  260. long long pid1=( off1 *npesLong)/totalWt;
  261. long long pid2=((off2+1)*npesLong)/totalWt+1;
  262. assert((totalWt*pid2)/npesLong>=off2);
  263. pid1=(pid1< 0? 0:pid1);
  264. pid2=(pid2>npesLong?npesLong:pid2);
  265. #pragma omp parallel for
  266. for(int i=pid1;i<pid2;i++){
  267. long long wt1=(totalWt*(i ))/npesLong;
  268. long long wt2=(totalWt*(i+1))/npesLong;
  269. long long start = std::lower_bound(&lscn[0], &lscn[0]+nlSize, wt1, std::less<long long>())-&lscn[0];
  270. long long end = std::lower_bound(&lscn[0], &lscn[0]+nlSize, wt2, std::less<long long>())-&lscn[0];
  271. if(i== 0) start=0 ;
  272. if(i==npesLong-1) end =nlSize;
  273. sendSz[i]=end-start;
  274. }
  275. }else sendSz[0]=nlSize;
  276. // communicate with other procs how many you shall be sending and get how
  277. // many to recieve from whom.
  278. MPI_Alltoall(&sendSz[0], 1, par::Mpi_datatype<int>::value(),
  279. &recvSz[0], 1, par::Mpi_datatype<int>::value(), comm);
  280. // compute offsets ...
  281. sendOff[0] = 0; omp_par::scan(&sendSz[0],&sendOff[0],npesLong);
  282. recvOff[0] = 0; omp_par::scan(&recvSz[0],&recvOff[0],npesLong);
  283. // new value of nlSize, ie the local nodes.
  284. long long nn = recvSz[npesLong-1] + recvOff[npes-1];
  285. // allocate memory for the new arrays ...
  286. Vector<T> newNodes(nn);
  287. // perform All2All ...
  288. par::Mpi_Alltoallv_sparse<T>(&nodeList[0], &sendSz[0], &sendOff[0],
  289. &newNodes[0], &recvSz[0], &recvOff[0], comm);
  290. // reset the pointer ...
  291. nodeList=newNodes;
  292. return 0;
  293. }//end function
  294. template<typename T>
  295. int partitionW(std::vector<T>& nodeList, long long* wts, const MPI_Comm& comm){
  296. Vector<T> nodeList_=nodeList;
  297. int ret = par::partitionW<T>(nodeList_, wts, comm);
  298. nodeList.assign(&nodeList_[0],&nodeList_[0]+nodeList_.Dim());
  299. return ret;
  300. }
  301. template<typename T>
  302. int HyperQuickSort(const Vector<T>& arr_, Vector<T>& SortedElem, const MPI_Comm& comm_){ // O( ((N/p)+log(p))*(log(N/p)+log(p)) )
  303. // Copy communicator.
  304. MPI_Comm comm=comm_;
  305. // Get comm size and rank.
  306. int npes, myrank;
  307. MPI_Comm_size(comm, &npes);
  308. MPI_Comm_rank(comm, &myrank);
  309. int omp_p=omp_get_max_threads();
  310. srand(myrank);
  311. // Local and global sizes. O(log p)
  312. long long totSize, nelem = arr_.Dim();
  313. //assert(nelem); // TODO: Check if this is needed.
  314. MPI_Allreduce(&nelem, &totSize, 1, par::Mpi_datatype<long long>::value(), par::Mpi_datatype<long long>::sum(), comm);
  315. // Local sort.
  316. Vector<T> arr=arr_;
  317. omp_par::merge_sort(&arr[0], &arr[0]+nelem);
  318. // Allocate memory.
  319. Vector<T> nbuff;
  320. Vector<T> nbuff_ext;
  321. Vector<T> rbuff ;
  322. Vector<T> rbuff_ext;
  323. bool free_comm=false; // Flag to free comm.
  324. // Binary split and merge in each iteration.
  325. while(npes>1 && totSize>0){ // O(log p) iterations.
  326. //Determine splitters. O( log(N/p) + log(p) )
  327. T split_key;
  328. long long totSize_new;
  329. //while(true)
  330. {
  331. // Take random splitters. O( 1 ) -- Let p * splt_count = glb_splt_count = const = 100~1000
  332. int splt_count=(1000*nelem)/totSize;
  333. if(npes>1000) splt_count=(((float)rand()/(float)RAND_MAX)*totSize<(1000*nelem)?1:0);
  334. if(splt_count>nelem) splt_count=nelem;
  335. std::vector<T> splitters(splt_count);
  336. for(int i=0;i<splt_count;i++)
  337. splitters[i]=arr[rand()%nelem];
  338. // Gather all splitters. O( log(p) )
  339. int glb_splt_count;
  340. std::vector<int> glb_splt_cnts(npes);
  341. std::vector<int> glb_splt_disp(npes,0);
  342. MPI_Allgather(&splt_count , 1, par::Mpi_datatype<int>::value(),
  343. &glb_splt_cnts[0], 1, par::Mpi_datatype<int>::value(), comm);
  344. omp_par::scan(&glb_splt_cnts[0],&glb_splt_disp[0],npes);
  345. glb_splt_count=glb_splt_cnts[npes-1]+glb_splt_disp[npes-1];
  346. std::vector<T> glb_splitters(glb_splt_count);
  347. MPI_Allgatherv(& splitters[0], splt_count, par::Mpi_datatype<T>::value(),
  348. &glb_splitters[0], &glb_splt_cnts[0], &glb_splt_disp[0],
  349. par::Mpi_datatype<T>::value(), comm);
  350. // Determine split key. O( log(N/p) + log(p) )
  351. std::vector<long long> disp(glb_splt_count,0);
  352. if(nelem>0){
  353. #pragma omp parallel for
  354. for(int i=0;i<glb_splt_count;i++){
  355. disp[i]=std::lower_bound(&arr[0], &arr[0]+nelem, glb_splitters[i])-&arr[0];
  356. }
  357. }
  358. std::vector<long long> glb_disp(glb_splt_count,0);
  359. MPI_Allreduce(&disp[0], &glb_disp[0], glb_splt_count, par::Mpi_datatype<long long>::value(), par::Mpi_datatype<long long>::sum(), comm);
  360. long long* split_disp=&glb_disp[0];
  361. for(int i=0;i<glb_splt_count;i++)
  362. if(labs(glb_disp[i]-totSize/2)<labs(*split_disp-totSize/2)) split_disp=&glb_disp[i];
  363. split_key=glb_splitters[split_disp-&glb_disp[0]];
  364. totSize_new=(myrank<=(npes-1)/2?*split_disp:totSize-*split_disp);
  365. //double err=(((double)*split_disp)/(totSize/2))-1.0;
  366. //if(pvfmm::fabs<double>(err)<0.01 || npes<=16) break;
  367. //else if(!myrank) std::cout<<err<<'\n';
  368. }
  369. // Split problem into two. O( N/p )
  370. int split_id=(npes-1)/2;
  371. {
  372. int new_p0=(myrank<=split_id?0:split_id+1);
  373. int cmp_p0=(myrank> split_id?0:split_id+1);
  374. int partner = myrank+cmp_p0-new_p0;
  375. if(partner>=npes) partner=npes-1;
  376. assert(partner>=0);
  377. bool extra_partner=( npes%2==1 && npes-1==myrank );
  378. // Exchange send sizes.
  379. char *sbuff, *lbuff;
  380. int rsize=0, ssize=0, lsize=0;
  381. int ext_rsize=0, ext_ssize=0;
  382. size_t split_indx=(nelem>0?std::lower_bound(&arr[0], &arr[0]+nelem, split_key)-&arr[0]:0);
  383. ssize= (myrank> split_id? split_indx: nelem -split_indx)*sizeof(T);
  384. sbuff=(char*)(myrank> split_id? &arr[0] : &arr[0]+split_indx);
  385. lsize= (myrank<=split_id? split_indx: nelem -split_indx)*sizeof(T);
  386. lbuff=(char*)(myrank<=split_id? &arr[0] : &arr[0]+split_indx);
  387. MPI_Status status;
  388. MPI_Sendrecv (& ssize,1,MPI_INT, partner,0, & rsize,1,MPI_INT, partner, 0,comm,&status);
  389. if(extra_partner) MPI_Sendrecv(&ext_ssize,1,MPI_INT,split_id,0, &ext_rsize,1,MPI_INT,split_id, 0,comm,&status);
  390. // Exchange data.
  391. rbuff .Resize( rsize/sizeof(T));
  392. rbuff_ext.Resize(ext_rsize/sizeof(T));
  393. MPI_Sendrecv (sbuff,ssize,MPI_BYTE, partner,0, &rbuff [0], rsize,MPI_BYTE, partner, 0,comm,&status);
  394. if(extra_partner) MPI_Sendrecv( NULL, 0,MPI_BYTE,split_id,0, &rbuff_ext[0],ext_rsize,MPI_BYTE,split_id, 0,comm,&status);
  395. int nbuff_size=lsize+rsize+ext_rsize;
  396. nbuff.Resize(nbuff_size/sizeof(T));
  397. omp_par::merge<T*>((T*)lbuff, (T*)(lbuff+lsize), &rbuff[0], &rbuff[0]+(rsize/sizeof(T)), &nbuff[0], omp_p, std::less<T>());
  398. if(ext_rsize>0 && nbuff.Dim()>0){
  399. nbuff_ext.Resize(nbuff_size/sizeof(T));
  400. omp_par::merge<T*>(&nbuff[0], &nbuff[0]+((lsize+rsize)/sizeof(T)), &rbuff_ext[0], &rbuff_ext[0]+(ext_rsize/sizeof(T)), &nbuff_ext[0], omp_p, std::less<T>());
  401. nbuff.Swap(nbuff_ext);
  402. nbuff_ext.Resize(0);
  403. }
  404. // Copy new data.
  405. totSize=totSize_new;
  406. nelem = nbuff_size/sizeof(T);
  407. arr.Swap(nbuff);
  408. nbuff.Resize(0);
  409. }
  410. {// Split comm. O( log(p) ) ??
  411. MPI_Comm scomm;
  412. MPI_Comm_split(comm, myrank<=split_id, myrank, &scomm );
  413. if(free_comm) MPI_Comm_free(&comm);
  414. comm=scomm; free_comm=true;
  415. npes =(myrank<=split_id? split_id+1: npes -split_id-1);
  416. myrank=(myrank<=split_id? myrank : myrank-split_id-1);
  417. }
  418. }
  419. if(free_comm) MPI_Comm_free(&comm);
  420. SortedElem.Resize(nelem);
  421. memcpy(&SortedElem[0], &arr[0], nelem*sizeof(T));
  422. par::partitionW<T>(SortedElem, NULL , comm_);
  423. return 0;
  424. }//end function
  425. template<typename T>
  426. int HyperQuickSort(const std::vector<T>& arr_, std::vector<T>& SortedElem_, const MPI_Comm& comm_){
  427. Vector<T> SortedElem;
  428. const Vector<T> arr(arr_.size(),(T*)&arr_[0],false);
  429. int ret = HyperQuickSort(arr, SortedElem, comm_);
  430. SortedElem_.assign(&SortedElem[0],&SortedElem[0]+SortedElem.Dim());
  431. return ret;
  432. }
  433. template<typename T>
  434. int SortScatterIndex(const Vector<T>& key, Vector<size_t>& scatter_index, const MPI_Comm& comm, const T* split_key_){
  435. typedef SortPair<T,size_t> Pair_t;
  436. int npes, rank;
  437. MPI_Comm_size(comm, &npes);
  438. MPI_Comm_rank(comm, &rank);
  439. long long npesLong = npes;
  440. Vector<Pair_t> parray(key.Dim());
  441. { // Build global index.
  442. long long glb_dsp=0;
  443. long long loc_size=key.Dim();
  444. MPI_Scan(&loc_size, &glb_dsp, 1, par::Mpi_datatype<long long>::value(), par::Mpi_datatype<long long>::sum(), comm);
  445. glb_dsp-=loc_size;
  446. #pragma omp parallel for
  447. for(size_t i=0;i<loc_size;i++){
  448. parray[i].key=key[i];
  449. parray[i].data=glb_dsp+i;
  450. }
  451. }
  452. Vector<Pair_t> psorted;
  453. { // Allocate memory
  454. //if(buff.Dim()>0){
  455. // psorted.ReInit(buff.Dim()/sizeof(Pair_t), (Pair_t*)&buff[0], false);
  456. //}
  457. }
  458. HyperQuickSort(parray, psorted, comm);
  459. if(split_key_!=NULL){ // Partition data
  460. Vector<T> split_key(npesLong);
  461. MPI_Allgather((void*)split_key_ , 1, par::Mpi_datatype<T>::value(),
  462. &split_key[0], 1, par::Mpi_datatype<T>::value(), comm);
  463. Vector<int> int_buff(npesLong*4);
  464. Vector<int> sendSz (npesLong,&int_buff[0]+npesLong*0,false);
  465. Vector<int> recvSz (npesLong,&int_buff[0]+npesLong*1,false);
  466. Vector<int> sendOff(npesLong,&int_buff[0]+npesLong*2,false);
  467. Vector<int> recvOff(npesLong,&int_buff[0]+npesLong*3,false);
  468. long long nlSize = psorted.Dim();
  469. // compute the partition offsets and sizes so that All2Allv can be performed.
  470. // initialize ...
  471. #pragma omp parallel for
  472. for (size_t i = 0; i < npesLong; i++) {
  473. sendSz[i] = 0;
  474. }
  475. //The Heart of the algorithm....
  476. if(nlSize>0) {
  477. // Determine processor range.
  478. long long pid1=std::lower_bound(&split_key[0], &split_key[0]+npesLong, psorted[ 0].key)-&split_key[0]-1;
  479. long long pid2=std::upper_bound(&split_key[0], &split_key[0]+npesLong, psorted[nlSize-1].key)-&split_key[0]+1;
  480. pid1=(pid1< 0? 0:pid1);
  481. pid2=(pid2>npesLong?npesLong:pid2);
  482. #pragma omp parallel for
  483. for(int i=pid1;i<pid2;i++){
  484. Pair_t p1; p1.key=split_key[ i];
  485. Pair_t p2; p2.key=split_key[i+1<npesLong?i+1:i];
  486. long long start = std::lower_bound(&psorted[0], &psorted[0]+nlSize, p1, std::less<Pair_t>())-&psorted[0];
  487. long long end = std::lower_bound(&psorted[0], &psorted[0]+nlSize, p2, std::less<Pair_t>())-&psorted[0];
  488. if(i== 0) start=0 ;
  489. if(i==npesLong-1) end =nlSize;
  490. sendSz[i]=end-start;
  491. }
  492. }
  493. // communicate with other procs how many you shall be sending and get how
  494. // many to recieve from whom.
  495. MPI_Alltoall(&sendSz[0], 1, par::Mpi_datatype<int>::value(),
  496. &recvSz[0], 1, par::Mpi_datatype<int>::value(), comm);
  497. // compute offsets ...
  498. sendOff[0] = 0; omp_par::scan(&sendSz[0],&sendOff[0],npesLong);
  499. recvOff[0] = 0; omp_par::scan(&recvSz[0],&recvOff[0],npesLong);
  500. // new value of nlSize, ie the local nodes.
  501. long long nn = recvSz[npesLong-1] + recvOff[npesLong-1];
  502. // allocate memory for the new arrays ...
  503. Vector<Pair_t> newNodes(nn);
  504. // perform All2All ...
  505. par::Mpi_Alltoallv_sparse<Pair_t>(&psorted[0], &sendSz[0], &sendOff[0],
  506. &newNodes[0], &recvSz[0], &recvOff[0], comm);
  507. // reset the pointer ...
  508. psorted.Swap(newNodes);
  509. }
  510. scatter_index.Resize(psorted.Dim());
  511. #pragma omp parallel for
  512. for(size_t i=0;i<psorted.Dim();i++){
  513. scatter_index[i]=psorted[i].data;
  514. }
  515. return 0;
  516. }
  517. template<typename T>
  518. int ScatterForward(Vector<T>& data_, const Vector<size_t>& scatter_index, const MPI_Comm& comm){
  519. typedef SortPair<size_t,size_t> Pair_t;
  520. int npes, rank;
  521. MPI_Comm_size(comm, &npes);
  522. MPI_Comm_rank(comm, &rank);
  523. long long npesLong = npes;
  524. size_t data_dim=0;
  525. long long send_size=0;
  526. long long recv_size=0;
  527. {
  528. recv_size=scatter_index.Dim();
  529. long long glb_size[2]={0,0};
  530. long long loc_size[2]={(long long)(data_.Dim()*sizeof(T)), recv_size};
  531. MPI_Allreduce(&loc_size, &glb_size, 2, par::Mpi_datatype<long long>::value(), par::Mpi_datatype<long long>::sum(), comm);
  532. if(glb_size[0]==0 || glb_size[1]==0) return 0; //Nothing to be done.
  533. data_dim=glb_size[0]/glb_size[1];
  534. assert(glb_size[0]==data_dim*glb_size[1]);
  535. send_size=(data_.Dim()*sizeof(T))/data_dim;
  536. }
  537. Vector<char> recv_buff(recv_size*data_dim);
  538. Vector<char> send_buff(send_size*data_dim);
  539. // Global scan of data size.
  540. Vector<long long> glb_scan(npesLong);
  541. {
  542. long long glb_rank=0;
  543. MPI_Scan(&send_size, &glb_rank, 1, par::Mpi_datatype<long long>::value(), par::Mpi_datatype<long long>::sum(), comm);
  544. glb_rank-=send_size;
  545. MPI_Allgather(&glb_rank , 1, par::Mpi_datatype<long long>::value(),
  546. &glb_scan[0], 1, par::Mpi_datatype<long long>::value(), comm);
  547. }
  548. // Sort scatter_index.
  549. Vector<Pair_t> psorted(recv_size);
  550. {
  551. #pragma omp parallel for
  552. for(size_t i=0;i<recv_size;i++){
  553. psorted[i].key=scatter_index[i];
  554. psorted[i].data=i;
  555. }
  556. omp_par::merge_sort(&psorted[0], &psorted[0]+recv_size);
  557. }
  558. // Exchange send, recv indices.
  559. Vector<size_t> recv_indx(recv_size);
  560. Vector<size_t> send_indx(send_size);
  561. Vector<int> sendSz(npesLong);
  562. Vector<int> sendOff(npesLong);
  563. Vector<int> recvSz(npesLong);
  564. Vector<int> recvOff(npesLong);
  565. {
  566. #pragma omp parallel for
  567. for(size_t i=0;i<recv_size;i++){
  568. recv_indx[i]=psorted[i].key;
  569. }
  570. #pragma omp parallel for
  571. for(size_t i=0;i<npesLong;i++){
  572. size_t start= std::lower_bound(&recv_indx[0], &recv_indx[0]+recv_size, glb_scan[ i])-&recv_indx[0];
  573. size_t end =(i+1<npesLong?std::lower_bound(&recv_indx[0], &recv_indx[0]+recv_size, glb_scan[i+1])-&recv_indx[0]:recv_size);
  574. recvSz[i]=end-start;
  575. recvOff[i]=start;
  576. }
  577. MPI_Alltoall(&recvSz[0], 1, par::Mpi_datatype<int>::value(),
  578. &sendSz[0], 1, par::Mpi_datatype<int>::value(), comm);
  579. sendOff[0] = 0; omp_par::scan(&sendSz[0],&sendOff[0],npesLong);
  580. assert(sendOff[npesLong-1]+sendSz[npesLong-1]==send_size);
  581. par::Mpi_Alltoallv_dense<size_t>(&recv_indx[0], &recvSz[0], &recvOff[0],
  582. &send_indx[0], &sendSz[0], &sendOff[0], comm);
  583. #pragma omp parallel for
  584. for(size_t i=0;i<send_size;i++){
  585. assert(send_indx[i]>=glb_scan[rank]);
  586. send_indx[i]-=glb_scan[rank];
  587. assert(send_indx[i]<send_size);
  588. }
  589. }
  590. // Prepare send buffer
  591. {
  592. char* data=(char*)&data_[0];
  593. #pragma omp parallel for
  594. for(size_t i=0;i<send_size;i++){
  595. size_t src_indx=send_indx[i]*data_dim;
  596. size_t trg_indx=i*data_dim;
  597. for(size_t j=0;j<data_dim;j++)
  598. send_buff[trg_indx+j]=data[src_indx+j];
  599. }
  600. }
  601. // All2Allv
  602. {
  603. #pragma omp parallel for
  604. for(size_t i=0;i<npesLong;i++){
  605. sendSz [i]*=data_dim;
  606. sendOff[i]*=data_dim;
  607. recvSz [i]*=data_dim;
  608. recvOff[i]*=data_dim;
  609. }
  610. par::Mpi_Alltoallv_dense<char>(&send_buff[0], &sendSz[0], &sendOff[0],
  611. &recv_buff[0], &recvSz[0], &recvOff[0], comm);
  612. }
  613. // Build output data.
  614. {
  615. data_.Resize(recv_size*data_dim/sizeof(T));
  616. char* data=(char*)&data_[0];
  617. #pragma omp parallel for
  618. for(size_t i=0;i<recv_size;i++){
  619. size_t src_indx=i*data_dim;
  620. size_t trg_indx=psorted[i].data*data_dim;
  621. for(size_t j=0;j<data_dim;j++)
  622. data[trg_indx+j]=recv_buff[src_indx+j];
  623. }
  624. }
  625. return 0;
  626. }
  627. template<typename T>
  628. int ScatterReverse(Vector<T>& data_, const Vector<size_t>& scatter_index_, const MPI_Comm& comm, size_t loc_size){
  629. typedef SortPair<size_t,size_t> Pair_t;
  630. int npes, rank;
  631. MPI_Comm_size(comm, &npes);
  632. MPI_Comm_rank(comm, &rank);
  633. long long npesLong = npes;
  634. size_t data_dim=0;
  635. long long send_size=0;
  636. long long recv_size=0;
  637. {
  638. recv_size=loc_size;
  639. long long glb_size[3]={0,0,0};
  640. long long loc_size[3]={data_.Dim()*sizeof(T), scatter_index_.Dim(),recv_size};
  641. MPI_Allreduce(&loc_size, &glb_size, 3, par::Mpi_datatype<long long>::value(), par::Mpi_datatype<long long>::sum(), comm);
  642. if(glb_size[0]==0 || glb_size[1]==0) return 0; //Nothing to be done.
  643. assert(glb_size[0]%glb_size[1]==0);
  644. data_dim=glb_size[0]/glb_size[1];
  645. assert(loc_size[0]%data_dim==0);
  646. send_size=loc_size[0]/data_dim;
  647. if(glb_size[0]!=glb_size[2]*data_dim){
  648. recv_size=(((rank+1)*(glb_size[0]/data_dim))/npesLong)-
  649. (( rank *(glb_size[0]/data_dim))/npesLong);
  650. }
  651. }
  652. Vector<size_t> scatter_index;
  653. {
  654. long long glb_rank[2]={0,0};
  655. long long glb_size[3]={0,0};
  656. long long loc_size[2]={data_.Dim()*sizeof(T)/data_dim, scatter_index_.Dim()};
  657. MPI_Scan(&loc_size, &glb_rank, 2, par::Mpi_datatype<long long>::value(), par::Mpi_datatype<long long>::sum(), comm);
  658. MPI_Allreduce(&loc_size, &glb_size, 2, par::Mpi_datatype<long long>::value(), par::Mpi_datatype<long long>::sum(), comm);
  659. assert(glb_size[0]==glb_size[1]);
  660. glb_rank[0]-=loc_size[0];
  661. glb_rank[1]-=loc_size[1];
  662. Matrix<long long> glb_scan(2,npesLong+1);
  663. MPI_Allgather(&glb_rank[0], 1, par::Mpi_datatype<long long>::value(),
  664. glb_scan[0], 1, par::Mpi_datatype<long long>::value(), comm);
  665. MPI_Allgather(&glb_rank[1], 1, par::Mpi_datatype<long long>::value(),
  666. glb_scan[1], 1, par::Mpi_datatype<long long>::value(), comm);
  667. glb_scan[0][npesLong]=glb_size[0];
  668. glb_scan[1][npesLong]=glb_size[1];
  669. if(loc_size[0]!=loc_size[1] || glb_rank[0]!=glb_rank[1]){ // Repartition scatter_index
  670. scatter_index.ReInit(loc_size[0]);
  671. Vector<int> send_dsp(npesLong+1);
  672. Vector<int> recv_dsp(npesLong+1);
  673. #pragma omp parallel for
  674. for(size_t i=0;i<=npesLong;i++){
  675. send_dsp[i]=std::min(std::max(glb_scan[0][i],glb_rank[1]),glb_rank[1]+loc_size[1])-glb_rank[1];
  676. recv_dsp[i]=std::min(std::max(glb_scan[1][i],glb_rank[0]),glb_rank[0]+loc_size[0])-glb_rank[0];
  677. }
  678. size_t commCnt=0;
  679. Vector<int> send_cnt(npesLong+0);
  680. Vector<int> recv_cnt(npesLong+0);
  681. #pragma omp parallel for reduction(+:commCnt)
  682. for(size_t i=0;i<npesLong;i++){
  683. send_cnt[i]=send_dsp[i+1]-send_dsp[i];
  684. recv_cnt[i]=recv_dsp[i+1]-recv_dsp[i];
  685. if(send_cnt[i] && i!=rank) commCnt++;
  686. if(recv_cnt[i] && i!=rank) commCnt++;
  687. }
  688. pvfmm::Vector<MPI_Request> requests(commCnt);
  689. pvfmm::Vector<MPI_Status> statuses(commCnt);
  690. commCnt=0;
  691. for(int i=0;i<npesLong;i++){ // post all receives
  692. if(recv_cnt[i] && i!=rank){
  693. MPI_Irecv(&scatter_index[0]+recv_dsp[i], recv_cnt[i], par::Mpi_datatype<size_t>::value(), i, 1,
  694. comm, &requests[commCnt]);
  695. commCnt++;
  696. }
  697. }
  698. for(int i=0;i<npesLong;i++){ // send data
  699. if(send_cnt[i] && i!=rank){
  700. MPI_Issend(&scatter_index_[0]+send_dsp[i], send_cnt[i], par::Mpi_datatype<size_t>::value(), i, 1,
  701. comm, &requests[commCnt]);
  702. commCnt++;
  703. }
  704. }
  705. assert(send_cnt[rank]==recv_cnt[rank]);
  706. if(send_cnt[rank]){
  707. memcpy(&scatter_index[0]+recv_dsp[rank], &scatter_index_[0]+send_dsp[rank], send_cnt[rank]*sizeof(size_t));
  708. }
  709. if(commCnt) MPI_Waitall(commCnt, &requests[0], &statuses[0]);
  710. }else{
  711. scatter_index.ReInit(scatter_index_.Dim(), &scatter_index_[0],false);
  712. }
  713. }
  714. Vector<char> recv_buff(recv_size*data_dim);
  715. Vector<char> send_buff(send_size*data_dim);
  716. // Global data size.
  717. Vector<long long> glb_scan(npesLong);
  718. {
  719. long long glb_rank=0;
  720. MPI_Scan(&recv_size, &glb_rank, 1, par::Mpi_datatype<long long>::value(), par::Mpi_datatype<long long>::sum(), comm);
  721. glb_rank-=recv_size;
  722. MPI_Allgather(&glb_rank , 1, par::Mpi_datatype<long long>::value(),
  723. &glb_scan[0], 1, par::Mpi_datatype<long long>::value(), comm);
  724. }
  725. // Sort scatter_index.
  726. Vector<Pair_t> psorted(send_size);
  727. {
  728. #pragma omp parallel for
  729. for(size_t i=0;i<send_size;i++){
  730. psorted[i].key=scatter_index[i];
  731. psorted[i].data=i;
  732. }
  733. omp_par::merge_sort(&psorted[0], &psorted[0]+send_size);
  734. }
  735. // Exchange send, recv indices.
  736. Vector<size_t> recv_indx(recv_size);
  737. Vector<size_t> send_indx(send_size);
  738. Vector<int> sendSz(npesLong);
  739. Vector<int> sendOff(npesLong);
  740. Vector<int> recvSz(npesLong);
  741. Vector<int> recvOff(npesLong);
  742. {
  743. #pragma omp parallel for
  744. for(size_t i=0;i<send_size;i++){
  745. send_indx[i]=psorted[i].key;
  746. }
  747. #pragma omp parallel for
  748. for(size_t i=0;i<npesLong;i++){
  749. size_t start= std::lower_bound(&send_indx[0], &send_indx[0]+send_size, glb_scan[ i])-&send_indx[0];
  750. size_t end =(i+1<npesLong?std::lower_bound(&send_indx[0], &send_indx[0]+send_size, glb_scan[i+1])-&send_indx[0]:send_size);
  751. sendSz[i]=end-start;
  752. sendOff[i]=start;
  753. }
  754. MPI_Alltoall(&sendSz[0], 1, par::Mpi_datatype<int>::value(),
  755. &recvSz[0], 1, par::Mpi_datatype<int>::value(), comm);
  756. recvOff[0] = 0; omp_par::scan(&recvSz[0],&recvOff[0],npesLong);
  757. assert(recvOff[npesLong-1]+recvSz[npesLong-1]==recv_size);
  758. par::Mpi_Alltoallv_dense<size_t>(&send_indx[0], &sendSz[0], &sendOff[0],
  759. &recv_indx[0], &recvSz[0], &recvOff[0], comm);
  760. #pragma omp parallel for
  761. for(size_t i=0;i<recv_size;i++){
  762. assert(recv_indx[i]>=glb_scan[rank]);
  763. recv_indx[i]-=glb_scan[rank];
  764. assert(recv_indx[i]<recv_size);
  765. }
  766. }
  767. // Prepare send buffer
  768. {
  769. char* data=(char*)&data_[0];
  770. #pragma omp parallel for
  771. for(size_t i=0;i<send_size;i++){
  772. size_t src_indx=psorted[i].data*data_dim;
  773. size_t trg_indx=i*data_dim;
  774. for(size_t j=0;j<data_dim;j++)
  775. send_buff[trg_indx+j]=data[src_indx+j];
  776. }
  777. }
  778. // All2Allv
  779. {
  780. #pragma omp parallel for
  781. for(size_t i=0;i<npesLong;i++){
  782. sendSz [i]*=data_dim;
  783. sendOff[i]*=data_dim;
  784. recvSz [i]*=data_dim;
  785. recvOff[i]*=data_dim;
  786. }
  787. par::Mpi_Alltoallv_dense<char>(&send_buff[0], &sendSz[0], &sendOff[0],
  788. &recv_buff[0], &recvSz[0], &recvOff[0], comm);
  789. }
  790. // Build output data.
  791. {
  792. data_.Resize(recv_size*data_dim/sizeof(T));
  793. char* data=(char*)&data_[0];
  794. #pragma omp parallel for
  795. for(size_t i=0;i<recv_size;i++){
  796. size_t src_indx=i*data_dim;
  797. size_t trg_indx=recv_indx[i]*data_dim;
  798. for(size_t j=0;j<data_dim;j++)
  799. data[trg_indx+j]=recv_buff[src_indx+j];
  800. }
  801. }
  802. return 0;
  803. }
  804. }//end namespace
  805. }//end namespace