Simpatico  v1.10
GroupDistributor.tpp
1 #ifndef DDMD_GROUP_DISTRIBUTOR_TPP
2 #define DDMD_GROUP_DISTRIBUTOR_TPP
3 
4 /*
5 * Simpatico - Simulation Package for Polymeric and Molecular Liquids
6 *
7 * Copyright 2010 - 2017, The Regents of the University of Minnesota
8 * Distributed under the terms of the GNU General Public License.
9 */
10 
11 #include "GroupDistributor.h"
12 #include "Domain.h"
13 #include <ddMd/storage/AtomStorage.h>
14 #include <ddMd/storage/GroupStorage.tpp>
15 
16 #include <algorithm>
17 
18 namespace DdMd
19 {
20 
21  using namespace Util;
22 
23  /*
24  * Constructor.
25  */
26  template <int N>
28  : cache_(),
29  newPtr_(0),
30  domainPtr_(0),
31  atomStoragePtr_(0),
32  groupStoragePtr_(0),
33  bufferPtr_(0),
34  sendType_(Buffer::NONE),
35  nAtomRecv_(0),
36  nSentTotal_(0),
37  cacheCapacity_(1024),
38  cacheSize_(0)
39  { setClassName("GroupDistributor"); }
40 
41  /*
42  * Destructor.
43  */
44  template <int N>
46  {}
47 
48  /*
49  * Retain pointers to associated objects.
50  */
51  template <int N>
53  AtomStorage& atomStorage,
54  GroupStorage<N>& groupStorage,
55  Buffer& buffer)
56  {
57  domainPtr_ = &domain;
58  atomStoragePtr_ = &atomStorage;
59  groupStoragePtr_ = &groupStorage;
60  bufferPtr_ = &buffer;
61  }
62 
63  /*
64  * Set cache capacity.
65  */
66  template <int N>
67  void GroupDistributor<N>::setCapacity(int cacheCapacity)
68  {
69  if (cacheCapacity <= 0) {
70  UTIL_THROW("Attempt to set nonpositive cacheCapacity");
71  }
72  if (cache_.capacity() > 0) {
73  UTIL_THROW("Attempt to set cacheCapacity after allocation");
74  }
75  cacheCapacity_ = cacheCapacity;
76  }
77 
78  /*
79  * Read cacheCapacity and allocate all required memory.
80  */
81  template <int N>
82  void GroupDistributor<N>::readParameters(std::istream& in)
83  { read<int>(in, "cacheCapacity", cacheCapacity_); }
84 
85  /*
86  * Setup master before distribution. Call only on master.
87  */
88  template <int N>
90  {
91  // Allocate cache if necessary
92  if (cache_.capacity() == 0) {
93  if (cacheCapacity_ == 0) {
94  UTIL_THROW("cachCapacity_ not set");
95  }
96  cache_.allocate(cacheCapacity_);
97  }
98 
99  // Setup state of master before loop
100  bufferPtr_->clearSendBuffer();
101  bufferPtr_->beginSendBlock(Buffer::GROUP2 + N - 2);
102  nAtomRecv_ = 0;
103  newPtr_ = 0;
104  }
105 
106  /*
107  * Returns address for a new local Group. Call only on master.
108  */
109  template <int N>
111  {
112  // Preconditions
113  if (atomStoragePtr_ == 0) {
114  UTIL_THROW("GroupDistributor is not initialized");
115  }
116  if (groupStoragePtr_ == 0) {
117  UTIL_THROW("GroupDistributor is not initialized");
118  }
119  if (domainPtr_->gridRank() != 0) {
120  UTIL_THROW("GroupDistributor::add called on slave node");
121  }
122  if (cache_.capacity() <= 0) {
123  UTIL_THROW("GroupDistributor cache is not allocated");
124  }
125  if (newPtr_ != 0) {
126  UTIL_THROW("A newPtr_ is still active");
127  }
128 
129  #ifdef UTIL_MPI
130  // If the cache is full, broadcast it and clear it for reuse.
131  if (cacheSize_ == cacheCapacity_) {
132  bool isComplete = false;
133  int source = 0;
134  bufferPtr_->endSendBlock(isComplete);
135  bufferPtr_->bcast(domainPtr_->communicator(), source);
136  nSentTotal_ += cacheSize_;
137  bufferPtr_->clearSendBuffer();
138  bufferPtr_->beginSendBlock(Buffer::GROUP2 + N - 2);
139  cacheSize_ = 0;
140  }
141  #endif
142 
143  // Set newPtr to the next element in cache_, after last occupied.
144  newPtr_ = &cache_[cacheSize_];
145  return newPtr_;
146  }
147 
148  /*
149  * Add a Group to the list to be sent.
150  */
151  template <int N>
153  {
154  // Preconditions
155  if (atomStoragePtr_ == 0) {
156  UTIL_THROW("GroupDistributor is not initialized");
157  }
158  if (groupStoragePtr_ == 0) {
159  UTIL_THROW("GroupDistributor is not initialized");
160  }
161  if (domainPtr_ == 0) {
162  UTIL_THROW("GroupDistributor is not initialized");
163  }
164  if (!domainPtr_->isInitialized()) {
165  UTIL_THROW("Domain is not initialized");
166  }
167  if (domainPtr_->gridRank() != 0) {
168  UTIL_THROW("GroupDistributor::add called on slave node");
169  }
170  if (cache_.capacity() <= 0) {
171  UTIL_THROW("GroupDistributor cache is not allocated");
172  }
173  if (atomStoragePtr_->nGhost() != 0) {
174  UTIL_THROW("AtomStorage has ghosts");
175  }
176  if (newPtr_ == 0) {
177  UTIL_THROW("newPtr is null on entry to add()");
178  }
179 
180  // If group has at least one atom on master, add to groupStorage.
181  int nAtom = atomStoragePtr_->map().findGroupLocalAtoms(*newPtr_);
182  if (nAtom > 0) {
183  Group<N>* ptr = groupStoragePtr_->newPtr();
184  *ptr = *newPtr_;
185  groupStoragePtr_->add();
186  nAtomRecv_ += nAtom;
187  }
188  newPtr_->pack(*bufferPtr_);
189 
190  // Nullify newPtr_ to release for reuse.
191  newPtr_ = 0;
192  ++cacheSize_;
193 
194  }
195 
196  /*
197  * Send any groups that have not been sent previously.
198  *
199  * This method must be called only by the master processor.
200  */
201  template <int N>
203  {
204 
205  // Preconditions
206  if (atomStoragePtr_ == 0) {
207  UTIL_THROW("GroupDistributor is not initialized");
208  }
209  if (groupStoragePtr_ == 0) {
210  UTIL_THROW("GroupDistributor is not initialized");
211  }
212  if (domainPtr_ == 0) {
213  UTIL_THROW("GroupDistributor is not initialized");
214  }
215  if (bufferPtr_ == 0) {
216  UTIL_THROW("GroupDistributor is not initialized");
217  }
218  if (cacheCapacity_ <= 0) {
219  UTIL_THROW("GroupDistributor is not allocated");
220  }
221  if (domainPtr_->gridRank() != 0) {
222  UTIL_THROW("GroupDistributor::send called on slave node");
223  }
224  if (newPtr_ != 0) {
225  UTIL_THROW("A newPtr_ is still active");
226  }
227 
228  #ifdef UTIL_MPI
229  bool isComplete = true;
230  int source = 0;
231  bufferPtr_->endSendBlock(isComplete);
232  bufferPtr_->bcast(domainPtr_->communicator(), source);
233  nSentTotal_ += cacheSize_;
234  bufferPtr_->clearSendBuffer();
235 
236  validate();
237  groupStoragePtr_->unsetNTotal();
238  groupStoragePtr_->computeNTotal(domainPtr_->communicator());
239  if (groupStoragePtr_->nTotal() != nSentTotal_) {
240  UTIL_THROW("Number of groups not equal number sent");
241  }
242  groupStoragePtr_->isValid(*atomStoragePtr_, domainPtr_->communicator(),
243  false);
244  #else
245  groupStoragePtr_->unsetNTotal();
246  groupStoragePtr_->computeNTotal();
247  if (groupStoragePtr_->nTotal() != nSentTotal_) {
248  UTIL_THROW("Number of groups not equal number sent");
249  }
250  groupStoragePtr_->isValid(*atomStoragePtr_, false);
251  #endif
252 
253  cacheSize_ = 0;
254  newPtr_ = 0;
255  nSentTotal_ = 0;
256  nAtomRecv_ = 0;
257  }
258 
259  /*
260  * Receive all atoms sent by the master processor.
261  *
262  * Called by all processors except the master.
263  */
264  template <int N>
266  {
267  #ifdef UTIL_MPI
268  // Preconditions
269  if (atomStoragePtr_ == 0) {
270  UTIL_THROW("GroupDistributor is not initialized");
271  }
272  if (groupStoragePtr_ == 0) {
273  UTIL_THROW("GroupDistributor is not initialized");
274  }
275  if (domainPtr_ == 0) {
276  UTIL_THROW("GroupDistributor is not initialized");
277  }
278  if (bufferPtr_ == 0) {
279  UTIL_THROW("GroupDistributor is not initialized");
280  }
281  if (domainPtr_->gridRank() == 0) {
282  UTIL_THROW("GroupDistributor::receive called on master node");
283  }
284  if (atomStoragePtr_->nGhost() != 0) {
285  UTIL_THROW("Error: AtomStorage has ghosts");
286  }
287 
288  Group<N>* ptr;
289  int nAtom;
290  bool isComplete = false;
291  const int source = 0;
292 
293  nAtomRecv_ = 0;
294  while (!isComplete) {
295 
296  // Receive broadcast
297  bufferPtr_->bcast(domainPtr_->communicator(), source);
298 
299  // Unpack the buffer, set pointers to atoms
300  isComplete = bufferPtr_->beginRecvBlock();
301  while (bufferPtr_->recvSize() > 0) {
302  ptr = groupStoragePtr_->newPtr();
303  ptr->unpack(*bufferPtr_);
304  nAtom = atomStoragePtr_->map().findGroupLocalAtoms(*ptr);
305  if (nAtom > 0) {
306  groupStoragePtr_->add();
307  nAtomRecv_ += nAtom;
308  } else if (nAtom == 0) {
309  groupStoragePtr_->returnPtr();
310  } else {
311  UTIL_THROW("Invalid return value from findGroupLocalAtoms");
312  }
313  }
314 
315  }
316 
317  // Validate Data
318  validate(); // Check number of local atoms in groups
319  groupStoragePtr_->unsetNTotal();
320  groupStoragePtr_->computeNTotal(domainPtr_->communicator());
321  groupStoragePtr_->isValid(*atomStoragePtr_, domainPtr_->communicator(),
322  false);
323 
324  nAtomRecv_ = 0;
325  #endif
326  }
327 
328  #ifdef UTIL_MPI
329 
332  template <int N>
334  {
335  int nAtomRecvTot;
336  const int source = 0;
337  domainPtr_->communicator()
338  .Reduce(&nAtomRecv_, &nAtomRecvTot, 1, MPI::INT, MPI::SUM, source);
339  if (domainPtr_->gridRank() == 0) {
340  if (nAtomRecvTot != nSentTotal_*N) {
341  Log::file() << "nSentTotal_*N = " << nSentTotal_*N << std::endl;
342  Log::file() << "nAtomRecvTot = " << nAtomRecvTot << std::endl;
343  UTIL_THROW("Discrepancy in number of local atoms in groups");
344  }
345  }
346  }
347  #endif
348 
349 }
350 #endif
bool beginRecvBlock()
Begin to receive a block from the recv buffer.
Definition: Buffer.cpp:271
void add()
Add a group to the cache for sending, send if necessary.
bool isInitialized() const
Has this Domain been initialized by calling readParam?
Definition: Domain.h:349
void unpack(Buffer &buffer)
Unpack a Group from the recv buffer.
void beginSendBlock(int sendType)
Initialize a data block.
Definition: Buffer.cpp:223
void associate(Domain &domain, AtomStorage &atomStorage, GroupStorage< N > &groupStorage, Buffer &buffer)
Create required associations with related objects.
Parallel domain decomposition (DD) MD simulation.
~GroupDistributor()
Destructor.
MPI::Intracomm & communicator() const
Return Cartesian communicator by reference.
Definition: Domain.h:257
virtual void readParameters(std::istream &in)
Read cacheCapacity, allocate memory and initialize object.
int gridRank() const
Get rank of this processor in the processor grid.
Definition: Domain.h:304
#define UTIL_THROW(msg)
Macro for throwing an Exception, reporting function, file and line number.
Definition: global.h:51
int nGhost() const
Return current number of ghost atoms on this processor.
Utility classes for scientific computation.
Definition: accumulators.mod:1
const AtomMap & map() const
Return the AtomMap by const reference.
Group< N > * newPtr()
Returns pointer an address available for a new Group<N>.
A container for all the Group<N> objects on this processor.
int recvSize() const
Number of unread items left in current recv block.
Definition: Buffer.cpp:495
void send()
Send all atoms that have not be sent previously.
A container for all the atoms and ghost atoms on this processor.
int findGroupLocalAtoms(Group< N > &group) const
Set handles to local atoms in a Group<N> object.
Definition: AtomMap.h:257
GroupDistributor()
Constructor.
Buffer for interprocessor communication.
Definition: Buffer.h:217
Decomposition of the system into domains associated with processors.
Definition: Domain.h:31
static std::ostream & file()
Get log ostream by reference.
Definition: Log.cpp:57
void setup()
Initialize Buffer for sending.
void receive()
Receive all atoms sent by master processor.
void endSendBlock(bool isComplete=true)
Finalize a block in the send buffer.
Definition: Buffer.cpp:247
void setClassName(const char *className)
Set class name string.
void bcast(MPI::Intracomm &comm, int source)
Broadcast a buffer.
Definition: Buffer.cpp:417
A group of covalently interacting atoms.
void setCapacity(int cacheCapacity)
Set cacheCapacity, allocate memory and initialize object.
void clearSendBuffer()
Clear the send buffer.
Definition: Buffer.cpp:212
Class template for distributing Group<N> objects among processors.