First draft about lock/wait thread management.
authorRomain Forlot <romain.forlot@iot.bzh>
Thu, 23 Feb 2017 17:52:35 +0000 (17:52 +0000)
committerRomain Forlot <romain.forlot@iot.bzh>
Thu, 23 Feb 2017 17:52:35 +0000 (17:52 +0000)
Reordering include files

Change-Id: Ia6d9ee30eb4e1df0c380c26355679fe00b373aa8
Signed-off-by: Romain Forlot <romain.forlot@iot.bzh>
12 files changed:
src/can-signals.cpp
src/can-signals.hpp
src/can-utils.cpp
src/can-utils.hpp
src/can_decode_message.cpp
src/can_decode_message.hpp
src/can_event_push.cpp
src/can_event_push.hpp
src/can_reader.cpp
src/can_reader.hpp
src/low-can-binding.cpp
src/low-can-binding.hpp

index 2cf4b03..0a6a829 100644 (file)
 
 #include "can-signals.hpp"
 
+#include <fnmatch.h>
+
+#include "low-can-binding.hpp"
+
 /**
  * @brief Dumb SIGNALS array. It is composed by CanMessageSet
  * SIGNALS[MESSAGE_SET_ID][CanSignal]
index d4ada1d..af110d0 100644 (file)
 
 #include <map>
 #include <queue>
-#include <string>
 #include <vector>
-#include <fnmatch.h>
+#include <string>
+#include <thread>
+#include <linux/can.h>
 
+#include "timer.hpp"
+#include "openxc.pb.h"
 #include "can-utils.hpp"
 
 extern "C"
 {
+       #include <afb/afb-binding.h>
        #include <afb/afb-event-itf.h>
 }
 
index ba4060e..5a845b3 100644 (file)
 
 #include "can-utils.hpp"
 
+#include <map>
+#include <vector>
+#include <cstdio>
+#include <string>
+#include <fcntl.h>
+#include <unistd.h>
+#include <net/if.h>
+#include <sys/ioctl.h>
+#include <sys/socket.h>
+#include <json-c/json.h>
+#include <linux/can/raw.h>
+#include <systemd/sd-event.h>
+
+extern "C"
+{
+       #include <afb/afb-binding.h>
+}
+
 /********************************************************************************
 *
 *              CanMessage method implementation
@@ -138,6 +156,154 @@ canfd_frame can_message_t::convert_to_canfd_frame()
        return frame;
 }
 
+/********************************************************************************
+*
+*              can_bus_t method implementation
+*
+*********************************************************************************/
+
+can_bus_t::can_bus_t(int& conf_file)
+       :  conf_file_{conf_file}
+{
+}
+
+void can_bus_t::start_threads()
+{
+       th_decoding_ = std::thread(can_decode_message, std::ref(*this));
+       th_pushing_ = std::thread(can_event_push, std::ref(*this));
+}
+
+
+int can_bus_t::init_can_dev()
+{
+       std::vector<std::string> devices_name;
+       int i;
+       size_t t;
+
+       devices_name = read_conf();
+
+       if (! devices_name.empty())
+       {
+               t = devices_name.size();
+               i=0;
+
+               for(const auto& device : devices_name)
+               {
+                       can_bus_dev_t can_bus_device_handler(device);
+                       can_bus_device_handler.open();
+                       can_bus_device_handler.start_reading(std::ref(*this));
+                       i++;
+               }
+
+               NOTICE(binder_interface, "Initialized %d/%d can bus device(s)", i, t);
+               return 0;
+       }
+       ERROR(binder_interface, "init_can_dev: Error at CAN device initialization. No devices read into configuration file. Did you specify canbus JSON object ?");
+       return 1;
+}
+
+std::vector<std::string> can_bus_t::read_conf()
+{
+       std::vector<std::string> ret;
+       json_object *jo, *canbus;
+       int n, i;
+
+       FILE *fd = fdopen(conf_file_, "r");
+       if (fd)
+       {
+               std::string fd_conf_content;
+               std::fseek(fd, 0, SEEK_END);
+               fd_conf_content.resize(std::ftell(fd));
+               std::rewind(fd);
+               std::fread(&fd_conf_content[0], 1, fd_conf_content.size(), fd);
+               std::fclose(fd);
+
+               jo = json_tokener_parse(fd_conf_content.c_str());
+
+               if (jo == NULL || !json_object_object_get_ex(jo, "canbus", &canbus))
+               {
+                       ERROR(binder_interface, "Can't find canbus node in the configuration file. Please review it.");
+                       ret.clear();
+               }
+               else if (json_object_get_type(canbus) != json_type_array)
+                       ret.push_back(json_object_get_string(canbus));
+               else
+               {
+                       n = json_object_array_length(canbus);
+                       for (i = 0 ; i < n ; i++)
+                               ret.push_back(json_object_get_string(json_object_array_get_idx(canbus, i)));
+               }
+               return ret;
+       }
+       ERROR(binder_interface, "Problem at reading the conf file");
+       ret.clear();
+       return ret;
+}
+
+can_message_t can_bus_t::next_can_message()
+{
+       can_message_t can_msg;
+
+       if(!can_message_q_.empty())
+       {
+               can_msg = can_message_q_.front();
+               can_message_q_.pop();
+               DEBUG(binder_interface, "next_can_message: Here is the next can message : id %d, length %d", can_msg.get_id(), can_msg.get_length());
+               return can_msg;
+       }
+       
+       NOTICE(binder_interface, "next_can_message: End of can message queue");
+       has_can_message_ = false;
+       return can_msg;
+}
+
+void can_bus_t::push_new_can_message(const can_message_t& can_msg)
+{
+       can_message_q_.push(can_msg);
+}
+
+bool can_bus_t::has_can_message() const
+{
+       return has_can_message_;
+}
+
+openxc_VehicleMessage can_bus_t::next_vehicle_message()
+{
+       openxc_VehicleMessage v_msg;
+
+       if(! vehicle_message_q_.empty())
+       {
+               v_msg = vehicle_message_q_.front();
+               vehicle_message_q_.pop();
+               DEBUG(binder_interface, "next_vehicle_message: next vehicle message poped");
+               return v_msg;
+       }
+       
+       NOTICE(binder_interface, "next_vehicle_message: End of vehicle message queue");
+       has_vehicle_message_ = false;
+       return v_msg;
+}
+
+void can_bus_t::push_new_vehicle_message(const openxc_VehicleMessage& v_msg)
+{
+       vehicle_message_q_.push(v_msg);
+       has_vehicle_message_ = true;
+}
+
+bool can_bus_t::has_vehicle_message() const
+{
+       return has_vehicle_message_;
+}
+
+/********************************************************************************
+*
+*              This is the sd_event_add_io callback function declaration. 
+*              Its implementation can be found into low-can-binding.cpp.
+*
+*********************************************************************************/
+
+int can_frame_received(sd_event_source *s, int fd, uint32_t revents, void *userdata);
+
 /********************************************************************************
 *
 *              can_bus_dev_t method implementation
@@ -149,6 +315,22 @@ can_bus_dev_t::can_bus_dev_t(const std::string &dev_name)
 {
 }
 
+int can_bus_dev_t::event_loop_connection()
+{
+       sd_event_source *source;
+       int rc;
+
+       /* adds to the event loop */
+       rc = sd_event_add_io(afb_daemon_get_event_loop(binder_interface->daemon), &source, can_socket_, EPOLLIN, can_frame_received, this);
+       if (rc < 0) {
+               close();
+               ERROR(binder_interface, "Can't coonect CAN device %s to the event loop", device_name_);
+       } else {
+               NOTICE(binder_interface, "Connected to %s", device_name_);
+       }
+       return rc;
+}
+
 int can_bus_dev_t::open()
 {
        const int canfd_on = 1;
@@ -249,7 +431,6 @@ canfd_frame can_bus_dev_t::read()
 void can_bus_dev_t::start_reading(can_bus_t& can_bus)
 {
        th_reading_ = std::thread(can_reader, std::ref(*this), std::ref(can_bus));
-
        is_running_ = true;
 }
 
@@ -285,142 +466,4 @@ int can_bus_dev_t::send_can_message(can_message_t& can_msg)
                open();
        }
        return 0;
