Simpatico  v1.10
AtomDistributor.cpp
1 /*
2 * Simpatico - Simulation Package for Polymeric and Molecular Liquids
3 *
4 * Copyright 2010 - 2017, The Regents of the University of Minnesota
5 * Distributed under the terms of the GNU General Public License.
6 */
7 
8 #include "AtomDistributor.h"
9 #include "Domain.h"
10 #include "Buffer.h"
11 #include <ddMd/storage/AtomStorage.h>
12 #include <ddMd/storage/ConstAtomIterator.h>
13 
14 #include <algorithm>
15 
16 //#define DDMD_ATOM_DISTRIBUTOR_DEBUG
17 
18 namespace DdMd
19 {
20 
21  using namespace Util;
22 
23  /*
24  * Constructor.
25  */
27  #ifdef UTIL_MPI
28  sendArrays_(),
29  sendSizes_(),
30  #endif
31  boundaryPtr_(0),
32  domainPtr_(0),
33  storagePtr_(0),
34  #ifdef UTIL_MPI
35  bufferPtr_(0),
36  #endif
37  newPtr_(0),
38  cacheCapacity_(1024),
39  sendCapacity_(0),
40  rankMaxSendSize_(0),
41  nCachedTotal_(0),
42  nSentTotal_(0),
43  isAllocated_(false)
44  { setClassName("AtomDistributor"); }
45 
46  /*
47  * Destructor.
48  */
50  {}
51 
52  /*
53  * Retain pointers to associated objects (call on all domain processors).
54  */
55  void AtomDistributor::associate(Domain& domain, Boundary& boundary,
56  AtomStorage& storage, Buffer& buffer)
57  {
58  domainPtr_ = &domain;
59  boundaryPtr_ = &boundary;
60  storagePtr_ = &storage;
61  #ifdef UTIL_MPI
62  bufferPtr_ = &buffer;
63  #endif
64  }
65 
66  /*
67  * Set cache capacity.
68  */
69  void AtomDistributor::setCapacity(int cacheCapacity)
70  {
71  if (cache_.capacity() > 0) {
72  UTIL_THROW("Attempt to set cacheCapacity after allocation");
73  }
74  cacheCapacity_ = cacheCapacity;
75  }
76 
77  /*
78  * Read cacheCapacity.
79  */
80  void AtomDistributor::readParameters(std::istream& in)
81  {
82  // Read parameter file block
83  read<int>(in, "cacheCapacity", cacheCapacity_);
84  }
85 
86  /*
87  * Allocate and initialize memory on master processor (private).
88  */
89  void AtomDistributor::allocate()
90  {
91  // Preconditions
92  if (isAllocated_) {
93  UTIL_THROW("Attempt to re-allocate AtomDistributor");
94  }
95  if (domainPtr_ == 0) {
96  UTIL_THROW("AtomDistributor is not initialized");
97  }
98  if (!domainPtr_->isInitialized()) {
99  UTIL_THROW("Domain is not initialized");
100  }
101  #ifdef UTIL_MPI
102  if (!bufferPtr_->isInitialized()) {
103  UTIL_THROW("Buffer is not initialized");
104  }
105  #endif
106 
107  int gridSize = domainPtr_->grid().size();
108  #ifdef UTIL_MPI
109  sendCapacity_ = bufferPtr_->atomCapacity();
110  #endif
111 
112  // Default cacheCapacity_ = (# processors)*(max atoms per send)
113  if (cacheCapacity_ <= 0) {
114  cacheCapacity_ = gridSize * sendCapacity_;
115  }
116 
117  // Allocate atom cache and reservoir on the master processor.
118  cache_.allocate(cacheCapacity_);
119  reservoir_.allocate(cacheCapacity_);
120 
121  // Push all atoms onto the reservoir stack, in reverse order.
122  for (int i = cacheCapacity_ - 1; i >= 0; --i) {
123  reservoir_.push(cache_[i]);
124  }
125 
126  #ifdef UTIL_MPI
127  // Allocate memory for sendArrays_ matrix, and nullify all elements.
128  sendArrays_.allocate(gridSize, sendCapacity_);
129  sendSizes_.allocate(gridSize);
130  for (int i = 0; i < gridSize; ++i) {
131  sendSizes_[i] = 0;
132  for (int j = 0; j < sendCapacity_; ++j) {
133  sendArrays_(i, j) = 0;
134  }
135  }
136  #endif
137 
138  // Mark this as allocated.
139  isAllocated_ = true;
140  }
141 
142  #ifdef UTIL_MPI
143  /*
144  * Initialize the send buffer.
145  */
147  {
148  // Preconditions
149  if (domainPtr_ == 0) {
150  UTIL_THROW("AtomDistributor not initialized");
151  }
152  if (!domainPtr_->isInitialized()) {
153  UTIL_THROW("Domain is not initialized");
154  }
155  if (domainPtr_->gridRank() != 0) {
156  UTIL_THROW("This is not the master processor");
157  }
158  #ifdef UTIL_MPI
159  if (!bufferPtr_->isInitialized()) {
160  UTIL_THROW("Buffer is not initialized");
161  }
162  #endif
163 
164  // Allocate if needed (on first call).
165  // Note: isAllocated_ is set true by the allocate() function.
166  if (!isAllocated_) {
167  allocate();
168  }
169 
170  // Check post-allocation conditions
171  if (reservoir_.size() != reservoir_.capacity()) {
172  UTIL_THROW("Atom reservoir not full in setup");
173  }
174  int gridSize = domainPtr_->grid().size();
175  for (int i = 0; i < gridSize; ++i) {
176  if (sendSizes_[i] != 0) {
177  UTIL_THROW("A sendArray size is not zero in setup");
178  }
179  }
180 
181  // Clear buffer and counters
182  bufferPtr_->clearSendBuffer();
183  bufferPtr_->beginSendBlock(Buffer::ATOM);
184  nCachedTotal_ = 0;
185  nSentTotal_ = 0;
186  }
187  #endif
188 
189  /*
190  * Returns address for a new local Atom.
191  */
193  {
194  // Preconditions
195  if (domainPtr_ == 0) {
196  UTIL_THROW("AtomDistributor is not initialized");
197  }
198  #ifdef UTIL_MPI
199  if (bufferPtr_ == 0) {
200  UTIL_THROW("AtomDistributor is not initialized");
201  }
202  #endif
203  if (cacheCapacity_ <= 0) {
204  UTIL_THROW("AtomDistributor is not allocated");
205  }
206  if (!domainPtr_->isInitialized() != 0) {
207  UTIL_THROW("Domain is not initialized");
208  }
209  if (domainPtr_->gridRank() != 0) {
210  UTIL_THROW("This is not the master processor");
211  }
212  if (newPtr_ != 0) {
213  UTIL_THROW("A newPtr_ is still active");
214  }
215 
216  #ifdef UTIL_MPI
217  // If reservoir is empty, send buffer to processor with maximum sendSize_
218  if (reservoir_.size() == 0) {
219 
220  Atom* ptr;
221  int rank = rankMaxSendSize_;
222  int size = sendSizes_[rank];
223  int nSend = std::min(size, sendCapacity_);
224  int begin = size - nSend;
225 
226  // Pack atoms into buffer, and return pointers for reuse.
227  for (int i = begin; i < size; ++i) {
228  ptr = sendArrays_(rank, i);
229  ptr->packAtom(*bufferPtr_);
230 
231  // Return pointer to the reservoir and remove it from sendArrays_.
232  reservoir_.push(*sendArrays_(rank, i));
233  sendArrays_(rank, i) = 0;
234  }
235  bool isComplete = false;
236  bufferPtr_->endSendBlock(isComplete);
237  nSentTotal_ += sendSizes_[rank];
238  sendSizes_[rank] = begin;
239 
240  // Send the buffer
241  bufferPtr_->send(domainPtr_->communicator(), rank);
242 
243  // Reinitialize the buffer for reuse.
244  bufferPtr_->clearSendBuffer();
245  bufferPtr_->beginSendBlock(Buffer::ATOM);
246  }
247  #endif
248 
249  if (reservoir_.size() == 0) {
250  UTIL_THROW("Empty cache reservoir (This should not happen)");
251  }
252 
253  // Pop pointer to new atom from reservoir and return that pointer.
254  newPtr_ = &reservoir_.pop();
255  newPtr_->clear();
256  return newPtr_;
257  }
258 
259  /*
260  * Add an atom to the list to be sent.
261  */
263  {
264  // Preconditions
265  if (domainPtr_ == 0) {
266  UTIL_THROW("AtomDistributor is not initialized");
267  }
268  if (boundaryPtr_ == 0) {
269  UTIL_THROW("AtomDistributor is not initialized");
270  }
271  if (!domainPtr_->isInitialized() != 0) {
272  UTIL_THROW("Domain is not initialized");
273  }
274  if (cacheCapacity_ <= 0) {
275  UTIL_THROW("AtomDistributor is not allocated");
276  }
277  if (domainPtr_->gridRank() != 0) {
278  UTIL_THROW("This is not the master processor");
279  }
280  if (storagePtr_->isCartesian()) {
281  UTIL_THROW("Atom coordinates are Cartesian");
282  }
283  if (newPtr_ == 0) {
284  UTIL_THROW("No active newPtr_");
285  }
286 
287  // Shift position to lie within primary unit cell.
288  boundaryPtr_->shiftGen(newPtr_->position());
289 
290  #ifdef UTIL_MPI
291  // Identify rank of processor that owns this atom.
292  int rank = domainPtr_->ownerRank(newPtr_->position());
293  #else
294  int rank = 0;
295  #endif
296 
297  // If owned by the master, add this atom to storage.
298  // If not owned by the master, queue this atom for sending.
299  if (rank == 0) {
300 
301  Atom* ptr = storagePtr_->newAtomPtr();
302  *ptr = *newPtr_;
303  storagePtr_->addNewAtom();
304  reservoir_.push(*newPtr_);
305 
306  // Note: Atom is returned to reservoir in a dirty state.
307  // Atoms must thus be cleared when popped from reservoir.
308  }
309  #ifdef UTIL_MPI
310  else { // if rank !=0
311 
312  // Add newPtr_ to array of pointers for processor rank.
313  assert(sendSizes_[rank] < sendCapacity_);
314  sendArrays_(rank, sendSizes_[rank]) = newPtr_;
315  ++sendSizes_[rank];
316  ++nCachedTotal_;
317 
318  // Check if sendSize for this rank is the largest.
319  if (rank != rankMaxSendSize_) {
320  if (sendSizes_[rank] > sendSizes_[rankMaxSendSize_]) {
321  rankMaxSendSize_ = rank;
322  }
323  }
324 
325  // If buffer for the relevant processor is full, send it now.
326  if (sendSizes_[rank] == sendCapacity_) {
327 
328  // Pack atoms into buffer, and return pointers for reuse.
329  Atom* ptr;
330  for (int i = 0; i < sendCapacity_; ++i) {
331  ptr = sendArrays_(rank, i);
332  ptr->packAtom(*bufferPtr_);
333 
334  // Push pointer onto reservoir and remove it from sendArrays_.
335  reservoir_.push(*sendArrays_(rank, i));
336  sendArrays_(rank, i) = 0;
337  }
338  bool isComplete = false;
339  bufferPtr_->endSendBlock(isComplete);
340  nSentTotal_ += sendCapacity_;
341  sendSizes_[rank] = 0;
342 
343  // Send the buffer
344  bufferPtr_->send(domainPtr_->communicator(), rank);
345 
346  // Reinitialize the send buffer for reuse.
347  bufferPtr_->clearSendBuffer();
348  bufferPtr_->beginSendBlock(Buffer::ATOM);
349  }
350 
351  }
352  #endif
353 
354  // Nullify newPtr_ to release for reuse.
355  newPtr_ = 0;
356 
357  #ifdef DDMD_ATOM_DISTRIBUTOR_DEBUG
358  // Check allocation of cache atoms
359  int sendSizeSum = 0;
360  int gridSize = domainPtr_->grid().size();
361  for (int i = 0; i < gridSize; ++i) {
362  sendSizeSum += sendSizes_[i];
363  }
364  if (sendSizeSum + reservoir_.size() != cacheCapacity_) {
365  UTIL_THROW("Error: Inconsistent cache atom count");
366  }
367  #endif
368 
369  // Return rank of processor that owns this atom.
370  return rank;
371  }
372 
373  #ifdef UTIL_MPI
374  /*
375  * Send any atoms that have not be sent previously.
376  *
377  * Called only on the master processor.
378  */
380  {
381 
382  // Preconditions
383  if (domainPtr_ == 0) {
384  UTIL_THROW("AtomDistributor is not initialized");
385  }
386  if (boundaryPtr_ == 0) {
387  UTIL_THROW("AtomDistributor is not initialized");
388  }
389  if (!domainPtr_->isInitialized() != 0) {
390  UTIL_THROW("Domain is not initialized");
391  }
392  if (domainPtr_->gridRank() != 0) {
393  UTIL_THROW("This is not the master processor");
394  }
395  if (newPtr_ != 0) {
396  UTIL_THROW("A newPtr_ is still active");
397  }
398 
399  int gridSize = domainPtr_->grid().size();
400  int i, j;
401  bool isComplete = true;
402  for (i = 1; i < gridSize; ++i) {
403 
404  // Pack all remaining atoms for this processor
405  Atom* ptr;
406  for (j = 0; j < sendSizes_[i]; ++j) {
407  ptr = sendArrays_(i, j);
408  ptr->packAtom(*bufferPtr_);
409 
410  // Return pointer to atom to the reservoir.
411  reservoir_.push(*sendArrays_(i, j));
412  }
413  bufferPtr_->endSendBlock(isComplete);
414  nSentTotal_ += sendSizes_[i];
415 
416  // Send buffer, with isComplete flag as true.
417  bufferPtr_->send(domainPtr_->communicator(), i);
418 
419  // Clear buffer and initialize for resending.
420  bufferPtr_->clearSendBuffer();
421  if (i < gridSize - 1) {
422  bufferPtr_->beginSendBlock(Buffer::ATOM);
423  }
424 
425  // Reset send arrays to empty state.
426  for (int j = 0; j < sendCapacity_; ++j) {
427  sendArrays_(i, j) = 0;
428  }
429  sendSizes_[i] = 0;
430  }
431 
432  // Compute total number of atoms on all processors.
433  // Note: Matching call at end of AtomDistributor::receive()
434  storagePtr_->unsetNAtomTotal();
435  storagePtr_->computeNAtomTotal(domainPtr_->communicator());
436 
437  // Postconditions
438  if (reservoir_.size() != reservoir_.capacity()) {
439  UTIL_THROW("atomReservoir not empty after final send");
440  }
441  if (nCachedTotal_ != nSentTotal_) {
442  UTIL_THROW("Number cached atoms != number sent");
443  }
444  if (storagePtr_->nAtomTotal() != nSentTotal_ + storagePtr_->nAtom()) {
445  UTIL_THROW("Number atoms received != number sent + nAtom on master");
446  }
447 
448  }
449  #endif
450 
451  #ifdef UTIL_MPI
452  /*
453  * Receive all atoms sent by the master processor.
454  *
455  * Called by all domain processors except the master.
456  */
458  {
459  Atom* ptr; // Ptr to atom for storage
460  int rank; // Rank of this processor
461  const int source = 0; // Rank of source processor
462  bool isComplete = false; // Have all atoms been received?
463 
464  // Preconditions
465  if (domainPtr_ == 0) {
466  UTIL_THROW("AtomDistributor is not initialized");
467  }
468  if (!domainPtr_->isInitialized() != 0) {
469  UTIL_THROW("Domain is not initialized");
470  }
471  rank = domainPtr_->gridRank();
472  if (rank == 0) {
473  UTIL_THROW("AtomDistributor::receive() called on master processor");
474  }
475 
476  while (!isComplete) {
477 
478  // Receive a buffer
479  bufferPtr_->recv(domainPtr_->communicator(), source);
480 
481  // Unpack the buffer
482  isComplete = bufferPtr_->beginRecvBlock();
483  while (bufferPtr_->recvSize() > 0) {
484  ptr = storagePtr_->newAtomPtr();
485  ptr->unpackAtom(*bufferPtr_);
486  storagePtr_->addNewAtom();
487  if (domainPtr_->ownerRank(ptr->position()) != rank) {
488  UTIL_THROW("Error: Atom on wrong processor");
489  }
490  }
491  bufferPtr_->endRecvBlock();
492 
493  }
494 
495  // Compute total number of atoms on all processors.
496  // Note: Matching call at end of AtomDistributor::send()
497  storagePtr_->unsetNAtomTotal();
498  storagePtr_->computeNAtomTotal(domainPtr_->communicator());
499  }
500  #endif
501 
502  /*
503  * Validate distribution of atoms, return total number of atoms.
504  * Called on all processors. Correct return value only on master.
505  */
507  {
508  storagePtr_->isValid(domainPtr_->communicator());
509 
510  // Check that number of atoms = nSentTotal
511  int nAtomTotal = 0;
512  storagePtr_->computeNAtomTotal(domainPtr_->communicator());
513  if (domainPtr_->isMaster()) {
514  nAtomTotal = storagePtr_->nAtomTotal();
515  if (nAtomTotal != nSentTotal_ + storagePtr_->nAtom()) {
516  UTIL_THROW("nAtomTotal != nAtom after distribution");
517  }
518  }
519 
520  // Check that every atom is in correct processor domain.
521  // Coordinates must scaled / generalized, rather than Cartesian.
522  double coordinate;
523  ConstAtomIterator atomIter;
524  int i;
525  storagePtr_->begin(atomIter);
526  for ( ; atomIter.notEnd(); ++atomIter) {
527  for (i=0; i < Dimension; ++i) {
528  coordinate = atomIter->position()[i];
529  if (coordinate < domainPtr_->domainBound(i, 0)) {
530  UTIL_THROW("coordinate < domainBound(i, 0)");
531  }
532  if (coordinate >= domainPtr_->domainBound(i, 1)) {
533  UTIL_THROW("coordinate >= domainBound(i, 1)");
534  }
535  }
536  }
537 
538  return nAtomTotal; // Only valid on master, returns 0 otherwise.
539  }
540 
541 }
virtual void readParameters(std::istream &in)
Read cacheCapacity.
bool beginRecvBlock()
Begin to receive a block from the recv buffer.
Definition: Buffer.cpp:271
AtomDistributor()
Constructor.
Atom * newAtomPtr()
Returns pointer an address available for a new Atom.
void setCapacity(int cacheCapacity)
Set capacity of the send cache on the master node.
const int Dimension
Dimensionality of space.
Definition: Dimension.h:19
void shiftGen(Vector &r) const
Shift generalized Vector r to its primary image.
void allocate(int capacity)
Allocate memory on the heap.
Definition: AtomArray.cpp:56
Atom * newAtomPtr()
Returns pointer an address available for a new Atom.
void clear()
Reset integer members to initial null values.
int nAtomTotal() const
Get total number of atoms on all processors.
~AtomDistributor()
Destructor.
bool isInitialized() const
Has this Domain been initialized by calling readParam?
Definition: Domain.h:349
An orthorhombic periodic unit cell.
void packAtom(Buffer &buffer)
Pack an Atom into a send buffer, for exchange of ownership.
Vector & position()
Get position Vector by reference.
int ownerRank(const Vector &position) const
Return rank of the processor whose domain contains a position.
Definition: Domain.cpp:208
void beginSendBlock(int sendType)
Initialize a data block.
Definition: Buffer.cpp:223
A point particle in an MD simulation.
Parallel domain decomposition (DD) MD simulation.
void unpackAtom(Buffer &buffer)
Unpack an atom from a recv buffer and receive ownership.
void addNewAtom()
Finalize addition of the most recent new atom.
int nAtom() const
Return number of local atoms on this procesor (excluding ghosts)
MPI::Intracomm & communicator() const
Return Cartesian communicator by reference.
Definition: Domain.h:257
int size() const
Get total number of grid points.
Definition: Grid.h:166
int gridRank() const
Get rank of this processor in the processor grid.
Definition: Domain.h:304
#define UTIL_THROW(msg)
Macro for throwing an Exception, reporting function, file and line number.
Definition: global.h:51
int addAtom()
Process the active atom for sending.
void associate(Domain &domain, Boundary &boundary, AtomStorage &storage, Buffer &buffer)
Set pointers to associated objects.
Const iterator for all atoms owned by an AtomStorage.
Utility classes for scientific computation.
Definition: accumulators.mod:1
void receive()
Receive all atoms sent by master processor.
int recvSize() const
Number of unread items left in current recv block.
Definition: Buffer.cpp:495
A container for all the atoms and ghost atoms on this processor.
void send(MPI::Intracomm &comm, int dest)
Send a complete buffer.
Definition: Buffer.cpp:365
const Grid & grid() const
Return processor Grid by const reference.
Definition: Domain.h:268
void computeNAtomTotal(MPI::Intracomm &communicator)
Compute the total number of local atoms on all processors.
bool isValid() const
Return true if the container is valid, or throw an Exception.
bool isMaster() const
Is this the master processor (gridRank == 0) ?
Definition: Domain.h:313
void setup()
Initialization before the loop over atoms on master processor.
Buffer for interprocessor communication.
Definition: Buffer.h:217
Decomposition of the system into domains associated with processors.
Definition: Domain.h:31
void recv(MPI::Intracomm &comm, int source)
Receive a buffer.
Definition: Buffer.cpp:393
bool notEnd() const
Is the current pointer not at the end of the array?
int validate()
Validate distribution of atoms after completion.
double domainBound(int i, int j) const
Get one boundary of the domain owned by this processor.
Definition: Domain.cpp:190
bool isCartesian() const
Are atom coordinates Cartesian (true) or generalized (false)?
void endSendBlock(bool isComplete=true)
Finalize a block in the send buffer.
Definition: Buffer.cpp:247
void setClassName(const char *className)
Set class name string.
bool isInitialized() const
Has this Buffer been initialized?
Definition: Buffer.cpp:158
int capacity() const
Return allocated size.
Definition: Array.h:153
void allocate(int capacity)
Allocate the underlying C array.
Definition: DArray.h:191
void clearSendBuffer()
Clear the send buffer.
Definition: Buffer.cpp:212
void endRecvBlock()
Finish processing a block in the recv buffer.
Definition: Buffer.cpp:301
int atomCapacity() const
Maximum number of atoms for which space is available.
Definition: Buffer.cpp:146
void begin(AtomIterator &iterator)
Set iterator to beginning of the set of atoms.
void send()
Send all atoms that have not be sent previously.
void unsetNAtomTotal()
Unset value of NAtomTotal (mark as unknown).