Add possibility to subscribe to a recurring request permanently 41/13741/2
authorJonathan Aillet <jonathan.aillet@iot.bzh>
Mon, 9 Apr 2018 07:25:47 +0000 (09:25 +0200)
committerJonathan Aillet <jonathan.aillet@iot.bzh>
Tue, 17 Apr 2018 09:42:37 +0000 (11:42 +0200)
Add possibility to subscribe to a recurring request that won't be deleted
when no subscriber is detected.
For now, this functionnality is implemented for internal use only.

Bug-AGL: SPEC-1347

Change-Id: I48f6f647677596ba7920c4348d5406ea7bf1081b
Signed-off-by: Jonathan Aillet <jonathan.aillet@iot.bzh>
low-can-binding/binding/low-can-cb.cpp
low-can-binding/diagnostic/active-diagnostic-request.cpp
low-can-binding/diagnostic/active-diagnostic-request.hpp
low-can-binding/diagnostic/diagnostic-manager.cpp
low-can-binding/diagnostic/diagnostic-manager.hpp

index 16e31d1..d8365da 100644 (file)
 
 void on_no_clients(std::shared_ptr<low_can_subscription_t> can_subscription, uint32_t pid, std::map<int, std::shared_ptr<low_can_subscription_t> >& s)
 {
+       bool is_permanent_recurring_request = false;
+
        if( ! can_subscription->get_diagnostic_message().empty() && can_subscription->get_diagnostic_message(pid) != nullptr)
        {
                DiagnosticRequest diag_req = can_subscription->get_diagnostic_message(pid)->build_diagnostic_request();
                active_diagnostic_request_t* adr = application_t::instance().get_diagnostic_manager().find_recurring_request(diag_req);
                if( adr != nullptr)
-                       application_t::instance().get_diagnostic_manager().cleanup_request(adr, true);
+               {
+                       is_permanent_recurring_request = adr->get_permanent();
+
+                       if(! is_permanent_recurring_request)
+                               application_t::instance().get_diagnostic_manager().cleanup_request(adr, true);
+               }
        }
 
-       on_no_clients(can_subscription, s);
+       if(! is_permanent_recurring_request)
+               on_no_clients(can_subscription, s);
 }
 
 void on_no_clients(std::shared_ptr<low_can_subscription_t> can_subscription, std::map<int, std::shared_ptr<low_can_subscription_t> >& s)
