2 #include <systemd/sd-event.h>
4 #include "application.hpp"
5 #include "../utils/signals.hpp"
6 #include "low-can-hat.hpp"
7 #include "../can/message/message.hpp"
8 #include "../can/can-bus.hpp"
12 ///******************************************************************************
14 /// SystemD event loop Callbacks
16 ///*******************************************************************************/
18 static void push_n_notify(std::shared_ptr<message_t> m)
20 can_bus_t& cbm = application_t::instance().get_can_bus_manager();
22 std::lock_guard<std::mutex> can_message_lock(cbm.get_can_message_mutex());
23 cbm.push_new_can_message(m);
25 cbm.get_new_can_message_cv().notify_one();
28 void on_no_clients(std::shared_ptr<low_can_subscription_t> can_subscription, uint32_t pid, std::map<int, std::shared_ptr<low_can_subscription_t> >& s)
30 bool is_permanent_recurring_request = false;
32 if( ! can_subscription->get_diagnostic_message().empty() && can_subscription->get_diagnostic_message(pid) != nullptr)
34 DiagnosticRequest diag_req = can_subscription->get_diagnostic_message(pid)->build_diagnostic_request();
35 active_diagnostic_request_t* adr = application_t::instance().get_diagnostic_manager().find_recurring_request(diag_req);
38 is_permanent_recurring_request = adr->get_permanent();
40 if(! is_permanent_recurring_request)
41 application_t::instance().get_diagnostic_manager().cleanup_request(adr, true);
45 if(! is_permanent_recurring_request)
46 on_no_clients(can_subscription, s);
50 void on_no_clients(std::shared_ptr<low_can_subscription_t> can_subscription, std::map<int, std::shared_ptr<low_can_subscription_t> >& s)
52 auto it = s.find(can_subscription->get_index());
56 int read_message(sd_event_source *event_source, int fd, uint32_t revents, void *userdata)
59 low_can_subscription_t* can_subscription = (low_can_subscription_t*)userdata;
62 if ((revents & EPOLLIN) != 0)
64 utils::signals_manager_t& sm = utils::signals_manager_t::instance();
65 std::lock_guard<std::mutex> subscribed_signals_lock(sm.get_subscribed_signals_mutex());
66 if(can_subscription->get_index() != -1)
68 std::shared_ptr<utils::socketcan_t> s = can_subscription->get_socket();
71 std::shared_ptr<message_t> message = s->read_message();
73 // Sure we got a valid CAN message ?
74 if (message->get_id() &&
75 message->get_length() &&
76 ! (message->get_flags() & INVALID_FLAG) )
77 push_n_notify(message);
82 // check if error or hangup
83 if ((revents & (EPOLLERR|EPOLLRDHUP|EPOLLHUP)) != 0)
85 sd_event_source_unref(event_source);
86 can_subscription->get_socket()->close();