[English | 日本語]
🎯 このガイドで学べること
- Action通信の深い理解: 非同期処理パターンの設計思想から実装詳細まで
- 長時間処理の管理: 進捗監視、キャンセル機能、エラーハンドリング
- 高度なワークフロー設計: 並列処理、タスクチェーン、状態管理
- 実践的な応用例: ファイル処理、機械学習、大容量データ変換
🧠 Action通信の深い理解
🏗️ アーキテクチャ解説
┌─────────────────────────────────────────────────────────────┐
│ 共有メモリ空間 │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ アクションキュー │ │
│ │ [Goal 0][Goal 1][Goal 2]...[Goal N-1] │ │
│ │ ↑ ↑ │ │
│ │ 実行位置 追加位置 │ │
│ └─────────────────────────────────────────────────────┘ │
│ │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ フィードバックキュー │ │
│ │ [FB 0][FB 1][FB 2]...[FB N-1] │ │
│ │ ↑ ↑ │ │
│ │ 読取位置 書込位置 │ │
│ └─────────────────────────────────────────────────────┘ │
│ │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ 結果キュー │ │
│ │ [Result 0][Result 1]...[Result N-1] │ │
│ │ ↑ ↑ │ │
│ │ 読取位置 書込位置 │ │
│ └─────────────────────────────────────────────────────┘ │
│ │
│ アクション状態管理: │
│ - ゴールID (一意識別子) │
│ - 実行状態 (待機/実行中/完了/キャンセル/エラー) │
│ - 進捗情報 (完了率、残り時間、処理詳細) │
│ - キャンセルフラグ (非同期キャンセル要求) │
└─────────────────────────────────────────────────────────────┘
Multiple Clients ← [shared memory] → Single Server
│ │
ゴール送信 アクション実行エンジン
フィードバック受信 │
結果受信 長時間処理
キャンセル要求 進捗報告
│ 結果生成
非同期監視
⚡ なぜ長時間処理に最適なのか?
1. 非同期実行とフィードバック
ActionClient<GoalType, ResultType, FeedbackType> client("action_server");
uint64_t goal_id = client.sendGoal(goal_data);
while (true) {
FeedbackType feedback;
if (client.getFeedback(goal_id, feedback)) {
std::cout << "進捗: " << feedback.progress_percent << "%" << std::endl;
}
if (client.isComplete(goal_id)) {
ResultType result = client.getResult(goal_id);
break;
}
if (user_cancel_request) {
client.cancelGoal(goal_id);
break;
}
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
2. 状態管理と進捗監視
enum class ActionStatus {
PENDING,
ACTIVE,
PREEMPTED,
SUCCEEDED,
ABORTED,
REJECTED,
RECALLED
};
struct ActionState {
uint64_t goal_id;
ActionStatus status;
uint64_t start_time;
uint64_t last_feedback_time;
float progress_percent;
uint32_t estimated_duration_ms;
char status_text[256];
};
class ActionStateTracker {
std::unordered_map<uint64_t, ActionState> active_actions_;
std::mutex state_mutex_;
public:
void updateProgress(uint64_t goal_id, float progress, const std::string& text) {
std::lock_guard<std::mutex> lock(state_mutex_);
auto& state = active_actions_[goal_id];
state.progress_percent = progress;
state.last_feedback_time = getCurrentTimeUs();
strncpy(state.status_text, text.c_str(), 255);
if (progress > 0.0f) {
uint64_t elapsed = state.last_feedback_time - state.start_time;
state.estimated_duration_ms = (elapsed / progress) * (100.0f - progress) / 1000;
}
}
};
3. 高度なワークフロー管理
class ParallelActionManager {
std::vector<ActionClient<GoalType, ResultType, FeedbackType>> clients_;
std::vector<uint64_t> active_goals_;
public:
void executeParallelActions(const std::vector<GoalType>& goals) {
active_goals_.clear();
for (size_t i = 0; i < goals.size(); ++i) {
uint64_t goal_id = clients_[i].sendGoal(goals[i]);
active_goals_.push_back(goal_id);
}
while (!active_goals_.empty()) {
for (auto it = active_goals_.begin(); it != active_goals_.end();) {
size_t client_index = it - active_goals_.begin();
if (clients_[client_index].isComplete(*it)) {
auto result = clients_[client_index].getResult(*it);
processResult(result);
it = active_goals_.erase(it);
} else {
FeedbackType feedback;
if (clients_[client_index].getFeedback(*it, feedback)) {
updateProgressDisplay(client_index, feedback);
}
++it;
}
}
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
}
};
🚀 基本的な使い方
1. 簡単なファイル処理アクション
#include <iostream>
#include <thread>
#include <chrono>
#include <fstream>
using namespace irlab::shm;
struct FileProcessGoal {
char input_path[512];
char output_path[512];
char operation[64];
float quality;
bool preserve_metadata;
};
struct FileProcessResult {
bool success;
uint64_t input_size;
uint64_t output_size;
uint32_t processing_time_ms;
float compression_ratio;
char error_message[256];
char final_path[512];
};
struct FileProcessFeedback {
float progress_percent;
uint64_t bytes_processed;
uint64_t bytes_remaining;
uint32_t estimated_time_remaining_ms;
char current_stage[128];
float current_speed_mbps;
};
class FileProcessingServer {
private:
ActionServer<FileProcessGoal, FileProcessResult, FileProcessFeedback> server_;
std::thread processing_thread_;
std::atomic<bool> running_;
public:
FileProcessingServer() : server_("file_processor"), running_(false) {}
void start() {
running_ = true;
processing_thread_ = std::thread(&FileProcessingServer::processLoop, this);
std::cout << "ファイル処理サーバーを開始しました" << std::endl;
}
void stop() {
running_ = false;
if (processing_thread_.joinable()) {
processing_thread_.join();
}
std::cout << "ファイル処理サーバーを停止しました" << std::endl;
}
private:
void processLoop() {
while (running_) {
if (server_.hasGoal()) {
auto start_time = std::chrono::high_resolution_clock::now();
uint64_t goal_id = server_.acceptGoal();
FileProcessGoal goal = server_.getGoal(goal_id);
std::cout << "ファイル処理開始: " << goal.input_path
<< " -> " << goal.output_path << std::endl;
std::thread worker([this, goal_id, goal, start_time]() {
processFile(goal_id, goal, start_time);
});
worker.detach();
}
std::this_thread::sleep_for(std::chrono::milliseconds(10));
}
}
void processFile(uint64_t goal_id, const FileProcessGoal& goal,
std::chrono::high_resolution_clock::time_point start_time) {
FileProcessResult result;
result.success = false;
result.input_size = 0;
result.output_size = 0;
result.compression_ratio = 1.0f;
try {
std::ifstream input(goal.input_path, std::ios::binary | std::ios::ate);
if (!input.is_open()) {
strncpy(result.error_message, "入力ファイルを開けません", 255);
server_.setAborted(goal_id, result);
return;
}
result.input_size = input.tellg();
input.seekg(0, std::ios::beg);
std::ofstream output(goal.output_path, std::ios::binary);
if (!output.is_open()) {
strncpy(result.error_message, "出力ファイルを作成できません", 255);
server_.setAborted(goal_id, result);
return;
}
const size_t BUFFER_SIZE = 64 * 1024;
std::vector<char> buffer(BUFFER_SIZE);
uint64_t total_processed = 0;
FileProcessFeedback feedback;
while (input.read(buffer.data(), BUFFER_SIZE) || input.gcount() > 0) {
if (server_.isPreemptRequested(goal_id)) {
result.success = false;
strncpy(result.error_message, "処理がキャンセルされました", 255);
server_.setPreempted(goal_id, result);
input.close();
output.close();
return;
}
size_t bytes_read = input.gcount();
total_processed += bytes_read;
processData(buffer.data(), bytes_read, goal.operation, goal.quality);
output.write(buffer.data(), bytes_read);
feedback.progress_percent = (float)total_processed / result.input_size * 100.0f;
feedback.bytes_processed = total_processed;
feedback.bytes_remaining = result.input_size - total_processed;
auto current_time = std::chrono::high_resolution_clock::now();
auto elapsed = std::chrono::duration_cast<std::chrono::milliseconds>(
current_time - start_time);
if (elapsed.count() > 0 && total_processed > 0) {
feedback.current_speed_mbps = (float)total_processed / elapsed.count() / 1024.0f;
if (feedback.progress_percent > 0) {
feedback.estimated_time_remaining_ms =
elapsed.count() * (100.0f - feedback.progress_percent) / feedback.progress_percent;
}
}
snprintf(feedback.current_stage, 127, "%s処理中", goal.operation);
server_.publishFeedback(goal_id, feedback);
std::this_thread::sleep_for(std::chrono::milliseconds(1));
}
input.close();
output.close();
std::ifstream output_check(goal.output_path, std::ios::binary | std::ios::ate);
result.output_size = output_check.tellg();
output_check.close();
result.compression_ratio = (float)result.output_size / result.input_size;
auto end_time = std::chrono::high_resolution_clock::now();
result.processing_time_ms = std::chrono::duration_cast<std::chrono::milliseconds>(
end_time - start_time).count();
strncpy(result.final_path, goal.output_path, 511);
result.success = true;
server_.setSucceeded(goal_id, result);
std::cout << "ファイル処理完了: " << result.processing_time_ms << "ms, "
<< "圧縮率: " << result.compression_ratio << std::endl;
} catch (const std::exception& e) {
strncpy(result.error_message, e.what(), 255);
server_.setAborted(goal_id, result);
std::cout << "ファイル処理エラー: " << e.what() << std::endl;
}
}
void processData(char* data, size_t size, const char* operation, float quality) {
if (strcmp(operation, "encrypt") == 0) {
for (size_t i = 0; i < size; ++i) {
data[i] ^= 0x55;
}
} else if (strcmp(operation, "compress") == 0) {
auto delay = std::chrono::microseconds((int)(100 * quality));
std::this_thread::sleep_for(delay);
}
}
};
class FileProcessingClient {
private:
ActionClient<FileProcessGoal, FileProcessResult, FileProcessFeedback> client_;
public:
FileProcessingClient() : client_("file_processor") {}
bool processFile(const std::string& input_path, const std::string& output_path,
const std::string& operation, float quality = 0.8f) {
FileProcessGoal goal;
strncpy(goal.input_path, input_path.c_str(), 511);
strncpy(goal.output_path, output_path.c_str(), 511);
strncpy(goal.operation, operation.c_str(), 63);
goal.quality = quality;
goal.preserve_metadata = true;
std::cout << "ファイル処理要求送信..." << std::endl;
uint64_t goal_id = client_.sendGoal(goal);
while (!client_.isComplete(goal_id)) {
FileProcessFeedback feedback;
if (client_.getFeedback(goal_id, feedback)) {
std::cout << "\r進捗: " << std::fixed << std::setprecision(1)
<< feedback.progress_percent << "% ("
<< feedback.current_stage << ", "
<< feedback.current_speed_mbps << " MB/s, "
<< "残り: " << feedback.estimated_time_remaining_ms / 1000 << "s)"
<< std::flush;
}
std::this_thread::sleep_for(std::chrono::milliseconds(200));
}
std::cout << std::endl;
if (client_.getState(goal_id) == ActionStatus::SUCCEEDED) {
FileProcessResult result = client_.getResult(goal_id);
std::cout << "処理完了!" << std::endl;
std::cout << " 入力サイズ: " << result.input_size << " bytes" << std::endl;
std::cout << " 出力サイズ: " << result.output_size << " bytes" << std::endl;
std::cout << " 処理時間: " << result.processing_time_ms << " ms" << std::endl;
std::cout << " 圧縮率: " << result.compression_ratio << std::endl;
return true;
} else {
FileProcessResult result = client_.getResult(goal_id);
std::cout << "処理失敗: " << result.error_message << std::endl;
return false;
}
}
void cancelProcessing(uint64_t goal_id) {
client_.cancelGoal(goal_id);
std::cout << "処理キャンセル要求を送信しました" << std::endl;
}
};
int main() {
try {
FileProcessingServer server;
server.start();
FileProcessingClient client;
bool success = client.processFile(
"/tmp/input.txt",
"/tmp/output.enc",
"encrypt",
0.9f
);
if (success) {
std::cout << "ファイル処理が成功しました" << std::endl;
}
std::this_thread::sleep_for(std::chrono::seconds(2));
server.stop();
} catch (const std::exception& e) {
std::cerr << "エラー: " << e.what() << std::endl;
return 1;
}
return 0;
}
2. 機械学習訓練アクション
struct MLTrainingGoal {
char dataset_path[512];
char model_type[64];
char output_model_path[512];
float learning_rate;
int epochs;
int batch_size;
float validation_split;
bool use_gpu;
bool early_stopping;
int max_training_time_minutes;
};
struct MLTrainingResult {
bool success;
float final_accuracy;
float final_loss;
int epochs_completed;
uint32_t training_time_ms;
uint64_t model_size_bytes;
char model_path[512];
char error_details[512];
float validation_accuracy;
float validation_loss;
float overfitting_score;
char best_checkpoint_path[512];
};
struct MLTrainingFeedback {
int current_epoch;
int total_epochs;
float current_accuracy;
float current_loss;
float validation_accuracy;
float validation_loss;
float epoch_progress_percent;
float overall_progress_percent;
uint32_t epoch_time_ms;
uint32_t estimated_remaining_ms;
float samples_per_second;
char current_phase[64];
bool is_improving;
};
class MLTrainingServer {
private:
ActionServer<MLTrainingGoal, MLTrainingResult, MLTrainingFeedback> server_;
std::thread training_thread_;
std::atomic<bool> running_;
public:
MLTrainingServer() : server_("ml_trainer"), running_(false) {}
void start() {
running_ = true;
training_thread_ = std::thread(&MLTrainingServer::trainingLoop, this);
std::cout << "機械学習訓練サーバーを開始しました" << std::endl;
}
void stop() {
running_ = false;
if (training_thread_.joinable()) {
training_thread_.join();
}
std::cout << "機械学習訓練サーバーを停止しました" << std::endl;
}
private:
void trainingLoop() {
while (running_) {
if (server_.hasGoal()) {
uint64_t goal_id = server_.acceptGoal();
MLTrainingGoal goal = server_.getGoal(goal_id);
std::cout << "訓練開始: " << goal.model_type
<< ", データセット: " << goal.dataset_path << std::endl;
std::thread worker([this, goal_id, goal]() {
trainModel(goal_id, goal);
});
worker.detach();
}
std::this_thread::sleep_for(std::chrono::milliseconds(10));
}
}
void trainModel(uint64_t goal_id, const MLTrainingGoal& goal) {
auto start_time = std::chrono::high_resolution_clock::now();
MLTrainingResult result;
result.success = false;
result.epochs_completed = 0;
result.final_accuracy = 0.0f;
result.final_loss = std::numeric_limits<float>::max();
try {
if (!loadDataset(goal.dataset_path)) {
strncpy(result.error_details, "データセットの読み込みに失敗", 511);
server_.setAborted(goal_id, result);
return;
}
MLModel model = initializeModel(goal.model_type, goal);
MLTrainingFeedback feedback;
float best_accuracy = 0.0f;
int epochs_without_improvement = 0;
for (int epoch = 0; epoch < goal.epochs; ++epoch) {
if (server_.isPreemptRequested(goal_id)) {
strncpy(result.error_details, "訓練がキャンセルされました", 511);
server_.setPreempted(goal_id, result);
return;
}
auto epoch_start = std::chrono::high_resolution_clock::now();
feedback.current_epoch = epoch + 1;
feedback.total_epochs = goal.epochs;
strcpy(feedback.current_phase, "training");
float epoch_loss = 0.0f;
float epoch_accuracy = 0.0f;
int total_batches = getDatasetSize() / goal.batch_size;
for (int batch = 0; batch < total_batches; ++batch) {
auto batch_data = getBatch(batch, goal.batch_size);
auto predictions = model.forward(batch_data.inputs);
float batch_loss = calculateLoss(predictions, batch_data.targets);
epoch_loss += batch_loss;
model.backward(batch_loss, goal.learning_rate);
float batch_accuracy = calculateAccuracy(predictions, batch_data.targets);
epoch_accuracy += batch_accuracy;
feedback.epoch_progress_percent = (float)(batch + 1) / total_batches * 100.0f;
feedback.overall_progress_percent =
((float)epoch + feedback.epoch_progress_percent / 100.0f) / goal.epochs * 100.0f;
auto current_time = std::chrono::high_resolution_clock::now();
auto elapsed = std::chrono::duration_cast<std::chrono::milliseconds>(
current_time - epoch_start);
feedback.samples_per_second = (float)(batch + 1) * goal.batch_size /
(elapsed.count() / 1000.0f);
server_.publishFeedback(goal_id, feedback);
if (batch % 10 == 0) {
std::this_thread::sleep_for(std::chrono::milliseconds(1));
}
}
epoch_loss /= total_batches;
epoch_accuracy /= total_batches;
strcpy(feedback.current_phase, "validation");
server_.publishFeedback(goal_id, feedback);
auto validation_result = validateModel(model, goal.validation_split);
feedback.current_accuracy = epoch_accuracy;
feedback.current_loss = epoch_loss;
feedback.validation_accuracy = validation_result.accuracy;
feedback.validation_loss = validation_result.loss;
auto epoch_end = std::chrono::high_resolution_clock::now();
feedback.epoch_time_ms = std::chrono::duration_cast<std::chrono::milliseconds>(
epoch_end - epoch_start).count();
if (epoch > 0) {
feedback.estimated_remaining_ms =
feedback.epoch_time_ms * (goal.epochs - epoch - 1);
}
feedback.is_improving = validation_result.accuracy > best_accuracy;
if (feedback.is_improving) {
best_accuracy = validation_result.accuracy;
epochs_without_improvement = 0;
saveCheckpoint(model, goal.output_model_path, epoch);
} else {
epochs_without_improvement++;
}
server_.publishFeedback(goal_id, feedback);
if (goal.early_stopping && epochs_without_improvement >= 10) {
std::cout << "早期停止: " << epochs_without_improvement
<< " エポック改善なし" << std::endl;
break;
}
result.epochs_completed = epoch + 1;
result.final_accuracy = validation_result.accuracy;
result.final_loss = validation_result.loss;
std::cout << "エポック " << (epoch + 1) << "/" << goal.epochs
<< " - 損失: " << epoch_loss << ", 精度: " << epoch_accuracy
<< ", 検証精度: " << validation_result.accuracy << std::endl;
}
auto end_time = std::chrono::high_resolution_clock::now();
result.training_time_ms = std::chrono::duration_cast<std::chrono::milliseconds>(
end_time - start_time).count();
saveModel(model, goal.output_model_path);
result.model_size_bytes = getFileSize(goal.output_model_path);
strncpy(result.model_path, goal.output_model_path, 511);
result.validation_accuracy = feedback.validation_accuracy;
result.validation_loss = feedback.validation_loss;
result.overfitting_score = calculateOverfittingScore(
feedback.current_accuracy, feedback.validation_accuracy);
result.success = true;
server_.setSucceeded(goal_id, result);
std::cout << "訓練完了: " << result.training_time_ms / 1000.0f << "秒, "
<< "最終精度: " << result.final_accuracy << std::endl;
} catch (const std::exception& e) {
strncpy(result.error_details, e.what(), 511);
server_.setAborted(goal_id, result);
std::cout << "訓練エラー: " << e.what() << std::endl;
}
}
bool loadDataset(const std::string& path) {
std::this_thread::sleep_for(std::chrono::milliseconds(500));
return true;
}
struct MLModel {
void forward(const std::vector<float>& inputs) { }
void backward(float loss, float learning_rate) { }
};
MLModel initializeModel(const std::string& type, const MLTrainingGoal& goal) {
return MLModel{};
}
struct ValidationResult {
float accuracy;
float loss;
};
ValidationResult validateModel(const MLModel& model, float validation_split) {
std::this_thread::sleep_for(std::chrono::milliseconds(100));
ValidationResult result;
result.accuracy = 0.7f + (rand() % 300) / 1000.0f;
result.loss = 1.0f - result.accuracy + (rand() % 100) / 1000.0f;
return result;
}
void saveCheckpoint(const MLModel& model, const std::string& path, int epoch) {
std::string checkpoint_path = path + "_epoch_" + std::to_string(epoch);
}
void saveModel(const MLModel& model, const std::string& path) {
}
size_t getFileSize(const std::string& path) {
return 1024 * 1024;
}
float calculateOverfittingScore(float train_acc, float val_acc) {
return std::max(0.0f, train_acc - val_acc);
}
};
🚀 実践的な使用例
1. 大容量データ変換パイプライン
#include <vector>
#include <queue>
#include <thread>
#include <future>
struct DataPipelineGoal {
char input_directory[512];
char output_directory[512];
char pipeline_config[1024];
int max_parallel_jobs;
int batch_size;
float quality_threshold;
bool skip_existing;
char file_pattern[128];
uint64_t min_file_size;
uint64_t max_file_size;
};
struct DataPipelineResult {
bool success;
uint32_t total_files;
uint32_t processed_files;
uint32_t failed_files;
uint32_t skipped_files;
uint64_t total_input_size;
uint64_t total_output_size;
uint32_t total_processing_time_ms;
float average_processing_speed_mbps;
float overall_compression_ratio;
char summary_report[2048];
char error_log_path[512];
};
struct DataPipelineFeedback {
uint32_t current_file_index;
uint32_t total_files;
char current_file_name[256];
float overall_progress_percent;
uint32_t files_completed;
uint32_t files_failed;
float current_file_progress;
char current_stage[128];
float current_speed_mbps;
uint32_t estimated_remaining_ms;
uint32_t active_workers;
float average_quality_score;
uint32_t quality_failures;
};
class DataPipelineServer {
private:
ActionServer<DataPipelineGoal, DataPipelineResult, DataPipelineFeedback> server_;
std::thread pipeline_thread_;
std::atomic<bool> running_;
public:
DataPipelineServer() : server_("data_pipeline"), running_(false) {}
void start() {
running_ = true;
pipeline_thread_ = std::thread(&DataPipelineServer::pipelineLoop, this);
std::cout << "データパイプラインサーバーを開始しました" << std::endl;
}
void stop() {
running_ = false;
if (pipeline_thread_.joinable()) {
pipeline_thread_.join();
}
std::cout << "データパイプラインサーバーを停止しました" << std::endl;
}
private:
void pipelineLoop() {
while (running_) {
if (server_.hasGoal()) {
uint64_t goal_id = server_.acceptGoal();
DataPipelineGoal goal = server_.getGoal(goal_id);
std::cout << "パイプライン開始: " << goal.input_directory
<< " -> " << goal.output_directory << std::endl;
std::thread worker([this, goal_id, goal]() {
executePipeline(goal_id, goal);
});
worker.detach();
}
std::this_thread::sleep_for(std::chrono::milliseconds(10));
}
}
void executePipeline(uint64_t goal_id, const DataPipelineGoal& goal) {
auto start_time = std::chrono::high_resolution_clock::now();
DataPipelineResult result;
result.success = false;
result.total_files = 0;
result.processed_files = 0;
result.failed_files = 0;
result.skipped_files = 0;
try {
std::vector<std::string> file_list = scanDirectory(goal.input_directory, goal);
result.total_files = file_list.size();
if (file_list.empty()) {
strncpy(result.summary_report, "処理対象ファイルが見つかりませんでした", 2047);
server_.setSucceeded(goal_id, result);
return;
}
std::vector<std::thread> workers;
std::queue<std::string> file_queue;
std::mutex queue_mutex;
std::atomic<uint32_t> completed_files{0};
std::atomic<uint32_t> failed_files{0};
std::atomic<uint64_t> total_input_bytes{0};
std::atomic<uint64_t> total_output_bytes{0};
for (const auto& file : file_list) {
file_queue.push(file);
}
DataPipelineFeedback feedback;
feedback.total_files = result.total_files;
feedback.active_workers = std::min((uint32_t)goal.max_parallel_jobs, result.total_files);
for (int i = 0; i < goal.max_parallel_jobs && !file_queue.empty(); ++i) {
workers.emplace_back([&, i]() {
workerFunction(goal_id, goal, file_queue, queue_mutex,
completed_files, failed_files,
total_input_bytes, total_output_bytes, feedback);
});
}
while (completed_files + failed_files < result.total_files) {
if (server_.isPreemptRequested(goal_id)) {
for (auto& worker : workers) {
if (worker.joinable()) {
worker.join();
}
}
result.processed_files = completed_files;
result.failed_files = failed_files;
strncpy(result.summary_report, "処理がキャンセルされました", 2047);
server_.setPreempted(goal_id, result);
return;
}
feedback.files_completed = completed_files;
feedback.files_failed = failed_files;
feedback.overall_progress_percent =
(float)(completed_files + failed_files) / result.total_files * 100.0f;
auto current_time = std::chrono::high_resolution_clock::now();
auto elapsed = std::chrono::duration_cast<std::chrono::milliseconds>(
current_time - start_time);
if (elapsed.count() > 0 && completed_files > 0) {
feedback.current_speed_mbps =
(float)total_input_bytes / elapsed.count() / 1024.0f;
if (feedback.overall_progress_percent > 0) {
feedback.estimated_remaining_ms =
elapsed.count() * (100.0f - feedback.overall_progress_percent) /
feedback.overall_progress_percent;
}
}
server_.publishFeedback(goal_id, feedback);
std::this_thread::sleep_for(std::chrono::milliseconds(500));
}
for (auto& worker : workers) {
if (worker.joinable()) {
worker.join();
}
}
result.processed_files = completed_files;
result.failed_files = failed_files;
result.total_input_size = total_input_bytes;
result.total_output_size = total_output_bytes;
auto end_time = std::chrono::high_resolution_clock::now();
result.total_processing_time_ms =
std::chrono::duration_cast<std::chrono::milliseconds>(
end_time - start_time).count();
if (result.total_processing_time_ms > 0) {
result.average_processing_speed_mbps =
(float)result.total_input_size / result.total_processing_time_ms / 1024.0f;
}
if (result.total_input_size > 0) {
result.overall_compression_ratio =
(float)result.total_output_size / result.total_input_size;
}
generateSummaryReport(result, goal);
result.success = (result.failed_files == 0);
server_.setSucceeded(goal_id, result);
std::cout << "パイプライン完了: " << result.processed_files << "/"
<< result.total_files << " ファイル処理" << std::endl;
} catch (const std::exception& e) {
snprintf(result.summary_report, 2047, "パイプラインエラー: %s", e.what());
server_.setAborted(goal_id, result);
std::cout << "パイプラインエラー: " << e.what() << std::endl;
}
}
void workerFunction(uint64_t goal_id, const DataPipelineGoal& goal,
std::queue<std::string>& file_queue, std::mutex& queue_mutex,
std::atomic<uint32_t>& completed_files, std::atomic<uint32_t>& failed_files,
std::atomic<uint64_t>& total_input_bytes, std::atomic<uint64_t>& total_output_bytes,
DataPipelineFeedback& feedback) {
while (true) {
std::string current_file;
{
std::lock_guard<std::mutex> lock(queue_mutex);
if (file_queue.empty()) {
break;
}
current_file = file_queue.front();
file_queue.pop();
}
try {
auto process_result = processFile(current_file, goal);
if (process_result.success) {
completed_files++;
total_input_bytes += process_result.input_size;
total_output_bytes += process_result.output_size;
} else {
failed_files++;
}
{
std::lock_guard<std::mutex> lock(queue_mutex);
strncpy(feedback.current_file_name,
extractFileName(current_file).c_str(), 255);
feedback.current_file_index = completed_files + failed_files;
}
} catch (const std::exception& e) {
failed_files++;
std::cout << "ファイル処理エラー (" << current_file << "): "
<< e.what() << std::endl;
}
}
}
struct FileProcessResult {
bool success;
uint64_t input_size;
uint64_t output_size;
uint32_t processing_time_ms;
float quality_score;
};
FileProcessResult processFile(const std::string& file_path,
const DataPipelineGoal& goal) {
FileProcessResult result;
result.success = false;
result.input_size = 0;
result.output_size = 0;
result.quality_score = 0.0f;
auto start = std::chrono::high_resolution_clock::now();
result.input_size = getFileSize(file_path);
std::string output_path = generateOutputPath(file_path, goal.output_directory);
std::this_thread::sleep_for(std::chrono::milliseconds(50 + rand() % 200));
result.quality_score = 0.8f + (rand() % 200) / 1000.0f;
if (result.quality_score >= goal.quality_threshold) {
result.output_size = result.input_size * 0.7f;
result.success = true;
}
auto end = std::chrono::high_resolution_clock::now();
result.processing_time_ms =
std::chrono::duration_cast<std::chrono::milliseconds>(end - start).count();
return result;
}
std::vector<std::string> scanDirectory(const std::string& directory,
const DataPipelineGoal& goal) {
std::vector<std::string> files;
for (int i = 0; i < 100; ++i) {
files.push_back(directory + "/file_" + std::to_string(i) + ".dat");
}
return files;
}
void generateSummaryReport(DataPipelineResult& result, const DataPipelineGoal& goal) {
snprintf(result.summary_report, 2047,
"パイプライン処理完了\n"
"総ファイル数: %u\n"
"成功: %u\n"
"失敗: %u\n"
"スキップ: %u\n"
"処理時間: %.2f秒\n"
"平均速度: %.2f MB/s\n"
"圧縮率: %.2f\n",
result.total_files,
result.processed_files,
result.failed_files,
result.skipped_files,
result.total_processing_time_ms / 1000.0f,
result.average_processing_speed_mbps,
result.overall_compression_ratio);
}
};
🛠️ パフォーマンス最適化テクニック
1. 状態管理の最適化
#include <unordered_map>
#include <memory>
#include <atomic>
class OptimizedActionStateManager {
private:
struct ActionState {
std::atomic<ActionStatus> status;
std::atomic<float> progress;
std::atomic<uint64_t> last_update_time;
std::string status_text;
std::mutex text_mutex;
};
std::unordered_map<uint64_t, std::unique_ptr<ActionState>> states_;
std::shared_mutex states_mutex_;
std::vector<std::function<void(uint64_t, ActionStatus)>> status_callbacks_;
std::mutex callbacks_mutex_;
public:
uint64_t createAction(uint64_t goal_id) {
std::unique_lock<std::shared_mutex> lock(states_mutex_);
auto state = std::make_unique<ActionState>();
state->status.store(ActionStatus::PENDING);
state->progress.store(0.0f);
state->last_update_time.store(getCurrentTimeUs());
states_[goal_id] = std::move(state);
notifyStatusChange(goal_id, ActionStatus::PENDING);
return goal_id;
}
void updateProgress(uint64_t goal_id, float progress, const std::string& text = "") {
std::shared_lock<std::shared_mutex> lock(states_mutex_);
auto it = states_.find(goal_id);
if (it != states_.end()) {
it->second->progress.store(progress);
it->second->last_update_time.store(getCurrentTimeUs());
if (!text.empty()) {
std::lock_guard<std::mutex> text_lock(it->second->text_mutex);
it->second->status_text = text;
}
}
}
void setStatus(uint64_t goal_id, ActionStatus status) {
std::shared_lock<std::shared_mutex> lock(states_mutex_);
auto it = states_.find(goal_id);
if (it != states_.end()) {
ActionStatus old_status = it->second->status.exchange(status);
if (old_status != status) {
notifyStatusChange(goal_id, status);
}
}
}
ActionStatus getStatus(uint64_t goal_id) const {
std::shared_lock<std::shared_mutex> lock(states_mutex_);
auto it = states_.find(goal_id);
return (it != states_.end()) ? it->second->status.load() : ActionStatus::PENDING;
}
float getProgress(uint64_t goal_id) const {
std::shared_lock<std::shared_mutex> lock(states_mutex_);
auto it = states_.find(goal_id);
return (it != states_.end()) ? it->second->progress.load() : 0.0f;
}
void addStatusCallback(std::function<void(uint64_t, ActionStatus)> callback) {
std::lock_guard<std::mutex> lock(callbacks_mutex_);
status_callbacks_.push_back(callback);
}
void cleanupExpiredStates(uint64_t expiry_time_us) {
std::unique_lock<std::shared_mutex> lock(states_mutex_);
auto current_time = getCurrentTimeUs();
for (auto it = states_.begin(); it != states_.end();) {
if (current_time - it->second->last_update_time.load() > expiry_time_us) {
it = states_.erase(it);
} else {
++it;
}
}
}
private:
void notifyStatusChange(uint64_t goal_id, ActionStatus status) {
std::lock_guard<std::mutex> lock(callbacks_mutex_);
for (const auto& callback : status_callbacks_) {
try {
callback(goal_id, status);
} catch (const std::exception& e) {
std::cerr << "Status callback error: " << e.what() << std::endl;
}
}
}
uint64_t getCurrentTimeUs() const {
return std::chrono::duration_cast<std::chrono::microseconds>(
std::chrono::steady_clock::now().time_since_epoch()).count();
}
};
2. 高性能フィードバック配信
class BatchedFeedbackSystem {
private:
struct FeedbackBatch {
std::vector<uint64_t> goal_ids;
std::vector<std::string> feedback_data;
uint64_t timestamp;
};
std::queue<FeedbackBatch> pending_batches_;
std::mutex batch_mutex_;
std::condition_variable batch_cv_;
std::thread delivery_thread_;
std::atomic<bool> running_;
size_t max_batch_size_;
uint32_t batch_timeout_ms_;
public:
BatchedFeedbackSystem(size_t max_batch_size = 100, uint32_t batch_timeout_ms = 50)
: max_batch_size_(max_batch_size), batch_timeout_ms_(batch_timeout_ms), running_(false) {}
void start() {
running_ = true;
delivery_thread_ = std::thread(&BatchedFeedbackSystem::deliveryLoop, this);
}
void stop() {
running_ = false;
batch_cv_.notify_all();
if (delivery_thread_.joinable()) {
delivery_thread_.join();
}
}
void addFeedback(uint64_t goal_id, const std::string& feedback) {
std::lock_guard<std::mutex> lock(batch_mutex_);
if (pending_batches_.empty() ||
pending_batches_.back().goal_ids.size() >= max_batch_size_) {
FeedbackBatch new_batch;
new_batch.timestamp = getCurrentTimeMs();
pending_batches_.push(new_batch);
}
auto& current_batch = pending_batches_.back();
current_batch.goal_ids.push_back(goal_id);
current_batch.feedback_data.push_back(feedback);
if (current_batch.goal_ids.size() >= max_batch_size_) {
batch_cv_.notify_one();
}
}
private:
void deliveryLoop() {
while (running_) {
std::unique_lock<std::mutex> lock(batch_mutex_);
batch_cv_.wait_for(lock, std::chrono::milliseconds(batch_timeout_ms_),
[this] { return !pending_batches_.empty() || !running_; });
if (!running_) break;
std::vector<FeedbackBatch> batches_to_deliver;
uint64_t current_time = getCurrentTimeMs();
while (!pending_batches_.empty()) {
auto& batch = pending_batches_.front();
if (batch.goal_ids.size() >= max_batch_size_ ||
(current_time - batch.timestamp) >= batch_timeout_ms_) {
batches_to_deliver.push_back(std::move(batch));
pending_batches_.pop();
} else {
break;
}
}
lock.unlock();
for (const auto& batch : batches_to_deliver) {
deliverBatch(batch);
}
}
}
void deliverBatch(const FeedbackBatch& batch) {
try {
std::cout << "フィードバックバッチ配信: " << batch.goal_ids.size()
<< " items" << std::endl;
} catch (const std::exception& e) {
std::cerr << "Feedback delivery error: " << e.what() << std::endl;
}
}
uint64_t getCurrentTimeMs() const {
return std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::steady_clock::now().time_since_epoch()).count();
}
};
📊 パフォーマンス測定とベンチマーク
詳細ベンチマーク
#include <chrono>
#include <algorithm>
#include <numeric>
#include <iomanip>
class ActionPerformanceBenchmark {
private:
struct BenchmarkResult {
std::vector<double> goal_submission_times;
std::vector<double> feedback_latencies;
std::vector<double> completion_times;
std::vector<double> throughput_measurements;
};
BenchmarkResult results_;
public:
void runActionBenchmark() {
std::cout << "=== Action通信ベンチマーク開始 ===" << std::endl;
measureGoalSubmissionLatency();
measureFeedbackLatency();
measureLongRunningActionThroughput();
measureConcurrentActionProcessing();
measureMemoryEfficiency();
analyzeResults();
}
private:
void measureGoalSubmissionLatency() {
std::cout << "ゴール送信レイテンシ測定中..." << std::endl;
ActionServer<int, int, float> server("latency_test");
ActionClient<int, int, float> client("latency_test");
std::thread server_thread([&server]() {
for (int i = 0; i < 1000; ++i) {
if (server.hasGoal()) {
uint64_t goal_id = server.acceptGoal();
int goal = server.getGoal(goal_id);
server.setSucceeded(goal_id, goal * 2);
}
std::this_thread::sleep_for(std::chrono::microseconds(10));
}
});
results_.goal_submission_times.clear();
results_.goal_submission_times.reserve(1000);
for (int i = 0; i < 1000; ++i) {
auto start = std::chrono::high_resolution_clock::now();
uint64_t goal_id = client.sendGoal(i);
auto end = std::chrono::high_resolution_clock::now();
auto duration = std::chrono::duration_cast<std::chrono::microseconds>(
end - start).count();
results_.goal_submission_times.push_back(duration);
while (!client.isComplete(goal_id)) {
std::this_thread::sleep_for(std::chrono::microseconds(10));
}
}
server_thread.join();
}
void measureFeedbackLatency() {
std::cout << "フィードバックレイテンシ測定中..." << std::endl;
ActionServer<int, int, float> server("feedback_test");
ActionClient<int, int, float> client("feedback_test");
std::atomic<bool> measurement_complete{false};
results_.feedback_latencies.clear();
std::thread server_thread([&]() {
while (!measurement_complete) {
if (server.hasGoal()) {
uint64_t goal_id = server.acceptGoal();
for (int i = 0; i < 100; ++i) {
auto start = std::chrono::high_resolution_clock::now();
server.publishFeedback(goal_id, (float)i);
auto end = std::chrono::high_resolution_clock::now();
auto duration = std::chrono::duration_cast<std::chrono::microseconds>(
end - start).count();
{
static std::mutex latency_mutex;
std::lock_guard<std::mutex> lock(latency_mutex);
results_.feedback_latencies.push_back(duration);
}
std::this_thread::sleep_for(std::chrono::milliseconds(10));
}
server.setSucceeded(goal_id, 100);
measurement_complete = true;
}
std::this_thread::sleep_for(std::chrono::microseconds(100));
}
});
uint64_t goal_id = client.sendGoal(1);
while (!client.isComplete(goal_id)) {
float feedback;
if (client.getFeedback(goal_id, feedback)) {
}
std::this_thread::sleep_for(std::chrono::milliseconds(5));
}
server_thread.join();
}
void measureLongRunningActionThroughput() {
std::cout << "長時間アクションスループット測定中..." << std::endl;
ActionServer<int, int, float> server("throughput_test");
ActionClient<int, int, float> client("throughput_test");
const int NUM_ACTIONS = 50;
const int ACTION_DURATION_MS = 200;
std::atomic<int> completed_actions{0};
std::thread server_thread([&]() {
while (completed_actions < NUM_ACTIONS) {
if (server.hasGoal()) {
uint64_t goal_id = server.acceptGoal();
int goal = server.getGoal(goal_id);
std::thread([&, goal_id, goal]() {
for (int i = 0; i < 10; ++i) {
server.publishFeedback(goal_id, (float)i * 10.0f);
std::this_thread::sleep_for(
std::chrono::milliseconds(ACTION_DURATION_MS / 10));
}
server.setSucceeded(goal_id, goal);
completed_actions++;
}).detach();
}
std::this_thread::sleep_for(std::chrono::milliseconds(1));
}
});
auto start_time = std::chrono::high_resolution_clock::now();
std::vector<uint64_t> goal_ids;
for (int i = 0; i < NUM_ACTIONS; ++i) {
goal_ids.push_back(client.sendGoal(i));
}
for (uint64_t goal_id : goal_ids) {
while (!client.isComplete(goal_id)) {
std::this_thread::sleep_for(std::chrono::milliseconds(10));
}
}
auto end_time = std::chrono::high_resolution_clock::now();
auto total_duration = std::chrono::duration_cast<std::chrono::milliseconds>(
end_time - start_time);
double throughput = (double)NUM_ACTIONS / total_duration.count() * 1000.0;
results_.throughput_measurements.push_back(throughput);
server_thread.join();
std::cout << "スループット: " << throughput << " actions/sec" << std::endl;
}
void measureConcurrentActionProcessing() {
std::cout << "並列アクション処理性能測定中..." << std::endl;
ActionServer<int, int, float> server("concurrent_test");
std::vector<int> client_counts = {1, 2, 4, 8, 16};
for (int client_count : client_counts) {
auto start = std::chrono::high_resolution_clock::now();
std::atomic<int> completed{0};
std::vector<std::thread> client_threads;
std::thread server_thread([&]() {
while (completed < client_count * 10) {
if (server.hasGoal()) {
uint64_t goal_id = server.acceptGoal();
int goal = server.getGoal(goal_id);
std::this_thread::sleep_for(std::chrono::milliseconds(10));
server.setSucceeded(goal_id, goal);
completed++;
}
std::this_thread::sleep_for(std::chrono::microseconds(100));
}
});
for (int i = 0; i < client_count; ++i) {
client_threads.emplace_back([i]() {
ActionClient<int, int, float> client("concurrent_test");
for (int j = 0; j < 10; ++j) {
uint64_t goal_id = client.sendGoal(i * 10 + j);
while (!client.isComplete(goal_id)) {
std::this_thread::sleep_for(std::chrono::milliseconds(1));
}
}
});
}
for (auto& thread : client_threads) {
thread.join();
}
server_thread.join();
auto end = std::chrono::high_resolution_clock::now();
auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(
end - start).count();
double concurrent_throughput = (double)(client_count * 10) / duration * 1000.0;
std::cout << "同時クライアント数 " << client_count << ": "
<< concurrent_throughput << " actions/sec" << std::endl;
}
}
void measureMemoryEfficiency() {
std::cout << "メモリ効率測定中..." << std::endl;
ActionServer<std::vector<char>, std::vector<char>, float> server("memory_test");
ActionClient<std::vector<char>, std::vector<char>, float> client("memory_test");
std::vector<size_t> data_sizes = {1024, 10240, 102400, 1048576};
for (size_t size : data_sizes) {
std::vector<char> large_data(size, 'X');
auto start = std::chrono::high_resolution_clock::now();
uint64_t goal_id = client.sendGoal(large_data);
std::thread([&]() {
if (server.hasGoal()) {
uint64_t goal_id = server.acceptGoal();
auto goal = server.getGoal(goal_id);
server.setSucceeded(goal_id, goal);
}
}).detach();
while (!client.isComplete(goal_id)) {
std::this_thread::sleep_for(std::chrono::milliseconds(1));
}
auto end = std::chrono::high_resolution_clock::now();
auto duration = std::chrono::duration_cast<std::chrono::microseconds>(
end - start).count();
double bandwidth = (double)size / duration;
std::cout << "データサイズ " << size << " bytes: "
<< bandwidth << " MB/s" << std::endl;
}
}
void analyzeResults() {
std::cout << "\n=== ベンチマーク結果解析 ===" << std::endl;
if (!results_.goal_submission_times.empty()) {
analyzeLatencyVector(results_.goal_submission_times, "ゴール送信レイテンシ");
}
if (!results_.feedback_latencies.empty()) {
analyzeLatencyVector(results_.feedback_latencies, "フィードバックレイテンシ");
}
if (!results_.throughput_measurements.empty()) {
double avg_throughput = std::accumulate(
results_.throughput_measurements.begin(),
results_.throughput_measurements.end(), 0.0) /
results_.throughput_measurements.size();
std::cout << "平均スループット: " << std::fixed << std::setprecision(2)
<< avg_throughput << " actions/sec" << std::endl;
}
}
void analyzeLatencyVector(const std::vector<double>& latencies, const std::string& name) {
auto sorted_latencies = latencies;
std::sort(sorted_latencies.begin(), sorted_latencies.end());
double mean = std::accumulate(latencies.begin(), latencies.end(), 0.0) / latencies.size();
double min_val = sorted_latencies.front();
double max_val = sorted_latencies.back();
double p50 = sorted_latencies[sorted_latencies.size() * 0.5];
double p95 = sorted_latencies[sorted_latencies.size() * 0.95];
double p99 = sorted_latencies[sorted_latencies.size() * 0.99];
std::cout << name << " (μs):" << std::endl;
std::cout << std::fixed << std::setprecision(2);
std::cout << " 平均: " << mean << std::endl;
std::cout << " 最小: " << min_val << std::endl;
std::cout << " 最大: " << max_val << std::endl;
std::cout << " 50%ile: " << p50 << std::endl;
std::cout << " 95%ile: " << p95 << std::endl;
std::cout << " 99%ile: " << p99 << std::endl;
if (p99 < 1000.0) {
std::cout << " 🏆 優秀: 99%のレイテンシが1ms以下" << std::endl;
} else if (p99 < 10000.0) {
std::cout << " 👍 良好: 99%のレイテンシが10ms以下" << std::endl;
} else {
std::cout << " ⚠️ 要改善: レイテンシが高めです" << std::endl;
}
}
};
int main() {
try {
ActionPerformanceBenchmark benchmark;
benchmark.runActionBenchmark();
} catch (const std::exception& e) {
std::cerr << "ベンチマークエラー: " << e.what() << std::endl;
return 1;
}
return 0;
}
❓ よくある質問
Q1. アクションが完了しない場合はどうすればよいですか?
A: 以下をチェックしてください:
- サーバーが正しく動作し、ゴールを受け付けているか
- サーバー側で例外やデッドロックが発生していないか
- アクション内でキャンセル確認を定期的に行っているか
- タイムアウト設定が適切か
Q2. フィードバックが届かない場合の原因は?
A: 考えられる原因:
- フィードバック送信間隔が短すぎる(バッファオーバーフロー)
- 共有メモリのサイズ不足
- クライアント側でフィードバック読み取りが遅い
- ネットワーク遅延(リモート使用時)
Q3. 長時間アクションの途中でプロセスがクラッシュした場合は?
A: Action通信では以下の対策があります:
- チェックポイント保存: 定期的に進捗を永続化
- 再開機能: 中断点からの復旧機能
- 状態監視: 別プロセスでアクション状態を監視
- タイムアウト設定: 無応答状態の自動検出
Q4. 複数のアクションを並列実行する際の注意点は?
A: 以下に注意してください:
- リソース競合: ファイルやデバイスの排他制御
- メモリ使用量: 各アクションのメモリ消費量を考慮
- CPU負荷: 適切なワーカー数の設定
- 優先度制御: 重要なアクションの優先実行
🔧 トラブルシューティング
よくある問題と解決法
#include <iostream>
void diagnose_action_communication() {
using namespace irlab::shm;
std::cout << "=== Action通信診断 ===" << std::endl;
try {
ActionServer<int, int, float> server("diagnostic_action");
std::cout << "✅ サーバー作成成功" << std::endl;
ActionClient<int, int, float> client("diagnostic_action");
std::cout << "✅ クライアント作成成功" << std::endl;
std::thread server_thread([&server]() {
int test_count = 0;
while (test_count < 3) {
if (server.hasGoal()) {
uint64_t goal_id = server.acceptGoal();
int goal = server.getGoal(goal_id);
std::cout << "サーバー: ゴール受信 " << goal << std::endl;
for (int i = 0; i < 5; ++i) {
server.publishFeedback(goal_id, (float)i * 20.0f);
std::cout << "サーバー: フィードバック送信 " << (i * 20) << "%" << std::endl;
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
server.setSucceeded(goal_id, goal * 2);
std::cout << "サーバー: 結果送信 " << (goal * 2) << std::endl;
test_count++;
}
std::this_thread::sleep_for(std::chrono::milliseconds(50));
}
});
std::this_thread::sleep_for(std::chrono::milliseconds(100));
uint64_t goal_id = client.sendGoal(21);
std::cout << "クライアント: ゴール送信 21" << std::endl;
bool feedback_received = false;
while (!client.isComplete(goal_id)) {
float feedback;
if (client.getFeedback(goal_id, feedback)) {
std::cout << "クライアント: フィードバック受信 " << feedback << "%" << std::endl;
feedback_received = true;
}
std::this_thread::sleep_for(std::chrono::milliseconds(50));
}
if (client.getState(goal_id) == ActionStatus::SUCCEEDED) {
int result = client.getResult(goal_id);
std::cout << "クライアント: 結果受信 " << result << std::endl;
if (result == 42 && feedback_received) {
std::cout << "✅ 通信テスト成功" << std::endl;
} else {
std::cout << "❌ 通信テスト失敗: 期待値42、実際値" << result
<< ", フィードバック受信: " << (feedback_received ? "○" : "×") << std::endl;
}
} else {
std::cout << "❌ 通信テスト失敗: アクションが失敗" << std::endl;
}
server_thread.join();
} catch (const std::exception& e) {
std::cout << "❌ 診断エラー: " << e.what() << std::endl;
std::cout << "以下を確認してください:" << std::endl;
std::cout << " - 共有メモリの権限設定" << std::endl;
std::cout << " - 他のプロセスがアクションを使用中でないか" << std::endl;
std::cout << " - システムリソースの不足" << std::endl;
std::cout << " - メモリサイズの設定" << std::endl;
}
}
class RobustActionClient {
private:
ActionClient<GoalType, ResultType, FeedbackType> client_;
std::string action_name_;
public:
RobustActionClient(const std::string& action_name)
: client_(action_name), action_name_(action_name) {}
bool executeGoalWithRetry(const GoalType& goal, ResultType& result,
int max_retries = 3, int timeout_ms = 30000) {
for (int retry = 0; retry < max_retries; ++retry) {
try {
std::cout << "ゴール実行 (試行 " << (retry + 1) << "/" << max_retries << ")" << std::endl;
uint64_t goal_id = client_.sendGoal(goal);
auto start_time = std::chrono::high_resolution_clock::now();
while (!client_.isComplete(goal_id)) {
auto current_time = std::chrono::high_resolution_clock::now();
auto elapsed = std::chrono::duration_cast<std::chrono::milliseconds>(
current_time - start_time);
if (elapsed.count() > timeout_ms) {
std::cout << "タイムアウト: ゴールをキャンセルします..." << std::endl;
client_.cancelGoal(goal_id);
break;
}
FeedbackType feedback;
if (client_.getFeedback(goal_id, feedback)) {
displayFeedback(feedback);
}
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
ActionStatus status = client_.getState(goal_id);
if (status == ActionStatus::SUCCEEDED) {
result = client_.getResult(goal_id);
std::cout << "成功: ゴール完了" << std::endl;
return true;
} else {
std::cout << "失敗: " << actionStatusToString(status) << std::endl;
if (retry < max_retries - 1) {
std::cout << "再試行します..." << std::endl;
std::this_thread::sleep_for(std::chrono::seconds(1));
}
}
} catch (const std::exception& e) {
std::cout << "エラー: " << e.what() << std::endl;
if (retry < max_retries - 1) {
std::cout << "再試行します..." << std::endl;
std::this_thread::sleep_for(std::chrono::seconds(2));
} else {
std::cout << "最大再試行回数に達しました" << std::endl;
throw;
}
}
}
return false;
}
private:
void displayFeedback(const FeedbackType& feedback) {
std::cout << "進捗更新..." << std::endl;
}
std::string actionStatusToString(ActionStatus status) {
switch (status) {
case ActionStatus::PENDING: return "待機中";
case ActionStatus::ACTIVE: return "実行中";
case ActionStatus::PREEMPTED: return "中断";
case ActionStatus::SUCCEEDED: return "成功";
case ActionStatus::ABORTED: return "異常終了";
case ActionStatus::REJECTED: return "拒否";
case ActionStatus::RECALLED: return "取り消し";
default: return "不明";
}
}
};
📚 次のステップ
Action通信をマスターしたら、以下の高度なトピックに挑戦してみましょう:
- 📡 Pub/Sub通信 - 高速ブロードキャスト通信
- 🤝 Service通信 - 確実な要求応答通信
- 🐍 Python連携 - PythonでAction通信
📄 ライセンス情報
本ドキュメントで紹介しているサンプルコードは、shared-memory-based-handy-communication-manager プロジェクトの一部として Apache License 2.0 の下で提供されています。
- ✅ 商用利用可能: サンプルコードを商業プロジェクトで自由に使用
- ✅ 改変可能: ニーズに合わせてコードを修正・拡張
- ✅ 再配布可能: ライセンス表示を保持して再配布
詳細はLICENSEファイルをご確認ください。
この完全ガイドで、Action通信の真の力を引き出し、次世代の高度な非同期処理システムを構築しましょう! 🚀✨