one event source by socket added to systemd event loop.
[apps/low-level-can-service.git] / CAN-binder / low-can-binding / binding / low-can-cb.cpp
index 414fa91..d298779 100644 (file)
@@ -52,14 +52,40 @@ void on_no_clients(std::string message)
        }
 }
 
-int read(sd_event_source *s, int fd, uint32_t revents, void *userdata)
+int read_can_signal(sd_event_source *s, int fd, uint32_t revents, void *userdata)
 {
        can_signal_t* sig= (can_signal_t*)userdata;
-       if(sig->read_socket() != can_message_format_t::ERROR)
-               return 0;
-       return -1;
+       sig->read_socket();
+
+       /* check if error or hangup */
+       if ((revents & (EPOLLERR|EPOLLRDHUP|EPOLLHUP)) != 0)
+       {
+               sd_event_source_unref(s);
+               sig->get_socket().close();
+               sig->create_rx_filter();
+       }
+       return 0;
 }
 
+int read_diagnostic_message(sd_event_source *s, int fd, uint32_t revents, void *userdata)
+{
+       diagnostic_manager_t& diag_m = configuration_t::instance().get_diagnostic_manager();
+       diag_m.read_socket();
+
+       /* check if error or hangup */
+       if ((revents & (EPOLLERR|EPOLLRDHUP|EPOLLHUP)) != 0)
+       {
+               sd_event_source_unref(s);
+               diag_m.get_socket().close();
+               diag_m.cleanup_active_requests(true);
+               ERROR(binder_interface, "%s: Error on diagnostic manager socket, cancelling active requests.", __FUNCTION__);
+               return -1;
+       }
+
+       return 0;
+}
+
+
 ///******************************************************************************
 ///
 ///            Subscription and unsubscription
@@ -154,10 +180,9 @@ static int subscribe_unsubscribe_signals(struct afb_req request, bool subscribe,
                // poll a PID for nothing.
                if(sig->get_supported() && subscribe)
                {
-                               float frequency = sig->get_frequency();
-                               subscribe = diag_m.add_recurring_request(
-                                       diag_req, sig->get_name().c_str(), false, sig->get_decoder(), sig->get_callback(), (float)frequency);
-                                       //TODO: Adding callback requesting ignition status:     diag_req, sig.c_str(), false, diagnostic_message_t::decode_obd2_response, diagnostic_message_t::check_ignition_status, frequency);
+                       float frequency = sig->get_frequency();
+                       diag_m.add_recurring_request(diag_req, sig->get_name().c_str(), false, sig->get_decoder(), sig->get_callback(), (float)frequency);
+                               //TODO: Adding callback requesting ignition status:     diag_req, sig.c_str(), false, diagnostic_message_t::decode_obd2_response, diagnostic_message_t::check_ignition_status, frequency);
                }
                else
                {
@@ -182,14 +207,34 @@ static int subscribe_unsubscribe_signals(struct afb_req request, bool subscribe,
                        return -1;
                }
                struct sd_event_source* e_source;
-               sd_event_add_io(afb_daemon_get_event_loop(binder_interface->daemon), &e_source, sig->get_socket().socket(), EPOLLIN, read, sig);
+               sd_event_add_io(afb_daemon_get_event_loop(binder_interface->daemon), &e_source, sig->get_socket().socket(), EPOLLIN, read_can_signal, sig.get());
                rets++;
                DEBUG(binder_interface, "%s: signal: %s subscribed", __FUNCTION__, sig->get_name().c_str());
        }
        return rets;
 }
 
-static const std::vector<std::string> parse_args_from_request(struct afb_req request, bool subscribe)
+static int process_args(struct afb_req request, const std::vector<std::string>& args, bool subscribe)
+{
+       struct utils::signals_found sf;
+       int ok = 0, total = 0;
+
+       for(const auto& sig: args)
+       {
+               openxc_DynamicField search_key = build_DynamicField(sig);
+               sf = utils::signals_manager_t::instance().find_signals(search_key);
+               total = (int)sf.can_signals.size() + (int)sf.diagnostic_messages.size();
+
+               if (sf.can_signals.empty() && sf.diagnostic_messages.empty())
+                       NOTICE(binder_interface, "%s: No signal(s) found for %s.", __FUNCTION__, sig.c_str());
+               else
+                       ok = subscribe_unsubscribe_signals(request, subscribe, sf);
+       }
+       NOTICE(binder_interface, "%s: Subscribed/unsubscribe correctly to %d/%d signal(s).", __FUNCTION__, ok, total);
+       return ok;
+}
+
+static const std::vector<std::string> parse_args_from_request(struct afb_req request)
 {
        int i, n;
        std::vector<std::string> ret;
@@ -220,33 +265,25 @@ static const std::vector<std::string> parse_args_from_request(struct afb_req req
 
 void subscribe(struct afb_req request)
 {
-       std::vector<std::string> args;
-       struct utils::signals_found sf;
-       int ok = 0, total = 0;
        bool subscribe = true;
 
-       args = parse_args_from_request(request, subscribe);
-
-       for(const auto& sig: args)
-       {
-               openxc_DynamicField search_key = build_DynamicField(sig);
-               sf = utils::signals_manager_t::instance().find_signals(search_key);
-               total = (int)sf.can_signals.size() + (int)sf.diagnostic_messages.size();
+       const std::vector<std::string> args = parse_args_from_request(request);
 
-               if (sf.can_signals.empty() && sf.diagnostic_messages.empty())
-                       NOTICE(binder_interface, "%s: No signal(s) found for %s.", __FUNCTION__, sig.c_str());
-               else
-                       ok = subscribe_unsubscribe_signals(request, subscribe, sf);
-       }
-
-       NOTICE(binder_interface, "%s: Subscribed/unsubscribe correctly to %d/%d signal(s).", __FUNCTION__, ok, total);
-       if (ok > 0)
+       if (process_args(request, args, subscribe) > 0)
                afb_req_success(request, NULL, NULL);
        else
                afb_req_fail(request, "error", NULL);
 }
 
-       void unsubscribe(struct afb_req request)
+void unsubscribe(struct afb_req request)
 {
-       parse_args_from_request(request, false);
+       std::vector<std::string> args;
+       bool subscribe = false;
+       
+       args = parse_args_from_request(request);
+
+       if (process_args(request, args, subscribe) > 0)
+               afb_req_success(request, NULL, NULL);
+       else
+               afb_req_fail(request, "error", NULL);
 }
\ No newline at end of file