Improve mutex lock logic.
authorRomain Forlot <romain.forlot@iot.bzh>
Fri, 24 Feb 2017 14:14:48 +0000 (14:14 +0000)
committerRomain Forlot <romain.forlot@iot.bzh>
Fri, 24 Feb 2017 14:14:48 +0000 (14:14 +0000)
- Use of bracket instead of unlock method
- Change some mutex lock scope.
- Added subscribed_signals map object mutex to
manipulate it safely.

Change-Id: I770c0b5701db6b1151511f7360ec31ae6dcc1de9
Signed-off-by: Romain Forlot <romain.forlot@iot.bzh>
src/can-signals.hpp
src/can-utils.cpp
src/can-utils.hpp
src/can_decode_message.cpp
src/can_event_push.cpp
src/can_event_push.hpp
src/can_reader.cpp
src/low-can-binding.cpp
src/low-can-binding.hpp

index af110d0..fb77df5 100644 (file)
@@ -23,6 +23,8 @@
 #include <string>
 #include <thread>
 #include <linux/can.h>
+#include <mutex>
+#include <condition_variable>
 
 #include "timer.hpp"
 #include "openxc.pb.h"
@@ -46,6 +48,13 @@ extern "C"
  */
 static std::map<std::string, struct afb_event> subscribed_signals;
 
+/**
+* @brief Mutex allowing safe manipulation on subscribed_signals map.
+* @desc To ensure that the map object isn't modified when we read it, you
+*  have to set this mutex before use subscribed_signals map object.
+*/
+extern std::mutex subscribed_signals_mutex;
+
 /** Public: Return the currently active CAN configuration. */
 CanMessageSet* getActiveMessageSet();
 
index c6f2d57..0d52bbe 100644 (file)
@@ -162,16 +162,33 @@ canfd_frame can_message_t::convert_to_canfd_frame()
 *********************************************************************************/
 
 can_bus_t::can_bus_t(int& conf_file)
-       :  conf_file_{conf_file}
+       : conf_file_{conf_file}
 {
 }
 
 void can_bus_t::start_threads()
 {
        th_decoding_ = std::thread(can_decode_message, std::ref(*this));
+       is_decoding_ = true;
        th_pushing_ = std::thread(can_event_push, std::ref(*this));
+       is_pushing_ = true;
 }
 
