SHM
Shared-memory based Handy-communication Manager
shm_service.hpp
Go to the documentation of this file.
1 
13 #ifndef __SHM_SERVICE_LIB_H__
14 #define __SHM_SERVICE_LIB_H__
15 
16 #include <string>
17 #include <thread>
18 #include <memory>
19 #include "shm_base.hpp"
20 
21 namespace irlab
22 {
23 
24 namespace shm
25 {
26 
27 // ****************************************************************************
33 // ****************************************************************************
34 template <class Req, class Res>
36 {
37 public:
38  ServiceServer(std::string name, Res (*input_func)(Req request), PERM perm = DEFAULT_PERM);
39  ~ServiceServer();
40 
41 private:
42  void initializeExclusiveAccess();
43  void loop();
44  static void called_loop(ServiceServer& ref)
45  {
46  ref.loop();
47  }
48 
49  Res (*func)(Req request);
50  pthread_t thread;
51  volatile bool shutdown_requested;
52 
53  std::string shm_name;
54  PERM shm_perm;
55  SharedMemory *shared_memory;
56 
57  uint8_t *memory_ptr;
58 
59  pthread_mutex_t *request_mutex;
60  pthread_cond_t *request_condition;
61  uint64_t *request_timestamp_usec;
62  Req *request_ptr;
63  pthread_mutex_t *response_mutex;
64  pthread_cond_t *response_condition;
65  uint64_t *response_timestamp_usec;
66  Res *response_ptr;
67 
68  uint64_t current_request_timestamp_usec;
69 };
70 
71 // ****************************************************************************
76 // ****************************************************************************
77 template <class Req, class Res>
79 {
80 public:
81  ServiceClient(std::string name);
82  ~ServiceClient();
83 
84  bool call(Req request, Res *response);
85  bool call(Req request, Res *response, unsigned long timeout_usec);
86 
87 private:
88  std::string shm_name;
89  SharedMemory *shared_memory;
90 
91  uint8_t *memory_ptr;
92 
93  pthread_mutex_t *request_mutex;
94  pthread_cond_t *request_condition;
95  uint64_t *request_timestamp_usec;
96  Req *request_ptr;
97  pthread_mutex_t *response_mutex;
98  pthread_cond_t *response_condition;
99  uint64_t *response_timestamp_usec;
100  Res *response_ptr;
101 
102  uint64_t current_response_timestamp_usec;
103 };
104 
105 // ****************************************************************************
106 // 関数定義
107 // (テンプレートクラス内の関数の定義はコンパイル時に実体化するのでヘッダに書く)
108 // ****************************************************************************
109 template <class Req, class Res>
110 ServiceServer<Req, Res>::ServiceServer(std::string name, Res (*input_func)(Req request), PERM perm)
111 : func(input_func)
112 , shutdown_requested(false)
113 , shm_name(name)
114 , shm_perm(perm)
115 , shared_memory(nullptr)
116 , memory_ptr(nullptr)
117 {
118  if (!std::is_standard_layout<Req>::value || !std::is_standard_layout<Res>::value)
119  {
120  throw std::runtime_error("shm::ServiceServer: Be setted not POD class!");
121  }
122 
123  shared_memory = new SharedMemoryPosix(shm_name, O_RDWR|O_CREAT, shm_perm);
124  shared_memory->connect( (sizeof(pthread_mutex_t)+sizeof(pthread_cond_t)+sizeof(uint64_t)) * 2 + sizeof(Req) + sizeof(Res));
125  if (shared_memory->isDisconnected())
126  {
127  throw std::runtime_error("shm::Publisher: Cannot get memory!");
128  }
129 
130  uint8_t *data_ptr = shared_memory->getPtr();
131  memory_ptr = data_ptr;
132  request_mutex = reinterpret_cast<pthread_mutex_t *>(data_ptr);
133  data_ptr += sizeof(pthread_mutex_t);
134  request_condition = reinterpret_cast<pthread_cond_t *>(data_ptr);
135  data_ptr += sizeof(pthread_cond_t);
136  request_timestamp_usec = reinterpret_cast<uint64_t *>(data_ptr);
137  data_ptr += sizeof(uint64_t);
138  request_ptr = reinterpret_cast<Req *>(data_ptr);
139  data_ptr += sizeof(Req);
140  response_mutex = reinterpret_cast<pthread_mutex_t *>(data_ptr);
141  data_ptr += sizeof(pthread_mutex_t);
142  response_condition = reinterpret_cast<pthread_cond_t *>(data_ptr);
143  data_ptr += sizeof(pthread_cond_t);
144  response_timestamp_usec = reinterpret_cast<uint64_t *>(data_ptr);
145  data_ptr += sizeof(uint64_t);
146  response_ptr = reinterpret_cast<Res *>(data_ptr);
147 
148  initializeExclusiveAccess();
149 
150  struct timespec ts;
151  clock_gettime(CLOCK_MONOTONIC_RAW, &ts);
152  *request_timestamp_usec = ((uint64_t) ts.tv_sec * 1000000L) + ((uint64_t) ts.tv_nsec / 1000L);
153  *response_timestamp_usec = *request_timestamp_usec;
154  current_request_timestamp_usec = *request_timestamp_usec;
155 
156  pthread_create(&thread, NULL, reinterpret_cast<void* (*)(void*)>(&ServiceServer<Req, Res>::called_loop), this);
157 }
158 
159 template <class Req, class Res>
160 ServiceServer<Req, Res>::~ServiceServer()
161 {
162  // Request graceful shutdown
163  shutdown_requested = true;
164 
165  // Wake up the thread
166  pthread_cond_broadcast(request_condition);
167 
168  // Wait for thread to finish gracefully, then force if needed
169  pthread_cancel(thread);
170  pthread_join(thread, nullptr);
171 
172  shared_memory->disconnect();
173  if (shared_memory != nullptr)
174  {
175  delete shared_memory;
176  }
177 }
178 
179 template <class Req, class Res>
180 void
181 ServiceServer<Req, Res>::initializeExclusiveAccess()
182 {
183  pthread_condattr_t request_cond_attr;
184  pthread_condattr_init(&request_cond_attr);
185  pthread_condattr_setpshared(&request_cond_attr, PTHREAD_PROCESS_SHARED);
186  pthread_cond_init(request_condition, &request_cond_attr);
187  pthread_condattr_destroy(&request_cond_attr);
188 
189  pthread_mutexattr_t request_m_attr;
190  pthread_mutexattr_init(&request_m_attr);
191  pthread_mutexattr_setpshared(&request_m_attr, PTHREAD_PROCESS_SHARED);
192  pthread_mutex_init(request_mutex, &request_m_attr);
193  pthread_mutexattr_destroy(&request_m_attr);
194 
195  pthread_condattr_t response_cond_attr;
196  pthread_condattr_init(&response_cond_attr);
197  pthread_condattr_setpshared(&response_cond_attr, PTHREAD_PROCESS_SHARED);
198  pthread_cond_init(response_condition, &response_cond_attr);
199  pthread_condattr_destroy(&response_cond_attr);
200 
201  pthread_mutexattr_t response_m_attr;
202  pthread_mutexattr_init(&response_m_attr);
203  pthread_mutexattr_setpshared(&response_m_attr, PTHREAD_PROCESS_SHARED);
204  pthread_mutex_init(response_mutex, &response_m_attr);
205  pthread_mutexattr_destroy(&response_m_attr);
206 }
207 
208 template <class Req, class Res>
209 void
210 ServiceServer<Req, Res>::loop()
211 {
212  // Pre-allocate objects to avoid repeated allocation/deallocation
213  std::unique_ptr<Req> current_request_ptr = std::make_unique<Req>();
214  std::unique_ptr<Res> result_ptr = std::make_unique<Res>();
215 
216  while (!shutdown_requested)
217  {
218  // Fix race condition: Check timestamp inside mutex
219  pthread_mutex_lock(request_mutex);
220  while (current_request_timestamp_usec >= *request_timestamp_usec && !shutdown_requested)
221  {
222  // Wait on the condvar while holding the mutex
223  pthread_cond_wait(request_condition, request_mutex);
224  }
225 
226  // Check for shutdown request
227  if (shutdown_requested)
228  {
229  pthread_mutex_unlock(request_mutex);
230  break;
231  }
232 
233  // Update current timestamp and copy request data while holding mutex
234  current_request_timestamp_usec = *request_timestamp_usec;
235  *current_request_ptr = *request_ptr;
236  pthread_mutex_unlock(request_mutex);
237 
238  // Process request outside of mutex to avoid blocking other clients
239  *result_ptr = func(*current_request_ptr);
240 
241  // Check again for shutdown before responding
242  if (shutdown_requested)
243  {
244  break;
245  }
246 
247  // Update response under mutex protection
248  pthread_mutex_lock(response_mutex);
249  *response_ptr = *result_ptr;
250  struct timespec ts;
251  clock_gettime(CLOCK_MONOTONIC_RAW, &ts);
252  *response_timestamp_usec = ((uint64_t) ts.tv_sec * 1000000L) + ((uint64_t) ts.tv_nsec / 1000L);
253  pthread_mutex_unlock(response_mutex);
254 
255  pthread_cond_broadcast(response_condition);
256  }
257 }
258 
259 
260 template <class Req, class Res>
261 ServiceClient<Req, Res>::ServiceClient(std::string name)
262 : shm_name(name)
263 , shared_memory(nullptr)
264 {
265  if (!std::is_standard_layout<Req>::value || !std::is_standard_layout<Res>::value)
266  {
267  throw std::runtime_error("shm::ServiceClient: Be setted not POD class!");
268  }
269  shared_memory = new SharedMemoryPosix(shm_name, O_RDWR, static_cast<PERM>(0));
270 
271  struct timespec ts;
272  clock_gettime(CLOCK_MONOTONIC_RAW, &ts);
273  current_response_timestamp_usec = ((uint64_t) ts.tv_sec * 1000000L) + ((uint64_t) ts.tv_nsec / 1000L);
274 }
275 
276 template <class Req, class Res>
277 ServiceClient<Req, Res>::~ServiceClient()
278 {
279  if (shared_memory != nullptr)
280  {
281  delete shared_memory;
282  }
283 }
284 
285 template <class Req, class Res>
286 bool
287 ServiceClient<Req, Res>::call(Req request, Res *response)
288 {
289  // Default timeout of 5 seconds
290  return call(request, response, 5000000);
291 }
292 
293 template <class Req, class Res>
294 bool
295 ServiceClient<Req, Res>::call(Req request, Res *response, unsigned long timeout_usec)
296 {
297  // Check the service shared memory existence.
298  if (shared_memory->isDisconnected())
299  {
300  shared_memory->connect();
301  if (shared_memory->isDisconnected())
302  {
303  return false;
304  }
305  uint8_t *data_ptr = shared_memory->getPtr();
306  memory_ptr = data_ptr;
307  request_mutex = reinterpret_cast<pthread_mutex_t *>(data_ptr);
308  data_ptr += sizeof(pthread_mutex_t);
309  request_condition = reinterpret_cast<pthread_cond_t *>(data_ptr);
310  data_ptr += sizeof(pthread_cond_t);
311  request_timestamp_usec = reinterpret_cast<uint64_t *>(data_ptr);
312  data_ptr += sizeof(uint64_t);
313  request_ptr = reinterpret_cast<Req *>(data_ptr);
314  data_ptr += sizeof(Req);
315  response_mutex = reinterpret_cast<pthread_mutex_t *>(data_ptr);
316  data_ptr += sizeof(pthread_mutex_t);
317  response_condition = reinterpret_cast<pthread_cond_t *>(data_ptr);
318  data_ptr += sizeof(pthread_cond_t);
319  response_timestamp_usec = reinterpret_cast<uint64_t *>(data_ptr);
320  data_ptr += sizeof(uint64_t);
321  response_ptr = reinterpret_cast<Res *>(data_ptr);
322  }
323 
324  // Set request to shared memory
325  *request_ptr = request;
326  struct timespec ts;
327  clock_gettime(CLOCK_MONOTONIC_RAW, &ts);
328  *request_timestamp_usec = ((uint64_t) ts.tv_sec * 1000000L) + ((uint64_t) ts.tv_nsec / 1000L);
329 
330  pthread_cond_broadcast(request_condition);
331 
332  // Simple timeout implementation using loop with small delays
333  uint64_t start_time = *request_timestamp_usec;
334  uint64_t end_time = start_time + timeout_usec;
335 
336  while (current_response_timestamp_usec >= *response_timestamp_usec)
337  {
338  // Check timeout
339  clock_gettime(CLOCK_MONOTONIC_RAW, &ts);
340  uint64_t current_time = ((uint64_t) ts.tv_sec * 1000000L) + ((uint64_t) ts.tv_nsec / 1000L);
341  if (current_time > end_time)
342  {
343  return false; // Timeout
344  }
345 
346  // Wait on the condvar with short timeout
347  pthread_mutex_lock(response_mutex);
348  struct timespec wait_time;
349  wait_time.tv_sec = 0;
350  wait_time.tv_nsec = 10000000; // 10ms
351  pthread_cond_timedwait(response_condition, response_mutex, &wait_time);
352  pthread_mutex_unlock(response_mutex);
353  }
354  current_response_timestamp_usec = *response_timestamp_usec;
355 
356  // Get response from shared memory
357  *response = *response_ptr;
358 
359  return true;
360 }
361 
362 }
363 
364 }
365 
366 #endif //__SHM_SERVICE_LIB_H__
共有メモリからトピックを取得する購読者を表現するクラス
Definition: shm_service.hpp:79
共有メモリで受信したリクエストからレスポンスを返すサーバーを表現するクラス
Definition: shm_service.hpp:36
Class that abstracts the method of accessing shared memory.
Definition: shm_base.hpp:86
Basic class definitions for accessing shared memory, ring buffers, etc. The notation is complianted R...