50 : _parent_socket(context, ZMQ_PAIR),
51 _child_socket(std::make_unique<zmq::socket_t>(context, ZMQ_PAIR)),
52 _exception_state(std::make_shared<SharedExceptionState>()),
55 std::string address = bind_to_unique_address();
56 _child_socket->connect(address);
62 _parent_socket = std::move(other._parent_socket);
63 _child_socket = std::move(other._child_socket);
64 _exception_state = std::move(other._exception_state);
65 _started = other._started;
66 _stopped = other._stopped;
67 _timeout_on_destructor = other._timeout_on_destructor;
69 other._started =
true;
70 other._stopped =
true;
77 stop(_timeout_on_destructor);
84 throw std::runtime_error(
"Actor already started");
87 std::thread thread([&func, &
socket = _child_socket, &exception_state = _exception_state]() {
88 actor_t::execute(func, std::move(
socket), exception_state);
97 if (signal && signal->is_success()) {
102 _parent_socket.close();
103 std::exception_ptr saved_ex;
105 std::lock_guard<std::mutex> lock(_exception_state->exception_mutex);
106 saved_ex = _exception_state->saved_exception;
109 std::rethrow_exception(saved_ex);
111 throw std::runtime_error(
"Actor initialization failed");
114 _parent_socket.close();
115 throw std::runtime_error(
"Failed to receive initialization signal");
119 if (!_started || _stopped) {
124 auto const result_send =
send_retry_on_eintr(_parent_socket, msg_send, zmq::send_flags::dontwait);
127 _parent_socket.close();
131 auto const confined_timeout = std::clamp(timeout, std::chrono::milliseconds{std::numeric_limits<int>::min()},
132 std::chrono::milliseconds{std::numeric_limits<int>::max()});
133 int timeout_ms = (confined_timeout.count() < 0) ? -1 :
static_cast<int>(confined_timeout.count());
134 auto const start_time = std::chrono::steady_clock::now();
136 zmq::message_t msg_recv;
138 _parent_socket.set(zmq::sockopt::rcvtimeo, timeout_ms);
139 auto const result_recv =
recv_retry_on_eintr(_parent_socket, msg_recv, zmq::recv_flags::none);
142 _parent_socket.close();
148 if (confined_timeout.count() >= 0) {
149 auto const time_left = std::chrono::ceil<std::chrono::milliseconds>(
150 confined_timeout - (std::chrono::steady_clock::now() - start_time));
151 timeout_ms = std::max(
static_cast<int>(time_left.count()), 0);
156 _parent_socket.close();
160void actor_t::execute(
actor_fn_t func, std::unique_ptr<zmq::socket_t> socket,
161 std::shared_ptr<SharedExceptionState> exception_state)
noexcept {
163 auto const success = func(*socket);
170 std::lock_guard<std::mutex> lock(exception_state->exception_mutex);
171 exception_state->saved_exception = std::current_exception();
185std::string actor_t::bind_to_unique_address() {
186 std::random_device rd;
187 std::mt19937 gen(rd());
188 std::uniform_int_distribution<> dis(0, 999999);
189 std::string base_address =
"inproc://zmqzext-actor-" + std::to_string(
reinterpret_cast<uintptr_t
>(
this));
193 std::string address = base_address +
"-" + std::to_string(dis(gen));
194 _parent_socket.bind(address);
196 }
catch (
const zmq::error_t& e) {
197 if (e.num() != EADDRINUSE) {
Actor pattern implementation using ZeroMQ PAIR sockets.
std::function< bool(zmq::socket_t &)> actor_fn_t
Alias for a function type used to define actor behaviors.
Class that implements the Actor pattern using ZMQ PAIR sockets.
void start(actor_fn_t func)
Starts the actor thread with the provided function.
bool stop(std::chrono::milliseconds timeout=std::chrono::milliseconds{-1})
Stops the actor thread.
~actor_t() noexcept
Destroys the actor_t object.
zmq::socket_t & socket() noexcept
Gets the parent socket for external communication.
actor_t & operator=(actor_t const &)=delete
Copy assignment operator is deleted.
actor_t(zmq::context_t &context)
Constructs a new actor_t object.
Class representing signals for ZMQ communication.
static zmq::message_t create_success()
Create a success signal message.
static std::optional< signal_t > check_signal(const zmq::message_t &msg) noexcept
Check if a zmq::message_t contains a valid signal.
static zmq::message_t create_stop()
Create a stop signal message.
static zmq::message_t create_failure()
Create a failure signal message.
Helper utilities for robust ZMQ message sending and receiving.
CZZE_EXPORT zmq::recv_buffer_result_t recv_retry_on_eintr(T &socket, zmq::mutable_buffer const &buf, zmq::recv_flags flags=zmq::recv_flags::none)
Receive data into a buffer with automatic retry on EINTR.
CZZE_EXPORT zmq::send_result_t send_retry_on_eintr(T &socket, zmq::const_buffer const &buf, zmq::send_flags flags=zmq::send_flags::none)
Send a message buffer with automatic retry on EINTR.
Signal definitions and utilities for actor inter-thread communication.