-}
-/********************************************************************************
-*
-*              can_bus_t method implementation
-*
-*********************************************************************************/
-
-can_bus_t::can_bus_t(int& conf_file)
-       :  conf_file_{conf_file}
-{
-}
-
-void can_bus_t::start_threads()
-{
-       th_decoding_ = std::thread(can_decode_message, std::ref(*this));
-       th_pushing_ = std::thread(can_event_push, std::ref(*this));
-}
-
-
-int can_bus_t::init_can_dev()
-{
-       std::vector<std::string> devices_name;
-       int i;
-       size_t t;
-
-       devices_name = read_conf();
-
-       if (! devices_name.empty())
-       {
-               t = devices_name.size();
-               i=0;
-
-               for(const auto& device : devices_name)
-               {
-                       can_bus_dev_t can_bus_device_handler(device);
-                       can_bus_device_handler.open();
-                       can_bus_device_handler.start_reading(std::ref(*this));
-                       i++;
-               }
-
-               NOTICE(binder_interface, "Initialized %d/%d can bus device(s)", i, t);
-               return 0;
-       }
-       ERROR(binder_interface, "init_can_dev: Error at CAN device initialization.");
-       return 1;
-}
-
-std::vector<std::string> can_bus_t::read_conf()
-{
-       std::vector<std::string> ret;
-       json_object *jo, *canbus;
-       int n, i;
-
-       FILE *fd = fdopen(conf_file_, "r");
-       if (fd)
-       {
-               std::string fd_conf_content;
-               std::fseek(fd, 0, SEEK_END);
-               fd_conf_content.resize(std::ftell(fd));
-               std::rewind(fd);
-               std::fread(&fd_conf_content[0], 1, fd_conf_content.size(), fd);
-               std::fclose(fd);
-
-               jo = json_tokener_parse(fd_conf_content.c_str());
-
-               if (jo == NULL || !json_object_object_get_ex(jo, "canbus", &canbus))
-               {
-                       ERROR(binder_interface, "Can't find canbus node in the configuration file. Please review it.");
-                       ret.clear();
-               }
-               else if (json_object_get_type(canbus) != json_type_array)
-                       ret.push_back(json_object_get_string(canbus));
-               else
-               {
-                       n = json_object_array_length(canbus);
-                       for (i = 0 ; i < n ; i++)
-                               ret.push_back(json_object_get_string(json_object_array_get_idx(canbus, i)));
-               }
-               return ret;
-       }
-       ERROR(binder_interface, "Problem at reading the conf file");
-       ret.clear();
-       return ret;
-}
-
-can_message_t can_bus_t::next_can_message()
-{
-       can_message_t can_msg;
-
-       if(!can_message_q_.empty())
-       {
-               can_msg = can_message_q_.front();
-               can_message_q_.pop();
-               DEBUG(binder_interface, "next_can_message: Here is the next can message : id %d, length %d", can_msg.get_id(), can_msg.get_length());
-               return can_msg;
-       }
-       
-       NOTICE(binder_interface, "next_can_message: End of can message queue");
-       has_can_message_ = false;
-       return can_msg;
-}
-
-void can_bus_t::push_new_can_message(const can_message_t& can_msg)
-{
-       can_message_q_.push(can_msg);
-}
-
-bool can_bus_t::has_can_message() const
-{
-       return has_can_message_;
-}
-
-openxc_VehicleMessage can_bus_t::next_vehicle_message()
-{
-       openxc_VehicleMessage v_msg;
-
-       if(! vehicle_message_q_.empty())
-       {
-               v_msg = vehicle_message_q_.front();
-               vehicle_message_q_.pop();
-               DEBUG(binder_interface, "next_vehicle_message: next vehicle message poped");
-               return v_msg;
-       }
-       
-       NOTICE(binder_interface, "next_vehicle_message: End of vehicle message queue");
-       has_vehicle_message_ = false;
-       return v_msg;
-}
-
-void can_bus_t::push_new_vehicle_message(const openxc_VehicleMessage& v_msg)
-{
-       vehicle_message_q_.push(v_msg);
-       has_vehicle_message_ = true;
-}
-
-bool can_bus_t::has_vehicle_message() const
-{
-       return has_vehicle_message_;
 }
