2 #include <condition_variable>
12 RingBuffer::getSize(
size_t element_size,
int buffer_num)
14 return (
sizeof(
size_t) +
sizeof(std::mutex) +
sizeof(std::condition_variable) +
sizeof(
size_t) + (
sizeof(std::atomic<uint64_t>)+element_size)*buffer_num);
22 : memory_ptr(first_ptr)
24 , data_expiry_time_us(2000000)
26 unsigned char* temp_ptr = memory_ptr;
27 mutex =
reinterpret_cast<pthread_mutex_t *
>(temp_ptr);
28 temp_ptr +=
sizeof(pthread_mutex_t);
29 condition =
reinterpret_cast<pthread_cond_t *
>(temp_ptr);
30 temp_ptr +=
sizeof(pthread_cond_t);
31 element_size =
reinterpret_cast<size_t *
>(temp_ptr);
36 temp_ptr +=
sizeof(size_t);
37 buf_num =
reinterpret_cast<size_t *
>(temp_ptr);
40 *buf_num = buffer_num;
42 temp_ptr +=
sizeof(size_t);
43 timestamp_list =
reinterpret_cast<std::atomic<uint64_t> *
>(temp_ptr);
44 temp_ptr +=
sizeof(std::atomic<uint64_t>) * *buf_num;
49 initializeExclusiveAccess();
53 RingBuffer::~RingBuffer()
59 RingBuffer::initializeExclusiveAccess()
61 pthread_condattr_t cond_attr;
62 pthread_condattr_init(&cond_attr);
63 pthread_condattr_setpshared(&cond_attr, PTHREAD_PROCESS_SHARED);
64 pthread_cond_init(condition, &cond_attr);
65 pthread_condattr_destroy(&cond_attr);
67 pthread_mutexattr_t m_attr;
68 pthread_mutexattr_init(&m_attr);
69 pthread_mutexattr_setpshared(&m_attr, PTHREAD_PROCESS_SHARED);
70 pthread_mutex_init(mutex, &m_attr);
71 pthread_mutexattr_destroy(&m_attr);
76 RingBuffer::getElementSize()
const
82 RingBuffer::getDataList()
105 timestamp_list[buffer_num] = input_time_us;
110 RingBuffer::getNewestBufferNum()
112 uint64_t timestamp_buf = 0;
113 size_t newest_buffer = -1;
114 bool found_valid_timestamp =
false;
116 for (
size_t i = 0; i < *buf_num; i++)
118 if (timestamp_list[i] != std::numeric_limits<uint64_t>::max() && timestamp_list[i] > 0 && timestamp_list[i] >= timestamp_buf)
120 timestamp_buf = timestamp_list[i];
122 found_valid_timestamp =
true;
127 if (!found_valid_timestamp)
132 timestamp_us = timestamp_buf;
135 clock_gettime(CLOCK_MONOTONIC_RAW, &ts);
136 uint64_t current_time_us = ((uint64_t) ts.tv_sec * 1000000L) + ((uint64_t) ts.tv_nsec / 1000L);
137 if (current_time_us - timestamp_us < data_expiry_time_us)
139 return newest_buffer;
146 RingBuffer::getOldestBufferNum()
148 if (timestamp_us == 0)
150 timestamp_us = UINT64_MAX;
152 uint64_t timestamp_buf = timestamp_list[0];
153 size_t oldest_buffer = 0;
154 for (
size_t i = 0; i < *buf_num; i++)
156 if (timestamp_list[i] < timestamp_buf)
158 timestamp_buf = timestamp_list[i];
163 timestamp_us = timestamp_buf;
164 return oldest_buffer;
169 RingBuffer::allocateBuffer(
int buffer_num)
171 if (buffer_num < 0 || buffer_num >= *buf_num)
175 uint64_t temp_buffer_timestamp = timestamp_list[buffer_num].load(std::memory_order_acquire);
176 if (temp_buffer_timestamp == std::numeric_limits<uint64_t>::max())
181 return timestamp_list[buffer_num].compare_exchange_weak(temp_buffer_timestamp,
182 std::numeric_limits<uint64_t>::max(),
183 std::memory_order_relaxed);
189 pthread_cond_broadcast(condition);
201 long sec =
static_cast<long>(timeout_usec / 1000000);
202 long mod_sec =
static_cast<long>(timeout_usec % 1000000);
203 clock_gettime(CLOCK_REALTIME, &ts);
205 ts.tv_nsec+= mod_sec * 1000;
206 if (1000000000 <= ts.tv_nsec)
208 ts.tv_nsec -= 1000000000;
215 pthread_mutex_lock(mutex);
216 int ret = pthread_cond_timedwait(condition, mutex, &ts);
217 pthread_mutex_unlock(mutex);
218 if (ret == ETIMEDOUT)
236 for (
size_t i = 0; i < *buf_num; i++)
238 if (timestamp_us < timestamp_list[i])
248 RingBuffer::setDataExpiryTime_us(uint64_t time_us)
250 data_expiry_time_us = time_us;
const uint64_t getTimestamp_us() const
タイムスタンプ取得
bool isUpdated() const
共有メモリの更新確認
RingBuffer(unsigned char *first_ptr, size_t size=0, int buffer_num=0)
コンストラクタ
void setTimestamp_us(uint64_t input_time_us, int buffer_num)
タイムスタンプ取得
bool waitFor(uint64_t timeout_usec)
トピックの更新待ち
Basic class definitions for accessing shared memory, ring buffers, etc. The notation is complianted R...