Remove socket read management by systemd event
authorRomain Forlot <romain.forlot@iot.bzh>
Fri, 24 Feb 2017 11:10:02 +0000 (11:10 +0000)
committerRomain Forlot <romain.forlot@iot.bzh>
Fri, 24 Feb 2017 11:12:55 +0000 (11:12 +0000)
loop and use classic thread with a blocking
read

Change-Id: Iac5428009b57c727bb32bd4893bc3fe282ba35c7
Signed-off-by: Romain Forlot <romain.forlot@iot.bzh>
src/can-utils.cpp
src/can-utils.hpp
src/can_reader.cpp
src/low-can-binding-new.cpp
src/low-can-binding.cpp
src/low-can-binding.hpp

index 8a91bd1..c6f2d57 100644 (file)
@@ -28,7 +28,6 @@
 #include <sys/socket.h>
 #include <json-c/json.h>
 #include <linux/can/raw.h>
-#include <systemd/sd-event.h>
 
 extern "C"
 {
@@ -187,7 +186,6 @@ int can_bus_t::init_can_dev()
                t = devices_name.size();
                i=0;
 
-               std::lock_guard<std::mutex> can_frame_lock(can_frame_mutex);
                for(const auto& device : devices_name)
                {
                        can_bus_dev_t can_bus_device_handler(device);
@@ -197,7 +195,6 @@ int can_bus_t::init_can_dev()
                                ERROR(binder_interface, "Can't open device %s", device);
                        can_bus_device_handler.start_reading(std::ref(*this));
                }
-               can_frame_mutex.unlock();
 
                NOTICE(binder_interface, "Initialized %d/%d can bus device(s)", i, t);
                return 0;
@@ -299,15 +296,6 @@ bool can_bus_t::has_vehicle_message() const
        return has_vehicle_message_;
 }
 
-/********************************************************************************
-*
-*              This is the sd_event_add_io callback function declaration. 
-*              Its implementation can be found into low-can-binding.cpp.
-*
-*********************************************************************************/
-
-int can_frame_received(sd_event_source *s, int fd, uint32_t revents, void *userdata);
-
 /********************************************************************************
 *
 *              can_bus_dev_t method implementation
@@ -319,22 +307,6 @@ can_bus_dev_t::can_bus_dev_t(const std::string &dev_name)
 {
 }
 
-int can_bus_dev_t::event_loop_connection()
-{
-       sd_event_source *source;
-       int rc;
-
-       /* adds to the event loop */
-       rc = sd_event_add_io(afb_daemon_get_event_loop(binder_interface->daemon), &source, can_socket_, EPOLLIN, can_frame_received, this);
-       if (rc < 0) {
-               close();
-               ERROR(binder_interface, "Can't coonect CAN device %s to the event loop", device_name_);
-       } else {
-               NOTICE(binder_interface, "Connected to %s", device_name_);
-       }
-       return rc;
-}
-
 int can_bus_dev_t::open()
 {
        const int canfd_on = 1;
@@ -374,14 +346,9 @@ int can_bus_dev_t::open()
 
                        /* And bind it to txAddress */
                        if (::bind(can_socket_, (struct sockaddr *)&txAddress_, sizeof(txAddress_)) < 0)
-                       {
                                ERROR(binder_interface, "Bind failed");
-                       }
                        else
-                       {
-                               ::fcntl(can_socket_, F_SETFL, O_NONBLOCK);
                                return 0;
-                       }
                }
                close();
        }
