- Use of bracket instead of unlock method
- Change some mutex lock scope.
- Added subscribed_signals map object mutex to
manipulate it safely.
Change-Id: I770c0b5701db6b1151511f7360ec31ae6dcc1de9
Signed-off-by: Romain Forlot <romain.forlot@iot.bzh>
#include <string>
#include <thread>
#include <linux/can.h>
#include <string>
#include <thread>
#include <linux/can.h>
+#include <mutex>
+#include <condition_variable>
#include "timer.hpp"
#include "openxc.pb.h"
#include "timer.hpp"
#include "openxc.pb.h"
*/
static std::map<std::string, struct afb_event> subscribed_signals;
*/
static std::map<std::string, struct afb_event> subscribed_signals;
+/**
+* @brief Mutex allowing safe manipulation on subscribed_signals map.
+* @desc To ensure that the map object isn't modified when we read it, you
+* have to set this mutex before use subscribed_signals map object.
+*/
+extern std::mutex subscribed_signals_mutex;
+
/** Public: Return the currently active CAN configuration. */
CanMessageSet* getActiveMessageSet();
/** Public: Return the currently active CAN configuration. */
CanMessageSet* getActiveMessageSet();
*********************************************************************************/
can_bus_t::can_bus_t(int& conf_file)
*********************************************************************************/
can_bus_t::can_bus_t(int& conf_file)
- : conf_file_{conf_file}
+ : conf_file_{conf_file}
{
}
void can_bus_t::start_threads()
{
th_decoding_ = std::thread(can_decode_message, std::ref(*this));
{
}
void can_bus_t::start_threads()
{
th_decoding_ = std::thread(can_decode_message, std::ref(*this));
th_pushing_ = std::thread(can_event_push, std::ref(*this));
th_pushing_ = std::thread(can_event_push, std::ref(*this));
+void can_bus_t::stop_threads()
+{
+ is_decoding_ = false;
+ is_pushing_ = false;
+}
+
+bool can_bus_t::is_decoding()
+{
+ return is_decoding_;
+}
+
+bool can_bus_t::is_pushing()
+{
+ return is_pushing_;
+}
int can_bus_t::init_can_dev()
{
int can_bus_t::init_can_dev()
{
int conf_file_; /*!< conf_file_ - configuration file handle used to initialize can_bus_dev_t objects.*/
std::thread th_decoding_; /*!< thread that'll handle decoding a can frame */
int conf_file_; /*!< conf_file_ - configuration file handle used to initialize can_bus_dev_t objects.*/
std::thread th_decoding_; /*!< thread that'll handle decoding a can frame */
+ bool is_decoding_; /*!< boolean member controling thread while loop*/
std::thread th_pushing_; /*!< thread that'll handle pushing decoded can frame to subscribers */
std::thread th_pushing_; /*!< thread that'll handle pushing decoded can frame to subscribers */
+ bool is_pushing_; /*!< boolean member controling thread while loop*/
bool has_can_message_; /*!< boolean members that control whether or not there is can_message into the queue */
std::queue <can_message_t> can_message_q_; /*!< queue that'll store can_message_t to decoded */
bool has_can_message_; /*!< boolean members that control whether or not there is can_message into the queue */
std::queue <can_message_t> can_message_q_; /*!< queue that'll store can_message_t to decoded */
+ /**
+ * @brief Will stop all threads holded by can_bus_t object
+ * which are decoding and pushing threads.
+ */
+ void stop_threads();
+
+ /**
+ * @brief Telling if the decoding thread is running.
+ * This is the boolean value on which the while loop
+ * take its condition. Set it to false will stop the
+ * according thread.
+ *
+ * @return true if decoding thread is running, false if not.
+ */
+ bool is_decoding();
+
+ /**
+ * @brief Telling if the pushing thread is running
+ * This is the boolean value on which the while loop
+ * take its condition. Set it to false will stop the
+ * according thread.
+ *
+ * @return true if pushing thread is running, false if not.
+ */
+ bool is_pushing();
+
/**
* @brief Return first can_message_t on the queue
*
/**
* @brief Return first can_message_t on the queue
*
/**
* @brief Telling if the reading thread is running
/**
* @brief Telling if the reading thread is running
+ * This is the boolean value on which the while loop
+ * take its condition. Set it to false will stop the
+ * according thread.
- * @return true if read is running, false if not.
+ * @return true if reading thread is running, false if not.
std::vector <CanSignal> signals;
std::vector <CanSignal>::iterator signals_i;
openxc_VehicleMessage vehicle_message;
std::vector <CanSignal> signals;
std::vector <CanSignal>::iterator signals_i;
openxc_VehicleMessage vehicle_message;
- openxc_DynamicField search_key, ret;
+ openxc_DynamicField search_key, decoded_message;
bool send = true;
decoder_t decoder;
bool send = true;
decoder_t decoder;
- while(can_bus.has_can_message())
+ while(can_bus.is_decoding())
- std::unique_lock<std::mutex> can_message_lock(can_message_mutex);
- new_can_message.wait(can_message_lock);
+ {
+ std::unique_lock<std::mutex> can_message_lock(can_message_mutex);
+ new_can_message.wait(can_message_lock);
can_message = can_bus.next_can_message();
can_message = can_bus.next_can_message();
- can_message_mutex.unlock();
- std::lock_guard<std::mutex> decoded_can_message_lock(decoded_can_message_mutex);
- /* First we have to found which CanSignal is */
- search_key = build_DynamicField((double)can_message.get_id());
- signals = find_can_signals(search_key);
-
- /* Decoding the message ! Don't kill the messenger ! */
- for(const auto& sig : signals)
- {
+ /* First we have to found which CanSignal it is */
+ search_key = build_DynamicField((double)can_message.get_id());
+ signals = find_can_signals(search_key);
+
+ /* Decoding the message ! Don't kill the messenger ! */
+ for(const auto& sig : signals)
+ {
+ {
+ std::lock_guard<std::mutex> subscribed_signals_lock(subscribed_signals_mutex);
std::map<std::string, struct afb_event> subscribed_signals = get_subscribed_signals();
const auto& it_event = subscribed_signals.find(sig.genericName);
if(it_event != subscribed_signals.end() &&
afb_event_is_valid(it_event->second))
{
std::map<std::string, struct afb_event> subscribed_signals = get_subscribed_signals();
const auto& it_event = subscribed_signals.find(sig.genericName);
if(it_event != subscribed_signals.end() &&
afb_event_is_valid(it_event->second))
{
- ret = decoder.decodeSignal(sig, can_message, getSignals(), &send);
-
- openxc_SimpleMessage s_message = build_SimpleMessage(sig.genericName, ret);
+ decoded_message = decoder.decodeSignal(sig, can_message, getSignals(), &send);
+ openxc_SimpleMessage s_message = build_SimpleMessage(sig.genericName, decoded_message);
vehicle_message = build_VehicleMessage_with_SimpleMessage(openxc_DynamicField_Type::openxc_DynamicField_Type_NUM, s_message);
vehicle_message = build_VehicleMessage_with_SimpleMessage(openxc_DynamicField_Type::openxc_DynamicField_Type_NUM, s_message);
+
+ std::lock_guard<std::mutex> decoded_can_message_lock(decoded_can_message_mutex);
can_bus.push_new_vehicle_message(vehicle_message);
}
can_bus.push_new_vehicle_message(vehicle_message);
}
+ new_decoded_can_message.notify_one();
- decoded_can_message_mutex.unlock();
- new_decoded_can_message.notify_one();
openxc_SimpleMessage s_message;
json_object* jo;
openxc_SimpleMessage s_message;
json_object* jo;
- while(can_bus.has_vehicle_message())
+ while(can_bus.is_pushing())
- std::unique_lock<std::mutex> decoded_can_message_lock(decoded_can_message_mutex);
- new_decoded_can_message.wait(decoded_can_message_lock);
+ {
+ std::unique_lock<std::mutex> decoded_can_message_lock(decoded_can_message_mutex);
+ new_decoded_can_message.wait(decoded_can_message_lock);
v_message = can_bus.next_vehicle_message();
v_message = can_bus.next_vehicle_message();
- decoded_can_message_mutex.unlock();
s_message = get_simple_message(v_message);
s_message = get_simple_message(v_message);
- std::lock_guard<std::mutex> push_signal_lock(subscribed_signals_mutex);
+ {
+ std::lock_guard<std::mutex> subscribed_signals_lock(subscribed_signals_mutex);
std::map<std::string, struct afb_event> subscribed_signals = get_subscribed_signals();
const auto& it_event = subscribed_signals.find(s_message.name);
if(it_event != subscribed_signals.end() && afb_event_is_valid(it_event->second))
std::map<std::string, struct afb_event> subscribed_signals = get_subscribed_signals();
const auto& it_event = subscribed_signals.find(s_message.name);
if(it_event != subscribed_signals.end() && afb_event_is_valid(it_event->second))
jsonify_simple(s_message, jo);
afb_event_push(it_event->second, jo);
}
jsonify_simple(s_message, jo);
afb_event_push(it_event->second, jo);
}
- subscribed_signals_mutex.unlock();
- update_subscrided_signals.notify_one();
}
}
\ No newline at end of file
}
}
\ No newline at end of file
-
-#include <mutex>
-#include <condition_variable>
-
-std::condition_variable update_subscrided_signals;
-std::mutex subscribed_signals_mutex;
\ No newline at end of file
{
can_message.convert_from_canfd_frame(can_bus_dev.read());
{
can_message.convert_from_canfd_frame(can_bus_dev.read());
- std::lock_guard<std::mutex> can_message_lock(can_message_mutex);
+ {
+ std::lock_guard<std::mutex> can_message_lock(can_message_mutex);
can_bus.push_new_can_message(can_message);
can_bus.push_new_can_message(can_message);
- can_message_mutex.unlock();
new_can_message.notify_one();
}
}
\ No newline at end of file
new_can_message.notify_one();
}
}
\ No newline at end of file
#include "low-can-binding.hpp"
#include <queue>
#include "low-can-binding.hpp"
#include <queue>
#include <vector>
#include <thread>
#include <fcntl.h>
#include <vector>
#include <thread>
#include <fcntl.h>
- #include <afb/afb-binding.h>
#include <afb/afb-service-itf.h>
};
#include <afb/afb-service-itf.h>
};
*/
const struct afb_binding_interface *binder_interface;
*/
const struct afb_binding_interface *binder_interface;
-/*
- * CAN bus handler pointer. This is the object that will be use to
- * initialize each CAN devices specified into the configuration file
- *
- * It is used by the reading thread also because of its can_message_q_ queue
- * that store CAN messages read from the socket.
- */
-can_bus_t *can_bus_handler;
-
/********************************************************************************
*
* Subscription and unsubscription
/********************************************************************************
*
* Subscription and unsubscription
- // TODO: lock the subscribed_signals when insert/remove
+ std::lock_guard<std::mutex> subscribed_signals_lock(subscribed_signals_mutex);
auto ss_i = subscribed_signals.find(sig.genericName);
if (ss_i != subscribed_signals.end() && !afb_event_is_valid(ss_i->second))
{
auto ss_i = subscribed_signals.find(sig.genericName);
if (ss_i != subscribed_signals.end() && !afb_event_is_valid(ss_i->second))
{
fd_conf = afb_daemon_rootdir_open_locale(binder_interface->daemon, "can_bus.json", O_RDONLY, NULL);
/* Initialize the CAN bus handler */
fd_conf = afb_daemon_rootdir_open_locale(binder_interface->daemon, "can_bus.json", O_RDONLY, NULL);
/* Initialize the CAN bus handler */
- can_bus_t cbh(fd_conf);
- can_bus_handler = &cbh;
+ can_bus_t can_bus_handler(fd_conf);
- if(can_bus_handler->init_can_dev() == 0)
+ if(can_bus_handler.init_can_dev() == 0)
- can_bus_handler->start_threads();
+ can_bus_handler.start_threads();
return 0;
}
ERROR(binder_interface, "There was something wrong with CAN device Initialization. Check your config file maybe");
return 0;
}
ERROR(binder_interface, "There was something wrong with CAN device Initialization. Check your config file maybe");
-#include <mutex>
-#include <condition_variable>
+#include <cstddef>
+extern "C"
+{
+ #include <afb/afb-binding.h>
+};
extern "C" struct afb_binding_interface;
extern "C" struct afb_binding_interface;