CppZmqZoltanExt 0.0.1
Loading...
Searching...
No Matches
loop.cpp
Go to the documentation of this file.
1/*
2MIT License
3
4Copyright (c) 2025 Luan Young
5
6Permission is hereby granted, free of charge, to any person obtaining a copy
7of this software and associated documentation files (the "Software"), to deal
8in the Software without restriction, including without limitation the rights
9to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
10copies of the Software, and to permit persons to whom the Software is
11furnished to do so, subject to the following conditions:
12
13The above copyright notice and this permission notice shall be included in all
14copies or substantial portions of the Software.
15
16THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
17IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
18FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
19AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
20LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
21OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
22SOFTWARE.
23*/
37
38#include <algorithm>
39
40namespace zmqzext {
41
42void loop_t::add(zmq::socket_ref socket, fn_socket_handler_t fn) {
43 _poller.add(socket);
44 try {
45 _socket_handlers.emplace(socket, fn);
46 } catch (...) {
47 _poller.remove(socket);
48 throw;
49 }
50}
51
52timer_id_t loop_t::add_timer(std::chrono::milliseconds timeout, std::size_t occurences, fn_timer_handler_t fn) {
53 auto const timer_id = generate_unique_timer_id();
54 auto const next_occurence = now() + timeout;
55 _timer_handlers.push_back(timer_t{timer_id, timeout, occurences, next_occurence, fn, false});
56 return timer_id;
57}
58
59void loop_t::remove(zmq::socket_ref socket) {
60 _poller.remove(socket);
61 auto const socket_handler_it = _socket_handlers.find(socket);
62 if (socket_handler_it == _socket_handlers.end()) {
63 return;
64 }
65 _socket_handlers.erase(socket_handler_it);
66}
67
69 auto timer_it = std::find_if(_timer_handlers.begin(), _timer_handlers.end(),
70 [timer_id](timer_t const& timer) { return timer.id == timer_id; });
71 if (timer_it == _timer_handlers.end()) {
72 return;
73 }
74 timer_it->removed = true;
75}
76
77void loop_t::run(bool interruptible /* = true*/,
78 std::chrono::milliseconds interruptCheckInterval /* = std::chrono::milliseconds{-1}*/) {
79 _poller.set_interruptible(interruptible);
80 _interruptCheckInterval = interruptCheckInterval;
81 auto should_continue = true;
82 while (should_continue) {
83 removeFlagedTimers();
84 if (_poller.size() == 0 && _timer_handlers.size() == 0) {
85 return;
86 }
87 auto const initial_time = now();
88 auto const next_timeout = find_next_timeout(initial_time);
89 auto sockets_ready = _poller.wait_all(next_timeout);
90 if (_poller.terminated()) {
91 return;
92 }
93 auto const current_time = now();
94 for (auto timer_it = _timer_handlers.begin(); timer_it != _timer_handlers.end();) {
95 if (timer_it->removed == false && current_time >= timer_it->next_occurence) {
96 should_continue = timer_it->handler(*this, timer_it->id);
97 if (!should_continue) {
98 break;
99 }
100 if (timer_it->occurences > 0 && --timer_it->occurences == 0) {
101 timer_it = _timer_handlers.erase(timer_it);
102 continue;
103 } else {
104 timer_it->next_occurence += timer_it->timeout;
105 }
106 }
107 ++timer_it;
108 }
109 if (!should_continue) {
110 break;
111 }
112 for (auto& socket : sockets_ready) {
113 auto const socket_handler_it = _socket_handlers.find(socket);
114 if (socket_handler_it != _socket_handlers.end()) {
115 should_continue = socket_handler_it->second(*this, socket_handler_it->first);
116 if (!should_continue) {
117 break;
118 }
119 }
120 }
121 }
122}
123
124loop_t::time_point_t loop_t::now() { return std::chrono::steady_clock::now(); }
125
126loop_t::time_milliseconds_t loop_t::find_next_timeout(time_point_t const& actual_time) {
127 auto const next_expiring_timer_it =
128 std::min_element(_timer_handlers.cbegin(), _timer_handlers.cend(),
129 [](timer_t const& a, timer_t const& b) { return a.next_occurence < b.next_occurence; });
130 if (next_expiring_timer_it == _timer_handlers.cend()) {
131 if (_interruptCheckInterval > time_milliseconds_t{0}) {
132 return _interruptCheckInterval;
133 }
134 return time_milliseconds_t{-1};
135 }
136 auto time_left = next_expiring_timer_it->next_occurence - actual_time;
137 if (_interruptCheckInterval > time_milliseconds_t{0} && time_left > _interruptCheckInterval) {
138 return _interruptCheckInterval;
139 }
140 return std::max(time_milliseconds_t{0}, std::chrono::ceil<time_milliseconds_t>(time_left));
141}
142
143void loop_t::removeFlagedTimers() {
144 _timer_handlers.remove_if([](timer_t const& timer) { return timer.removed; });
145}
146
147timer_id_t loop_t::generate_unique_timer_id() {
148 timer_id_t timer_id = ++_last_timer_id;
149 if (_last_timer_id == 0) {
150 _timer_id_has_overflowed = true;
151 timer_id = ++_last_timer_id;
152 }
153 if (_timer_id_has_overflowed) {
154 while (std::any_of(_timer_handlers.begin(), _timer_handlers.end(),
155 [timer_id](auto const& t) { return t.id == timer_id; })) {
156 timer_id = ++_last_timer_id;
157 if (_last_timer_id == 0) {
158 throw std::runtime_error("Unable to generate unique timer ID: all IDs are in use.");
159 }
160 }
161 }
162 return timer_id;
163}
164
165} // namespace zmqzext
void remove(zmq::socket_ref socket)
Unregister a socket from the event loop.
Definition: loop.cpp:59
void run(bool interruptible=true, std::chrono::milliseconds interruptCheckInterval=std::chrono::milliseconds{-1})
Run the event loop.
Definition: loop.cpp:77
void remove_timer(timer_id_t timer_id)
Unregister a timer from the event loop.
Definition: loop.cpp:68
timer_id_t add_timer(std::chrono::milliseconds timeout, std::size_t occurences, fn_timer_handler_t fn)
Register a timer with an expiration handler.
Definition: loop.cpp:52
void add(zmq::socket_ref socket, fn_socket_handler_t fn)
Register a socket with an I/O handler.
Definition: loop.cpp:42
void add(zmq::socket_ref socket)
Add a socket to the polling set.
Definition: poller.cpp:45
void set_interruptible(bool interruptible) noexcept
Set whether polling should be interruptible.
Definition: poller.h:142
std::vector< zmq::socket_ref > wait_all(std::chrono::milliseconds timeout=std::chrono::milliseconds{-1})
Wait for at least one socket to become ready for receiving and return all ready.
Definition: poller.cpp:101
std::size_t size() const noexcept
Get the number of sockets in the polling set.
Definition: poller.h:157
void remove(zmq::socket_ref socket)
Remove a socket from the polling set.
Definition: poller.cpp:57
bool terminated() const noexcept
Check if the poller has been terminated during the last wait operation.
Definition: poller.h:174
Event loop for managing sockets and timers.
std::function< bool(loop_t &, timer_id_t)> fn_timer_handler_t
Timer event handler callback type.
Definition: loop.h:104
std::function< bool(loop_t &, zmq::socket_ref)> fn_socket_handler_t
Socket event handler callback type.
Definition: loop.h:91
std::size_t timer_id_t
Unique identifier for timer instances.
Definition: loop.h:78