[English | ๆฅๆฌ่ช]
๐ฏ What You'll Learn in This Guide
- Deep Understanding of Service Communication: From design philosophy to implementation details of request-response patterns
- Reliable Data Exchange: Timeout handling, error management, and retry mechanisms
- High-Performance Server Design: Concurrent processing, load balancing, and memory efficiency
- Practical Applications: Database operations, computation services, and configuration management
๐ง Deep Understanding of Service Communication
๐๏ธ Architecture Overview
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ Shared Memory Space โ
โ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ โ
โ โ Request Queue โ โ
โ โ [Req 0][Req 1][Req 2]...[Req N-1] โ โ
โ โ โ โ โ โ
โ โ Process Pos Add Pos โ โ
โ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ โ
โ โ
โ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ โ
โ โ Response Queue โ โ
โ โ [Res 0][Res 1][Res 2]...[Res N-1] โ โ
โ โ โ โ โ โ
โ โ Read Pos Write Pos โ โ
โ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ โ
โ โ
โ Request-Response Mapping: โ
โ - Request ID (unique identifier) โ
โ - Timestamp (timeout management) โ
โ - Process State (waiting/processing/completed) โ
โ - Error Code (detailed failure information) โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
Multiple Clients โ [shared memory] โ Single Server
โ โ
Send Request Processing Engine
Receive Response โ
โ Parallel Processing
Timeout Management Result Generation
โก Why is it Reliable and Fast?
1. Both Synchronous and Asynchronous Support
ServiceClient<int, int> client("calc_service");
client.sendRequest(42);
if (client.waitForResponse(5000000)) {
int result = client.getResponse();
std::cout << "Result: " << result << std::endl;
}
client.sendRequestAsync(42);
if (client.checkResponse()) {
int result = client.getResponse();
std::cout << "Result: " << result << std::endl;
}
2. Reliable Request-Response Mapping
struct RequestHeader {
uint64_t request_id;
uint64_t timestamp_us;
uint32_t timeout_ms;
uint32_t retry_count;
uint32_t priority;
RequestStatus status;
};
class RequestTracker {
std::unordered_map<uint64_t, RequestInfo> pending_requests_;
std::mutex requests_mutex_;
public:
uint64_t addRequest(const RequestInfo& info) {
std::lock_guard<std::mutex> lock(requests_mutex_);
uint64_t id = generateUniqueId();
pending_requests_[id] = info;
return id;
}
bool checkResponse(uint64_t request_id, ResponseInfo& response) {
std::lock_guard<std::mutex> lock(requests_mutex_);
auto it = pending_requests_.find(request_id);
if (it != pending_requests_.end() && it->second.status == COMPLETED) {
response = it->second.response;
pending_requests_.erase(it);
return true;
}
return false;
}
};
๐ Basic Usage Examples
1. Simple Calculator Service
Server Side:
#include <iostream>
#include <thread>
using namespace irlab::shm;
int main() {
ServiceServer<int, int> calc_server("calculator");
std::cout << "๐ฅ๏ธ Calculator service started!" << std::endl;
while (true) {
if (calc_server.hasRequest()) {
int input = calc_server.getRequest();
std::cout << "Processing request: " << input << std::endl;
int result = input * input;
calc_server.sendResponse(result);
std::cout << "Sent result: " << result << std::endl;
}
std::this_thread::sleep_for(std::chrono::milliseconds(1));
}
return 0;
}
ใกใขใชใฎๆ ผ็ดๆนๆณใ่ฆๅฎใใใฏใฉในใฎๅฎ็พฉ
Client Side:
#include <iostream>
#include <thread>
using namespace irlab::shm;
int main() {
ServiceClient<int, int> calc_client("calculator");
std::cout << "๐ข Calculator client started!" << std::endl;
for (int i = 1; i <= 10; ++i) {
std::cout << "Sending request: " << i << std::endl;
calc_client.sendRequest(i);
if (calc_client.waitForResponse(5000000)) {
int result = calc_client.getResponse();
std::cout << "โ
" << i << "ยฒ = " << result << std::endl;
} else {
std::cout << "โ Timeout for request: " << i << std::endl;
}
std::this_thread::sleep_for(std::chrono::milliseconds(500));
}
return 0;
}
2. String Processing Service
struct StringRequest {
char text[256];
int operation;
};
struct StringResponse {
char result[256];
bool success;
int error_code;
};
class StringProcessorServer {
private:
ServiceServer<StringRequest, StringResponse> server_;
public:
StringProcessorServer() : server_("string_processor") {}
void run() {
std::cout << "๐ String Processor Service started!" << std::endl;
while (true) {
if (server_.hasRequest()) {
StringRequest req = server_.getRequest();
StringResponse resp = processString(req);
server_.sendResponse(resp);
}
std::this_thread::sleep_for(std::chrono::milliseconds(1));
}
}
private:
StringResponse processString(const StringRequest& req) {
StringResponse resp;
resp.success = true;
resp.error_code = 0;
std::string text(req.text);
try {
switch (req.operation) {
case 0:
std::transform(text.begin(), text.end(), text.begin(), ::toupper);
break;
case 1:
std::transform(text.begin(), text.end(), text.begin(), ::tolower);
break;
case 2:
std::reverse(text.begin(), text.end());
break;
default:
resp.success = false;
resp.error_code = 1;
strcpy(resp.result, "Invalid operation");
return resp;
}
strcpy(resp.result, text.c_str());
} catch (const std::exception& e) {
resp.success = false;
resp.error_code = 2;
strcpy(resp.result, e.what());
}
return resp;
}
};
void useStringProcessor() {
ServiceClient<StringRequest, StringResponse> client("string_processor");
StringRequest req;
strcpy(req.text, "Hello World");
req.operation = 0;
client.sendRequest(req);
if (client.waitForResponse(3000000)) {
StringResponse resp = client.getResponse();
if (resp.success) {
std::cout << "Result: " << resp.result << std::endl;
} else {
std::cout << "Error: " << resp.result << " (code: " << resp.error_code << ")" << std::endl;
}
}
}
๐ง Advanced Features and Error Handling
3. Robust Service with Timeout and Retry
class RobustServiceClient {
private:
ServiceClient<int, int> client_;
static const int MAX_RETRIES = 3;
static const int TIMEOUT_US = 2000000;
public:
RobustServiceClient(const std::string& service_name) : client_(service_name) {}
bool callService(int input, int& output) {
for (int retry = 0; retry < MAX_RETRIES; ++retry) {
std::cout << "Attempt " << (retry + 1) << "/" << MAX_RETRIES << std::endl;
client_.sendRequest(input);
if (client_.waitForResponse(TIMEOUT_US)) {
output = client_.getResponse();
std::cout << "โ
Service call successful" << std::endl;
return true;
} else {
std::cout << "โฐ Timeout on attempt " << (retry + 1) << std::endl;
if (retry < MAX_RETRIES - 1) {
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
}
}
std::cout << "โ Service call failed after " << MAX_RETRIES << " attempts" << std::endl;
return false;
}
};
int main() {
RobustServiceClient client("robust_calc");
int result;
if (client.callService(42, result)) {
std::cout << "Final result: " << result << std::endl;
} else {
std::cout << "Service unavailable" << std::endl;
}
return 0;
}
4. High-Performance Concurrent Server
class ConcurrentCalculatorServer {
private:
ServiceServer<int, int> server_;
std::atomic<bool> running_;
std::vector<std::thread> worker_threads_;
std::queue<std::pair<int, std::promise<int>>> task_queue_;
std::mutex queue_mutex_;
std::condition_variable queue_cv_;
public:
ConcurrentCalculatorServer(int num_workers = 4)
: server_("concurrent_calc"), running_(true) {
for (int i = 0; i < num_workers; ++i) {
worker_threads_.emplace_back([this, i]() {
workerLoop(i);
});
}
std::cout << "๐ Concurrent server started with " << num_workers << " workers" << std::endl;
}
~ConcurrentCalculatorServer() {
running_ = false;
queue_cv_.notify_all();
for (auto& thread : worker_threads_) {
if (thread.joinable()) {
thread.join();
}
}
}
void run() {
while (running_) {
if (server_.hasRequest()) {
int request = server_.getRequest();
std::promise<int> result_promise;
auto result_future = result_promise.get_future();
{
std::lock_guard<std::mutex> lock(queue_mutex_);
task_queue_.emplace(request, std::move(result_promise));
}
queue_cv_.notify_one();
int result = result_future.get();
server_.sendResponse(result);
}
std::this_thread::sleep_for(std::chrono::microseconds(100));
}
}
private:
void workerLoop(int worker_id) {
while (running_) {
std::unique_lock<std::mutex> lock(queue_mutex_);
queue_cv_.wait(lock, [this]() { return !task_queue_.empty() || !running_; });
if (!running_) break;
auto task = std::move(task_queue_.front());
task_queue_.pop();
lock.unlock();
int input = task.first;
int result = performComplexCalculation(input, worker_id);
task.second.set_value(result);
}
}
int performComplexCalculation(int input, int worker_id) {
std::this_thread::sleep_for(std::chrono::milliseconds(10));
std::cout << "Worker " << worker_id << " processed: " << input << std::endl;
return input * input + input;
}
};
๐ Performance Measurement and Optimization
5. Latency Benchmarking
#include <chrono>
#include <vector>
#include <algorithm>
class ServiceBenchmark {
private:
ServiceClient<int, int> client_;
std::vector<double> latencies_;
public:
ServiceBenchmark(const std::string& service_name) : client_(service_name) {}
void runBenchmark(int num_requests = 1000) {
std::cout << "๐ Starting benchmark with " << num_requests << " requests..." << std::endl;
latencies_.clear();
latencies_.reserve(num_requests);
for (int i = 0; i < num_requests; ++i) {
auto start = std::chrono::high_resolution_clock::now();
client_.sendRequest(i);
if (client_.waitForResponse(1000000)) {
auto end = std::chrono::high_resolution_clock::now();
auto duration = std::chrono::duration_cast<std::chrono::microseconds>(end - start);
latencies_.push_back(duration.count());
client_.getResponse();
} else {
std::cout << "โ Timeout on request " << i << std::endl;
}
std::this_thread::sleep_for(std::chrono::microseconds(100));
}
printStatistics();
}
private:
void printStatistics() {
if (latencies_.empty()) {
std::cout << "No successful requests" << std::endl;
return;
}
std::sort(latencies_.begin(), latencies_.end());
double avg = std::accumulate(latencies_.begin(), latencies_.end(), 0.0) / latencies_.size();
double median = latencies_[latencies_.size() / 2];
double min_latency = latencies_.front();
double max_latency = latencies_.back();
double p95 = latencies_[static_cast<size_t>(latencies_.size() * 0.95)];
double p99 = latencies_[static_cast<size_t>(latencies_.size() * 0.99)];
std::cout << "\n๐ Benchmark Results:" << std::endl;
std::cout << "Successful requests: " << latencies_.size() << std::endl;
std::cout << "Average latency: " << avg << " ฮผs" << std::endl;
std::cout << "Median latency: " << median << " ฮผs" << std::endl;
std::cout << "Min latency: " << min_latency << " ฮผs" << std::endl;
std::cout << "Max latency: " << max_latency << " ฮผs" << std::endl;
std::cout << "95th percentile: " << p95 << " ฮผs" << std::endl;
std::cout << "99th percentile: " << p99 << " ฮผs" << std::endl;
std::cout << "Throughput: " << (1000000.0 / avg) << " requests/second" << std::endl;
}
};
๐ ๏ธ Real-World Applications
6. Database Service Proxy
struct DatabaseQuery {
char sql[512];
int query_type;
int timeout_ms;
};
struct DatabaseResult {
char data[1024];
int row_count;
bool success;
int error_code;
char error_message[256];
};
class DatabaseServiceProxy {
private:
ServiceServer<DatabaseQuery, DatabaseResult> server_;
public:
DatabaseServiceProxy() : server_("database_service") {}
void run() {
while (true) {
if (server_.hasRequest()) {
DatabaseQuery query = server_.getRequest();
DatabaseResult result = executeQuery(query);
server_.sendResponse(result);
}
std::this_thread::sleep_for(std::chrono::milliseconds(1));
}
}
private:
DatabaseResult executeQuery(const DatabaseQuery& query) {
DatabaseResult result;
result.success = true;
result.error_code = 0;
result.row_count = 0;
try {
std::this_thread::sleep_for(std::chrono::milliseconds(query.timeout_ms / 10));
switch (query.query_type) {
case 0:
strcpy(result.data, "[{\"id\":1,\"name\":\"test\"}]");
result.row_count = 1;
break;
case 1:
strcpy(result.data, "INSERT successful");
result.row_count = 1;
break;
default:
strcpy(result.data, "Operation completed");
result.row_count = 0;
}
} catch (const std::exception& e) {
result.success = false;
result.error_code = 500;
strcpy(result.error_message, e.what());
}
return result;
}
};
๐ฏ Best Practices and Guidelines
Design Principles
- Keep Request/Response Data Small
struct CalculationRequest {
double value;
int operation;
};
struct HugeRequest {
char large_buffer[1024*1024];
};
- Use Appropriate Timeouts
const int QUICK_OPERATION_TIMEOUT = 100000;
const int NORMAL_OPERATION_TIMEOUT = 1000000;
const int LONG_OPERATION_TIMEOUT = 10000000;
- Implement Proper Error Handling
enum class ServiceError {
SUCCESS = 0,
TIMEOUT = 1,
INVALID_REQUEST = 2,
PROCESSING_ERROR = 3,
SERVICE_UNAVAILABLE = 4
};
Performance Tips
- Minimize Memory Allocations
- Use Async Pattern for High Throughput
- Implement Connection Pooling for Multiple Services
- Monitor Service Health and Performance
๐ Troubleshooting Common Issues
Common Problems and Solutions
- Service Server Not Responding
- Check if server process is running
- Verify service name matches exactly
- Ensure shared memory permissions
- Client Timeout Issues
- Increase timeout value
- Check server processing time
- Implement retry mechanism
- Memory Issues
- Monitor shared memory usage
- Implement proper cleanup
- Use appropriate buffer sizes
๐ Next Steps
๐ Congratulations! You've mastered reliable request-response communication! Your distributed applications now have rock-solid service communication! ๐โจ