SHM
Shared-memory based Handy-communication Manager
ring_buffer.cpp
1 #include <shm_base.hpp>
2 #include <condition_variable>
3 #include <limits>
4 
5 namespace irlab
6 {
7 
8 namespace shm
9 {
10 
11 size_t
12 RingBuffer::getSize(size_t element_size, int buffer_num)
13 {
14  return (sizeof(size_t) + sizeof(std::mutex) + sizeof(std::condition_variable) + sizeof(size_t) + (sizeof(std::atomic<uint64_t>)+element_size)*buffer_num);
15 }
16 
21 RingBuffer::RingBuffer(unsigned char* first_ptr, size_t size, int buffer_num)
22 : memory_ptr(first_ptr)
23 , timestamp_us(0)
24 , data_expiry_time_us(2000000)
25 {
26  unsigned char* temp_ptr = memory_ptr;
27  mutex = reinterpret_cast<pthread_mutex_t *>(temp_ptr);
28  temp_ptr += sizeof(pthread_mutex_t);
29  condition = reinterpret_cast<pthread_cond_t *>(temp_ptr);
30  temp_ptr += sizeof(pthread_cond_t);
31  element_size = reinterpret_cast<size_t *>(temp_ptr);
32  if (size != 0)
33  {
34  *element_size = size;
35  }
36  temp_ptr += sizeof(size_t);
37  buf_num = reinterpret_cast<size_t *>(temp_ptr);
38  if (buffer_num != 0)
39  {
40  *buf_num = buffer_num;
41  }
42  temp_ptr += sizeof(size_t);
43  timestamp_list = reinterpret_cast<std::atomic<uint64_t> *>(temp_ptr);
44  temp_ptr += sizeof(std::atomic<uint64_t>) * *buf_num;
45  data_list = temp_ptr;
46 
47  if (buffer_num != 0)
48  {
49  initializeExclusiveAccess();
50  }
51 }
52 
53 RingBuffer::~RingBuffer()
54 {
55 }
56 
57 
58 void
59 RingBuffer::initializeExclusiveAccess()
60 {
61  pthread_condattr_t cond_attr;
62  pthread_condattr_init(&cond_attr);
63  pthread_condattr_setpshared(&cond_attr, PTHREAD_PROCESS_SHARED);
64  pthread_cond_init(condition, &cond_attr);
65  pthread_condattr_destroy(&cond_attr);
66 
67  pthread_mutexattr_t m_attr;
68  pthread_mutexattr_init(&m_attr);
69  pthread_mutexattr_setpshared(&m_attr, PTHREAD_PROCESS_SHARED);
70  pthread_mutex_init(mutex, &m_attr);
71  pthread_mutexattr_destroy(&m_attr);
72 }
73 
74 
75 size_t
76 RingBuffer::getElementSize() const
77 {
78  return *element_size;
79 }
80 
81 unsigned char*
82 RingBuffer::getDataList()
83 {
84  return data_list;
85 }
86 
91 const uint64_t
93 {
94  return timestamp_us;
95 }
96 
97 
102 void
103 RingBuffer::setTimestamp_us(uint64_t input_time_us, int buffer_num)
104 {
105  timestamp_list[buffer_num] = input_time_us;
106 }
107 
108 
109 int
110 RingBuffer::getNewestBufferNum()
111 {
112  uint64_t timestamp_buf = 0;
113  size_t newest_buffer = -1;
114  bool found_valid_timestamp = false;
115 
116  for (size_t i = 0; i < *buf_num; i++)
117  {
118  if (timestamp_list[i] != std::numeric_limits<uint64_t>::max() && timestamp_list[i] > 0 && timestamp_list[i] >= timestamp_buf)
119  {
120  timestamp_buf = timestamp_list[i];
121  newest_buffer = i;
122  found_valid_timestamp = true;
123  }
124  }
125 
126  // If no valid timestamp found, return -1
127  if (!found_valid_timestamp)
128  {
129  return -1;
130  }
131 
132  timestamp_us = timestamp_buf;
133 
134  struct timespec ts;
135  clock_gettime(CLOCK_MONOTONIC_RAW, &ts);
136  uint64_t current_time_us = ((uint64_t) ts.tv_sec * 1000000L) + ((uint64_t) ts.tv_nsec / 1000L);
137  if (current_time_us - timestamp_us < data_expiry_time_us)
138  {
139  return newest_buffer;
140  }
141  return -1;
142 }
143 
144 
145 int
146 RingBuffer::getOldestBufferNum()
147 {
148  if (timestamp_us == 0)
149  {
150  timestamp_us = UINT64_MAX;
151  }
152  uint64_t timestamp_buf = timestamp_list[0];
153  size_t oldest_buffer = 0;
154  for (size_t i = 0; i < *buf_num; i++)
155  {
156  if (timestamp_list[i] < timestamp_buf)
157  {
158  timestamp_buf = timestamp_list[i];
159  oldest_buffer = i;
160  }
161  }
162 
163  timestamp_us = timestamp_buf;
164  return oldest_buffer;
165 }
166 
167 
168 bool
169 RingBuffer::allocateBuffer(int buffer_num)
170 {
171  if (buffer_num < 0 || buffer_num >= *buf_num)
172  {
173  return false;
174  }
175  uint64_t temp_buffer_timestamp = timestamp_list[buffer_num].load(std::memory_order_acquire);
176  if (temp_buffer_timestamp == std::numeric_limits<uint64_t>::max())
177  {
178  // The buffer is already allocated
179  return false;
180  }
181  return timestamp_list[buffer_num].compare_exchange_weak(temp_buffer_timestamp,
182  std::numeric_limits<uint64_t>::max(),
183  std::memory_order_relaxed);
184 }
185 
186 void
187 RingBuffer::signal()
188 {
189  pthread_cond_broadcast(condition);
190 }
191 
197 bool
198 RingBuffer::waitFor(uint64_t timeout_usec)
199 {
200  struct timespec ts;
201  long sec = static_cast<long>(timeout_usec / 1000000);
202  long mod_sec = static_cast<long>(timeout_usec % 1000000);
203  clock_gettime(CLOCK_REALTIME, &ts);
204  ts.tv_sec += sec;
205  ts.tv_nsec+= mod_sec * 1000;
206  if (1000000000 <= ts.tv_nsec)
207  {
208  ts.tv_nsec -= 1000000000;
209  ts.tv_sec += 1;
210  }
211 
212  while (!isUpdated())
213  {
214  // Wait on the condvar
215  pthread_mutex_lock(mutex);
216  int ret = pthread_cond_timedwait(condition, mutex, &ts);
217  pthread_mutex_unlock(mutex);
218  if (ret == ETIMEDOUT)
219  {
220  return false;
221  }
222  }
223 
224  return true;
225 }
226 
227 
233 bool
235 {
236  for (size_t i = 0; i < *buf_num; i++)
237  {
238  if (timestamp_us < timestamp_list[i])
239  {
240  return true;
241  }
242  }
243  return false;
244 }
245 
246 
247 void
248 RingBuffer::setDataExpiryTime_us(uint64_t time_us)
249 {
250  data_expiry_time_us = time_us;
251 }
252 
253 
254 }
255 
256 }
257 
const uint64_t getTimestamp_us() const
タイムスタンプ取得
Definition: ring_buffer.cpp:92
bool isUpdated() const
共有メモリの更新確認
RingBuffer(unsigned char *first_ptr, size_t size=0, int buffer_num=0)
コンストラクタ
Definition: ring_buffer.cpp:21
void setTimestamp_us(uint64_t input_time_us, int buffer_num)
タイムスタンプ取得
bool waitFor(uint64_t timeout_usec)
トピックの更新待ち
Basic class definitions for accessing shared memory, ring buffers, etc. The notation is complianted R...