\ No newline at end of file
index 218452d..e4a3d34 100644 (file)
 
 #pragma once
 
-#include <map>
 #include <queue>
-#include <vector>
-#include <cstdio>
-#include <string>
-#include <fcntl.h>
-#include <unistd.h>
-#include <net/if.h>
 #include <thread>
-#include <sys/ioctl.h>
 #include <linux/can.h>
-#include <sys/socket.h>
-#include <json-c/json.h>
-#include <linux/can/raw.h>
 
 #include "timer.hpp"
 #include "openxc.pb.h"
 #include "low-can-binding.hpp"
 
-extern "C"
-{
-       #include <afb/afb-binding.h>
-}
-
 // TODO actual max is 32 but dropped to 24 for memory considerations
 #define MAX_ACCEPTANCE_FILTERS 24
 // TODO this takes up a ton of memory
@@ -341,13 +325,34 @@ class can_bus_dev_t {
                 */
                can_bus_dev_t(const std::string& dev_name);
 
+               /**
+                * @brief Connect to the application framework event loop and adding
+                *  a watch on the socket that will wakeup reading thread that will
+                * read the socket and fill can_bus_t object queue.
+                *
+                * @return 0 if success, anything else if not.
+                */
+               int event_loop_connection();
+
                /**
                 * @brief Open the can socket and returning it 
                 *
                 * @return 
                 */
                int open();
+               
+               /**
+                * @brief Open the can socket and returning it 
+                *
+                * @return 
+                */
                int close();
+               
+               /**
+                * @brief Telling if the reading thread is running
+                *
+                * @return true if read is running, false if not.
+                */
                bool is_running();
                
                /**
index 8fadbdb..8d81f8b 100644 (file)
 
 #include "can_decode_message.hpp"
 
+#include "can-utils.hpp"
+#include "openxc-utils.hpp"
+#include "can-signals.hpp"
+#include "can-decoder.hpp"
+
+#include "can_reader.hpp"
+
 void can_decode_message(can_bus_t &can_bus)
 {
        can_message_t can_message;
@@ -31,28 +38,34 @@ void can_decode_message(can_bus_t &can_bus)
 
        while(can_bus.has_can_message())
        {
-               can_message = can_bus.next_can_message();
-
-               /* First we have to found which CanSignal is */
-               search_key = build_DynamicField((double)can_message.get_id());
-               signals = find_can_signals(search_key);
-               
-               /* Decoding the message ! Don't kill the messenger ! */
-               for(const auto& sig : signals)
-               {       
-                       std::map<std::string, struct afb_event> subscribed_signals = get_subscribed_signals();
-                       const auto& it_event = subscribed_signals.find(sig.genericName);
+               std::unique_lock<std::mutex> can_message_lock(can_message_mutex);
+               new_can_message.wait(can_message_lock);
+                       can_message = can_bus.next_can_message();
+               can_message_mutex.unlock();
+
+               std::lock_guard<std::mutex> decoded_can_message_lock(decoded_can_message_mutex);
+                       /* First we have to found which CanSignal is */
+                       search_key = build_DynamicField((double)can_message.get_id());
+                       signals = find_can_signals(search_key);
                        
-                       if(it_event != subscribed_signals.end() &&
-                               afb_event_is_valid(it_event->second))
-                       {
-                               ret = decoder.decodeSignal(sig, can_message, getSignals(), &send);
+                       /* Decoding the message ! Don't kill the messenger ! */
+                       for(const auto& sig : signals)
+                       {       
+                               std::map<std::string, struct afb_event> subscribed_signals = get_subscribed_signals();
+                               const auto& it_event = subscribed_signals.find(sig.genericName);
+                               
+                               if(it_event != subscribed_signals.end() &&
+                                       afb_event_is_valid(it_event->second))
+                               {
+                                       ret = decoder.decodeSignal(sig, can_message, getSignals(), &send);
 
-                               openxc_SimpleMessage s_message = build_SimpleMessage(sig.genericName, ret);
+                                       openxc_SimpleMessage s_message = build_SimpleMessage(sig.genericName, ret);
 
-                               vehicle_message = build_VehicleMessage_with_SimpleMessage(openxc_DynamicField_Type::openxc_DynamicField_Type_NUM, s_message);
-                               can_bus.push_new_vehicle_message(vehicle_message);
+                                       vehicle_message = build_VehicleMessage_with_SimpleMessage(openxc_DynamicField_Type::openxc_DynamicField_Type_NUM, s_message);
+                                       can_bus.push_new_vehicle_message(vehicle_message);
+                               }
                        }
-               }
+               decoded_can_message_mutex.unlock();
+               new_decoded_can_message.notify_one();
        }
 }