+void can_bus_t::stop_threads()
+{
+       is_decoding_ = false;
+       is_pushing_ = false;
+}
+
+bool can_bus_t::is_decoding()
+{
+       return is_decoding_;
+}
+
+bool can_bus_t::is_pushing()
+{
+       return is_pushing_;
+}
 
 int can_bus_t::init_can_dev()
 {
index 11e19f3..76dc75b 100644 (file)
@@ -221,7 +221,9 @@ class can_bus_t {
                int conf_file_; /*!< conf_file_ - configuration file handle used to initialize can_bus_dev_t objects.*/
                
                std::thread th_decoding_; /*!< thread that'll handle decoding a can frame */
+               bool is_decoding_; /*!< boolean member controling thread while loop*/
                std::thread th_pushing_; /*!<  thread that'll handle pushing decoded can frame to subscribers */
+               bool is_pushing_; /*!< boolean member controling thread while loop*/
 
                bool has_can_message_; /*!< boolean members that control whether or not there is can_message into the queue */
                std::queue <can_message_t> can_message_q_; /*!< queue that'll store can_message_t to decoded */
@@ -258,6 +260,32 @@ class can_bus_t {
                 */
                void start_threads();
 
+               /**
+                * @brief Will stop all threads holded by can_bus_t object
+                *  which are decoding and pushing threads.
+                */
+               void stop_threads();
+
+               /**
+                * @brief Telling if the decoding thread is running.
+                *  This is the boolean value on which the while loop
+                *  take its condition. Set it to false will stop the 
+                *  according thread.
+                *
+                * @return true if decoding thread is running, false if not.
+                */
+               bool is_decoding();
+
+               /**
+                * @brief Telling if the pushing thread is running
+                *  This is the boolean value on which the while loop
+                *  take its condition. Set it to false will stop the 
+                *  according thread.
+                *
+                * @return true if pushing thread is running, false if not.
+                */
+               bool is_pushing();
+
                /**
                 * @brief Return first can_message_t on the queue 
                 *
@@ -341,8 +369,11 @@ class can_bus_dev_t {
                
                /**
                 * @brief Telling if the reading thread is running
+                *  This is the boolean value on which the while loop
+                *  take its condition. Set it to false will stop the 
+                *  according thread.
                 *
-                * @return true if read is running, false if not.
+                * @return true if reading thread is running, false if not.
                 */
                bool is_running();
                
index 8d81f8b..3c1825d 100644 (file)
@@ -31,41 +31,44 @@ void can_decode_message(can_bus_t &can_bus)
        std::vector <CanSignal> signals;
        std::vector <CanSignal>::iterator signals_i;
        openxc_VehicleMessage vehicle_message;
-       openxc_DynamicField search_key, ret;
+       openxc_DynamicField search_key, decoded_message;
        bool send = true;
 
        decoder_t decoder;
 
-       while(can_bus.has_can_message())
+       while(can_bus.is_decoding())
        {
-               std::unique_lock<std::mutex> can_message_lock(can_message_mutex);
-               new_can_message.wait(can_message_lock);
+               {
+                       std::unique_lock<std::mutex> can_message_lock(can_message_mutex);
+                       new_can_message.wait(can_message_lock);
                        can_message = can_bus.next_can_message();
-               can_message_mutex.unlock();
+               }
 
-               std::lock_guard<std::mutex> decoded_can_message_lock(decoded_can_message_mutex);
-                       /* First we have to found which CanSignal is */
-                       search_key = build_DynamicField((double)can_message.get_id());
-                       signals = find_can_signals(search_key);
-                       
-                       /* Decoding the message ! Don't kill the messenger ! */
-                       for(const auto& sig : signals)
-                       {       
+               /* First we have to found which CanSignal it is */
+               search_key = build_DynamicField((double)can_message.get_id());
+               signals = find_can_signals(search_key);
+
+               /* Decoding the message ! Don't kill the messenger ! */
+               for(const auto& sig : signals)
+               {
+                       {
+                               std::lock_guard<std::mutex> subscribed_signals_lock(subscribed_signals_mutex);
                                std::map<std::string, struct afb_event> subscribed_signals = get_subscribed_signals();
                                const auto& it_event = subscribed_signals.find(sig.genericName);
                                
                                if(it_event != subscribed_signals.end() &&
                                        afb_event_is_valid(it_event->second))
                                {
-                                       ret = decoder.decodeSignal(sig, can_message, getSignals(), &send);
-
-                                       openxc_SimpleMessage s_message = build_SimpleMessage(sig.genericName, ret);
+                                       decoded_message = decoder.decodeSignal(sig, can_message, getSignals(), &send);
 
+                                       openxc_SimpleMessage s_message = build_SimpleMessage(sig.genericName, decoded_message);
                                        vehicle_message = build_VehicleMessage_with_SimpleMessage(openxc_DynamicField_Type::openxc_DynamicField_Type_NUM, s_message);
+
+                                       std::lock_guard<std::mutex> decoded_can_message_lock(decoded_can_message_mutex);
                                        can_bus.push_new_vehicle_message(vehicle_message);
                                }
+                               new_decoded_can_message.notify_one();
                        }
-               decoded_can_message_mutex.unlock();
-               new_decoded_can_message.notify_one();
+               }
        }
 }
index bfe8c80..fc403ee 100644 (file)
@@ -30,16 +30,18 @@ void can_event_push(can_bus_t& can_bus)
        openxc_SimpleMessage s_message;
        json_object* jo;
        
-       while(can_bus.has_vehicle_message())
+       while(can_bus.is_pushing())
        {
-               std::unique_lock<std::mutex> decoded_can_message_lock(decoded_can_message_mutex);
-               new_decoded_can_message.wait(decoded_can_message_lock);
+               {
+                       std::unique_lock<std::mutex> decoded_can_message_lock(decoded_can_message_mutex);
+                       new_decoded_can_message.wait(decoded_can_message_lock);
                        v_message = can_bus.next_vehicle_message();
-               decoded_can_message_mutex.unlock();
+               }
 
                s_message = get_simple_message(v_message);
 
-               std::lock_guard<std::mutex> push_signal_lock(subscribed_signals_mutex);
+               {
+                       std::lock_guard<std::mutex> subscribed_signals_lock(subscribed_signals_mutex);
                        std::map<std::string, struct afb_event> subscribed_signals = get_subscribed_signals();
                        const auto& it_event = subscribed_signals.find(s_message.name);
                        if(it_event != subscribed_signals.end() && afb_event_is_valid(it_event->second))
@@ -48,7 +50,6 @@ void can_event_push(can_bus_t& can_bus)
                                jsonify_simple(s_message, jo);
                                afb_event_push(it_event->second, jo);
                        }
-               subscribed_signals_mutex.unlock();
-               update_subscrided_signals.notify_one();
+               }
        }
 }
\ No newline at end of file
index 0040144..e597241 100644 (file)
@@ -17,9 +17,3 @@
  */
 
 #pragma once
-
-#include <mutex>
-#include <condition_variable>
-
-std::condition_variable update_subscrided_signals;
-std::mutex subscribed_signals_mutex;
\ No newline at end of file
index 018c7cb..7ae981b 100644 (file)
@@ -29,9 +29,10 @@ void can_reader(can_bus_dev_t &can_bus_dev, can_bus_t& can_bus)
        {
                can_message.convert_from_canfd_frame(can_bus_dev.read());
 
-               std::lock_guard<std::mutex> can_message_lock(can_message_mutex);
+               {
+                       std::lock_guard<std::mutex> can_message_lock(can_message_mutex);
                        can_bus.push_new_can_message(can_message);
-               can_message_mutex.unlock();
+               }
                new_can_message.notify_one();
        }
 }
