[English | 日本語]
目的
SHM (Shared-memory based Handy-communication Manager)の目的は,異なるプロセス間でできるだけ安全かつ高速な通信を行うことである.また、学生が使用しやすいように、考慮して設計したはずである.インストール方法などは,README.mdを参照すること.
概要
フレームワークのコンテクスト
宇都宮大学計測・ロボット工学研究室では,一般にプログラムで利用されるローカルメモリに加えて,プログラム間のデータのやり取りに利用できる共有メモリを使用する. 共有メモリはローカルメモリとは異なり,開発者が確保したメモリを開放する必要がない(不用意に開放すると他のプログラムにデータが渡せなくなる)ことやポインタを利用するために初学者に対する敷居が微妙に高いこと,設計者が新しいライブラリを作成する度にそれぞれのメモリに合わせて処理を作成する必要があることなど問題点があった. 本フレームワークは,共有メモリによるデータのやり取りを隠蔽し,初学者にもわかりやすいプロセス間通信を提供する.
システム機能
メモリ管理処理の隠蔽
共有メモリの領域確保やバッファへのアクセスをクラスに隠蔽することで、容易にプロセス間通信する機能を実現した。ただし、標準では標準レイアウト型のクラスのみをサポートする。その他のクラスについては都度特殊化したPublisher/Subscriberを定義することで対応できる.詳細はサンプルを参照のこと.
ポインタレスコーディング
基本的に、ローカルメモリに確保した変数を出版者(Publisher)に渡したり、購読者(Subscriber)からのトピックを受け取るのみであり、従来のように共有メモリのポインタを意識してコーディングすることがなくなった.
ユーザ特性
開発者
開発者とは,本ライブラリなどの研究室内外のライブラリを利用して新しいプログラムを作成するものを指す.主に、学部4年生などのプログラミング初学者を対象としている.
設計者
設計者とは,本ライブラリを利用して新しいライブラリを作成し,現状のノウハウを後輩に受け継ぐものを指す.主に,修士2年生を対象としている.
定義・用語
ローカルメモリ
ローカルメモリとは,1プロセス内でアクセスできる仮想記憶領域を指す.通常のプログラミング時に利用される記憶領域であり,利用後にちゃんと開放しないと将来的に大変な(プログラムがしばらく正常に動いていたのに,急に動かなくなる)ことになる領域である.
共有メモリ
共有メモリとは,プロセス間で共通して利用可能な記憶領域を指す.特殊な手段によって確保される記憶領域であり,様々な実装方法があるが、今回はPOSIXのファイルマップドメモリを採用している.これは、共有メモリに格納するデータをファイルとして扱う方式であり,Linuxでは/dev/shmの直下に確保したメモリ領域が確認できる.
標準レイアウト型
クラスまたは構造体にC言語にはない仮想関数のような特定のC++言語の機能が含まれておらず,すべてのメンバーに同じアクセス制御が含まれている場合、それは標準レイアウト型である.memcpyが可能で,Cプログラムで使用できるようにレイアウトが明確に定義されている.標準レイアウト型は,ユーザー定義された特殊なメンバー関数を持つことができる.さらに,標準レイアウト型には,次のような特性がある.
- 仮想関数または仮想基底クラスがない
- すべての非静的データ メンバーに同じアクセス制御が含まれている
- クラス型のすべての非静的メンバーが標準レイアウトである
- すべての基底クラスが標準レイアウトである
- 最初の非静的データ メンバーと同じ型の基底クラスがない
- 次のいずれかの条件を満たしている.
- 最派生クラスに非静的データ メンバーがなく、非静的データ メンバーの基底クラスが 1 つしかない
- 非静的データ メンバーを含む基底クラスがない
アーキテクチャ設計
システム全体構成
graph TB
subgraph "プロセス A"
PA[アプリケーション A]
PubA[Publisher A]
end
subgraph "プロセス B"
PB[アプリケーション B]
SubB[Subscriber B]
end
subgraph "プロセス C"
PC[アプリケーション C]
SubC[Subscriber C]
end
subgraph "共有メモリ領域"
SM[共有メモリセグメント]
RB[リングバッファ]
Meta[メタデータ]
end
PA --> PubA
PubA --> SM
SM --> RB
RB --> SubB
RB --> SubC
SubB --> PB
SubC --> PC
SM --> Meta
レイヤ構成
graph TB
subgraph "アプリケーション層"
APP[ユーザアプリケーション]
end
subgraph "SHM API層"
PUB["Publisher"]
SUB["Subscriber"]
end
subgraph "共有メモリ管理層"
SHM[SharedMemory]
POSIX[SharedMemoryPosix]
RB[RingBuffer]
end
subgraph "OS層"
KERNEL["Linux Kernel"]
SHMFS["/dev/shm ファイルシステム"]
end
APP --> PUB
APP --> SUB
PUB --> SHM
SUB --> SHM
SHM --> POSIX
POSIX --> RB
POSIX --> KERNEL
KERNEL --> SHMFS
詳細設計
クラス階層構造
classDiagram
class SharedMemory {
<<abstract>>
#int shm_fd
#int shm_oflag
#PERM shm_perm
#size_t shm_size
#unsigned char* shm_ptr
+SharedMemory(int oflag, PERM perm)
+getSize() size_t
+getPtr() unsigned char*
+connect(size_t size)* bool
+disconnect()* int
+isDisconnected()* bool
}
class SharedMemoryPosix {
-string shm_name
+SharedMemoryPosix(string name, int oflag, PERM perm)
+connect(size_t size) bool
+disconnect() int
+isDisconnected() bool
}
class RingBuffer {
-unsigned char* memory_ptr
-pthread_mutex_t* mutex
-pthread_cond_t* condition
-size_t* element_size
-size_t* buf_num
-atomic_uint64_t* timestamp_list
-unsigned char* data_list
-uint64_t timestamp_us
-uint64_t data_expiry_time_us
+RingBuffer(unsigned char* first_ptr, size_t size, int buffer_num)
+getSize(size_t element_size, int buffer_num)$ size_t
+getTimestamp_us() uint64_t
+setTimestamp_us(uint64_t input_time_us, int buffer_num)
+getNewestBufferNum() int
+getOldestBufferNum() int
+allocateBuffer(int buffer_num) bool
+getElementSize() size_t
+getDataList() unsigned char*
+signal()
+waitFor(uint64_t timeout_usec) bool
+isUpdated() bool
+setDataExpiryTime_us(uint64_t time_us)
}
class PublisherT {
-string shm_name
-int shm_buf_num
-PERM shm_perm
-unique_ptr_SharedMemory shared_memory
-unique_ptr_RingBuffer ring_buffer
-size_t data_size
+Publisher(string name, int buffer_num, PERM perm)
+publish(const T& data)
}
class SubscriberT {
-string shm_name
-unique_ptr_SharedMemory shared_memory
-unique_ptr_RingBuffer ring_buffer
-int current_reading_buffer
-uint64_t data_expiry_time_us
+Subscriber(string name)
+subscribe(bool* state) T
+waitFor(uint64_t timeout_usec) bool
+setDataExpiryTime_us(uint64_t time_us)
}
SharedMemory <|-- SharedMemoryPosix
PublisherT *-- SharedMemory
PublisherT *-- RingBuffer
SubscriberT *-- SharedMemory
SubscriberT *-- RingBuffer
共有メモリレイアウト
graph TB
subgraph "共有メモリセグメント"
subgraph "メタデータ領域"
MUTEX[pthread_mutex_t]
COND[pthread_cond_t]
ESIZE[element_size]
BUFNUM[buffer_num]
end
subgraph "タイムスタンプ領域"
TS0["timestamp[0]"]
TS1["timestamp[1]"]
TS2["timestamp[2]"]
TSN["timestamp[n-1]"]
end
subgraph "データ領域"
DATA0["data_buffer[0]"]
DATA1["data_buffer[1]"]
DATA2["data_buffer[2]"]
DATAN["data_buffer[n-1]"]
end
end
MUTEX --> COND
COND --> ESIZE
ESIZE --> BUFNUM
BUFNUM --> TS0
TS0 --> TS1
TS1 --> TS2
TS2 --> TSN
TSN --> DATA0
DATA0 --> DATA1
DATA1 --> DATA2
DATA2 --> DATAN
データフロー
Publish処理フロー
sequenceDiagram
participant App as アプリケーション
participant Pub as Publisher
participant RB as RingBuffer
participant SM as SharedMemory
App->>+Pub: publish(data)
Pub->>+RB: getOldestBufferNum()
RB-->>-Pub: buffer_index
loop 最大10回リトライ
Pub->>+RB: allocateBuffer(buffer_index)
alt バッファ確保成功
RB-->>-Pub: true
else バッファ確保失敗
RB-->>Pub: false
Note over Pub: 1ms待機
Pub->>RB: getOldestBufferNum()
RB-->>Pub: 新しいbuffer_index
end
end
Pub->>SM: データをバッファにコピー
Pub->>RB: setTimestamp_us(current_time, buffer_index)
Pub->>RB: signal()
Note over RB: 待機中のSubscriberに通知
Pub-->>-App: 処理完了
Subscribe処理フロー
sequenceDiagram
participant App as アプリケーション
participant Sub as Subscriber
participant RB as RingBuffer
participant SM as SharedMemory
App->>+Sub: subscribe(&is_success)
alt 共有メモリが切断されている場合
Sub->>+SM: connect()
SM-->>-Sub: 接続結果
alt 接続失敗
Sub-->>App: (default_value, false)
end
Sub->>RB: 新しいRingBufferインスタンス作成
end
Sub->>+RB: getNewestBufferNum()
RB-->>-Sub: buffer_index
alt 有効なバッファが見つからない
Sub-->>App: (前回の値, false)
else 有効なバッファが見つかった
Sub->>SM: バッファからデータをコピー
Sub-->>-App: (data, true)
end
waitFor処理フロー
sequenceDiagram
participant App as アプリケーション
participant Sub as Subscriber
participant RB as RingBuffer
App->>+Sub: waitFor(timeout_usec)
alt 共有メモリが切断されている場合
Sub->>Sub: 再接続処理
alt 再接続失敗
Sub-->>App: false
end
end
Sub->>+RB: waitFor(timeout_usec)
Note over RB: pthread_cond_timedwait で待機
alt タイムアウト前にシグナル受信
RB-->>-Sub: true
Sub-->>-App: true
else タイムアウト
RB-->>Sub: false
Sub-->>App: false
end
通信プロトコル
リングバッファアルゴリズム
バッファ選択アルゴリズム
flowchart TD
Start([開始]) --> GetOldest[最古のタイムスタンプを持つバッファを特定]
GetOldest --> TryAlloc{バッファ確保を試行}
TryAlloc -->|成功| WriteData[データ書き込み]
TryAlloc -->|失敗| CheckRetry{リトライ回数 < 10?}
CheckRetry -->|Yes| Wait[1ms待機]
Wait --> GetOldest
CheckRetry -->|No| Error[エラー: バッファ確保失敗]
WriteData --> UpdateTime[タイムスタンプ更新]
UpdateTime --> Signal[条件変数でシグナル送信]
Signal --> End([終了])
Error --> End
データ読み取りアルゴリズム
flowchart TD
Start([開始]) --> CheckConn{共有メモリ接続済み?}
CheckConn -->|No| Reconnect[再接続試行]
Reconnect --> ConnSuccess{接続成功?}
ConnSuccess -->|No| ReturnFail[失敗を返す]
ConnSuccess -->|Yes| GetNewest
CheckConn -->|Yes| GetNewest[最新のタイムスタンプを持つバッファを特定]
GetNewest --> ValidBuffer{有効なバッファ?}
ValidBuffer -->|No| ReturnOld[前回値と失敗フラグを返す]
ValidBuffer -->|Yes| CheckExpiry{データが有効期限内?}
CheckExpiry -->|No| ReturnOld
CheckExpiry -->|Yes| ReadData[データ読み取り]
ReadData --> ReturnSuccess[データと成功フラグを返す]
ReturnFail --> End([終了])
ReturnOld --> End
ReturnSuccess --> End
同期機構
Mutex とCondition Variable
stateDiagram-v2
[*] --> Unlocked : 初期状態
state Publisher {
Unlocked --> Locked : pthread_mutex_lock
Locked --> Writing : バッファ確保成功
Writing --> Unlocked : pthread_mutex_unlock + pthread_cond_signal
Locked --> Unlocked : バッファ確保失敗 + pthread_mutex_unlock
}
state Subscriber {
Unlocked --> Waiting : waitFor() 呼び出し
Waiting --> Unlocked : タイムアウト
Waiting --> Processing : シグナル受信
Processing --> Unlocked : データ読み取り完了
}
パフォーマンス特性
メモリ使用量
共有メモリセグメントのサイズは以下の式で計算される:
total_size = metadata_size + timestamp_array_size + data_array_size
metadata_size = sizeof(pthread_mutex_t) + sizeof(pthread_cond_t) +
sizeof(size_t) + sizeof(size_t)
timestamp_array_size = sizeof(uint64_t) * buffer_num
data_array_size = element_size * buffer_num
レイテンシ特性
graph LR
subgraph "レイテンシ構成要素"
A[アプリケーション処理] --> B[Publisher処理]
B --> C[Mutex獲得]
C --> D[メモリコピー]
D --> E[タイムスタンプ更新]
E --> F[Signal送信]
F --> G[Subscriber処理]
G --> H[アプリケーション処理]
end
subgraph "典型的な時間"
T1[アプリ: ~1μs]
T2[Pub: ~2μs]
T3[Mutex: ~0.5μs]
T4[Copy: ~0.1μs]
T5[Time: ~0.1μs]
T6[Signal: ~0.5μs]
T7[Sub: ~2μs]
T8[アプリ: ~1μs]
end
セキュリティ考慮事項
アクセス権限
graph TB
subgraph "POSIX権限モデル"
Owner[所有者]
Group[グループ]
Others[その他]
end
subgraph "権限種別"
Read["読み取り: S_IRUSR/S_IRGRP/S_IROTH"]
Write["書き込み: S_IWUSR/S_IWGRP/S_IWOTH"]
end
subgraph "デフォルト設定"
Default["DEFAULT_PERM = 0666
(全ユーザ読み書き可能)"]
end
Owner --> Read
Owner --> Write
Group --> Read
Group --> Write
Others --> Read
Others --> Write
Default -.-> Owner
Default -.-> Group
Default -.-> Others
データ整合性
sequenceDiagram
participant P1 as Publisher 1
participant P2 as Publisher 2
participant Mutex as Mutex
participant Buffer as SharedBuffer
participant S as Subscriber
Note over P1,S: 複数Publisherからの同時書き込み
P1->>+Mutex: lock()
P2->>Mutex: lock() (ブロック)
Mutex-->>-P1: 獲得成功
P1->>Buffer: データ書き込み
P1->>Buffer: タイムスタンプ更新
P1->>+Mutex: unlock() + signal()
Mutex-->>-P2: 獲得成功
P2->>Buffer: データ書き込み
P2->>Buffer: タイムスタンプ更新
P2->>Mutex: unlock() + signal()
S->>Buffer: 最新データ読み取り
Note over S: P2のデータを取得
エラーハンドリング
エラー分類と対処
flowchart TD
Error([エラー発生]) --> CheckType{エラー種別}
CheckType -->|初期化エラー| InitError[初期化エラー]
CheckType -->|通信エラー| CommError[通信エラー]
CheckType -->|メモリエラー| MemError[メモリエラー]
CheckType -->|タイムアウト| TimeoutError[タイムアウトエラー]
InitError --> InitActions["・名前の確認
・権限の確認
・POD型の確認"]
CommError --> CommActions["・共有メモリ再接続
・Publisher側確認
・プロセス生存確認"]
MemError --> MemActions["・メモリ不足確認
・セグメント削除
・システム再起動"]
TimeoutError --> TimeoutActions["・タイムアウト値調整
・Publisher頻度確認
・システム負荷確認"]
InitActions --> LogError[エラーログ出力]
CommActions --> LogError
MemActions --> LogError
TimeoutActions --> LogError
LogError --> Recovery{回復可能?}
Recovery -->|Yes| Retry[リトライ処理]
Recovery -->|No| Abort[処理中断]
Retry --> Success{成功?}
Success -->|Yes| End([正常終了])
Success -->|No| Recovery
Abort --> End
Python バインディング設計
Boost.Python ラッパー構造
classDiagram
class PublisherBool {
+PublisherBool(string name, bool arg, int buffer_num)
+_publish(bool data)
}
class PublisherInt {
+PublisherInt(string name, int arg, int buffer_num)
+_publish(int data)
}
class PublisherFloat {
+PublisherFloat(string name, float arg, int buffer_num)
+_publish(float data)
}
class SubscriberBool {
+SubscriberBool(string name, bool arg)
+_subscribe() (bool,bool)
}
class SubscriberInt {
+SubscriberInt(string name, int arg)
+_subscribe() (int,bool)
}
class SubscriberFloat {
+SubscriberFloat(string name, float arg)
+_subscribe() (float,bool)
}
class Publisher {
<<C++ Template>>
}
class Subscriber {
<<C++ Template>>
}
Publisher <|-- PublisherBool
Publisher <|-- PublisherInt
Publisher <|-- PublisherFloat
Subscriber <|-- SubscriberBool
Subscriber <|-- SubscriberInt
Subscriber <|-- SubscriberFloat
Python/C++ データ変換
sequenceDiagram
participant Py as Python App
participant Boost as Boost.Python
participant Wrapper as C++ Wrapper
participant Core as SHM Core
Note over Py,Core: Publish処理
Py->>+Boost: pub.publish(data)
Boost->>+Wrapper: _publish(converted_data)
Note over Boost: Python型 → C++型変換
Wrapper->>+Core: publish(data)
Core-->>-Wrapper: void
Wrapper-->>-Boost: void
Boost-->>-Py: None
Note over Py,Core: Subscribe処理
Py->>+Boost: data, success = sub.subscribe()
Boost->>+Wrapper: _subscribe()
Wrapper->>+Core: subscribe(&is_success)
Core-->>-Wrapper: result_data
Wrapper-->>-Boost: make_tuple(result_data, is_success)
Note over Boost: C++型 → Python型変換
Boost-->>-Py: (data, success)
拡張性考慮
新しいデータ型の追加
flowchart TD
Start([新しい型Tを追加]) --> CheckPOD{POD型?}
CheckPOD -->|Yes| UseTemplate[既存テンプレートを使用]
CheckPOD -->|No| Specialize[テンプレート特殊化]
UseTemplate --> InstantiateC["C++でPublisherT,
SubscriberTをインスタンス化"]
Specialize --> CustomImpl["カスタム実装
・シリアライゼーション
・デシリアライゼーション"]
CustomImpl --> InstantiateC
InstantiateC --> PythonNeeded{Python対応必要?}
PythonNeeded -->|Yes| CreateWrapper["Boost.Pythonラッパー作成
・PublisherT
・SubscriberT"]
PythonNeeded -->|No| TestC[C++テスト実装]
CreateWrapper --> UpdateModule[BOOST_PYTHON_MODULEに追加]
UpdateModule --> TestPy[Pythonテスト実装]
TestPy --> TestC
TestC --> Document[ドキュメント更新]
Document --> End([完了])
参照
man shm_overview
Posix共有メモリの概要が記載されているURLを以下に示す. https://linuxjm.osdn.jp/html/LDP_man-pages/man7/shm_overview.7.html
関連技術仕様
- POSIX.1-2001 共有メモリオブジェクト
- POSIX.1-2001 pthread mutexとcondition variables
- C++11 標準レイアウト型
- Boost.Python 1.75+