index e4a3d34..11e19f3 100644 (file)
@@ -325,15 +325,6 @@ class can_bus_dev_t {
                 */
                can_bus_dev_t(const std::string& dev_name);
 
-               /**
-                * @brief Connect to the application framework event loop and adding
-                *  a watch on the socket that will wakeup reading thread that will
-                * read the socket and fill can_bus_t object queue.
-                *
-                * @return 0 if success, anything else if not.
-                */
-               int event_loop_connection();
-
                /**
                 * @brief Open the can socket and returning it 
                 *
@@ -521,8 +512,7 @@ void pre_initialize(can_bus_dev_t* bus, bool writable, can_bus_dev_t* buses, con
 
 /**
  * @fn void post_initialize(can_bus_dev_t* bus, bool writable, can_bus_dev_t* buses, const int busCount);
- * @brief Post-initialize actions made after CAN bus initialization and before the
- * event loop connection.
+ * @brief Post-initialize actions made after CAN bus initialization
  *
  * @param[in] bus - A CanBus struct defining the bus's metadata
  * @param[in] writable - configure the controller in a writable mode. If false, it will be
index e2f6277..018c7cb 100644 (file)
@@ -27,13 +27,7 @@ void can_reader(can_bus_dev_t &can_bus_dev, can_bus_t& can_bus)
 
        while(can_bus_dev.is_running())
        {
-               /* Declare and take lock ownership of can_frame_mutex.
-                * then waiting notification for a new can frame arrival
-                */
-               std::unique_lock<std::mutex> can_frame_lock(can_frame_mutex);
-               new_can_frame.wait(can_frame_lock);
-                       can_message.convert_from_canfd_frame(can_bus_dev.read());
-               can_frame_mutex.unlock();
+               can_message.convert_from_canfd_frame(can_bus_dev.read());
 
                std::lock_guard<std::mutex> can_message_lock(can_message_mutex);
                        can_bus.push_new_can_message(can_message);
index 3686c6b..13571f2 100644 (file)
 *              Subscription and unsubscription
 *
 *********************************************************************************/
+static int subscribe_unsubscribe_signal(struct afb_req request, const std::string& sig)
+{
+       int ret;
+
+       // TODO: lock the subscribed_signals when insert/remove
+       const auto& ss_i = subscribed_signals.find(sig);
+       if (ss_i != subscribed_signals.end())
+       {
+               if(!afb_event_is_valid(ss_i->second))
+               {
+                       if(!subscribe)
+                       {
+                               NOTICE(binder_interface, "Event isn't valid, it can't be unsubscribed.");
+                               ret = 1;
+                       }
+                       else
+                       {
+                               ss_i->second = afb_daemon_make_event(binder_interface->daemon, ss_i->first.c_str());
+                               if (!afb_event_is_valid(ss_i->second)) 
+                               {
+                                       ERROR(binder_interface, "Can't create an event, something goes wrong.");
+                                       ret = 0;
+                               }
+                       }
+               }
+       }
+       else
+       {
+               subscribed_signals[sig] = afb_daemon_make_event(binder_interface->daemon, sig);
+               if (!afb_event_is_valid(ss_i->second)) 
+               {
+                       ERROR(binder_interface, "Can't create an event, something goes wrong.");
+                       ret = 0;
+               }
+       }
+
+       if (((subscribe ? afb_req_subscribe : afb_req_unsubscribe)(request, subscribed_signals[sig])) < 0)
+       {
+               ERROR(binder_interface, "Operation goes wrong for signal: %s", sig);
+               ret = 0;
+       }
+       else
+               ret = 1;
+       
+       return ret;
+}
+
+static int subscribe_signals(struct afb_req request, const std::vector<std::string>& signals)
+{
+       int ret = 0;
+
+       for(const auto& signal_i : signals)
+       {
+               ret = subscribe_signal(request, subscribe, signal_i);
+               if(ret == 0)
+                       return ret;
+       }
+       return ret;
+}
 
 std::vector<std::string> get_name(struct afb_req request)
 {
index ea8803f..2151790 100644 (file)
@@ -52,34 +52,6 @@ const struct afb_binding_interface *binder_interface;
  */
 can_bus_t *can_bus_handler;
 
-/********************************************************************************
-*
-*              Event management
-*
-*********************************************************************************/
-int can_frame_received(sd_event_source *s, int fd, uint32_t revents, void *userdata)
-{
-       can_bus_dev_t *can_bus_dev = (can_bus_dev_t*)userdata;
-
-       /* Notify reading thread that there is something to read */
-       if ((revents & EPOLLIN) != 0) {
-               new_can_frame.notify_one();
-       }
-
-       /* check if error or hangup and reopen the socket and event_loop. 
-        * socket is protected by a mutex */
-       if ((revents & (EPOLLERR|EPOLLRDHUP|EPOLLHUP)) != 0)
-       {
-               std::lock_guard<std::mutex> can_frame_lock(can_frame_mutex);
-               sd_event_source_unref(s);
-               can_bus_dev->close();
-               can_bus_dev->open();
-               can_bus_dev->start_reading(*can_bus_handler);
-               can_bus_dev->event_loop_connection();
-       }
-
-       return 0;
-}
 /********************************************************************************
 *
 *              Subscription and unsubscription
index 11df1f4..b2b544f 100644 (file)
@@ -23,7 +23,4 @@
 
 extern "C" struct afb_binding_interface;
 
-extern const struct afb_binding_interface *binder_interface;
-
-extern std::condition_variable new_can_frame;
-extern std::mutex can_frame_mutex;
\ No newline at end of file
+extern const struct afb_binding_interface *binder_interface;
\ No newline at end of file