#include #include namespace pvfmm { Comm::Comm() { #ifdef PVFMM_HAVE_MPI Init(MPI_COMM_SELF); #endif } Comm::Comm(const Comm& c) { #ifdef PVFMM_HAVE_MPI Init(c.mpi_comm_); #endif } Comm Comm::Self() { #ifdef PVFMM_HAVE_MPI Comm comm_self(MPI_COMM_SELF); return comm_self; #else Comm comm_self; return comm_self; #endif } Comm Comm::World() { #ifdef PVFMM_HAVE_MPI Comm comm_world(MPI_COMM_WORLD); return comm_world; #else Comm comm_self; return comm_self; #endif } Comm& Comm::operator=(const Comm& c) { #ifdef PVFMM_HAVE_MPI Init(c.mpi_comm_); #endif } Comm::~Comm() { #ifdef PVFMM_HAVE_MPI while (!req.empty()) { delete (Vector*)req.top(); req.pop(); } MPI_Comm_free(&mpi_comm_); #endif } Comm Comm::Split(Integer clr) const { #ifdef PVFMM_HAVE_MPI MPI_Comm new_comm; MPI_Comm_split(mpi_comm_, clr, mpi_rank_, &new_comm); Comm c(new_comm); MPI_Comm_free(&new_comm); return c; #else Comm c; return c; #endif } Integer Comm::Rank() const { #ifdef PVFMM_HAVE_MPI return mpi_rank_; #else return 0; #endif } Integer Comm::Size() const { #ifdef PVFMM_HAVE_MPI return mpi_size_; #else return 1; #endif } void Comm::Barrier() const { #ifdef PVFMM_HAVE_MPI MPI_Barrier(mpi_comm_); #endif } template void* Comm::Isend(ConstIterator sbuf, Long scount, Integer dest, Integer tag) const { #ifdef PVFMM_HAVE_MPI if (!scount) return NULL; Vector& request = *NewReq(); request.ReInit(1); sbuf[0]; sbuf[scount - 1]; #ifndef NDEBUG MPI_Issend(&sbuf[0], scount, CommDatatype::value(), dest, tag, mpi_comm_, &request[0]); #else MPI_Isend(&sbuf[0], scount, CommDatatype::value(), dest, tag, mpi_comm_, &request[0]); #endif return &request; #else auto it = recv_req.find(tag); if (it == recv_req.end()) send_req.insert(std::pair>(tag, (ConstIterator)sbuf)); else pvfmm::memcopy(it->second, (ConstIterator)sbuf, scount * sizeof(SType)); return NULL; #endif } template void* Comm::Irecv(Iterator rbuf, Long rcount, Integer source, Integer tag) const { #ifdef PVFMM_HAVE_MPI if (!rcount) return NULL; Vector& request = *NewReq(); request.ReInit(1); rbuf[0]; rbuf[rcount - 1]; MPI_Irecv(&rbuf[0], rcount, CommDatatype::value(), source, tag, mpi_comm_, &request[0]); return &request; #else auto it = send_req.find(tag); if (it == send_req.end()) recv_req.insert(std::pair>(tag, (Iterator)rbuf)); else pvfmm::memcopy((Iterator)rbuf, it->second, rcount * sizeof(RType)); return NULL; #endif } void Comm::Wait(void* req_ptr) const { #ifdef PVFMM_HAVE_MPI if (req_ptr == NULL) return; Vector& request = *(Vector*)req_ptr; // std::vector status(request.Dim()); if (request.Dim()) MPI_Waitall(request.Dim(), &request[0], MPI_STATUSES_IGNORE); //&status[0]); DelReq(&request); #endif } template void Comm::Allgather(ConstIterator sbuf, Long scount, Iterator rbuf, Long rcount) const { #ifdef PVFMM_HAVE_MPI if (scount) { sbuf[0]; sbuf[scount - 1]; } if (rcount) { rbuf[0]; rbuf[rcount * Size() - 1]; } MPI_Allgather((scount ? &sbuf[0] : NULL), scount, CommDatatype::value(), (rcount ? &rbuf[0] : NULL), rcount, CommDatatype::value(), mpi_comm_); #else pvfmm::memcopy((Iterator)rbuf, (ConstIterator)sbuf, scount * sizeof(SType)); #endif } template void Comm::Allgatherv(ConstIterator sbuf, Long scount, Iterator rbuf, ConstIterator rcounts, ConstIterator rdispls) const { #ifdef PVFMM_HAVE_MPI Vector rcounts_(mpi_size_), rdispls_(mpi_size_); Long rcount_sum = 0; #pragma omp parallel for schedule(static) reduction(+ : rcount_sum) for (Integer i = 0; i < mpi_size_; i++) { rcounts_[i] = rcounts[i]; rdispls_[i] = rdispls[i]; rcount_sum += rcounts[i]; } if (scount) { sbuf[0]; sbuf[scount - 1]; } if (rcount_sum) { rbuf[0]; rbuf[rcount_sum - 1]; } MPI_Allgatherv((scount ? &sbuf[0] : NULL), scount, CommDatatype::value(), (rcount_sum ? &rbuf[0] : NULL), &rcounts_.Begin()[0], &rdispls_.Begin()[0], CommDatatype::value(), mpi_comm_); #else pvfmm::memcopy((Iterator)(rbuf + rdispls[0]), (ConstIterator)sbuf, scount * sizeof(SType)); #endif } template void Comm::Alltoall(ConstIterator sbuf, Long scount, Iterator rbuf, Long rcount) const { #ifdef PVFMM_HAVE_MPI if (scount) { sbuf[0]; sbuf[scount * Size() - 1]; } if (rcount) { rbuf[0]; rbuf[rcount * Size() - 1]; } MPI_Alltoall((scount ? &sbuf[0] : NULL), scount, CommDatatype::value(), (rcount ? &rbuf[0] : NULL), rcount, CommDatatype::value(), mpi_comm_); #else pvfmm::memcopy((Iterator)rbuf, (ConstIterator)sbuf, scount * sizeof(SType)); #endif } template void* Comm::Ialltoallv_sparse(ConstIterator sbuf, ConstIterator scounts, ConstIterator sdispls, Iterator rbuf, ConstIterator rcounts, ConstIterator rdispls, Integer tag) const { #ifdef PVFMM_HAVE_MPI Integer request_count = 0; for (Integer i = 0; i < mpi_size_; i++) { if (rcounts[i]) request_count++; if (scounts[i]) request_count++; } if (!request_count) return NULL; Vector& request = *NewReq(); request.ReInit(request_count); Integer request_iter = 0; for (Integer i = 0; i < mpi_size_; i++) { if (rcounts[i]) { rbuf[rdispls[i]]; rbuf[rdispls[i] + rcounts[i] - 1]; MPI_Irecv(&rbuf[rdispls[i]], rcounts[i], CommDatatype::value(), i, tag, mpi_comm_, &request[request_iter]); request_iter++; } } for (Integer i = 0; i < mpi_size_; i++) { if (scounts[i]) { sbuf[sdispls[i]]; sbuf[sdispls[i] + scounts[i] - 1]; MPI_Isend(&sbuf[sdispls[i]], scounts[i], CommDatatype::value(), i, tag, mpi_comm_, &request[request_iter]); request_iter++; } } return &request; #else pvfmm::memcopy((Iterator)(rbuf + rdispls[0]), (ConstIterator)(sbuf + sdispls[0]), scounts[0] * sizeof(SType)); return NULL; #endif } template void Comm::Alltoallv(ConstIterator sbuf, ConstIterator scounts, ConstIterator sdispls, Iterator rbuf, ConstIterator rcounts, ConstIterator rdispls) const { #ifdef PVFMM_HAVE_MPI { // Use Alltoallv_sparse of average connectivity<64 Long connectivity = 0, glb_connectivity = 0; #pragma omp parallel for schedule(static) reduction(+ : connectivity) for (Integer i = 0; i < mpi_size_; i++) { if (rcounts[i]) connectivity++; } Allreduce(PVFMM_PTR2CONSTITR(Long, &connectivity, 1), PVFMM_PTR2ITR(Long, &glb_connectivity, 1), 1, CommOp::SUM); if (glb_connectivity < 64 * Size()) { void* mpi_req = Ialltoallv_sparse(sbuf, scounts, sdispls, rbuf, rcounts, rdispls, 0); Wait(mpi_req); return; } } { // Use vendor MPI_Alltoallv //#ifndef ALLTOALLV_FIX Vector scnt, sdsp, rcnt, rdsp; scnt.ReInit(mpi_size_); sdsp.ReInit(mpi_size_); rcnt.ReInit(mpi_size_); rdsp.ReInit(mpi_size_); Long stotal = 0, rtotal = 0; #pragma omp parallel for schedule(static) reduction(+ : stotal, rtotal) for (Integer i = 0; i < mpi_size_; i++) { scnt[i] = scounts[i]; sdsp[i] = sdispls[i]; rcnt[i] = rcounts[i]; rdsp[i] = rdispls[i]; stotal += scounts[i]; rtotal += rcounts[i]; } MPI_Alltoallv((stotal ? &sbuf[0] : NULL), &scnt[0], &sdsp[0], CommDatatype::value(), (rtotal ? &rbuf[0] : NULL), &rcnt[0], &rdsp[0], CommDatatype::value(), mpi_comm_); return; //#endif } // TODO: implement hypercube scheme #else pvfmm::memcopy((Iterator)(rbuf + rdispls[0]), (ConstIterator)(sbuf + sdispls[0]), scounts[0] * sizeof(Type)); #endif } template void Comm::Allreduce(ConstIterator sbuf, Iterator rbuf, Long count, CommOp op) const { #ifdef PVFMM_HAVE_MPI if (!count) return; MPI_Op mpi_op; switch (op) { case CommOp::SUM: mpi_op = CommDatatype::sum(); break; case CommOp::MIN: mpi_op = CommDatatype::min(); break; case CommOp::MAX: mpi_op = CommDatatype::max(); break; default: mpi_op = MPI_OP_NULL; break; } sbuf[0]; sbuf[count - 1]; rbuf[0]; rbuf[count - 1]; MPI_Allreduce(&sbuf[0], &rbuf[0], count, CommDatatype::value(), mpi_op, mpi_comm_); #else pvfmm::memcopy((Iterator)rbuf, (ConstIterator)sbuf, count * sizeof(Type)); #endif } template void Comm::Scan(ConstIterator sbuf, Iterator rbuf, int count, CommOp op) const { #ifdef PVFMM_HAVE_MPI if (!count) return; MPI_Op mpi_op; switch (op) { case CommOp::SUM: mpi_op = CommDatatype::sum(); break; case CommOp::MIN: mpi_op = CommDatatype::min(); break; case CommOp::MAX: mpi_op = CommDatatype::max(); break; default: mpi_op = MPI_OP_NULL; break; } sbuf[0]; sbuf[count - 1]; rbuf[0]; rbuf[count - 1]; MPI_Scan(&sbuf[0], &rbuf[0], count, CommDatatype::value(), mpi_op, mpi_comm_); #else pvfmm::memcopy((Iterator)rbuf, (ConstIterator)sbuf, count * sizeof(Type)); #endif } template void Comm::PartitionW(Vector& nodeList, const Vector* wts_) const { Integer npes = Size(); if (npes == 1) return; Long nlSize = nodeList.Dim(); Vector wts; Long localWt = 0; if (wts_ == NULL) { // Construct arrays of wts. wts.ReInit(nlSize); #pragma omp parallel for schedule(static) for (Long i = 0; i < nlSize; i++) { wts[i] = 1; } localWt = nlSize; } else { wts.ReInit(nlSize, (Iterator)wts_->Begin(), false); #pragma omp parallel for reduction(+ : localWt) for (Long i = 0; i < nlSize; i++) { localWt += wts[i]; } } Long off1 = 0, off2 = 0, totalWt = 0; { // compute the total weight of the problem ... Allreduce(PVFMM_PTR2CONSTITR(Long, &localWt, 1), PVFMM_PTR2ITR(Long, &totalWt, 1), 1, CommOp::SUM); Scan(PVFMM_PTR2CONSTITR(Long, &localWt, 1), PVFMM_PTR2ITR(Long, &off2, 1), 1, CommOp::SUM); off1 = off2 - localWt; } Vector lscn; if (nlSize) { // perform a local scan on the weights first ... lscn.ReInit(nlSize); lscn[0] = off1; omp_par::scan(wts.Begin(), lscn.Begin(), nlSize); } Vector sendSz, recvSz, sendOff, recvOff; sendSz.ReInit(npes); recvSz.ReInit(npes); sendOff.ReInit(npes); recvOff.ReInit(npes); sendSz.SetZero(); if (nlSize > 0 && totalWt > 0) { // Compute sendSz Long pid1 = (off1 * npes) / totalWt; Long pid2 = ((off2 + 1) * npes) / totalWt + 1; assert((totalWt * pid2) / npes >= off2); pid1 = (pid1 < 0 ? 0 : pid1); pid2 = (pid2 > npes ? npes : pid2); #pragma omp parallel for schedule(static) for (Integer i = pid1; i < pid2; i++) { Long wt1 = (totalWt * (i)) / npes; Long wt2 = (totalWt * (i + 1)) / npes; Long start = std::lower_bound(lscn.Begin(), lscn.Begin() + nlSize, wt1, std::less()) - lscn.Begin(); Long end = std::lower_bound(lscn.Begin(), lscn.Begin() + nlSize, wt2, std::less()) - lscn.Begin(); if (i == 0) start = 0; if (i == npes - 1) end = nlSize; sendSz[i] = end - start; } } else { sendSz[0] = nlSize; } // Exchange sendSz, recvSz Alltoall(sendSz.Begin(), 1, recvSz.Begin(), 1); { // Compute sendOff, recvOff sendOff[0] = 0; omp_par::scan(sendSz.Begin(), sendOff.Begin(), npes); recvOff[0] = 0; omp_par::scan(recvSz.Begin(), recvOff.Begin(), npes); assert(sendOff[npes - 1] + sendSz[npes - 1] == nlSize); } // perform All2All ... Vector newNodes; newNodes.ReInit(recvSz[npes - 1] + recvOff[npes - 1]); void* mpi_req = Ialltoallv_sparse(nodeList.Begin(), sendSz.Begin(), sendOff.Begin(), newNodes.Begin(), recvSz.Begin(), recvOff.Begin()); Wait(mpi_req); // reset the pointer ... nodeList.Swap(newNodes); } template void Comm::PartitionN(Vector& v, Long N) const { Integer rank = Rank(); Integer np = Size(); if (np == 1) return; Vector v_cnt(np), v_dsp(np + 1); Vector N_cnt(np), N_dsp(np + 1); { // Set v_cnt, v_dsp v_dsp[0] = 0; Long cnt = v.Dim(); Allgather(PVFMM_PTR2CONSTITR(Long, &cnt, 1), 1, v_cnt.Begin(), 1); omp_par::scan(v_cnt.Begin(), v_dsp.Begin(), np); v_dsp[np] = v_cnt[np - 1] + v_dsp[np - 1]; } { // Set N_cnt, N_dsp N_dsp[0] = 0; Long cnt = N; Allgather(PVFMM_PTR2CONSTITR(Long, &cnt, 1), 1, N_cnt.Begin(), 1); omp_par::scan(N_cnt.Begin(), N_dsp.Begin(), np); N_dsp[np] = N_cnt[np - 1] + N_dsp[np - 1]; } { // Adjust for dof Long dof = v_dsp[np] / N_dsp[np]; assert(dof * N_dsp[np] == v_dsp[np]); if (dof == 0) return; if (dof != 1) { #pragma omp parallel for schedule(static) for (Integer i = 0; i < np; i++) N_cnt[i] *= dof; #pragma omp parallel for schedule(static) for (Integer i = 0; i <= np; i++) N_dsp[i] *= dof; } } Vector v_(N_cnt[rank]); { // Set v_ Vector scnt(np), sdsp(np); Vector rcnt(np), rdsp(np); #pragma omp parallel for schedule(static) for (Integer i = 0; i < np; i++) { { // Set scnt Long n0 = N_dsp[i + 0]; Long n1 = N_dsp[i + 1]; if (n0 < v_dsp[rank + 0]) n0 = v_dsp[rank + 0]; if (n1 < v_dsp[rank + 0]) n1 = v_dsp[rank + 0]; if (n0 > v_dsp[rank + 1]) n0 = v_dsp[rank + 1]; if (n1 > v_dsp[rank + 1]) n1 = v_dsp[rank + 1]; scnt[i] = n1 - n0; } { // Set rcnt Long n0 = v_dsp[i + 0]; Long n1 = v_dsp[i + 1]; if (n0 < N_dsp[rank + 0]) n0 = N_dsp[rank + 0]; if (n1 < N_dsp[rank + 0]) n1 = N_dsp[rank + 0]; if (n0 > N_dsp[rank + 1]) n0 = N_dsp[rank + 1]; if (n1 > N_dsp[rank + 1]) n1 = N_dsp[rank + 1]; rcnt[i] = n1 - n0; } } sdsp[0] = 0; omp_par::scan(scnt.Begin(), sdsp.Begin(), np); rdsp[0] = 0; omp_par::scan(rcnt.Begin(), rdsp.Begin(), np); void* mpi_request = Ialltoallv_sparse(v.Begin(), scnt.Begin(), sdsp.Begin(), v_.Begin(), rcnt.Begin(), rdsp.Begin()); Wait(mpi_request); } v.Swap(v_); } template void Comm::PartitionS(Vector& nodeList, const Type& splitter) const { Integer npes = Size(); if (npes == 1) return; Vector mins(npes); Allgather(PVFMM_PTR2CONSTITR(Type, &splitter, 1), 1, mins.Begin(), 1); Vector scnt(npes), sdsp(npes); Vector rcnt(npes), rdsp(npes); { // Compute scnt, sdsp #pragma omp parallel for schedule(static) for (Integer i = 0; i < npes; i++) { sdsp[i] = std::lower_bound(nodeList.Begin(), nodeList.Begin() + nodeList.Dim(), mins[i]) - nodeList.Begin(); } #pragma omp parallel for schedule(static) for (Integer i = 0; i < npes - 1; i++) { scnt[i] = sdsp[i + 1] - sdsp[i]; } scnt[npes - 1] = nodeList.Dim() - sdsp[npes - 1]; } { // Compute rcnt, rdsp rdsp[0] = 0; Alltoall(scnt.Begin(), 1, rcnt.Begin(), 1); omp_par::scan(rcnt.Begin(), rdsp.Begin(), npes); } { // Redistribute nodeList Vector nodeList_(rdsp[npes - 1] + rcnt[npes - 1]); void* mpi_request = Ialltoallv_sparse(nodeList.Begin(), scnt.Begin(), sdsp.Begin(), nodeList_.Begin(), rcnt.Begin(), rdsp.Begin()); Wait(mpi_request); nodeList.Swap(nodeList_); } } template void Comm::SortScatterIndex(const Vector& key, Vector& scatter_index, const Type* split_key_) const { typedef SortPair Pair_t; Integer npes = Size(), rank = Rank(); Vector parray(key.Dim()); { // Build global index. Long glb_dsp = 0; Long loc_size = key.Dim(); Scan(PVFMM_PTR2CONSTITR(Long, &loc_size, 1), PVFMM_PTR2ITR(Long, &glb_dsp, 1), 1, CommOp::SUM); glb_dsp -= loc_size; #pragma omp parallel for schedule(static) for (Long i = 0; i < loc_size; i++) { parray[i].key = key[i]; parray[i].data = glb_dsp + i; } } Vector psorted; HyperQuickSort(parray, psorted); if (npes > 1 && split_key_ != NULL) { // Partition data Vector split_key(npes); Allgather(PVFMM_PTR2CONSTITR(Type, split_key_, 1), 1, split_key.Begin(), 1); Vector sendSz(npes); Vector recvSz(npes); Vector sendOff(npes); Vector recvOff(npes); Long nlSize = psorted.Dim(); sendSz.SetZero(); if (nlSize > 0) { // Compute sendSz // Determine processor range. Long pid1 = std::lower_bound(split_key.Begin(), split_key.Begin() + npes, psorted[0].key) - split_key.Begin() - 1; Long pid2 = std::upper_bound(split_key.Begin(), split_key.Begin() + npes, psorted[nlSize - 1].key) - split_key.Begin() + 1; pid1 = (pid1 < 0 ? 0 : pid1); pid2 = (pid2 > npes ? npes : pid2); #pragma omp parallel for schedule(static) for (Integer i = pid1; i < pid2; i++) { Pair_t p1; p1.key = split_key[i]; Pair_t p2; p2.key = split_key[i + 1 < npes ? i + 1 : i]; Long start = std::lower_bound(psorted.Begin(), psorted.Begin() + nlSize, p1, std::less()) - psorted.Begin(); Long end = std::lower_bound(psorted.Begin(), psorted.Begin() + nlSize, p2, std::less()) - psorted.Begin(); if (i == 0) start = 0; if (i == npes - 1) end = nlSize; sendSz[i] = end - start; } } // Exchange sendSz, recvSz Alltoall(sendSz.Begin(), 1, recvSz.Begin(), 1); // compute offsets ... { // Compute sendOff, recvOff sendOff[0] = 0; omp_par::scan(sendSz.Begin(), sendOff.Begin(), npes); recvOff[0] = 0; omp_par::scan(recvSz.Begin(), recvOff.Begin(), npes); assert(sendOff[npes - 1] + sendSz[npes - 1] == nlSize); } // perform All2All ... Vector newNodes(recvSz[npes - 1] + recvOff[npes - 1]); void* mpi_req = Ialltoallv_sparse(psorted.Begin(), sendSz.Begin(), sendOff.Begin(), newNodes.Begin(), recvSz.Begin(), recvOff.Begin()); Wait(mpi_req); // reset the pointer ... psorted.Swap(newNodes); } scatter_index.ReInit(psorted.Dim()); #pragma omp parallel for schedule(static) for (Long i = 0; i < psorted.Dim(); i++) { scatter_index[i] = psorted[i].data; } } template void Comm::ScatterForward(Vector& data_, const Vector& scatter_index) const { typedef SortPair Pair_t; Integer npes = Size(), rank = Rank(); Long data_dim = 0; Long send_size = 0; Long recv_size = 0; { // Set data_dim, send_size, recv_size recv_size = scatter_index.Dim(); StaticArray glb_size; StaticArray loc_size; loc_size[0] = data_.Dim(); loc_size[1] = recv_size; Allreduce(loc_size, glb_size, 2, CommOp::SUM); if (glb_size[0] == 0 || glb_size[1] == 0) return; // Nothing to be done. data_dim = glb_size[0] / glb_size[1]; PVFMM_ASSERT(glb_size[0] == data_dim * glb_size[1]); send_size = data_.Dim() / data_dim; } if (npes == 1) { // Scatter directly Vector data; data.ReInit(recv_size * data_dim); #pragma omp parallel for schedule(static) for (Long i = 0; i < recv_size; i++) { Long src_indx = scatter_index[i] * data_dim; Long trg_indx = i * data_dim; for (Long j = 0; j < data_dim; j++) data[trg_indx + j] = data_[src_indx + j]; } data_.Swap(data); return; } Vector glb_scan; { // Global scan of data size. glb_scan.ReInit(npes); Long glb_rank = 0; Scan(PVFMM_PTR2CONSTITR(Long, &send_size, 1), PVFMM_PTR2ITR(Long, &glb_rank, 1), 1, CommOp::SUM); glb_rank -= send_size; Allgather(PVFMM_PTR2CONSTITR(Long, &glb_rank, 1), 1, glb_scan.Begin(), 1); } Vector psorted; { // Sort scatter_index. psorted.ReInit(recv_size); #pragma omp parallel for schedule(static) for (Long i = 0; i < recv_size; i++) { psorted[i].key = scatter_index[i]; psorted[i].data = i; } omp_par::merge_sort(psorted.Begin(), psorted.Begin() + recv_size); } Vector recv_indx(recv_size); Vector send_indx(send_size); Vector sendSz(npes); Vector sendOff(npes); Vector recvSz(npes); Vector recvOff(npes); { // Exchange send, recv indices. #pragma omp parallel for schedule(static) for (Long i = 0; i < recv_size; i++) { recv_indx[i] = psorted[i].key; } #pragma omp parallel for schedule(static) for (Integer i = 0; i < npes; i++) { Long start = std::lower_bound(recv_indx.Begin(), recv_indx.Begin() + recv_size, glb_scan[i]) - recv_indx.Begin(); Long end = (i + 1 < npes ? std::lower_bound(recv_indx.Begin(), recv_indx.Begin() + recv_size, glb_scan[i + 1]) - recv_indx.Begin() : recv_size); recvSz[i] = end - start; recvOff[i] = start; } Alltoall(recvSz.Begin(), 1, sendSz.Begin(), 1); sendOff[0] = 0; omp_par::scan(sendSz.Begin(), sendOff.Begin(), npes); assert(sendOff[npes - 1] + sendSz[npes - 1] == send_size); Alltoallv(recv_indx.Begin(), recvSz.Begin(), recvOff.Begin(), send_indx.Begin(), sendSz.Begin(), sendOff.Begin()); #pragma omp parallel for schedule(static) for (Long i = 0; i < send_size; i++) { assert(send_indx[i] >= glb_scan[rank]); send_indx[i] -= glb_scan[rank]; assert(send_indx[i] < send_size); } } Vector send_buff; { // Prepare send buffer send_buff.ReInit(send_size * data_dim); ConstIterator data = data_.Begin(); #pragma omp parallel for schedule(static) for (Long i = 0; i < send_size; i++) { Long src_indx = send_indx[i] * data_dim; Long trg_indx = i * data_dim; for (Long j = 0; j < data_dim; j++) send_buff[trg_indx + j] = data[src_indx + j]; } } Vector recv_buff; { // All2Allv recv_buff.ReInit(recv_size * data_dim); #pragma omp parallel for schedule(static) for (Integer i = 0; i < npes; i++) { sendSz[i] *= data_dim; sendOff[i] *= data_dim; recvSz[i] *= data_dim; recvOff[i] *= data_dim; } Alltoallv(send_buff.Begin(), sendSz.Begin(), sendOff.Begin(), recv_buff.Begin(), recvSz.Begin(), recvOff.Begin()); } { // Build output data. data_.ReInit(recv_size * data_dim); Iterator data = data_.Begin(); #pragma omp parallel for schedule(static) for (Long i = 0; i < recv_size; i++) { Long src_indx = i * data_dim; Long trg_indx = psorted[i].data * data_dim; for (Long j = 0; j < data_dim; j++) data[trg_indx + j] = recv_buff[src_indx + j]; } } } template void Comm::ScatterReverse(Vector& data_, const Vector& scatter_index_, Long loc_size_) const { typedef SortPair Pair_t; Integer npes = Size(), rank = Rank(); Long data_dim = 0; Long send_size = 0; Long recv_size = 0; { // Set data_dim, send_size, recv_size recv_size = loc_size_; StaticArray glb_size; StaticArray loc_size; loc_size[0] = data_.Dim(); loc_size[1] = scatter_index_.Dim(); loc_size[2] = recv_size; Allreduce(loc_size, glb_size, 3, CommOp::SUM); if (glb_size[0] == 0 || glb_size[1] == 0) return; // Nothing to be done. PVFMM_ASSERT(glb_size[0] % glb_size[1] == 0); data_dim = glb_size[0] / glb_size[1]; PVFMM_ASSERT(loc_size[0] % data_dim == 0); send_size = loc_size[0] / data_dim; if (glb_size[0] != glb_size[2] * data_dim) { recv_size = (((rank + 1) * (glb_size[0] / data_dim)) / npes) - ((rank * (glb_size[0] / data_dim)) / npes); } } if (npes == 1) { // Scatter directly Vector data; data.ReInit(recv_size * data_dim); #pragma omp parallel for schedule(static) for (Long i = 0; i < recv_size; i++) { Long src_indx = i * data_dim; Long trg_indx = scatter_index_[i] * data_dim; for (Long j = 0; j < data_dim; j++) data[trg_indx + j] = data_[src_indx + j]; } data_.Swap(data); return; } Vector scatter_index; { StaticArray glb_rank; StaticArray glb_size; StaticArray loc_size; loc_size[0] = data_.Dim() / data_dim; loc_size[1] = scatter_index_.Dim(); Scan(loc_size, glb_rank, 2, CommOp::SUM); Allreduce(loc_size, glb_size, 2, CommOp::SUM); PVFMM_ASSERT(glb_size[0] == glb_size[1]); glb_rank[0] -= loc_size[0]; glb_rank[1] -= loc_size[1]; Vector glb_scan0(npes + 1); Vector glb_scan1(npes + 1); Allgather(glb_rank + 0, 1, glb_scan0.Begin(), 1); Allgather(glb_rank + 1, 1, glb_scan1.Begin(), 1); glb_scan0[npes] = glb_size[0]; glb_scan1[npes] = glb_size[1]; if (loc_size[0] != loc_size[1] || glb_rank[0] != glb_rank[1]) { // Repartition scatter_index scatter_index.ReInit(loc_size[0]); Vector send_dsp(npes + 1); Vector recv_dsp(npes + 1); #pragma omp parallel for schedule(static) for (Integer i = 0; i <= npes; i++) { send_dsp[i] = std::min(std::max(glb_scan0[i], glb_rank[1]), glb_rank[1] + loc_size[1]) - glb_rank[1]; recv_dsp[i] = std::min(std::max(glb_scan1[i], glb_rank[0]), glb_rank[0] + loc_size[0]) - glb_rank[0]; } // Long commCnt=0; Vector send_cnt(npes + 0); Vector recv_cnt(npes + 0); #pragma omp parallel for schedule(static) // reduction(+:commCnt) for (Integer i = 0; i < npes; i++) { send_cnt[i] = send_dsp[i + 1] - send_dsp[i]; recv_cnt[i] = recv_dsp[i + 1] - recv_dsp[i]; // if(send_cnt[i] && i!=rank) commCnt++; // if(recv_cnt[i] && i!=rank) commCnt++; } void* mpi_req = Ialltoallv_sparse(scatter_index_.Begin(), send_cnt.Begin(), send_dsp.Begin(), scatter_index.Begin(), recv_cnt.Begin(), recv_dsp.Begin(), 0); Wait(mpi_req); } else { scatter_index.ReInit(scatter_index_.Dim(), (Iterator)scatter_index_.Begin(), false); } } Vector glb_scan(npes); { // Global data size. Long glb_rank = 0; Scan(PVFMM_PTR2CONSTITR(Long, &recv_size, 1), PVFMM_PTR2ITR(Long, &glb_rank, 1), 1, CommOp::SUM); glb_rank -= recv_size; Allgather(PVFMM_PTR2CONSTITR(Long, &glb_rank, 1), 1, glb_scan.Begin(), 1); } Vector psorted(send_size); { // Sort scatter_index. #pragma omp parallel for schedule(static) for (Long i = 0; i < send_size; i++) { psorted[i].key = scatter_index[i]; psorted[i].data = i; } omp_par::merge_sort(psorted.Begin(), psorted.Begin() + send_size); } Vector recv_indx(recv_size); Vector send_indx(send_size); Vector sendSz(npes); Vector sendOff(npes); Vector recvSz(npes); Vector recvOff(npes); { // Exchange send, recv indices. #pragma omp parallel for schedule(static) for (Long i = 0; i < send_size; i++) { send_indx[i] = psorted[i].key; } #pragma omp parallel for schedule(static) for (Integer i = 0; i < npes; i++) { Long start = std::lower_bound(send_indx.Begin(), send_indx.Begin() + send_size, glb_scan[i]) - send_indx.Begin(); Long end = (i + 1 < npes ? std::lower_bound(send_indx.Begin(), send_indx.Begin() + send_size, glb_scan[i + 1]) - send_indx.Begin() : send_size); sendSz[i] = end - start; sendOff[i] = start; } Alltoall(sendSz.Begin(), 1, recvSz.Begin(), 1); recvOff[0] = 0; omp_par::scan(recvSz.Begin(), recvOff.Begin(), npes); assert(recvOff[npes - 1] + recvSz[npes - 1] == recv_size); Alltoallv(send_indx.Begin(), sendSz.Begin(), sendOff.Begin(), recv_indx.Begin(), recvSz.Begin(), recvOff.Begin()); #pragma omp parallel for schedule(static) for (Long i = 0; i < recv_size; i++) { assert(recv_indx[i] >= glb_scan[rank]); recv_indx[i] -= glb_scan[rank]; assert(recv_indx[i] < recv_size); } } Vector send_buff; { // Prepare send buffer send_buff.ReInit(send_size * data_dim); ConstIterator data = data_.Begin(); #pragma omp parallel for schedule(static) for (Long i = 0; i < send_size; i++) { Long src_indx = psorted[i].data * data_dim; Long trg_indx = i * data_dim; for (Long j = 0; j < data_dim; j++) send_buff[trg_indx + j] = data[src_indx + j]; } } Vector recv_buff; { // All2Allv recv_buff.ReInit(recv_size * data_dim); #pragma omp parallel for schedule(static) for (Integer i = 0; i < npes; i++) { sendSz[i] *= data_dim; sendOff[i] *= data_dim; recvSz[i] *= data_dim; recvOff[i] *= data_dim; } Alltoallv(send_buff.Begin(), sendSz.Begin(), sendOff.Begin(), recv_buff.Begin(), recvSz.Begin(), recvOff.Begin()); } { // Build output data. data_.ReInit(recv_size * data_dim); Iterator data = data_.Begin(); #pragma omp parallel for schedule(static) for (Long i = 0; i < recv_size; i++) { Long src_indx = i * data_dim; Long trg_indx = recv_indx[i] * data_dim; for (Long j = 0; j < data_dim; j++) data[trg_indx + j] = recv_buff[src_indx + j]; } } } #ifdef PVFMM_HAVE_MPI Vector* Comm::NewReq() const { if (req.empty()) req.push(new Vector); Vector& request = *(Vector*)req.top(); req.pop(); return &request; } void Comm::DelReq(Vector* req_ptr) const { if (req_ptr) req.push(req_ptr); } #define HS_MPIDATATYPE(CTYPE, MPITYPE) \ template <> class Comm::CommDatatype { \ public: \ static MPI_Datatype value() { return MPITYPE; } \ static MPI_Op sum() { return MPI_SUM; } \ static MPI_Op min() { return MPI_MIN; } \ static MPI_Op max() { return MPI_MAX; } \ } HS_MPIDATATYPE(short, MPI_SHORT); HS_MPIDATATYPE(int, MPI_INT); HS_MPIDATATYPE(long, MPI_LONG); HS_MPIDATATYPE(unsigned short, MPI_UNSIGNED_SHORT); HS_MPIDATATYPE(unsigned int, MPI_UNSIGNED); HS_MPIDATATYPE(unsigned long, MPI_UNSIGNED_LONG); HS_MPIDATATYPE(float, MPI_FLOAT); HS_MPIDATATYPE(double, MPI_DOUBLE); HS_MPIDATATYPE(long double, MPI_LONG_DOUBLE); HS_MPIDATATYPE(long long, MPI_LONG_LONG_INT); HS_MPIDATATYPE(char, MPI_CHAR); HS_MPIDATATYPE(unsigned char, MPI_UNSIGNED_CHAR); #undef HS_MPIDATATYPE #endif template void Comm::HyperQuickSort(const Vector& arr_, Vector& SortedElem) const { // O( ((N/p)+log(p))*(log(N/p)+log(p)) ) #ifdef PVFMM_HAVE_MPI Integer npes, myrank, omp_p; { // Get comm size and rank. npes = Size(); myrank = Rank(); omp_p = omp_get_max_threads(); } srand(myrank); Long totSize, nelem = arr_.Dim(); { // Local and global sizes. O(log p) assert(nelem); // TODO: Check if this is needed. Allreduce(PVFMM_PTR2CONSTITR(Long, &nelem, 1), PVFMM_PTR2ITR(Long, &totSize, 1), 1, CommOp::SUM); } if (npes == 1) { // SortedElem <--- local_sort(arr_) SortedElem = arr_; omp_par::merge_sort(SortedElem.Begin(), SortedElem.Begin() + nelem); return; } Vector arr; { // arr <-- local_sort(arr_) arr = arr_; omp_par::merge_sort(arr.Begin(), arr.Begin() + nelem); } Vector nbuff, nbuff_ext, rbuff, rbuff_ext; // Allocate memory. MPI_Comm comm = mpi_comm_; // Copy comm bool free_comm = false; // Flag to free comm. // Binary split and merge in each iteration. while (npes > 1 && totSize > 0) { // O(log p) iterations. Type split_key; Long totSize_new; { // Determine split_key. O( log(N/p) + log(p) ) Integer glb_splt_count; Vector glb_splitters; { // Take random splitters. glb_splt_count = const = 100~1000 Integer splt_count; { // Set splt_coun. O( 1 ) -- Let p * splt_count = t splt_count = (100 * nelem) / totSize; if (npes > 100) splt_count = (drand48() * totSize) < (100 * nelem) ? 1 : 0; if (splt_count > nelem) splt_count = nelem; } Vector splitters(splt_count); for (Integer i = 0; i < splt_count; i++) { splitters[i] = arr[rand() % nelem]; } Vector glb_splt_cnts(npes), glb_splt_disp(npes); { // Set glb_splt_count, glb_splt_cnts, glb_splt_disp MPI_Allgather(&splt_count, 1, CommDatatype::value(), &glb_splt_cnts[0], 1, CommDatatype::value(), comm); glb_splt_disp[0] = 0; omp_par::scan(glb_splt_cnts.Begin(), glb_splt_disp.Begin(), npes); glb_splt_count = glb_splt_cnts[npes - 1] + glb_splt_disp[npes - 1]; PVFMM_ASSERT(glb_splt_count); } { // Gather all splitters. O( log(p) ) glb_splitters.ReInit(glb_splt_count); Vector glb_splt_cnts_(npes), glb_splt_disp_(npes); for (Integer i = 0; i < npes; i++) { glb_splt_cnts_[i] = glb_splt_cnts[i]; glb_splt_disp_[i] = glb_splt_disp[i]; } MPI_Allgatherv((splt_count ? &splitters[0] : NULL), splt_count, CommDatatype::value(), &glb_splitters[0], &glb_splt_cnts_[0], &glb_splt_disp_[0], CommDatatype::value(), comm); } } // Determine split key. O( log(N/p) + log(p) ) Vector lrank(glb_splt_count); { // Compute local rank #pragma omp parallel for schedule(static) for (Integer i = 0; i < glb_splt_count; i++) { lrank[i] = std::lower_bound(arr.Begin(), arr.Begin() + nelem, glb_splitters[i]) - arr.Begin(); } } Vector grank(glb_splt_count); { // Compute global rank MPI_Allreduce(&lrank[0], &grank[0], glb_splt_count, CommDatatype::value(), CommDatatype::sum(), comm); } { // Determine split_key, totSize_new ConstIterator split_disp = grank.Begin(); for (Integer i = 0; i < glb_splt_count; i++) { if (labs(grank[i] - totSize / 2) < labs(*split_disp - totSize / 2)) { split_disp = grank.Begin() + i; } } split_key = glb_splitters[split_disp - grank.Begin()]; if (myrank <= (npes - 1) / 2) totSize_new = split_disp[0]; else totSize_new = totSize - split_disp[0]; // double err=(((double)*split_disp)/(totSize/2))-1.0; // if(pvfmm::fabs(err)<0.01 || npes<=16) break; // else if(!myrank) std::cout< split_id ? 0 : split_id + 1); Integer partner; { // Set partner partner = myrank + cmp_p0 - new_p0; if (partner >= npes) partner = npes - 1; assert(partner >= 0); } bool extra_partner = (npes % 2 == 1 && npes - 1 == myrank); Long ssize = 0, lsize = 0; ConstIterator sbuff, lbuff; { // Set ssize, lsize, sbuff, lbuff Long split_indx = std::lower_bound(arr.Begin(), arr.Begin() + nelem, split_key) - arr.Begin(); ssize = (myrank > split_id ? split_indx : nelem - split_indx); sbuff = (myrank > split_id ? arr.Begin() : arr.Begin() + split_indx); lsize = (myrank <= split_id ? split_indx : nelem - split_indx); lbuff = (myrank <= split_id ? arr.Begin() : arr.Begin() + split_indx); } Long rsize = 0, ext_rsize = 0; { // Get rsize, ext_rsize Long ext_ssize = 0; MPI_Status status; MPI_Sendrecv(&ssize, 1, CommDatatype::value(), partner, 0, &rsize, 1, CommDatatype::value(), partner, 0, comm, &status); if (extra_partner) MPI_Sendrecv(&ext_ssize, 1, CommDatatype::value(), split_id, 0, &ext_rsize, 1, CommDatatype::value(), split_id, 0, comm, &status); } { // Exchange data. rbuff.ReInit(rsize); rbuff_ext.ReInit(ext_rsize); MPI_Status status; MPI_Sendrecv((ssize ? &sbuff[0] : NULL), ssize, CommDatatype::value(), partner, 0, (rsize ? &rbuff[0] : NULL), rsize, CommDatatype::value(), partner, 0, comm, &status); if (extra_partner) MPI_Sendrecv(NULL, 0, CommDatatype::value(), split_id, 0, (ext_rsize ? &rbuff_ext[0] : NULL), ext_rsize, CommDatatype::value(), split_id, 0, comm, &status); } Long nbuff_size = lsize + rsize + ext_rsize; { // nbuff <-- merge(lbuff, rbuff, rbuff_ext) nbuff.ReInit(lsize + rsize); omp_par::merge>(lbuff, (lbuff + lsize), rbuff.Begin(), rbuff.Begin() + rsize, nbuff.Begin(), omp_p, std::less()); if (ext_rsize > 0 && nbuff.Dim() > 0) { nbuff_ext.ReInit(nbuff_size); omp_par::merge(nbuff.Begin(), nbuff.Begin() + (lsize + rsize), rbuff_ext.Begin(), rbuff_ext.Begin() + ext_rsize, nbuff_ext.Begin(), omp_p, std::less()); nbuff.Swap(nbuff_ext); nbuff_ext.ReInit(0); } } // Copy new data. totSize = totSize_new; nelem = nbuff_size; arr.Swap(nbuff); nbuff.ReInit(0); } { // Split comm. O( log(p) ) ?? MPI_Comm scomm; MPI_Comm_split(comm, myrank <= split_id, myrank, &scomm); if (free_comm) MPI_Comm_free(&comm); comm = scomm; free_comm = true; npes = (myrank <= split_id ? split_id + 1 : npes - split_id - 1); myrank = (myrank <= split_id ? myrank : myrank - split_id - 1); } } if (free_comm) MPI_Comm_free(&comm); SortedElem = arr; PartitionW(SortedElem); #else SortedElem = arr_; std::sort(SortedElem.Begin(), SortedElem.Begin() + SortedElem.Dim()); #endif } } // end namespace