CppZmqZoltanExt 0.0.1
Loading...
Searching...
No Matches
actor.cpp
Go to the documentation of this file.
1/*
2MIT License
3
4Copyright (c) 2025 Luan Young
5
6Permission is hereby granted, free of charge, to any person obtaining a copy
7of this software and associated documentation files (the "Software"), to deal
8in the Software without restriction, including without limitation the rights
9to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
10copies of the Software, and to permit persons to whom the Software is
11furnished to do so, subject to the following conditions:
12
13The above copyright notice and this permission notice shall be included in all
14copies or substantial portions of the Software.
15
16THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
17IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
18FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
19AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
20LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
21OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
22SOFTWARE.
23*/
37
38#include <algorithm>
39#include <cerrno>
40#include <limits>
41#include <random>
42#include <thread>
43
46
47namespace zmqzext {
48
49actor_t::actor_t(zmq::context_t& context)
50 : _parent_socket(context, ZMQ_PAIR),
51 _child_socket(std::make_unique<zmq::socket_t>(context, ZMQ_PAIR)),
52 _exception_state(std::make_shared<SharedExceptionState>()),
53 _started(false),
54 _stopped(false) {
55 std::string address = bind_to_unique_address();
56 _child_socket->connect(address);
57}
58
59actor_t::actor_t(actor_t&& other) noexcept { *this = std::move(other); }
60
62 _parent_socket = std::move(other._parent_socket);
63 _child_socket = std::move(other._child_socket);
64 _exception_state = std::move(other._exception_state);
65 _started = other._started;
66 _stopped = other._stopped;
67 _timeout_on_destructor = other._timeout_on_destructor;
68
69 other._started = true;
70 other._stopped = true;
71
72 return *this;
73}
74
76 try {
77 stop(_timeout_on_destructor);
78 } catch (...) {
79 }
80}
81
83 if (_started) {
84 throw std::runtime_error("Actor already started");
85 }
86
87 std::thread thread([&func, &socket = _child_socket, &exception_state = _exception_state]() {
88 actor_t::execute(func, std::move(socket), exception_state);
89 });
90 thread.detach(); // Thread will run independently
91
92 _started = true;
93
94 zmq::message_t msg;
95 if (recv_retry_on_eintr(_parent_socket, msg, zmq::recv_flags::none)) { // blocking
96 auto signal = signal_t::check_signal(msg);
97 if (signal && signal->is_success()) {
98 return; // Success case
99 }
100 // Failure case - get exception if any
101 _stopped = true;
102 _parent_socket.close();
103 std::exception_ptr saved_ex;
104 {
105 std::lock_guard<std::mutex> lock(_exception_state->exception_mutex);
106 saved_ex = _exception_state->saved_exception;
107 }
108 if (saved_ex) {
109 std::rethrow_exception(saved_ex);
110 }
111 throw std::runtime_error("Actor initialization failed");
112 }
113 _stopped = true;
114 _parent_socket.close();
115 throw std::runtime_error("Failed to receive initialization signal");
116}
117
118bool actor_t::stop(std::chrono::milliseconds timeout /* = std::chrono::milliseconds{-1}*/) {
119 if (!_started || _stopped) {
120 return true;
121 }
122
123 auto msg_send = signal_t::create_stop();
124 auto const result_send = send_retry_on_eintr(_parent_socket, msg_send, zmq::send_flags::dontwait);
125 if (!result_send) {
126 _stopped = true;
127 _parent_socket.close();
128 return true;
129 }
130
131 auto const confined_timeout = std::clamp(timeout, std::chrono::milliseconds{std::numeric_limits<int>::min()},
132 std::chrono::milliseconds{std::numeric_limits<int>::max()});
133 int timeout_ms = (confined_timeout.count() < 0) ? -1 : static_cast<int>(confined_timeout.count());
134 auto const start_time = std::chrono::steady_clock::now();
135
136 zmq::message_t msg_recv;
137 while (true) {
138 _parent_socket.set(zmq::sockopt::rcvtimeo, timeout_ms);
139 auto const result_recv = recv_retry_on_eintr(_parent_socket, msg_recv, zmq::recv_flags::none); // blocking
140 if (!result_recv) {
141 _stopped = true;
142 _parent_socket.close();
143 return false;
144 }
145 if (signal_t::check_signal(msg_recv)) {
146 break;
147 }
148 if (confined_timeout.count() >= 0) {
149 auto const time_left = std::chrono::ceil<std::chrono::milliseconds>(
150 confined_timeout - (std::chrono::steady_clock::now() - start_time));
151 timeout_ms = std::max(static_cast<int>(time_left.count()), 0);
152 }
153 }
154
155 _stopped = true;
156 _parent_socket.close();
157 return true;
158}
159
160void actor_t::execute(actor_fn_t func, std::unique_ptr<zmq::socket_t> socket,
161 std::shared_ptr<SharedExceptionState> exception_state) noexcept {
162 try {
163 auto const success = func(*socket);
164
165 auto signal = success ? signal_t::create_success() : signal_t::create_failure();
166 send_retry_on_eintr(*socket, signal, zmq::send_flags::none); // blocking
167 } catch (...) {
168 // Save exception to be rethrown in start() if needed
169 {
170 std::lock_guard<std::mutex> lock(exception_state->exception_mutex);
171 exception_state->saved_exception = std::current_exception();
172 }
173
174 try {
175 auto signal = signal_t::create_failure();
176 send_retry_on_eintr(*socket, signal, zmq::send_flags::none); // blocking
177 } catch (...) {
178 // Ignore exceptions during send in exception handler
179 }
180 }
181 // Always close socket when done
182 socket->close();
183}
184
185std::string actor_t::bind_to_unique_address() {
186 std::random_device rd;
187 std::mt19937 gen(rd());
188 std::uniform_int_distribution<> dis(0, 999999);
189 std::string base_address = "inproc://zmqzext-actor-" + std::to_string(reinterpret_cast<uintptr_t>(this));
190
191 while (true) {
192 try {
193 std::string address = base_address + "-" + std::to_string(dis(gen));
194 _parent_socket.bind(address);
195 return address;
196 } catch (const zmq::error_t& e) {
197 if (e.num() != EADDRINUSE) {
198 throw; // Rethrow if it's not an address-in-use error
199 }
200 // If address is in use, loop will continue and try another random
201 // suffix
202 }
203 }
204}
205} // namespace zmqzext
Actor pattern implementation using ZeroMQ PAIR sockets.
std::function< bool(zmq::socket_t &)> actor_fn_t
Alias for a function type used to define actor behaviors.
Definition: actor.h:121
Class that implements the Actor pattern using ZMQ PAIR sockets.
Definition: actor.h:131
void start(actor_fn_t func)
Starts the actor thread with the provided function.
Definition: actor.cpp:82
bool stop(std::chrono::milliseconds timeout=std::chrono::milliseconds{-1})
Stops the actor thread.
Definition: actor.cpp:118
~actor_t() noexcept
Destroys the actor_t object.
Definition: actor.cpp:75
zmq::socket_t & socket() noexcept
Gets the parent socket for external communication.
Definition: actor.h:245
actor_t & operator=(actor_t const &)=delete
Copy assignment operator is deleted.
actor_t(zmq::context_t &context)
Constructs a new actor_t object.
Definition: actor.cpp:49
Class representing signals for ZMQ communication.
Definition: signal.h:77
static zmq::message_t create_success()
Create a success signal message.
Definition: signal.cpp:51
static std::optional< signal_t > check_signal(const zmq::message_t &msg) noexcept
Check if a zmq::message_t contains a valid signal.
Definition: signal.cpp:57
static zmq::message_t create_stop()
Create a stop signal message.
Definition: signal.cpp:55
static zmq::message_t create_failure()
Create a failure signal message.
Definition: signal.cpp:53
Helper utilities for robust ZMQ message sending and receiving.
CZZE_EXPORT zmq::recv_buffer_result_t recv_retry_on_eintr(T &socket, zmq::mutable_buffer const &buf, zmq::recv_flags flags=zmq::recv_flags::none)
Receive data into a buffer with automatic retry on EINTR.
Definition: helpers.cpp:85
CZZE_EXPORT zmq::send_result_t send_retry_on_eintr(T &socket, zmq::const_buffer const &buf, zmq::send_flags flags=zmq::send_flags::none)
Send a message buffer with automatic retry on EINTR.
Definition: helpers.cpp:43
Signal definitions and utilities for actor inter-thread communication.