index 118dac0..73ea2ef 100644 (file)
@@ -18,9 +18,8 @@
 
 #pragma once
 
-#include "can-utils.hpp"
-#include "openxc-utils.hpp"
-#include "can-signals.hpp"
-#include "can-decoder.hpp"
+#include <mutex>
+#include <condition_variable>
 
-void can_decode_message(can_bus_t &can_bus);
\ No newline at end of file
+extern std::condition_variable new_decoded_can_message;
+extern std::mutex decoded_can_message_mutex;
\ No newline at end of file
index 2842c63..bfe8c80 100644 (file)
 
 #include "can_event_push.hpp"
 
+#include "can-utils.hpp"
+#include "can-signals.hpp"
+#include "openxc-utils.hpp"
+
+#include "can_decode_message.hpp"
+
 void can_event_push(can_bus_t& can_bus)
 {
        openxc_VehicleMessage v_message;
@@ -26,15 +32,23 @@ void can_event_push(can_bus_t& can_bus)
        
        while(can_bus.has_vehicle_message())
        {
-               v_message = can_bus.next_vehicle_message();
+               std::unique_lock<std::mutex> decoded_can_message_lock(decoded_can_message_mutex);
+               new_decoded_can_message.wait(decoded_can_message_lock);
+                       v_message = can_bus.next_vehicle_message();
+               decoded_can_message_mutex.unlock();
+
                s_message = get_simple_message(v_message);
-               std::map<std::string, struct afb_event> subscribed_signals = get_subscribed_signals();
-               const auto& it_event = subscribed_signals.find(s_message.name);
-               if(it_event != subscribed_signals.end() && afb_event_is_valid(it_event->second))
-               {
-                       jo = json_object_new_object();
-                       jsonify_simple(s_message, jo);
-                       afb_event_push(it_event->second, jo);
-               }
+
+               std::lock_guard<std::mutex> push_signal_lock(subscribed_signals_mutex);
+                       std::map<std::string, struct afb_event> subscribed_signals = get_subscribed_signals();
+                       const auto& it_event = subscribed_signals.find(s_message.name);
+                       if(it_event != subscribed_signals.end() && afb_event_is_valid(it_event->second))
+                       {
+                               jo = json_object_new_object();
+                               jsonify_simple(s_message, jo);
+                               afb_event_push(it_event->second, jo);
+                       }
+               subscribed_signals_mutex.unlock();
+               update_subscrided_signals.notify_one();
        }
 }
