SHM
Shared-memory based Handy-communication Manager
shm_pub_sub.hpp
Go to the documentation of this file.
1 
15 #ifndef __SHM_PS_LIB_H__
16 #define __SHM_PS_LIB_H__
17 
18 #include <iostream>
19 #include <limits>
20 #include <string>
21 #include <regex>
22 #include <stdexcept>
23 #include <mutex>
24 extern "C" {
25 #include <sys/mman.h>
26 #include <sys/stat.h>
27 #include <fcntl.h>
28 #include <sys/time.h>
29 #include <pthread.h>
30 #include <unistd.h>
31 #include <sys/types.h>
32 #include <sys/ipc.h>
33 #include <sys/shm.h>
34 }
35 #include "shm_base.hpp"
36 
37 namespace irlab
38 {
39 
40 namespace shm
41 {
42 
43 // ****************************************************************************
57 // ****************************************************************************
58 template <typename T>
59 class Publisher
60 {
61 public:
62  Publisher(std::string name = "", int buffer_num = 3, PERM perm = DEFAULT_PERM);
63  ~Publisher() = default;
64 
65  // コピーは禁止
66  Publisher(const Publisher&) = delete;
67  Publisher& operator=(const Publisher&) = delete;
68 
69  // ムーブコンストラクタ:ポインタを奪い、元を nullptr に
70  Publisher(Publisher&& other) noexcept = default;
71 
72  void publish(const T& data);
73 
74 private:
75  std::string shm_name;
76  int shm_buf_num;
77  PERM shm_perm;
78  std::unique_ptr<SharedMemory> shared_memory;
79  std::unique_ptr<RingBuffer> ring_buffer;
80 
81  size_t data_size;
82 };
83 
84 // ****************************************************************************
92 // ****************************************************************************
93 template <typename T>
95 {
96 public:
97  Subscriber(std::string name = "");
98  ~Subscriber() = default;
99 
100  // コピーは禁止
101  Subscriber(const Subscriber&) = delete;
102  Subscriber& operator=(const Subscriber&) = delete;
103 
104  // ムーブコンストラクタ:ポインタを奪い、元を nullptr に
105  Subscriber(Subscriber&& other) noexcept = default;
106 
107  const T subscribe(bool *state);
108  bool waitFor(uint64_t timeout_usec);
109  void setDataExpiryTime_us(uint64_t time_us);
110 
111 private:
112  std::string shm_name;
113  std::unique_ptr<SharedMemory> shared_memory;
114  std::unique_ptr<RingBuffer> ring_buffer;
115  int current_reading_buffer;
116  uint64_t data_expiry_time_us;
117 };
118 
119 // ****************************************************************************
120 // Function Definications
121 // 関数定義
122 // (テンプレートクラス内の関数の定義はコンパイル時に実体化するのでヘッダに書く)
123 // ****************************************************************************
124 
137 template <typename T>
138 Publisher<T>::Publisher(std::string name, int buffer_num, PERM perm)
139 : shm_name(name)
140 , shm_buf_num(buffer_num)
141 , shm_perm(perm)
142 , shared_memory(nullptr)
143 , ring_buffer(nullptr)
144 , data_size(sizeof(T))
145 {
146  if (!std::is_standard_layout<T>::value)
147  {
148  throw std::runtime_error("shm::Publisher: Be setted not POD class!");
149  }
150 
151  if (name.empty())
152  {
153  throw std::runtime_error("shm::Publisher: Please set name!");
154  }
155 
156  try {
157  shared_memory = std::make_unique<SharedMemoryPosix>(shm_name, O_RDWR | O_CREAT, shm_perm);
158  } catch (const std::runtime_error& e) {
159  throw std::runtime_error("shm::Publisher: " + std::string(e.what()));
160  }
161  shared_memory->connect(RingBuffer::getSize(sizeof(T), shm_buf_num));
162 
163  if (shared_memory->isDisconnected())
164  {
165  throw std::runtime_error("shm::Publisher: Cannot get memory!");
166  }
167 
168  ring_buffer = std::make_unique<RingBuffer>(shared_memory->getPtr(), sizeof(T), shm_buf_num);
169 }
170 
171 
181 template <typename T>
182 void
183 Publisher<T>::publish(const T& data)
184 {
185  int oldest_buffer = ring_buffer->getOldestBufferNum();
186  for (size_t i = 0; i < 10; i++)
187  {
188  if (ring_buffer->allocateBuffer(oldest_buffer))
189  {
190  break;
191  }
192  usleep(1000); // Wait for 1ms
193  oldest_buffer = ring_buffer->getOldestBufferNum();
194  }
195 
196  (reinterpret_cast<T *>(ring_buffer->getDataList()))[oldest_buffer] = data;
197 
198  struct timespec t;
199  clock_gettime(CLOCK_MONOTONIC_RAW, &t);
200  ring_buffer->setTimestamp_us(((uint64_t) t.tv_sec * 1000000L) + ((uint64_t) t.tv_nsec / 1000L), oldest_buffer);
201 
202  ring_buffer->signal();
203 }
204 
205 
214 template <typename T>
215 Subscriber<T>::Subscriber(std::string name)
216 : shm_name(name)
217 , shared_memory(nullptr)
218 , ring_buffer(nullptr)
219 , current_reading_buffer(0)
220 , data_expiry_time_us(2000000)
221 {
222  if (!std::is_standard_layout<T>::value)
223  {
224  throw std::runtime_error("shm::Subscriber: Be setted not POD class!");
225  }
226 
227  if (name.empty())
228  {
229  throw std::runtime_error("shm::Subscriber: Please set name!");
230  }
231 
232  try {
233  shared_memory = std::make_unique<SharedMemoryPosix>(shm_name, O_RDWR, static_cast<PERM>(0));
234  } catch (const std::runtime_error& e) {
235  throw std::runtime_error("shm::Subscriber: " + std::string(e.what()));
236  }
237 }
238 
239 
249 template <typename T>
250 const T
251 Subscriber<T>::subscribe(bool *is_success)
252 {
253  if (shared_memory->isDisconnected())
254  {
255  if (ring_buffer != nullptr)
256  {
257  ring_buffer.reset();
258  }
259  shared_memory->connect();
260  if (shared_memory->isDisconnected())
261  {
262  *is_success = false;
263  return T();
264  }
265  try {
266  if (shared_memory->getPtr() == nullptr) {
267  *is_success = false;
268  return T();
269  }
270  ring_buffer = std::make_unique<RingBuffer>(shared_memory->getPtr());
271  } catch (const std::bad_alloc& e)
272  {
273  *is_success = false;
274  return T();
275  }
276  ring_buffer->setDataExpiryTime_us(data_expiry_time_us);
277  }
278  int newest_buffer = ring_buffer->getNewestBufferNum();
279  if (newest_buffer < 0)
280  {
281  *is_success = false;
282  return (reinterpret_cast<T*>(ring_buffer->getDataList()))[current_reading_buffer];
283  }
284  *is_success = true;
285  current_reading_buffer = newest_buffer;
286  return (reinterpret_cast<T*>(ring_buffer->getDataList()))[current_reading_buffer];
287 }
288 
289 
290 template <typename T>
291 bool
292 Subscriber<T>::waitFor(uint64_t timeout_usec)
293 {
294  if (shared_memory->isDisconnected())
295  {
296  if (ring_buffer != nullptr)
297  {
298  ring_buffer.reset();
299  }
300  shared_memory->connect();
301  if (shared_memory->isDisconnected())
302  {
303  return false;
304  }
305  ring_buffer = std::make_unique<RingBuffer>(shared_memory->getPtr());
306  ring_buffer->setDataExpiryTime_us(data_expiry_time_us);
307  }
308 
309  return ring_buffer->waitFor(timeout_usec);
310 }
311 
312 
313 template <typename T>
314 void
315 Subscriber<T>::setDataExpiryTime_us(uint64_t time_us)
316 {
317  data_expiry_time_us = time_us;
318  if (ring_buffer != nullptr)
319  {
320  ring_buffer->setDataExpiryTime_us(data_expiry_time_us);
321  }
322 }
323 
324 
325 }
326 
327 }
328 
329 #endif /* __SHM_PS_LIB_H__ */
Class representing a publisher that outputs topics to shared memory This class is used to output the ...
Definition: shm_pub_sub.hpp:60
void publish(const T &data)
Publish a topic None Writes the topic to the buffer with the oldest timestamp and updates the timesta...
Publisher(std::string name="", int buffer_num=3, PERM perm=DEFAULT_PERM)
Constructor Shared-memory name Number of Buffers Permission infomation None Create shared memory obje...
Class representing a subscriber that retrieves topics from shared memory This class is used to load a...
Definition: shm_pub_sub.hpp:95
const T subscribe(bool *state)
Subscribe a topic Const reference to the loaded topic. The topic with the most recent timestamp is lo...
Subscriber(std::string name="")
Constructor Shared-memory name None Access to shared memory.
Basic class definitions for accessing shared memory, ring buffers, etc. The notation is complianted R...