Simpatico  v1.10
Buffer.cpp
1 /*
2 * Simpatico - Simulation Package for Polymeric and Molecular Liquids
3 *
4 * Copyright 2010 - 2017, The Regents of the University of Minnesota
5 * Distributed under the terms of the GNU General Public License.
6 */
7 
8 #include "Buffer.h"
9 #include "Domain.h"
10 #include <util/misc/Memory.h>
11 #include <ddMd/chemistry/Atom.h>
12 #include <ddMd/chemistry/Group.h>
13 #include <util/format/Int.h>
14 
15 namespace DdMd
16 {
17  using namespace Util;
18 
19  /*
20  * Constructor.
21  */
23  : ParamComposite(),
24  #ifdef UTIL_MPI
25  sendBufferBegin_(0),
26  recvBufferBegin_(0),
27  sendBufferEnd_(0),
28  recvBufferEnd_(0),
29  sendBlockBegin_(0),
30  recvBlockBegin_(0),
31  recvBlockEnd_(0),
32  sendPtr_(0),
33  recvPtr_(0),
34  bufferCapacity_(-1),
35  dataCapacity_(-1),
36  sendSize_(0),
37  recvSize_(0),
38  recvType_(NONE),
39  #endif
40  atomCapacity_(-1),
41  ghostCapacity_(-1),
42  maxSendLocal_(0),
43  isInitialized_(false)
44  { setClassName("Buffer"); }
45 
46  /*
47  * Destructor.
48  */
50  {
51  if (sendBufferBegin_) {
52  Memory::deallocate<char>(sendBufferBegin_, bufferCapacity_);
53  }
54  if (recvBufferBegin_) {
55  Memory::deallocate<char>(recvBufferBegin_, bufferCapacity_);
56  }
57  }
58 
59  /*
60  * Allocate send and recv buffers.
61  */
62  void Buffer::allocate(int atomCapacity, int ghostCapacity)
63  {
64  //Preconditions
65  if (atomCapacity < 0) {
66  UTIL_THROW("Negative atomCapacity");
67  }
68  if (ghostCapacity < 0) {
69  UTIL_THROW("Negative ghostCapacity");
70  }
71 
72  atomCapacity_ = atomCapacity;
73  ghostCapacity_ = ghostCapacity;
74 
75  #ifdef UTIL_MPI
76  // Do actual allocation
77  allocate();
78  #endif
79 
80  isInitialized_ = true;
81  }
82 
83  /*
84  * Read capacities, and allocate buffers.
85  */
86  void Buffer::readParameters(std::istream& in)
87  {
88 
89  // Read parameters
90  read<int>(in, "atomCapacity", atomCapacity_);
91  read<int>(in, "ghostCapacity", ghostCapacity_);
92 
93  //Preconditions
94  if (atomCapacity_ < 0) {
95  UTIL_THROW("Negative atomCapacity");
96  }
97  if (ghostCapacity_ < 0) {
98  UTIL_THROW("Negative ghostCapacity");
99  }
100 
101  #ifdef UTIL_MPI
102  // Do actual allocation
103  allocate();
104  #endif
105 
106  isInitialized_ = true;
107  }
108 
109  /*
110  * Load internal state from an archive.
111  */
113  {
114  // Read parameters
115  loadParameter<int>(ar, "atomCapacity", atomCapacity_);
116  loadParameter<int>(ar, "ghostCapacity", ghostCapacity_);
117 
118  // Validate data
119  if (atomCapacity_ < 0) {
120  UTIL_THROW("Negative atomCapacity");
121  }
122  if (ghostCapacity_ < 0) {
123  UTIL_THROW("Negative ghostCapacity");
124  }
125 
126  #ifdef UTIL_MPI
127  // Do actual allocation
128  allocate();
129  #endif
130 
131  isInitialized_ = true;
132  }
133 
134  /*
135  * Save internal state to an archive.
136  */
138  {
139  ar << atomCapacity_;
140  ar << ghostCapacity_;
141  }
142 
143  /*
144  * Maximum number of atoms for which space is available.
145  */
147  { return atomCapacity_; }
148 
149  /*
150  * Maximum number of ghost atoms for which space is available.
151  */
153  { return ghostCapacity_; }
154 
155  /*
156  * Has this buffer been initialized?
157  */
159  { return isInitialized_; }
160 
161  #ifdef UTIL_MPI
162  /*
163  * Allocate send and recv buffers (private method).
164  *
165  * This method uses values of atomCapacity_ and ghostCapacity_ that
166  * must have been set previously. It is called by allocate(int, int)
167  * and readParameters() to do the actual allocation.
168  */
169  void Buffer::allocate()
170  {
171 
172  // Preconditions
173  if (atomCapacity_ <= 0) {
174  UTIL_THROW("atomCapacity_ must be positive");
175  }
176  if (ghostCapacity_ <= 0) {
177  UTIL_THROW("ghostCapacity_ must be positive");
178  }
179  if (isAllocated()) {
180  UTIL_THROW("Buffer cannot be re-allocated");
181  }
182 
183  // Capacity in bytes send and receive buffers. This is maximum of
184  // the buffer space required by the local atoms and ghost atoms.
185  int atomDataSize = atomCapacity_*Atom::packedAtomSize();
186  int ghostDataSize = ghostCapacity_*Atom::packedGhostSize();
187  if (atomDataSize > ghostDataSize) {
188  dataCapacity_ = atomDataSize;
189  ghostCapacity_ = dataCapacity_/Atom::packedGhostSize();
190  } else {
191  dataCapacity_ = ghostDataSize;
192  atomCapacity_ = dataCapacity_/Atom::packedAtomSize();
193  }
194 
195  // Leave space for a 4 byte header
196  bufferCapacity_ += dataCapacity_ + 4 * sizeof(int);
197 
198  // Allocate memory for the send buffer
199  Memory::allocate<char>(sendBufferBegin_, bufferCapacity_);
200  sendBufferEnd_ = sendBufferBegin_ + bufferCapacity_;
201 
202  // Allocate memory for the receive buffer
203  Memory::allocate<char>(recvBufferBegin_, bufferCapacity_);
204  recvBufferEnd_ = recvBufferBegin_ + bufferCapacity_;
205 
206  recvPtr_ = recvBufferBegin_;
207  }
208 
209  /*
210  * Clear the send buffer prior to packing, and set the sendType.
211  */
213  {
214  sendPtr_ = sendBufferBegin_;
215  sendSize_ = 0;
216  sendType_ = NONE;
217  sendBlockBegin_ = 0;
218  }
219 
220  /*
221  * Clear the send buffer prior to packing, and set the sendType.
222  */
223  void Buffer::beginSendBlock(int sendType)
224  {
225  if (sendSize_ != 0) {
226  UTIL_THROW("Error: previous send block not finalized");
227  }
228 
229  // Set number of atoms currently in send block to zero.
230  sendSize_ = 0;
231 
232  // Data type to be sent.
233  sendType_ = sendType;
234 
235  // Mark beginning of block.
236  sendBlockBegin_ = sendPtr_;
237 
238  // Increment sendPtr_ to leave space for 4 integers.
239  int* sendBuffPtr = (int *)sendPtr_;
240  sendBuffPtr += 4;
241  sendPtr_ = (char *)sendBuffPtr;
242  }
243 
244  /*
245  * Finalize data block in buffer. Pack prefix data.
246  */
247  void Buffer::endSendBlock(bool isComplete)
248  {
249  int sendBytes = (int)(sendPtr_ - sendBlockBegin_);
250 
251  // Add passport to the beginning of the block:
252  // Pack sendSize_, sendBytes, sendType_ and isComplete.
253  int* sendBuffPtr = (int *)sendBlockBegin_;
254  *sendBuffPtr = sendSize_;
255  ++sendBuffPtr;
256  *sendBuffPtr = sendBytes;
257  ++sendBuffPtr;
258  *sendBuffPtr = sendType_;
259  ++sendBuffPtr;
260  *sendBuffPtr = (int) isComplete;
261 
262  // Clear variables associated with the sent block.
263  sendBlockBegin_ = 0;
264  sendSize_ = 0;
265  sendType_ = NONE;
266  }
267 
268  /*
269  * Begin receiving block. Extract prefix data.
270  */
272  {
273  // Precondition
274  if (recvSize_ != 0) {
275  UTIL_THROW("Error: Previous receive block not completely unpacked");
276  }
277 
278  // Store address of begining of block
279  recvBlockBegin_ = recvPtr_;
280 
281  // Extract passport data
282  int* recvBuffPtr = (int *)recvPtr_;
283  recvSize_ = *recvBuffPtr;
284  int recvBytes = *(recvBuffPtr + 1);
285  recvType_ = *(recvBuffPtr + 2);
286  bool isComplete = (bool) *(recvBuffPtr + 3);
287 
288  // Calculate address of expected end of recv block
289  recvBlockEnd_ = recvBlockBegin_ + recvBytes;
290 
291  // Set recvPtr to beginning of first item to be unpacked
292  recvBuffPtr += 4;
293  recvPtr_ = (char *)recvBuffPtr;
294 
295  return isComplete;
296  }
297 
298  /*
299  * Finalize receiving block, check consistency.
300  */
302  {
303  if (recvSize_ != 0) {
304  UTIL_THROW("Error: Recv counter != 0 at end of block");
305  }
306  if (recvPtr_ != recvBlockEnd_) {
307  UTIL_THROW("Error: Inconsistent recv cursor at end of block");
308  }
309  recvBlockBegin_ = 0;
310  recvBlockEnd_ = 0;
311  recvSize_ = 0;
312  recvType_ = NONE;
313  }
314 
315  #ifdef UTIL_MPI
316  /*
317  * Send and receive buffer.
318  */
319  void Buffer::sendRecv(MPI::Intracomm& comm, int source, int dest)
320  {
321 
322  MPI::Request request[2];
323  int sendBytes = 0;
324  int myRank = comm.Get_rank();
325  int comm_size = comm.Get_size();
326 
327  // Preconditions
328  if (dest > comm_size - 1 || dest < 0) {
329  UTIL_THROW("Destination rank out of bounds");
330  }
331  if (source > comm_size - 1 || source < 0) {
332  UTIL_THROW("Source rank out of bounds");
333  }
334  if (dest == myRank) {
335  UTIL_THROW("Destination and my rank are identical");
336  }
337  if (source == myRank) {
338  UTIL_THROW("Source and my rank are identical");
339  }
340 
341  // Start nonblocking receive.
342  request[0] = comm.Irecv(recvBufferBegin_, bufferCapacity_ ,
343  MPI::CHAR, source, 5);
344 
345  // Start nonblocking send.
346  sendBytes = sendPtr_ - sendBufferBegin_;
347  request[1] = comm.Isend(sendBufferBegin_, sendBytes , MPI::CHAR, dest, 5);
348 
349  // Wait for completion of receive.
350  request[0].Wait();
351  recvPtr_ = recvBufferBegin_;
352 
353  // Wait for completion of send.
354  request[1].Wait();
355 
356  // Update statistics.
357  if (sendBytes > maxSendLocal_) {
358  maxSendLocal_ = sendBytes;
359  }
360  }
361 
362  /*
363  * Send a buffer.
364  */
365  void Buffer::send(MPI::Intracomm& comm, int dest)
366  {
367  MPI::Request request;
368  int sendBytes = 0;
369  int comm_size = comm.Get_size();
370  int myRank = comm.Get_rank();
371 
372  // Preconditions
373  if (dest > comm_size - 1 || dest < 0) {
374  UTIL_THROW("Destination rank out of bounds");
375  }
376  if (dest == myRank) {
377  UTIL_THROW("Source and destination identical");
378  }
379 
380  sendBytes = sendPtr_ - sendBufferBegin_;
381  request = comm.Isend(sendBufferBegin_, sendBytes, MPI::CHAR, dest, 5);
382  request.Wait();
383 
384  // Update statistics.
385  if (sendBytes > maxSendLocal_) {
386  maxSendLocal_ = sendBytes;
387  }
388  }
389 
390  /*
391  * Receive a buffer.
392  */
393  void Buffer::recv(MPI::Intracomm& comm, int source)
394  {
395  MPI::Request request;
396  int myRank = comm.Get_rank();
397  int comm_size = comm.Get_size();
398 
399  // Preconditons
400  if (source > comm_size - 1 || source < 0) {
401  UTIL_THROW("Source rank out of bounds");
402  }
403  if (source == myRank) {
404  UTIL_THROW("Source and destination identical");
405  }
406 
407  request = comm.Irecv(recvBufferBegin_, bufferCapacity_,
408  MPI::CHAR, source, 5);
409  request.Wait();
410  recvType_ = NONE;
411  recvPtr_ = recvBufferBegin_;
412  }
413 
414  /*
415  * Broadcast a buffer.
416  */
417  void Buffer::bcast(MPI::Intracomm& comm, int source)
418  {
419  int comm_size = comm.Get_size();
420  int myRank = comm.Get_rank();
421  if (source > comm_size - 1 || source < 0) {
422  UTIL_THROW("Source rank out of bounds");
423  }
424 
425  int sendBytes;
426  if (myRank == source) {
427  sendBytes = sendPtr_ - sendBufferBegin_;
428  comm.Bcast(&sendBytes, 1, MPI::INT, source);
429  comm.Bcast(sendBufferBegin_, sendBytes, MPI::CHAR, source);
430  sendPtr_ = sendBufferBegin_;
431  sendType_ = NONE;
432  } else {
433  comm.Bcast(&sendBytes, 1, MPI::INT, source);
434  comm.Bcast(recvBufferBegin_, sendBytes, MPI::CHAR, source);
435  recvPtr_ = recvBufferBegin_;
436  recvType_ = NONE;
437  }
438  if (sendBytes > maxSendLocal_) {
439  maxSendLocal_ = sendBytes;
440  }
441 
442  }
443  #endif
444 
445  /*
446  * Compute maximum message size among all processors.
447  */
448  #ifdef UTIL_MPI
449  void Buffer::computeStatistics(MPI::Intracomm& comm)
450  #else
452  #endif
453  {
454  #ifdef UTIL_MPI
455  int globalSendMax;
456  comm.Allreduce(&maxSendLocal_, &globalSendMax, 1, MPI::INT, MPI::MAX);
457  maxSend_.set(globalSendMax);
458  #else
459  maxSend_.set(maxSendLocal_);
460  #endif
461  }
462 
463  /*
464  * Clear any accumulated usage statistics.
465  */
467  {
468  maxSendLocal_ = 0;
469  maxSend_.unset();
470  }
471 
472  /*
473  * Output statistics.
474  */
475  void Buffer::outputStatistics(std::ostream& out)
476  {
477 
478  out << std::endl;
479  out << "Buffer" << std::endl;
480  out << "sendBytes: max, capacity "
481  << Int(maxSend_.value(), 10)
482  << Int(bufferCapacity_, 10)
483  << std::endl;
484  }
485 
486  /*
487  * Number of items packed thus far in current data send block.
488  */
489  int Buffer::sendSize() const
490  { return sendSize_; }
491 
492  /*
493  * Number of unread items left in the current receive block.
494  */
495  int Buffer::recvSize() const
496  { return recvSize_; }
497 
498  /*
499  * Has this buffer been allocated?
500  */
501  bool Buffer::isAllocated() const
502  { return (bufferCapacity_ > 0); }
503  #endif
504 
505 }
bool beginRecvBlock()
Begin to receive a block from the recv buffer.
Definition: Buffer.cpp:271
void sendRecv(MPI::Intracomm &comm, int source, int dest)
Receive from processor send and send to processor recv.
Definition: Buffer.cpp:319
int ghostCapacity() const
Maximum number of ghost atoms for which space is available.
Definition: Buffer.cpp:152
void readParameters(std::istream &in)
Read capacity and allocate buffers.
Definition: Buffer.cpp:86
void outputStatistics(std::ostream &out)
Output statistics.
Definition: Buffer.cpp:475
void set(const T &value)
Set the value and mark as set.
Definition: Setable.h:107
void beginSendBlock(int sendType)
Initialize a data block.
Definition: Buffer.cpp:223
static int packedAtomSize()
Return max size of an atom packed for exchange, in bytes.
Parallel domain decomposition (DD) MD simulation.
void allocate(int atomCapacity, int ghostCapacity)
Allocate send and recv buffers.
Definition: Buffer.cpp:62
Saving / output archive for binary ostream.
const T & value() const
Return value (if set).
Definition: Setable.h:132
#define UTIL_THROW(msg)
Macro for throwing an Exception, reporting function, file and line number.
Definition: global.h:51
virtual void save(Serializable::OArchive &ar)
Save internal state to an archive.
Definition: Buffer.cpp:137
Utility classes for scientific computation.
Definition: accumulators.mod:1
Wrapper for an int, for formatted ostream output.
Definition: Int.h:36
int recvSize() const
Number of unread items left in current recv block.
Definition: Buffer.cpp:495
void unset()
Unset the value (mark as unknown).
Definition: Setable.h:116
void send(MPI::Intracomm &comm, int dest)
Send a complete buffer.
Definition: Buffer.cpp:365
virtual void computeStatistics(MPI::Intracomm &comm)
Compute statistics (reduce from all processors).
Definition: Buffer.cpp:449
void recv(MPI::Intracomm &comm, int source)
Receive a buffer.
Definition: Buffer.cpp:393
Saving archive for binary istream.
void endSendBlock(bool isComplete=true)
Finalize a block in the send buffer.
Definition: Buffer.cpp:247
void setClassName(const char *className)
Set class name string.
bool isInitialized() const
Has this Buffer been initialized?
Definition: Buffer.cpp:158
void bcast(MPI::Intracomm &comm, int source)
Broadcast a buffer.
Definition: Buffer.cpp:417
Buffer()
Constructor.
Definition: Buffer.cpp:22
static int packedGhostSize()
Return size of ghost atom packed for communication, in bytes.
An object that can read multiple parameters from file.
void clearSendBuffer()
Clear the send buffer.
Definition: Buffer.cpp:212
void endRecvBlock()
Finish processing a block in the recv buffer.
Definition: Buffer.cpp:301
int atomCapacity() const
Maximum number of atoms for which space is available.
Definition: Buffer.cpp:146
bool isAllocated() const
Has memory been allocated for this Buffer?
Definition: Buffer.cpp:501
void clearStatistics()
Clear any accumulated usage statistics.
Definition: Buffer.cpp:466
virtual ~Buffer()
Destructor.
Definition: Buffer.cpp:49
int sendSize() const
Number of items in current send block.
Definition: Buffer.cpp:489
virtual void loadParameters(Serializable::IArchive &ar)
Load internal state from an archive.
Definition: Buffer.cpp:112