\ No newline at end of file
index 0e614f1..0040144 100644 (file)
@@ -18,8 +18,8 @@
 
 #pragma once
 
-#include "can-utils.hpp"
-#include "can-signals.hpp"
-#include "openxc-utils.hpp"
+#include <mutex>
+#include <condition_variable>
 
-void can_event_push(can_bus_t& can_bus);
\ No newline at end of file
+std::condition_variable update_subscrided_signals;
+std::mutex subscribed_signals_mutex;
\ No newline at end of file
index b836570..f984c70 100644 (file)
 
 #include "can_reader.hpp"
 
+#include "low-can-binding.hpp"
+#include "can-utils.hpp"
+
 void can_reader(can_bus_dev_t &can_bus_dev, can_bus_t& can_bus)
 {
        can_message_t can_message;
 
        while(can_bus_dev.is_running())
        {
-               can_message.convert_from_canfd_frame(can_bus_dev.read());
-               can_bus.push_new_can_message(can_message);
+               std::unique_lock<std::mutex> can_frame_lock(can_frame_mutex);
+               new_can_frame.wait(can_frame_lock);
+                       can_message.convert_from_canfd_frame(can_bus_dev.read());
+               can_frame_mutex.unlock();
+
+               std::lock_guard<std::mutex> can_message_lock(can_message_mutex);
+                       can_bus.push_new_can_message(can_message);
+               can_message_mutex.unlock();
+               new_can_message.notify_one();
        }
 }
