}
}
-int read(sd_event_source *s, int fd, uint32_t revents, void *userdata)
+static void push_n_notify(const can_message_t cm)
{
- can_signal_t* sig= (can_signal_t*)userdata;
- if(sig->read_socket() != can_message_format_t::ERROR)
- return 0;
- return -1;
+ can_bus_t& cbm = configuration_t::instance().get_can_bus_manager();
+ std::lock_guard<std::mutex> can_message_lock(cbm.get_can_message_mutex());
+ { cbm.push_new_can_message(cm); }
+ cbm.get_new_can_message_cv().notify_one();
+}
+
+int read_message(sd_event_source *s, int fd, uint32_t revents, void *userdata)
+{
+ can_message_t cm;
+ can_signal_t* sig;
+ diagnostic_manager_t& diag_m = configuration_t::instance().get_diagnostic_manager();
+
+ if(userdata != nullptr)
+ {
+ sig = (can_signal_t*)userdata;
+ utils::socketcan_bcm_t s = sig->get_socket();
+ s >> cm;
+ }
+ else
+ {
+ utils::socketcan_bcm_t s = diag_m.get_socket();
+ s >> cm;
+ }
+
+ push_n_notify(cm);
+
+ /* check if error or hangup */
+ if ((revents & (EPOLLERR|EPOLLRDHUP|EPOLLHUP)) != 0)
+ {
+ sd_event_source_unref(s);
+ if(userdata != nullptr)
+ {
+ sig->get_socket().close();
+ sig->create_rx_filter();
+ NOTICE(binder_interface, "%s: Recreated RX_SETUP BCM job for signal: %s", __FUNCTION__, sig->get_name().c_str());
+ }
+ else
+ {
+ diag_m.get_socket().close();
+ diag_m.cleanup_active_requests(true);
+ ERROR(binder_interface, "%s: Error on diagnostic manager socket, cancelling active requests.", __FUNCTION__);
+ }
+ return -1;
+ }
+
+ return 0;
}
+
///******************************************************************************
///
/// Subscription and unsubscription
// poll a PID for nothing.
if(sig->get_supported() && subscribe)
{
- float frequency = sig->get_frequency();
- subscribe = diag_m.add_recurring_request(
- diag_req, sig->get_name().c_str(), false, sig->get_decoder(), sig->get_callback(), (float)frequency);
- //TODO: Adding callback requesting ignition status: diag_req, sig.c_str(), false, diagnostic_message_t::decode_obd2_response, diagnostic_message_t::check_ignition_status, frequency);
+ float frequency = sig->get_frequency();
+ diag_m.add_recurring_request(diag_req, sig->get_name().c_str(), false, sig->get_decoder(), sig->get_callback(), (float)frequency);
+ //TODO: Adding callback requesting ignition status: diag_req, sig.c_str(), false, diagnostic_message_t::decode_obd2_response, diagnostic_message_t::check_ignition_status, frequency);
}
else
{
return -1;
}
struct sd_event_source* e_source;
- sd_event_add_io(afb_daemon_get_event_loop(binder_interface->daemon), &e_source, sig->get_socket().socket(), EPOLLIN, read, sig);
+ sd_event_add_io(afb_daemon_get_event_loop(binder_interface->daemon), &e_source, sig->get_socket().socket(), EPOLLIN, read_message, sig.get());
rets++;
DEBUG(binder_interface, "%s: signal: %s subscribed", __FUNCTION__, sig->get_name().c_str());
}
return rets;
}
-static const std::vector<std::string> parse_args_from_request(struct afb_req request, bool subscribe)
+static int process_args(struct afb_req request, const std::vector<std::string>& args, bool subscribe)
+{
+ struct utils::signals_found sf;
+ int ok = 0, total = 0;
+
+ for(const auto& sig: args)
+ {
+ openxc_DynamicField search_key = build_DynamicField(sig);
+ sf = utils::signals_manager_t::instance().find_signals(search_key);
+ total = (int)sf.can_signals.size() + (int)sf.diagnostic_messages.size();
+
+ if (sf.can_signals.empty() && sf.diagnostic_messages.empty())
+ NOTICE(binder_interface, "%s: No signal(s) found for %s.", __FUNCTION__, sig.c_str());
+ else
+ ok = subscribe_unsubscribe_signals(request, subscribe, sf);
+ }
+ NOTICE(binder_interface, "%s: Subscribed/unsubscribe correctly to %d/%d signal(s).", __FUNCTION__, ok, total);
+ return ok;
+}
+
+static const std::vector<std::string> parse_args_from_request(struct afb_req request)
{
int i, n;
std::vector<std::string> ret;
void subscribe(struct afb_req request)
{
- std::vector<std::string> args;
- struct utils::signals_found sf;
- int ok = 0, total = 0;
bool subscribe = true;
- args = parse_args_from_request(request, subscribe);
-
- for(const auto& sig: args)
- {
- openxc_DynamicField search_key = build_DynamicField(sig);
- sf = utils::signals_manager_t::instance().find_signals(search_key);
- total = (int)sf.can_signals.size() + (int)sf.diagnostic_messages.size();
-
- if (sf.can_signals.empty() && sf.diagnostic_messages.empty())
- NOTICE(binder_interface, "%s: No signal(s) found for %s.", __FUNCTION__, sig.c_str());
- else
- ok = subscribe_unsubscribe_signals(request, subscribe, sf);
- }
+ const std::vector<std::string> args = parse_args_from_request(request);
- NOTICE(binder_interface, "%s: Subscribed/unsubscribe correctly to %d/%d signal(s).", __FUNCTION__, ok, total);
- if (ok > 0)
+ if (process_args(request, args, subscribe) > 0)
afb_req_success(request, NULL, NULL);
else
afb_req_fail(request, "error", NULL);
}
- void unsubscribe(struct afb_req request)
+void unsubscribe(struct afb_req request)
{
- parse_args_from_request(request, false);
+ std::vector<std::string> args;
+ bool subscribe = false;
+
+ args = parse_args_from_request(request);
+
+ if (process_args(request, args, subscribe) > 0)
+ afb_req_success(request, NULL, NULL);
+ else
+ afb_req_fail(request, "error", NULL);
}
\ No newline at end of file