Simpatico  v1.10
AtomCollector.cpp
1 /*
2 * Simpatico - Simulation Package for Polymeric and Molecular Liquids
3 *
4 * Copyright 2010 - 2017, The Regents of the University of Minnesota
5 * Distributed under the terms of the GNU General Public License.
6 */
7 
8 #include "AtomCollector.h"
9 #include "Domain.h"
10 #include "Buffer.h"
11 #include <ddMd/storage/AtomStorage.h>
12 
13 #include <algorithm>
14 
15 namespace DdMd
16 {
17 
18  using namespace Util;
19 
20  /*
21  * Constructor.
22  */
24  : domainPtr_(0),
25  bufferPtr_(0),
26  source_(-1),
27  recvBufferSize_(-1),
28  recvArrayCapacity_(256),
29  recvArraySize_(-1),
30  recvArrayId_(-1),
31  isComplete_(false)
32  { setClassName("AtomCollector"); }
33 
34  /*
35  * Destructor.
36  */
38  {}
39 
40  /*
41  * Retain pointers to associated objects.
42  *
43  * Call on all domain notes.
44  */
45  void AtomCollector::associate(Domain& domain, AtomStorage& storage,
46  Buffer& buffer)
47  {
48  domainPtr_ = &domain;
49  storagePtr_ = &storage;
50  bufferPtr_ = &buffer;
51  }
52 
53  /*
54  * Set recvArray cache capacity.
55  */
56  void AtomCollector::setCapacity(int recvArrayCapacity)
57  {
58  if (recvArrayCapacity <= 0) {
59  UTIL_THROW("Attempt to set nonpositive recvArrayCapacity");
60  }
61  if (recvArray_.capacity() > 0) {
62  UTIL_THROW("Attempt to set recvArrayCapacity after allocation");
63  }
64  recvArrayCapacity_ = recvArrayCapacity;
65  }
66 
67  /*
68  * Setup before receiving loop - call only on master.
69  */
71  {
72  // Preconditions
73  if (!domainPtr_) {
74  UTIL_THROW("Collector not initialized");
75  }
76  if (!domainPtr_->isInitialized()) {
77  UTIL_THROW("Domain is not initialized");
78  }
79  if (!domainPtr_->isMaster()) {
80  UTIL_THROW("Not the master processor");
81  }
82  if (!bufferPtr_->isInitialized()) {
83  UTIL_THROW("Buffer not allocated");
84  }
85 
86  // Allocate recvArray if not done previously
87  if (recvArray_.capacity() == 0) {
88  if (recvArrayCapacity_ == 0) {
89  UTIL_THROW("recvArrayCapacity_ not set");
90  }
91  recvArray_.allocate(recvArrayCapacity_);
92  }
93 
94  source_ = 0; // rank of source node
95  recvBufferSize_ = 0; // number of atoms in MPI buffer
96  recvArraySize_ = 0; // number of atoms in recvArray_
97  recvArrayId_ = 0; // id of current atom in recvArray_
98  isComplete_ = false; // not finished with current processor
99 
100  // Initialize Atom iterator on master processor.
101  storagePtr_->begin(iterator_);
102  }
103 
104  #ifdef UTIL_MPI
105  /*
106  * Return address for a new Atom, or null when all are received.
107  *
108  * Call this function only on the master processor.
109  */
111  {
112  // Preconditions
113  if (domainPtr_ == 0) {
114  UTIL_THROW("AtomCollector has not been initialized");
115  }
116  if (!domainPtr_->isInitialized() != 0) {
117  UTIL_THROW("Domain is not initialized");
118  }
119  if (!domainPtr_->isMaster()) {
120  UTIL_THROW("Not the master processor");
121  }
122 
123  // If still processing atoms from master processor
124  Atom* ptr;
125  if (source_ == 0) {
126  if (iterator_.notEnd()) {
127  ptr = iterator_.get();
128  ++iterator_;
129  return ptr;
130  } else {
131  recvBufferSize_ = 0;
132  recvArraySize_ = 0;
133  recvArrayId_ = 0;
134  isComplete_ = true;
135  }
136  }
137 
138  // While at end of recvArray_, or while array is empty.
139  while (recvArrayId_ == recvArraySize_) {
140 
141  // If receive buffer is empty
142  if (recvBufferSize_ == 0) {
143 
144  // If processing of items from processor source_ is complete.
145  if (isComplete_) {
146  ++source_;
147  recvArraySize_ = 0;
148  recvArrayId_ = 0;
149  isComplete_ = false;
150  // If last processor is complete, return null pointer.
151  if (source_ == domainPtr_->grid().size()) {
152  source_ = 0;
153  return 0;
154  }
155  }
156 
157  assert(recvBufferSize_ == 0); // recv buffer is empty
158  assert(!isComplete_); // source_ is not completed
159 
160  // Send request to processor source_ .
161  int message = source_;
162  domainPtr_->communicator().Send(&message, 1, MPI::INT,
163  source_, message);
164 
165  // Receive buffer from processor source_
166  bufferPtr_->recv(domainPtr_->communicator(), source_);
167  isComplete_ = bufferPtr_->beginRecvBlock();
168  recvBufferSize_ = bufferPtr_->recvSize();
169  }
170 
171  // Unpack atoms from recv buffer into recvArray_.
172  if (recvBufferSize_ > 0) {
173  recvArraySize_ = 0;
174  recvArrayId_ = 0;
175  if (recvBufferSize_ != bufferPtr_->recvSize()) {
176  UTIL_THROW("Inconsistent buffer receive counters");
177  }
178  while (bufferPtr_->recvSize() > 0
179  && recvArraySize_ < recvArray_.capacity())
180  {
181  recvArray_[recvArraySize_].unpackAtom(*bufferPtr_);
182  ++recvArraySize_;
183  --recvBufferSize_;
184  if (recvBufferSize_ != bufferPtr_->recvSize()) {
185  UTIL_THROW("Inconsistent buffer receive counters");
186  }
187  }
188  if (bufferPtr_->recvSize() == 0) {
189  bufferPtr_->endRecvBlock();
190  }
191  }
192 
193  }
194 
195  // Return current item from recvArray.
196  ++recvArrayId_;
197  return &recvArray_[recvArrayId_ - 1];
198 
199  }
200 
201  /*
202  * Send all atoms from this process.
203  *
204  * Call on every domain processor except the master.
205  */
206  void
208  {
209 
210  // Preconditions
211  if (!domainPtr_->isInitialized()) {
212  UTIL_THROW("Domain is not initialized");
213  }
214  if (domainPtr_->isMaster()) {
215  UTIL_THROW("AtomCollector::send() called from master node.");
216  }
217  if (!bufferPtr_->isInitialized()) {
218  UTIL_THROW("Buffer is not initialized");
219  }
220 
221  // Initialize atom iterator
222  storagePtr_->begin(iterator_);
223 
224  isComplete_= false;
225  while (!isComplete_) {
226 
227  // Receive notice from master to send atoms (blocking receive)
228  int message;
229  int tag = domainPtr_->communicator().Get_rank();
230  domainPtr_->communicator().Recv(&message, 1, MPI::INT, 0, tag);
231 
232  // Pack buffer with atoms
233  int recvArraySize_ = 0;
234  isComplete_ = iterator_.isEnd();
235  bufferPtr_->clearSendBuffer();
236  bufferPtr_->beginSendBlock(Buffer::ATOM);
237  while (recvArraySize_ < bufferPtr_->atomCapacity() && !isComplete_) {
238  iterator_->packAtom(*bufferPtr_);
239  ++recvArraySize_;
240  ++iterator_;
241  isComplete_ = iterator_.isEnd();
242  }
243  bufferPtr_->endSendBlock(isComplete_);
244 
245  // Send buffer to master
246  bufferPtr_->send(domainPtr_->communicator(), 0);
247 
248  }
249 
250  }
251  #endif
252 
253 }
bool isEnd() const
Is the current pointer at the end of the PArray?
bool beginRecvBlock()
Begin to receive a block from the recv buffer.
Definition: Buffer.cpp:271
void associate(Domain &domain, AtomStorage &storage, Buffer &buffer)
Initialize pointers to associated objects.
void allocate(int capacity)
Allocate memory on the heap.
Definition: AtomArray.cpp:56
~AtomCollector()
Destructor.
bool isInitialized() const
Has this Domain been initialized by calling readParam?
Definition: Domain.h:349
void beginSendBlock(int sendType)
Initialize a data block.
Definition: Buffer.cpp:223
void send()
Send all atoms to the master.
void setCapacity(int recvArrayCapacity)
Set cache capacity on master processor.
A point particle in an MD simulation.
Parallel domain decomposition (DD) MD simulation.
MPI::Intracomm & communicator() const
Return Cartesian communicator by reference.
Definition: Domain.h:257
int size() const
Get total number of grid points.
Definition: Grid.h:166
#define UTIL_THROW(msg)
Macro for throwing an Exception, reporting function, file and line number.
Definition: global.h:51
bool notEnd() const
Is the current pointer not at the end of the PArray?
Data * get() const
Return a pointer to the current data.
Utility classes for scientific computation.
Definition: accumulators.mod:1
Atom * nextPtr()
Return a pointer to the next available atom, or null.
int recvSize() const
Number of unread items left in current recv block.
Definition: Buffer.cpp:495
A container for all the atoms and ghost atoms on this processor.
void send(MPI::Intracomm &comm, int dest)
Send a complete buffer.
Definition: Buffer.cpp:365
const Grid & grid() const
Return processor Grid by const reference.
Definition: Domain.h:268
bool isMaster() const
Is this the master processor (gridRank == 0) ?
Definition: Domain.h:313
Buffer for interprocessor communication.
Definition: Buffer.h:217
Decomposition of the system into domains associated with processors.
Definition: Domain.h:31
void recv(MPI::Intracomm &comm, int source)
Receive a buffer.
Definition: Buffer.cpp:393
AtomCollector()
Constructor.
void endSendBlock(bool isComplete=true)
Finalize a block in the send buffer.
Definition: Buffer.cpp:247
void setClassName(const char *className)
Set class name string.
bool isInitialized() const
Has this Buffer been initialized?
Definition: Buffer.cpp:158
void setup()
Setup master processor for receiving.
int capacity() const
Return allocated size.
Definition: Array.h:153
void clearSendBuffer()
Clear the send buffer.
Definition: Buffer.cpp:212
void endRecvBlock()
Finish processing a block in the recv buffer.
Definition: Buffer.cpp:301
void begin(AtomIterator &iterator)
Set iterator to beginning of the set of atoms.