2 * Copyright (C) 2015, 2018 "IoT.bzh"
3 * Author "Romain Forlot" <romain.forlot@iot.bzh>
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
19 #include <sys/socket.h>
20 #include <json-c/json.h>
21 #include <linux/can/raw.h>
28 #include "can-bus.hpp"
30 #include "signals.hpp"
31 #include "can-decoder.hpp"
32 #include "../binding/application.hpp"
33 #include "../utils/signals.hpp"
34 #include "../utils/openxc-utils.hpp"
36 /// @brief Class destructor
38 /// @param[in] conf_file - Stop threads and unlock them to correctly finish them
39 /// even without any activity on the CAN bus.
40 can_bus_t::~can_bus_t()
43 new_can_message_cv_.notify_one();
46 /// @brief Class constructor
47 can_bus_t::can_bus_t()
50 /// @brief Fills the CAN device map member with value from device
52 /// @param[in] mapping configuration section.
53 void can_bus_t::set_can_devices(json_object *mapping)
57 AFB_ERROR("Can't initialize CAN buses with this empty mapping.");
61 struct json_object_iterator it = json_object_iter_begin(mapping);
62 struct json_object_iterator end = json_object_iter_end(mapping);
63 while (! json_object_iter_equal(&it, &end)) {
64 can_devices_mapping_.push_back(std::make_pair(
65 json_object_iter_peek_name(&it),
66 json_object_get_string(json_object_iter_peek_value(&it))
68 json_object_iter_next(&it);
72 /// @brief Take a decoded message to determine if its value complies with the desired
75 /// @param[in] vehicle_message - The decoded message to be analyzed.
76 /// @param[in] can_subscription - the subscription which will be notified depending
77 /// on its filtering values. Filtering values are stored in the event_filtermember.
79 /// @return True if the value is compliant with event filter values, false if not...
80 bool can_bus_t::apply_filter(const openxc_VehicleMessage& vehicle_message, std::shared_ptr<low_can_subscription_t> can_subscription)
83 if(is_valid(vehicle_message))
85 float min = can_subscription->get_min();
86 float max = can_subscription->get_max();
87 double value = get_numerical_from_DynamicField(vehicle_message);
88 send = (value < min || value > max) ? false : true;
93 /// @brief Will make the decoding operation on a classic CAN message. It will not
94 /// handle CAN commands nor diagnostic messages that have their own method to get
97 /// It will add to the vehicle_message queue the decoded message and tell the event push
98 /// thread to process it.
100 /// @param[in] can_message - a single CAN message from the CAN socket read, to be decode.
102 /// @return How many signals has been decoded.
103 void can_bus_t::process_signals(std::shared_ptr<message_t> message, map_subscription& s)
105 int subscription_id = message->get_sub_id();
106 openxc_DynamicField decoded_message;
107 openxc_VehicleMessage vehicle_message;
109 if( s.find(subscription_id) != s.end() && afb_event_is_valid(s[subscription_id]->get_event()))
112 // First we have to found which signal_t it is
113 std::shared_ptr<low_can_subscription_t> subscription = s[subscription_id];
114 openxc_SimpleMessage s_message;
117 if(subscription->get_message_definition() != nullptr)
119 openxc_DynamicField dynamicField_tmp;
120 json_object *signal_json_tmp;
121 decoded_message = build_DynamicField_json(json_object_new_array());
122 for(std::shared_ptr<signal_t> sig : subscription->get_message_definition()->get_signals())
124 signal_json_tmp = json_object_new_object();
125 dynamicField_tmp = decoder_t::translate_signal(*sig, message, &send);
126 json_object_object_add(signal_json_tmp,"name", json_object_new_string(sig->get_name().c_str()));
127 jsonify_DynamicField(dynamicField_tmp,signal_json_tmp);
128 if(sig != nullptr && sig->get_unit() != "")
129 json_object_object_add(signal_json_tmp, "unit", json_object_new_string(sig->get_unit().c_str()));
130 json_object_array_add(decoded_message.json_value,signal_json_tmp);
135 decoded_message = decoder_t::translate_signal(*subscription->get_signal(), message, &send);
138 s_message = build_SimpleMessage(subscription->get_name(), decoded_message);
139 vehicle_message = build_VehicleMessage(s_message, message->get_timestamp());
141 if(send && apply_filter(vehicle_message, subscription))
143 std::lock_guard<std::mutex> decoded_can_message_lock(decoded_can_message_mutex_);
144 push_new_vehicle_message(subscription_id, vehicle_message);
145 AFB_DEBUG("%s CAN signals processed.", subscription->get_name().c_str());
150 /// @brief Will make the decoding operation on a diagnostic CAN message.Then it find the subscribed signal
151 /// corresponding and will add the vehicle_message to the queue of event to pushed before notifying
152 /// the event push thread to process it.
154 /// @param[in] manager - the diagnostic manager object that handle diagnostic communication
155 /// @param[in] can_message - a single CAN message from the CAN socket read, to be decode.
157 /// @return How many signals has been decoded.
158 void can_bus_t::process_diagnostic_signals(diagnostic_manager_t& manager, std::shared_ptr<message_t> message, map_subscription& s)
160 int subscription_id = message->get_sub_id();
162 openxc_VehicleMessage vehicle_message = manager.find_and_decode_adr(message);
163 if (message->get_timestamp())
164 vehicle_message.timestamp = message->get_timestamp();
165 if( (vehicle_message.has_simple_message && vehicle_message.simple_message.has_name) &&
166 s.find(subscription_id) != s.end() && afb_event_is_valid(s[subscription_id]->get_event()))
168 if (apply_filter(vehicle_message, s[subscription_id]))
170 std::lock_guard<std::mutex> decoded_can_message_lock(decoded_can_message_mutex_);
171 push_new_vehicle_message(subscription_id, vehicle_message);
172 AFB_DEBUG("%s CAN signals processed.", s[subscription_id]->get_name().c_str());
177 /// @brief thread to decoding raw CAN messages.
179 /// Depending on the nature of message, if arbitration ID matches ID for a diagnostic response
180 /// then decoding a diagnostic message else use classic CAN signals decoding functions.
182 /// It will take from the can_message_q_ queue the next can message to process then it search
183 /// about signal subscribed if there is a valid afb_event for it. We only decode signal for which a
184 /// subscription has been made. Can message will be decoded using translate_signal that will pass it to the
185 /// corresponding decoding function if there is one assigned for that signal. If not, it will be the default
186 /// noopDecoder function that will operate on it.
188 /// TODO: make diagnostic messages parsing optionnal.
189 void can_bus_t::can_decode_message()
191 utils::signals_manager_t& sm = utils::signals_manager_t::instance();
195 std::unique_lock<std::mutex> can_message_lock(can_message_mutex_);
196 new_can_message_cv_.wait(can_message_lock);
197 while(!can_message_q_.empty())
199 std::shared_ptr<message_t> message = next_can_message();
200 can_message_lock.unlock();
203 std::lock_guard<std::mutex> subscribed_signals_lock(sm.get_subscribed_signals_mutex());
204 map_subscription& s = sm.get_subscribed_signals();
205 if(application_t::instance().get_diagnostic_manager().is_diagnostic_response(message))
206 process_diagnostic_signals(application_t::instance().get_diagnostic_manager(), message, s);
208 process_signals(message, s);
210 can_message_lock.lock();
212 new_decoded_can_message_.notify_one();
213 can_message_lock.unlock();
217 /// @brief thread to push events to suscribers. It will read subscribed_signals map to look
218 /// which are events that has to be pushed.
219 void can_bus_t::can_event_push()
222 utils::signals_manager_t& sm = utils::signals_manager_t::instance();
226 std::unique_lock<std::mutex> decoded_can_message_lock(decoded_can_message_mutex_);
227 new_decoded_can_message_.wait(decoded_can_message_lock);
228 while(!vehicle_message_q_.empty())
230 std::pair<int, openxc_VehicleMessage> v_message = next_vehicle_message();
231 decoded_can_message_lock.unlock();
233 std::lock_guard<std::mutex> subscribed_signals_lock(sm.get_subscribed_signals_mutex());
234 map_subscription& s = sm.get_subscribed_signals();
235 if(s.find(v_message.first) != s.end() && afb_event_is_valid(s[v_message.first]->get_event()))
237 jo = json_object_new_object();
238 jsonify_vehicle(v_message.second, s[v_message.first]->get_signal(), jo);
239 if(afb_event_push(s[v_message.first]->get_event(), jo) == 0)
241 if(v_message.second.has_diagnostic_response)
242 on_no_clients(s[v_message.first], v_message.second.diagnostic_response.pid, s);
244 on_no_clients(s[v_message.first], s);
248 decoded_can_message_lock.lock();
250 decoded_can_message_lock.unlock();
254 /// @brief Will initialize threads that will decode
255 /// and push subscribed events.
256 void can_bus_t::start_threads()
259 th_decoding_ = std::thread(&can_bus_t::can_decode_message, this);
260 th_decoding_.detach();
263 th_pushing_ = std::thread(&can_bus_t::can_event_push, this);
264 th_pushing_.detach();
267 /// @brief Will stop all threads holded by can_bus_t object
268 /// which are decoding and pushing then will wait that's
269 /// they'll finish their job.
270 void can_bus_t::stop_threads()
272 is_decoding_ = false;
276 /// @brief return new_can_message_cv_ member
278 /// @return return new_can_message_cv_ member
279 std::condition_variable& can_bus_t::get_new_can_message_cv()
281 return new_can_message_cv_;
284 /// @brief return can_message_mutex_ member
286 /// @return return can_message_mutex_ member
287 std::mutex& can_bus_t::get_can_message_mutex()
289 return can_message_mutex_;
292 /// @brief Return first can_message_t on the queue
294 /// @return a can_message_t
295 std::shared_ptr<message_t> can_bus_t::next_can_message()
297 std::shared_ptr<message_t> msg;
299 if(!can_message_q_.empty())
301 msg = can_message_q_.front();
302 can_message_q_.pop();
303 std::string debug = msg->get_debug_message();
304 AFB_DEBUG(debug.c_str());
311 /// @brief Push a message_t into the queue
313 /// @param[in] msg - the const reference message_t object to push into the queue
314 void can_bus_t::push_new_can_message(std::shared_ptr<message_t> msg)
316 can_message_q_.push(msg);
319 /// @brief Return first openxc_VehicleMessage on the queue
321 /// @return a openxc_VehicleMessage containing a decoded can message
322 std::pair<int, openxc_VehicleMessage> can_bus_t::next_vehicle_message()
324 std::pair<int, openxc_VehicleMessage> v_msg;
326 if(! vehicle_message_q_.empty())
328 v_msg = vehicle_message_q_.front();
329 vehicle_message_q_.pop();
330 AFB_DEBUG("next vehicle message poped");
337 /// @brief Push a openxc_VehicleMessage into the queue
339 /// @param[in] v_msg - const reference openxc_VehicleMessage object to push into the queue
340 void can_bus_t::push_new_vehicle_message(int subscription_id, const openxc_VehicleMessage& v_msg)
342 vehicle_message_q_.push(std::make_pair(subscription_id, v_msg));
345 /// @brief Return the CAN device index from the map
346 /// map are sorted so index depend upon alphabetical sorting.
347 int can_bus_t::get_can_device_index(const std::string& bus_name) const
350 for(const auto& d: can_devices_mapping_)
352 if(d.first == bus_name)
359 /// @brief Return CAN device name from a logical CAN device name gotten from
360 /// the signals.json description file which comes from a CAN databases file in
362 const std::string can_bus_t::get_can_device_name(const std::string& id_name) const
364 std::string ret = "";
365 for(const auto& d: can_devices_mapping_)
367 if(d.first == id_name)