45 _socket_handlers.emplace(socket, fn);
53 auto const timer_id = generate_unique_timer_id();
54 auto const next_occurence = now() + timeout;
55 _timer_handlers.push_back(timer_t{timer_id, timeout, occurences, next_occurence, fn,
false});
61 auto const socket_handler_it = _socket_handlers.find(socket);
62 if (socket_handler_it == _socket_handlers.end()) {
65 _socket_handlers.erase(socket_handler_it);
69 auto timer_it = std::find_if(_timer_handlers.begin(), _timer_handlers.end(),
70 [timer_id](timer_t
const& timer) { return timer.id == timer_id; });
71 if (timer_it == _timer_handlers.end()) {
74 timer_it->removed =
true;
78 std::chrono::milliseconds interruptCheckInterval ) {
80 _interruptCheckInterval = interruptCheckInterval;
81 auto should_continue =
true;
82 while (should_continue) {
84 if (_poller.
size() == 0 && _timer_handlers.size() == 0) {
87 auto const initial_time = now();
88 auto const next_timeout = find_next_timeout(initial_time);
89 auto sockets_ready = _poller.
wait_all(next_timeout);
93 auto const current_time = now();
94 for (
auto timer_it = _timer_handlers.begin(); timer_it != _timer_handlers.end();) {
95 if (timer_it->removed ==
false && current_time >= timer_it->next_occurence) {
96 should_continue = timer_it->handler(*
this, timer_it->id);
97 if (!should_continue) {
100 if (timer_it->occurences > 0 && --timer_it->occurences == 0) {
101 timer_it = _timer_handlers.erase(timer_it);
104 timer_it->next_occurence += timer_it->timeout;
109 if (!should_continue) {
112 for (
auto& socket : sockets_ready) {
113 auto const socket_handler_it = _socket_handlers.find(socket);
114 if (socket_handler_it != _socket_handlers.end()) {
115 should_continue = socket_handler_it->second(*
this, socket_handler_it->first);
116 if (!should_continue) {
124loop_t::time_point_t loop_t::now() {
return std::chrono::steady_clock::now(); }
126loop_t::time_milliseconds_t loop_t::find_next_timeout(time_point_t
const& actual_time) {
127 auto const next_expiring_timer_it =
128 std::min_element(_timer_handlers.cbegin(), _timer_handlers.cend(),
129 [](timer_t
const& a, timer_t
const& b) { return a.next_occurence < b.next_occurence; });
130 if (next_expiring_timer_it == _timer_handlers.cend()) {
131 if (_interruptCheckInterval > time_milliseconds_t{0}) {
132 return _interruptCheckInterval;
134 return time_milliseconds_t{-1};
136 auto time_left = next_expiring_timer_it->next_occurence - actual_time;
137 if (_interruptCheckInterval > time_milliseconds_t{0} && time_left > _interruptCheckInterval) {
138 return _interruptCheckInterval;
140 return std::max(time_milliseconds_t{0}, std::chrono::ceil<time_milliseconds_t>(time_left));
143void loop_t::removeFlagedTimers() {
144 _timer_handlers.remove_if([](timer_t
const& timer) {
return timer.removed; });
147timer_id_t loop_t::generate_unique_timer_id() {
149 if (_last_timer_id == 0) {
150 _timer_id_has_overflowed =
true;
151 timer_id = ++_last_timer_id;
153 if (_timer_id_has_overflowed) {
154 while (std::any_of(_timer_handlers.begin(), _timer_handlers.end(),
155 [timer_id](
auto const& t) { return t.id == timer_id; })) {
156 timer_id = ++_last_timer_id;
157 if (_last_timer_id == 0) {
158 throw std::runtime_error(
"Unable to generate unique timer ID: all IDs are in use.");
void remove(zmq::socket_ref socket)
Unregister a socket from the event loop.
void run(bool interruptible=true, std::chrono::milliseconds interruptCheckInterval=std::chrono::milliseconds{-1})
Run the event loop.
void remove_timer(timer_id_t timer_id)
Unregister a timer from the event loop.
timer_id_t add_timer(std::chrono::milliseconds timeout, std::size_t occurences, fn_timer_handler_t fn)
Register a timer with an expiration handler.
void add(zmq::socket_ref socket, fn_socket_handler_t fn)
Register a socket with an I/O handler.
void add(zmq::socket_ref socket)
Add a socket to the polling set.
void set_interruptible(bool interruptible) noexcept
Set whether polling should be interruptible.
std::vector< zmq::socket_ref > wait_all(std::chrono::milliseconds timeout=std::chrono::milliseconds{-1})
Wait for at least one socket to become ready for receiving and return all ready.
std::size_t size() const noexcept
Get the number of sockets in the polling set.
void remove(zmq::socket_ref socket)
Remove a socket from the polling set.
bool terminated() const noexcept
Check if the poller has been terminated during the last wait operation.
Event loop for managing sockets and timers.
std::function< bool(loop_t &, timer_id_t)> fn_timer_handler_t
Timer event handler callback type.
std::function< bool(loop_t &, zmq::socket_ref)> fn_socket_handler_t
Socket event handler callback type.
std::size_t timer_id_t
Unique identifier for timer instances.