@@ -106,8 +114,9 @@ static int make_subscription_unsubscription(struct afb_req request,
                                                                                        std::map<int, std::shared_ptr<low_can_subscription_t> >& s,
                                                                                        bool subscribe)
 {
-       /* Make the subscription or unsubscription to the event */
-       if (((subscribe ? afb_req_subscribe : afb_req_unsubscribe)(request, s[can_subscription->get_index()]->get_event())) < 0)
+       /* Make the subscription or unsubscription to the event (if request contents are not null) */
+       if(request.itf && request.closure &&
+          ((subscribe ? afb_req_subscribe : afb_req_unsubscribe)(request, s[can_subscription->get_index()]->get_event())) < 0)
        {
                AFB_ERROR("Operation goes wrong for signal: %s", can_subscription->get_name().c_str());
                return -1;
@@ -179,7 +188,8 @@ static int subscribe_unsubscribe_diagnostic_messages(struct afb_req request,
                                                                                                        bool subscribe,
                                                                                                        std::vector<std::shared_ptr<diagnostic_message_t> > diagnostic_messages,
                                                                                                        struct event_filter_t& event_filter,
-                                                                                                       std::map<int, std::shared_ptr<low_can_subscription_t> >& s)
+                                                                                                       std::map<int, std::shared_ptr<low_can_subscription_t> >& s,
+                                                                                                       bool perm_rec_diag_req)
 {
        int rets = 0;
        application_t& app = application_t::instance();
@@ -198,10 +208,9 @@ static int subscribe_unsubscribe_diagnostic_messages(struct afb_req request,
                // If the requested diagnostic message isn't supported by the car then unsubcribe it
                // no matter what we want, worse case will be a fail unsubscription but at least we don't
                // poll a PID for nothing.
-               //TODO: Adding callback requesting ignition status:     diag_req, sig.c_str(), false, diagnostic_message_t::decode_obd2_response, diagnostic_message_t::check_ignition_status, frequency);
                if(sig->get_supported() && subscribe)
                {
-                       diag_m.add_recurring_request(diag_req, sig->get_name().c_str(), false, sig->get_decoder(), sig->get_callback(), event_filter.frequency);
+                       diag_m.add_recurring_request(diag_req, sig->get_name().c_str(), false, sig->get_decoder(), sig->get_callback(), event_filter.frequency, perm_rec_diag_req);
                        if(can_subscription->create_rx_filter(sig) < 0)
                                {return -1;}
                        AFB_DEBUG("Signal: %s subscribed", sig->get_name().c_str());
@@ -286,7 +295,7 @@ static int subscribe_unsubscribe_signals(struct afb_req request,
        std::lock_guard<std::mutex> subscribed_signals_lock(sm.get_subscribed_signals_mutex());
        std::map<int, std::shared_ptr<low_can_subscription_t> >& s = sm.get_subscribed_signals();
 
-       rets += subscribe_unsubscribe_diagnostic_messages(request, subscribe, signals.diagnostic_messages, event_filter, s);
+       rets += subscribe_unsubscribe_diagnostic_messages(request, subscribe, signals.diagnostic_messages, event_filter, s, false);
        rets += subscribe_unsubscribe_can_signals(request, subscribe, signals.can_signals, event_filter, s);
 
        return rets;
index 71f5980..a242147 100644 (file)
@@ -42,6 +42,7 @@ active_diagnostic_request_t& active_diagnostic_request_t::operator=(const active
                decoder_ = adr.decoder_;
                callback_ = adr.callback_;
                recurring_ = adr.recurring_;
+               permanent_ = adr.permanent_;
                wait_for_multiple_responses_ = adr.wait_for_multiple_responses_;
                frequency_clock_ = adr.frequency_clock_;
                timeout_clock_ = adr.timeout_clock_;
@@ -59,6 +60,7 @@ active_diagnostic_request_t::active_diagnostic_request_t()
          decoder_{nullptr},
          callback_{nullptr},
          recurring_{false},
+         permanent_{false},
          wait_for_multiple_responses_{false},
          frequency_clock_{frequency_clock_t()},
          timeout_clock_{frequency_clock_t()},
@@ -70,7 +72,8 @@ active_diagnostic_request_t::active_diagnostic_request_t(const std::string& bus,
                bool wait_for_multiple_responses,
                const DiagnosticResponseDecoder decoder,
                const DiagnosticResponseCallback callback,
-               float frequencyHz)
+               float frequencyHz,
+               bool permanent)
        : bus_{bus},
          id_{id},
          handle_{nullptr},
@@ -78,6 +81,7 @@ active_diagnostic_request_t::active_diagnostic_request_t(const std::string& bus,
          decoder_{decoder},
          callback_{callback},
          recurring_{frequencyHz ? true : false},
+         permanent_{permanent},
          wait_for_multiple_responses_{wait_for_multiple_responses},
          frequency_clock_{frequency_clock_t(frequencyHz)},
          timeout_clock_{frequency_clock_t(10)},
@@ -133,6 +137,11 @@ bool active_diagnostic_request_t::get_recurring() const
        return recurring_;
 }
 
+bool active_diagnostic_request_t::get_permanent() const
+{
+       return permanent_;
+}
+
 frequency_clock_t& active_diagnostic_request_t::get_frequency_clock()
 {
        return frequency_clock_;
index 892feb5..8e5333b 100644 (file)
@@ -76,6 +76,7 @@ private:
                                                                                  ///< response is received for this request.
        bool recurring_; ///< bool recurring_ - If true, this is a recurring request and it will remain as active until explicitly cancelled.
                                         ///< The frequencyClock attribute controls how often a recurring request is made.
+       bool permanent_; ///< bool permanent_ - If true, this a permanent recurring request and will remain as active indefinitely (can't be cancelled).
        bool wait_for_multiple_responses_; ///< wait_for_multiple_responses_ - False by default, when any response is received for a request
                                                                           ///< it will be removed from the active list. If true, the request will remain active until the timeout
                                                                           ///< clock expires, to allow it to receive multiple response (e.g. to a functional broadcast request).
@@ -93,7 +94,7 @@ public:
        active_diagnostic_request_t(const std::string& bus, uint32_t id,
                const std::string& name, bool wait_for_multiple_responses,
                const DiagnosticResponseDecoder decoder,
-               const DiagnosticResponseCallback callback, float frequencyHz);
+               const DiagnosticResponseCallback callback, float frequencyHz, bool permanent);
        ~active_diagnostic_request_t();
 
        uint32_t get_id() const;
@@ -104,6 +105,7 @@ public:
        DiagnosticResponseDecoder& get_decoder();
        DiagnosticResponseCallback& get_callback();
        bool get_recurring() const;
+       bool get_permanent() const;
        frequency_clock_t& get_frequency_clock();
        frequency_clock_t& get_timeout_clock();
        utils::socketcan_bcm_t& get_socket();
index 86ff25b..2f22b14 100644 (file)
@@ -293,7 +293,7 @@ active_diagnostic_request_t* diagnostic_manager_t::add_request(DiagnosticRequest
        if (non_recurring_requests_.size() <= MAX_SIMULTANEOUS_DIAG_REQUESTS)
        {
                active_diagnostic_request_t* entry = new active_diagnostic_request_t(bus_, request->arbitration_id, name,
-                               wait_for_multiple_responses, decoder, callback, 0);
+                               wait_for_multiple_responses, decoder, callback, 0, false);
                entry->set_handle(shims_, request);
 
                char request_string[128] = {0};
@@ -359,7 +359,7 @@ bool diagnostic_manager_t::validate_optional_request_attributes(float frequencyH
 /// was too much already running requests, or if the frequency was too high.
 active_diagnostic_request_t* diagnostic_manager_t::add_recurring_request(DiagnosticRequest* request, const char* name,
                bool wait_for_multiple_responses, const DiagnosticResponseDecoder decoder,
-               const DiagnosticResponseCallback callback, float frequencyHz)
+               const DiagnosticResponseCallback callback, float frequencyHz, bool permanent)
 {
        active_diagnostic_request_t* entry = nullptr;
 
@@ -373,7 +373,7 @@ active_diagnostic_request_t* diagnostic_manager_t::add_recurring_request(Diagnos
                if(recurring_requests_.size() <= MAX_SIMULTANEOUS_DIAG_REQUESTS)
                {
                        entry = new active_diagnostic_request_t(bus_, request->arbitration_id, name,
-                                       wait_for_multiple_responses, decoder, callback, frequencyHz);
+                                       wait_for_multiple_responses, decoder, callback, frequencyHz, permanent);
                        recurring_requests_.push_back(entry);
 
                        entry->set_handle(shims_, request);
index 1124ff9..75d08e2 100644 (file)
@@ -80,7 +80,7 @@ public:
        bool validate_optional_request_attributes(float frequencyHz);
        active_diagnostic_request_t* add_recurring_request(DiagnosticRequest* request, const char* name,
                bool waitForMultipleResponses, const DiagnosticResponseDecoder decoder,
-               const DiagnosticResponseCallback callback, float frequencyHz);
+               const DiagnosticResponseCallback callback, float frequencyHz, bool permanent);
 
        // Decoding part
        openxc_VehicleMessage relay_diagnostic_response(active_diagnostic_request_t* adr, const DiagnosticResponse& response, const uint64_t timestamp);