\ No newline at end of file
index 2151790..be2d373 100644 (file)
@@ -19,6 +19,7 @@
 #include "low-can-binding.hpp"
 
 #include <queue>
+#include <mutex>
 #include <vector>
 #include <thread>
 #include <fcntl.h>
@@ -34,7 +35,6 @@
 
 extern "C"
 {
-       #include <afb/afb-binding.h>
        #include <afb/afb-service-itf.h>
 };
 
@@ -43,15 +43,6 @@ extern "C"
  */
 const struct afb_binding_interface *binder_interface;
 
-/*
- * CAN bus handler pointer. This is the object that will be use to
- * initialize each CAN devices specified into the configuration file
- *
- * It is used by the reading thread also because of its can_message_q_ queue
- * that store CAN messages read from the socket.
- */
-can_bus_t *can_bus_handler;
-
 /********************************************************************************
 *
 *              Subscription and unsubscription
@@ -86,7 +77,7 @@ static int subscribe_unsubscribe_signal(struct afb_req request, bool subscribe,
 {
        int ret;
 
-       // TODO: lock the subscribed_signals when insert/remove
+       std::lock_guard<std::mutex> subscribed_signals_lock(subscribed_signals_mutex);
        auto ss_i = subscribed_signals.find(sig.genericName);
        if (ss_i != subscribed_signals.end() && !afb_event_is_valid(ss_i->second))
        {
@@ -243,13 +234,12 @@ extern "C"
                fd_conf = afb_daemon_rootdir_open_locale(binder_interface->daemon, "can_bus.json", O_RDONLY, NULL);
 
                /* Initialize the CAN bus handler */
-               can_bus_t cbh(fd_conf);
-               can_bus_handler = &cbh;
+               can_bus_t can_bus_handler(fd_conf);
 
                /* Open CAN socket */
-               if(can_bus_handler->init_can_dev() == 0)
+               if(can_bus_handler.init_can_dev() == 0)
                {
-                       can_bus_handler->start_threads();
+                       can_bus_handler.start_threads();
                        return 0;
                }
                ERROR(binder_interface, "There was something wrong with CAN device Initialization. Check your config file maybe");
index b2b544f..2c84989 100644 (file)
  
 #pragma once
 
-#include <mutex>
-#include <condition_variable>
+#include <cstddef>
+extern "C"
+{
+       #include <afb/afb-binding.h>
+};
 
 extern "C" struct afb_binding_interface;