8 #include "AtomDistributor.h" 11 #include <ddMd/storage/AtomStorage.h> 12 #include <ddMd/storage/ConstAtomIterator.h> 59 boundaryPtr_ = &boundary;
60 storagePtr_ = &storage;
72 UTIL_THROW(
"Attempt to set cacheCapacity after allocation");
74 cacheCapacity_ = cacheCapacity;
83 read<int>(in,
"cacheCapacity", cacheCapacity_);
89 void AtomDistributor::allocate()
93 UTIL_THROW(
"Attempt to re-allocate AtomDistributor");
95 if (domainPtr_ == 0) {
96 UTIL_THROW(
"AtomDistributor is not initialized");
107 int gridSize = domainPtr_->
grid().
size();
113 if (cacheCapacity_ <= 0) {
114 cacheCapacity_ = gridSize * sendCapacity_;
119 reservoir_.allocate(cacheCapacity_);
122 for (
int i = cacheCapacity_ - 1; i >= 0; --i) {
123 reservoir_.push(cache_[i]);
128 sendArrays_.allocate(gridSize, sendCapacity_);
130 for (
int i = 0; i < gridSize; ++i) {
132 for (
int j = 0; j < sendCapacity_; ++j) {
133 sendArrays_(i, j) = 0;
149 if (domainPtr_ == 0) {
150 UTIL_THROW(
"AtomDistributor not initialized");
156 UTIL_THROW(
"This is not the master processor");
171 if (reservoir_.size() != reservoir_.capacity()) {
172 UTIL_THROW(
"Atom reservoir not full in setup");
174 int gridSize = domainPtr_->
grid().
size();
175 for (
int i = 0; i < gridSize; ++i) {
176 if (sendSizes_[i] != 0) {
177 UTIL_THROW(
"A sendArray size is not zero in setup");
195 if (domainPtr_ == 0) {
196 UTIL_THROW(
"AtomDistributor is not initialized");
199 if (bufferPtr_ == 0) {
200 UTIL_THROW(
"AtomDistributor is not initialized");
203 if (cacheCapacity_ <= 0) {
204 UTIL_THROW(
"AtomDistributor is not allocated");
210 UTIL_THROW(
"This is not the master processor");
218 if (reservoir_.size() == 0) {
221 int rank = rankMaxSendSize_;
222 int size = sendSizes_[rank];
223 int nSend = std::min(size, sendCapacity_);
224 int begin = size - nSend;
227 for (
int i = begin; i < size; ++i) {
228 ptr = sendArrays_(rank, i);
232 reservoir_.push(*sendArrays_(rank, i));
233 sendArrays_(rank, i) = 0;
235 bool isComplete =
false;
237 nSentTotal_ += sendSizes_[rank];
238 sendSizes_[rank] = begin;
249 if (reservoir_.size() == 0) {
250 UTIL_THROW(
"Empty cache reservoir (This should not happen)");
254 newPtr_ = &reservoir_.pop();
265 if (domainPtr_ == 0) {
266 UTIL_THROW(
"AtomDistributor is not initialized");
268 if (boundaryPtr_ == 0) {
269 UTIL_THROW(
"AtomDistributor is not initialized");
274 if (cacheCapacity_ <= 0) {
275 UTIL_THROW(
"AtomDistributor is not allocated");
278 UTIL_THROW(
"This is not the master processor");
304 reservoir_.push(*newPtr_);
313 assert(sendSizes_[rank] < sendCapacity_);
314 sendArrays_(rank, sendSizes_[rank]) = newPtr_;
319 if (rank != rankMaxSendSize_) {
320 if (sendSizes_[rank] > sendSizes_[rankMaxSendSize_]) {
321 rankMaxSendSize_ = rank;
326 if (sendSizes_[rank] == sendCapacity_) {
330 for (
int i = 0; i < sendCapacity_; ++i) {
331 ptr = sendArrays_(rank, i);
335 reservoir_.push(*sendArrays_(rank, i));
336 sendArrays_(rank, i) = 0;
338 bool isComplete =
false;
340 nSentTotal_ += sendCapacity_;
341 sendSizes_[rank] = 0;
357 #ifdef DDMD_ATOM_DISTRIBUTOR_DEBUG 360 int gridSize = domainPtr_->
grid().
size();
361 for (
int i = 0; i < gridSize; ++i) {
362 sendSizeSum += sendSizes_[i];
364 if (sendSizeSum + reservoir_.size() != cacheCapacity_) {
365 UTIL_THROW(
"Error: Inconsistent cache atom count");
383 if (domainPtr_ == 0) {
384 UTIL_THROW(
"AtomDistributor is not initialized");
386 if (boundaryPtr_ == 0) {
387 UTIL_THROW(
"AtomDistributor is not initialized");
393 UTIL_THROW(
"This is not the master processor");
399 int gridSize = domainPtr_->
grid().
size();
401 bool isComplete =
true;
402 for (i = 1; i < gridSize; ++i) {
406 for (j = 0; j < sendSizes_[i]; ++j) {
407 ptr = sendArrays_(i, j);
411 reservoir_.push(*sendArrays_(i, j));
414 nSentTotal_ += sendSizes_[i];
421 if (i < gridSize - 1) {
426 for (
int j = 0; j < sendCapacity_; ++j) {
427 sendArrays_(i, j) = 0;
438 if (reservoir_.size() != reservoir_.capacity()) {
439 UTIL_THROW(
"atomReservoir not empty after final send");
441 if (nCachedTotal_ != nSentTotal_) {
442 UTIL_THROW(
"Number cached atoms != number sent");
445 UTIL_THROW(
"Number atoms received != number sent + nAtom on master");
461 const int source = 0;
462 bool isComplete =
false;
465 if (domainPtr_ == 0) {
466 UTIL_THROW(
"AtomDistributor is not initialized");
473 UTIL_THROW(
"AtomDistributor::receive() called on master processor");
476 while (!isComplete) {
483 while (bufferPtr_->
recvSize() > 0) {
515 if (nAtomTotal != nSentTotal_ + storagePtr_->
nAtom()) {
516 UTIL_THROW(
"nAtomTotal != nAtom after distribution");
525 storagePtr_->
begin(atomIter);
526 for ( ; atomIter.
notEnd(); ++atomIter) {
528 coordinate = atomIter->position()[i];
529 if (coordinate < domainPtr_->domainBound(i, 0)) {
533 UTIL_THROW(
"coordinate >= domainBound(i, 1)");
virtual void readParameters(std::istream &in)
Read cacheCapacity.
bool beginRecvBlock()
Begin to receive a block from the recv buffer.
AtomDistributor()
Constructor.
Atom * newAtomPtr()
Returns pointer an address available for a new Atom.
void setCapacity(int cacheCapacity)
Set capacity of the send cache on the master node.
const int Dimension
Dimensionality of space.
void shiftGen(Vector &r) const
Shift generalized Vector r to its primary image.
void allocate(int capacity)
Allocate memory on the heap.
Atom * newAtomPtr()
Returns pointer an address available for a new Atom.
void clear()
Reset integer members to initial null values.
int nAtomTotal() const
Get total number of atoms on all processors.
~AtomDistributor()
Destructor.
bool isInitialized() const
Has this Domain been initialized by calling readParam?
An orthorhombic periodic unit cell.
void packAtom(Buffer &buffer)
Pack an Atom into a send buffer, for exchange of ownership.
Vector & position()
Get position Vector by reference.
int ownerRank(const Vector &position) const
Return rank of the processor whose domain contains a position.
void beginSendBlock(int sendType)
Initialize a data block.
A point particle in an MD simulation.
Parallel domain decomposition (DD) MD simulation.
void unpackAtom(Buffer &buffer)
Unpack an atom from a recv buffer and receive ownership.
void addNewAtom()
Finalize addition of the most recent new atom.
int nAtom() const
Return number of local atoms on this procesor (excluding ghosts)
MPI::Intracomm & communicator() const
Return Cartesian communicator by reference.
int size() const
Get total number of grid points.
int gridRank() const
Get rank of this processor in the processor grid.
#define UTIL_THROW(msg)
Macro for throwing an Exception, reporting function, file and line number.
int addAtom()
Process the active atom for sending.
void associate(Domain &domain, Boundary &boundary, AtomStorage &storage, Buffer &buffer)
Set pointers to associated objects.
Const iterator for all atoms owned by an AtomStorage.
Utility classes for scientific computation.
void receive()
Receive all atoms sent by master processor.
int recvSize() const
Number of unread items left in current recv block.
A container for all the atoms and ghost atoms on this processor.
void send(MPI::Intracomm &comm, int dest)
Send a complete buffer.
const Grid & grid() const
Return processor Grid by const reference.
void computeNAtomTotal(MPI::Intracomm &communicator)
Compute the total number of local atoms on all processors.
bool isValid() const
Return true if the container is valid, or throw an Exception.
bool isMaster() const
Is this the master processor (gridRank == 0) ?
void setup()
Initialization before the loop over atoms on master processor.
Buffer for interprocessor communication.
Decomposition of the system into domains associated with processors.
void recv(MPI::Intracomm &comm, int source)
Receive a buffer.
bool notEnd() const
Is the current pointer not at the end of the array?
int validate()
Validate distribution of atoms after completion.
double domainBound(int i, int j) const
Get one boundary of the domain owned by this processor.
bool isCartesian() const
Are atom coordinates Cartesian (true) or generalized (false)?
void endSendBlock(bool isComplete=true)
Finalize a block in the send buffer.
void setClassName(const char *className)
Set class name string.
bool isInitialized() const
Has this Buffer been initialized?
int capacity() const
Return allocated size.
void allocate(int capacity)
Allocate the underlying C array.
void clearSendBuffer()
Clear the send buffer.
void endRecvBlock()
Finish processing a block in the recv buffer.
int atomCapacity() const
Maximum number of atoms for which space is available.
void begin(AtomIterator &iterator)
Set iterator to beginning of the set of atoms.
void send()
Send all atoms that have not be sent previously.
void unsetNAtomTotal()
Unset value of NAtomTotal (mark as unknown).