\ No newline at end of file
index 714a94d..3d73103 100644 (file)
@@ -18,6 +18,8 @@
 
 #pragma once
 
-#include "can-utils.hpp"
+#include <mutex>
+#include <condition_variable>
 
-void can_reader(can_bus_dev_t& can_bus_dev, can_bus_t& can_bus);
\ No newline at end of file
+extern std::condition_variable new_can_message;
+extern std::mutex can_message_mutex;
\ No newline at end of file
index 9e34401..233f002 100644 (file)
  * limitations under the License.
  */
 
+#include "low-can-binding.hpp"
+
+#include <queue>
+#include <vector>
+#include <thread>
+#include <fcntl.h>
+#include <linux/can.h>
 #include <json-c/json.h>
 #include <systemd/sd-event.h>
 
-#include "low-can-binding.hpp"
-
+#include "timer.hpp"
+#include "openxc.pb.h"
 #include "can-utils.hpp"
 #include "can-signals.hpp"
 #include "openxc-utils.hpp"
@@ -36,12 +43,41 @@ extern "C"
  */
 const struct afb_binding_interface *binder_interface;
 
+/*
+ * CAN bus handler pointer. This is the object that will be use to
+ * initialize each CAN devices specified into the configuration file
+ *
+ * It is used by the reading thread also because of its can_message_q_ queue
+ * that store CAN messages read from the socket.
+ */
+can_bus_t *can_bus_handler;
+
 /********************************************************************************
 *
 *              Event management
 *
 *********************************************************************************/
+int can_frame_received(sd_event_source *s, int fd, uint32_t revents, void *userdata)
+{
+       can_bus_dev_t *can_bus_dev = (can_bus_dev_t*)userdata;
+       std::lock_guard<std::mutex> can_frame_lock(can_frame_mutex);
 
+       /* Notify reading thread that there is something to read */
+       if ((revents & EPOLLIN) != 0) {
+               new_can_frame.notify_one();
+       }
+
+       /* check if error or hangup */
+       if ((revents & (EPOLLERR|EPOLLRDHUP|EPOLLHUP)) != 0)
+       {
+               sd_event_source_unref(s);
+               can_bus_dev->close();
+               can_bus_dev->open();
+               can_bus_dev->start_reading(*can_bus_handler);
+       }
+
+       return 0;
+}
 /********************************************************************************
 *
 *              Subscription and unsubscription
@@ -220,14 +256,17 @@ extern "C"
                int fd_conf;
                fd_conf = afb_daemon_rootdir_open_locale(binder_interface->daemon, "can_bus.json", O_RDONLY, NULL);
 
+               /* Initialize the CAN bus handler */
+               can_bus_t cbh(fd_conf);
+               can_bus_handler = &cbh;
+
                /* Open CAN socket */
-               can_bus_t can_bus_handler(fd_conf);
-               if(can_bus_handler.init_can_dev() == 0)
+               if(can_bus_handler->init_can_dev() == 0)
                {
-                       can_bus_handler.start_threads();
+                       can_bus_handler->start_threads();
                        return 0;
                }
-
+               ERROR(binder_interface, "There was something wrong with CAN device Initialization. Check your config file maybe");
                return 1;
        }
 };
index 0f2b950..11df1f4 100644 (file)
  
 #pragma once
 
+#include <mutex>
+#include <condition_variable>
+
 extern "C" struct afb_binding_interface;
 
 extern const struct afb_binding_interface *binder_interface;
+
+extern std::condition_variable new_can_frame;
+extern std::mutex can_frame_mutex;
\ No newline at end of file