Simpatico  v1.10
GroupCollector.tpp
1 #ifndef DDMD_GROUP_COLLECTOR_TPP
2 #define DDMD_GROUP_COLLECTOR_TPP
3 
4 /*
5 * Simpatico - Simulation Package for Polymeric and Molecular Liquids
6 *
7 * Copyright 2010 - 2017, The Regents of the University of Minnesota
8 * Distributed under the terms of the GNU General Public License.
9 */
10 
11 #include "GroupCollector.h"
12 #include "Domain.h"
13 #include "Buffer.h"
14 #include <ddMd/storage/GroupStorage.tpp>
15 
16 namespace DdMd
17 {
18 
19  using namespace Util;
20 
21  /*
22  * Constructor.
23  */
24  template <int N>
26  : domainPtr_(0),
27  storagePtr_(0),
28  bufferPtr_(0),
29  source_(-1),
30  recvArrayCapacity_(256),
31  recvBufferSize_(-1),
32  recvArraySize_(-1),
33  recvArrayId_(-1),
34  isComplete_(false)
35  { setClassName("GroupCollector"); }
36 
37  /*
38  * Destructor.
39  */
40  template <int N>
42  {}
43 
44  /*
45  * Retain pointers to associated objects.
46  */
47  template <int N>
49  Buffer& buffer)
50  {
51  domainPtr_ = &domain;
52  storagePtr_ = &storage;
53  bufferPtr_ = &buffer;
54  }
55 
56  /*
57  * Set recvArray capacity (only needed on master).
58  */
59  template <int N>
60  void GroupCollector<N>::setCapacity(int recvArrayCapacity)
61  {
62  if (recvArrayCapacity <= 0) {
63  UTIL_THROW("Attempt to set nonpositive recvArrayCapacity");
64  }
65  if (recvArray_.capacity() > 0) {
66  UTIL_THROW("Attempt to set recvArrayCapacity after allocation");
67  }
68  recvArrayCapacity_ = recvArrayCapacity;
69  }
70 
71  /*
72  * Setup on master processor just before loop over groups.
73  */
74  template <int N>
76  {
77  // Preconditions
78  if (!domainPtr_) {
79  UTIL_THROW("Collector not initialized: No associated domain");
80  }
81  if (!domainPtr_->isInitialized()) {
82  UTIL_THROW("Domain is not initialized");
83  }
84  if (!domainPtr_->isMaster()) {
85  UTIL_THROW("Not the master processor");
86  }
87  if (!bufferPtr_->isInitialized()) {
88  UTIL_THROW("Buffer not allocated");
89  }
90 
91  // Allocate recvArray if necessary
92  if (recvArray_.capacity() == 0) {
93  if (recvArrayCapacity_ == 0) {
94  UTIL_THROW("recvArrayCapacity_ not set");
95  }
96  recvArray_.allocate(recvArrayCapacity_);
97  }
98 
99  source_ = 0; // rank of source node
100  recvBufferSize_ = 0; // number of groups in MPI buffer
101  recvArraySize_ = 0; // number of groups in recvArray_
102  recvArrayId_ = 0; // id of current group in recvArray_
103  isComplete_ = false; // not finished with current processor
104 
105  // Initialize Group iterator on master processor.
106  storagePtr_->begin(iterator_);
107  }
108 
109  #ifdef UTIL_MPI
110  /*
111  * Returns address for a new Group.
112  */
113  template <int N>
115  {
116  // Preconditions
117  if (domainPtr_ == 0) {
118  UTIL_THROW("GroupCollector has not been initialized");
119  }
120  if (!domainPtr_->isInitialized() != 0) {
121  UTIL_THROW("Domain is not initialized");
122  }
123  if (!domainPtr_->isMaster()) {
124  UTIL_THROW("Not the master processor");
125  }
126  if (recvArray_.capacity() <= 0) {
127  UTIL_THROW("Cache not allocated");
128  }
129 
130  // If master processor
131  Group<N>* groupPtr;
132  Atom* atomPtr;
133  if (source_ == 0) {
134  while (!isComplete_) {
135  if (iterator_.notEnd()) {
136  groupPtr = iterator_.get();
137  ++iterator_;
138  atomPtr = groupPtr->atomPtr(0);
139  if (atomPtr) {
140  if (!atomPtr->isGhost()) {
141  return groupPtr;
142  }
143  }
144  } else {
145  recvBufferSize_ = 0;
146  recvArraySize_ = 0;
147  recvArrayId_ = 0;
148  isComplete_ = true;
149  }
150  }
151  }
152 
153  // While at end of recvArray_, or while array is empty.
154  while (recvArrayId_ == recvArraySize_) {
155 
156  // If receive buffer is empty
157  if (recvBufferSize_ == 0) {
158 
159  // If processing of items from processor source_ is complete.
160  if (isComplete_) {
161  ++source_;
162  recvArraySize_ = 0;
163  recvArrayId_ = 0;
164  isComplete_ = false;
165  // If last processor is complete, return null pointer.
166  if (source_ == domainPtr_->grid().size()) {
167  source_ = 0;
168  return 0;
169  }
170  }
171 
172  // Send request to processor source_
173  int message = source_;
174  domainPtr_->communicator().Send(&message, 1, MPI::INT,
175  source_, message);
176 
177  // Receive buffer from processor source_
178  bufferPtr_->recv(domainPtr_->communicator(), source_);
179  isComplete_ = bufferPtr_->beginRecvBlock();
180  recvBufferSize_ = bufferPtr_->recvSize();
181  }
182 
183  // Unpack groups from buffer into recvArray_.
184  if (recvBufferSize_ > 0) {
185  recvArraySize_ = 0;
186  recvArrayId_ = 0;
187  while (bufferPtr_->recvSize() > 0
188  && recvArraySize_ < recvArray_.capacity())
189  {
190  recvArray_[recvArraySize_].unpack(*bufferPtr_);
191  ++recvArraySize_;
192  --recvBufferSize_;
193  if (recvBufferSize_ != bufferPtr_->recvSize()) {
194  UTIL_THROW("Inconsistent buffer receive counters");
195  }
196  }
197  }
198 
199  }
200 
201  // Return current item from recvArray.
202  ++recvArrayId_;
203  return &recvArray_[recvArrayId_ - 1];
204 
205  }
206 
207  /*
208  * Send all groups from this process.
209  *
210  * Call on every processor except the master.
211  */
212  template <int N>
214  {
215 
216  // Preconditions
217  if (!domainPtr_) {
218  UTIL_THROW("Collector not initialized: null domainPtr_");
219  }
220  if (!domainPtr_->isInitialized()) {
221  UTIL_THROW("Domain is not initialized");
222  }
223  if (domainPtr_->isMaster()) {
224  UTIL_THROW("GroupCollector<N>::send() called from master node.");
225  }
226  if (!storagePtr_) {
227  UTIL_THROW("Collector not initialized: Null storagePtr_");
228  }
229  if (storagePtr_->capacity() <= 0) {
230  UTIL_THROW("GroupStorage not initialized");
231  }
232  if (!bufferPtr_) {
233  UTIL_THROW("Collector not initialized: Null bufferPtr_");
234  }
235  if (!bufferPtr_->isInitialized()) {
236  UTIL_THROW("Buffer not allocated");
237  }
238 
239  Atom* atomPtr = 0;
240  int message;
241  int tag;
242  int bufferCapacity = bufferPtr_->groupCapacity<N>();
243 
244  // Initialize group iterator
245  storagePtr_->begin(iterator_);
246 
247  isComplete_= false;
248  while (!isComplete_) {
249 
250  // Receive notice from master to send groups (blocking receive)
251  tag = domainPtr_->communicator().Get_rank();
252  domainPtr_->communicator().Recv(&message, 1, MPI::INT, 0, tag);
253 
254  // Pack buffer with groups
255  int recvArraySize_ = 0;
256  isComplete_ = iterator_.isEnd();
257  bufferPtr_->clearSendBuffer();
258  bufferPtr_->beginSendBlock(Buffer::GROUP2 + N - 2);
259  while (recvArraySize_ < bufferCapacity && !isComplete_) {
260  // Get pointer to first atom in Group
261  // Send group only if this is a local atom.
262  atomPtr = iterator_->atomPtr(0);
263  if (atomPtr) {
264  if (!atomPtr->isGhost()) {
265  iterator_->pack(*bufferPtr_);
266  ++recvArraySize_;
267  }
268  }
269  ++iterator_;
270  isComplete_ = iterator_.isEnd();
271  }
272  bufferPtr_->endSendBlock(isComplete_);
273 
274  // Send buffer to master
275  bufferPtr_->send(domainPtr_->communicator(), 0);
276 
277  }
278 
279  }
280  #endif
281 
282 }
283 #endif // ifndef DDMD_GROUP_COLLECTOR_TPP
void setup()
Setup master processor for receiving.
bool beginRecvBlock()
Begin to receive a block from the recv buffer.
Definition: Buffer.cpp:271
int groupCapacity() const
Maximum number of group<N> objects for which space is available.
Definition: Buffer.h:593
Group< N > * nextPtr()
Return a pointer to the next available atom, or null.
bool isInitialized() const
Has this Domain been initialized by calling readParam?
Definition: Domain.h:349
void beginSendBlock(int sendType)
Initialize a data block.
Definition: Buffer.cpp:223
Atom * atomPtr(int i) const
Get a pointer to a specific Atom.
A point particle in an MD simulation.
Parallel domain decomposition (DD) MD simulation.
MPI::Intracomm & communicator() const
Return Cartesian communicator by reference.
Definition: Domain.h:257
GroupCollector()
Constructor.
int size() const
Get total number of grid points.
Definition: Grid.h:166
#define UTIL_THROW(msg)
Macro for throwing an Exception, reporting function, file and line number.
Definition: global.h:51
bool isGhost() const
Is this atom a ghost?
Utility classes for scientific computation.
Definition: accumulators.mod:1
void setCapacity(int recvArrayCapacity)
Set size of cache for receiving groups on the master.
A container for all the Group<N> objects on this processor.
int recvSize() const
Number of unread items left in current recv block.
Definition: Buffer.cpp:495
void send(MPI::Intracomm &comm, int dest)
Send a complete buffer.
Definition: Buffer.cpp:365
const Grid & grid() const
Return processor Grid by const reference.
Definition: Domain.h:268
bool isMaster() const
Is this the master processor (gridRank == 0) ?
Definition: Domain.h:313
Buffer for interprocessor communication.
Definition: Buffer.h:217
Decomposition of the system into domains associated with processors.
Definition: Domain.h:31
void recv(MPI::Intracomm &comm, int source)
Receive a buffer.
Definition: Buffer.cpp:393
void send()
Send all groups on this processor to the master processor.
~GroupCollector()
Destructor.
void associate(Domain &domain, GroupStorage< N > &storage, Buffer &buffer)
Initialize pointers to associated objects.
void endSendBlock(bool isComplete=true)
Finalize a block in the send buffer.
Definition: Buffer.cpp:247
void setClassName(const char *className)
Set class name string.
bool isInitialized() const
Has this Buffer been initialized?
Definition: Buffer.cpp:158
A group of covalently interacting atoms.
void clearSendBuffer()
Clear the send buffer.
Definition: Buffer.cpp:212