2 # Copyright (c) 2016 Alex Bencz
3 # Copyright (c) 2019 Konsulko Group, smurray@konsulko.com
4 # Copyright (c) 2020 The Linux Foundation, jsmoeller@linuxfoundation.org
6 # Permission is hereby granted, free of charge, to any person obtaining a copy of
7 # this software and associated documentation files (the "Software"), to deal in
8 # the Software without restriction, including without limitation the rights to
9 # use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies
10 # of the Software, and to permit persons to whom the Software is furnished to do
11 # so, subject to the following conditions:
13 # The above copyright notice and this permission notice shall be included in all
14 # copies or substantial portions of the Software.
16 # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
17 # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
18 # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
19 # AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
20 # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
21 # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
27 # https://github.com/abencz/python_socketcan/blob/master/python_socketcan_example.py
38 class CANSocket(object):
40 FD_FORMAT = "<IB3x64s"
42 def __init__(self, interface=None):
43 self.sock = socket.socket(socket.PF_CAN, socket.SOCK_RAW, socket.CAN_RAW)
44 if interface is not None:
47 def bind(self, interface):
48 self.sock.bind((interface,))
49 self.sock.setsockopt(socket.SOL_CAN_RAW, socket.CAN_RAW_FD_FRAMES, 1)
51 def send(self, can_id, data, flags=0):
52 can_id = can_id | flags
53 can_pkt = struct.pack(self.FORMAT, can_id, len(data), data)
54 self.sock.send(can_pkt)
56 def sendfd(self, can_id, data, flags=0):
57 can_id = can_id | flags
58 datafd = data.ljust(64, b'\x00');
59 can_pkt = struct.pack(self.FD_FORMAT, can_id, len(datafd), datafd)
60 self.sock.send(can_pkt)
62 def recv(self, flags=0):
63 can_pkt = self.sock.recv(72)
65 if len(can_pkt) == 16:
66 can_id, length, data = struct.unpack(self.FORMAT, can_pkt)
68 can_id, length, data = struct.unpack(self.FD_FORMAT, can_pkt)
70 can_id &= socket.CAN_EFF_MASK
71 return (can_id, data[:length])
73 class VehicleSimulator(object):
74 DEFAULT_IDLE_RPM = 600
77 self.CRUISEMODE = False
78 self.CRUISEACTIVE = False
82 self.vehicle_speed = 0
83 self.engine_speed = self.DEFAULT_IDLE_RPM
84 self.thread = threading.Thread(target=self.run, daemon=True)
85 self.lock = threading.Lock()
89 self.vehicle_speed = 0
90 self.engine_speed = self.DEFAULT_IDLE_RPM
95 def get_engine_speed(self):
97 return int(self.engine_speed)
99 def get_vehicle_speed(self):
101 return int(self.vehicle_speed)
103 def accelerate(self, target_speed, target_rpm, duration, bycruise = False):
104 if target_speed <= self.vehicle_speed:
106 v = (target_speed - self.vehicle_speed) / (duration * self.freq)
107 r = (target_rpm - self.engine_speed) / (duration * self.freq)
108 while self.vehicle_speed < target_speed and (not self.CRUISEACTIVE or bycruise):
110 self.vehicle_speed += v;
111 self.engine_speed += r;
112 time.sleep(1 / self.freq)
114 def brake(self, target_speed, target_rpm, duration, bycruise = False):
115 if target_speed >= self.vehicle_speed:
117 v = (self.vehicle_speed - target_speed) / (duration * self.freq)
118 r = (self.engine_speed - target_rpm) / (duration * self.freq)
119 while self.vehicle_speed > target_speed and (not self.CRUISEACTIVE or bycruise):
121 self.vehicle_speed -= v;
122 self.engine_speed -= r;
123 time.sleep(1 / self.freq)
125 def increase(self, bycruise = True):
126 if self.CRUISEACTIVE:
127 target_speed = self.vehicle_speed + 5
128 target_rpm = self.engine_speed * 1.1
129 self.accelerate(target_speed, target_rpm, 2, bycruise)
131 def decrease(self, bycruise = True):
132 if self.CRUISEACTIVE:
133 target_speed = self.vehicle_speed - 5
134 target_rpm = self.engine_speed * 0.9
135 self.brake(target_speed, target_rpm, 2, bycruise)
137 def resume(self, bycruise = True):
138 target_speed = self.CRUISESPEED
139 target_rpm = self.CRUISERPM
140 current_speed = self.get_vehicle_speed()
141 if target_speed > current_speed:
142 self.accelerate(target_speed, target_rpm, 2, bycruise)
144 self.brake(target_speed, target_rpm, 2, bycruise)
148 if not self.CRUISEACTIVE:
149 self.accelerate(80, 3000, 5)
150 self.accelerate(104, 4000, 3)
151 self.brake(80, 3000, 3)
152 self.accelerate(104, 4000, 6)
153 self.brake(40, 2000, 4)
154 self.accelerate(90, 3000, 5)
155 self.brake(1, 650, 5)
156 if not self.CRUISEACTIVE:
160 class DiagnosticMessageHandler(object):
161 def __init__(self, can_sock, simulator, verbose=False):
162 self.can_sock = can_sock
163 self.simulator = simulator
164 self.verbose = verbose
165 self.thread = threading.Thread(target=self.run, daemon=True)
172 can_id, data = self.can_sock.recv()
173 #print('%03X#%s' % (can_id, ''.join(format(x, '02X') for x in data)))
176 if data[1] == 0x01 and data[2] == 0x0C:
178 speed = self.simulator.get_engine_speed()
179 #print('engine speed = %d' % speed)
182 reply = [ 0x04, 0x41, 0x0C ]
183 reply.append(4 * speed // 256)
184 reply.append(4 * speed % 256)
185 # pad remaining bytes to make 8
189 self.can_sock.send(0x7e8, bytes(reply), 0)
190 elif data[1] == 0x01 and data[2] == 0x0D:
192 speed = int(self.simulator.get_vehicle_speed()) % 256
193 #print('vehicle speed = %d' % speed)
194 reply = [ 0x03, 0x41, 0x0D ]
196 # pad remaining bytes to make 8
201 self.can_sock.send(0x7e8, bytes(reply), 0)
203 class SteeringWheelMessageHandler(object):
204 def __init__(self, can_sock, simulator, verbose=False):
205 self.can_sock = can_sock
206 self.simulator = simulator
207 self.verbose = verbose
208 self.thread = threading.Thread(target=self.run, daemon=True)
209 self.buttonpressed = False
210 self.buttonenabled = False
211 self.buttoncancel = False
212 self.buttondec = False
213 self.buttoninc = False
214 self.cruisemode = False
215 self.cruiseactive = False
222 can_id, data = self.can_sock.recv()
223 #print('%03X#%s' % (can_id, ''.join(format(x, '02X') for x in data)))
225 #print('%03X#%s' % (can_id, ''.join(format(x, '02X') for x in data)))
228 #print('data6: %02X' % (data[6]))
229 if data[6] == 0x80 and not self.buttonpressed:
230 # we do skip any further lin messages
231 # two buttons at the same time won't work
232 # (aka unlikely w/o twisting fingers)
233 self.buttonpressed = True
234 self.buttonenabled = True
235 if data[6] == 0x08 and not self.buttonpressed:
236 self.buttonpressed = True
237 self.buttoncancel = True
238 if data[6] == 0x10 and not self.buttonpressed:
239 self.buttonpressed = True
240 self.buttondec = True
241 if data[6] == 0x40 and not self.buttonpressed:
242 self.buttonpressed = True
243 self.buttoninc = True
244 if data[6] == 0x00 and self.buttonpressed:
245 #now handle it as the button was released
246 if self.buttonenabled:
247 self.buttonenabled = False
248 self.cruisemode = not self.cruisemode
249 #print("set cruisemode to %s" % self.cruisemode)
250 self.simulator.CRUISEMODE = self.cruisemode
251 # disable/reset all if going off
252 if not self.cruisemode:
253 self.cruiseactive = False
254 self.simulator.CRUISEACTIVE = self.cruiseactive
255 self.simulator.CRUISESPEED = 0
256 self.simulator.CRUISERPM = 0
257 #print("set cruiseactive to %s" % self.cruiseactive)
258 if self.buttoncancel:
259 self.buttoncancel = False
260 self.simulator.CRUISESPEED = self.simulator.get_vehicle_speed()
261 self.simulator.CRUISERPM = self.simulator.get_engine_speed()
262 #print("set cruisespeed to %d" % self.simulator.CRUISESPEED )
263 #print("set cruiserpm to %d" % self.simulator.CRUISERPM )
264 self.cruiseactive = False
265 #print("set cruiseactive to %s" % self.cruiseactive )
266 self.simulator.CRUISEACTIVE = self.cruiseactive
268 self.buttondec = False
269 if self.cruiseactive:
271 self.simulator.decrease()
275 self.simulator.CRUISESPEED = self.simulator.get_vehicle_speed()
276 self.simulator.CRUISERPM = self.simulator.get_engine_speed()
277 #print("set cruisespeed to %d" % self.simulator.CRUISESPEED )
278 #print("set cruiserpm to %d" % self.simulator.CRUISERPM )
279 self.cruiseactive = not self.cruiseactive
280 #print("set cruiseactive to %s" % self.cruiseactive )
281 self.simulator.CRUISEACTIVE = self.cruiseactive
283 self.buttoninc = False
284 if self.cruiseactive:
286 self.simulator.increase()
288 if self.simulator.CRUISESPEED > 0:
290 self.cruiseactive = not self.cruiseactive
291 self.simulator.CRUISEACTIVE = self.cruiseactive
292 #print("set cruiseactive to %s" % self.cruiseactive )
294 self.simulator.resume()
295 self.buttonpressed = False
298 class StatusMessageSender(object):
299 def __init__(self, can_sock, simulator, verbose=False):
300 self.can_sock = can_sock
301 self.simulator = simulator
302 self.verbose = verbose
303 self.thread = threading.Thread(target=self.run, daemon=True)
311 speed = self.simulator.get_engine_speed()
313 print('engine speed = %d' % speed)
316 # Message is 1 byte unknown, 1 byte fuel level, 2 bytes engine speed (4x), fuel low @ bit 55
319 msg.append(speed // 256)
320 msg.append(speed % 256)
321 # pad remaining bytes to make 8
326 self.can_sock.send(0x3d9, bytes(msg), 0)
329 speed = int(self.simulator.get_vehicle_speed()) % 256
331 print('vehicle speed = %d' % speed)
332 # Message is 15 bits speed (64x), left aligned
334 # Note: extra 2x to yield required left-alignment
336 msg.append(speed // 256)
337 msg.append(speed % 256)
338 # pad remaining bytes to make 8
345 self.can_sock.send(0x3e9, bytes(msg), 0)
351 parser = argparse.ArgumentParser(description='Simple CAN vehicle simulator.')
352 parser.add_argument('interface', type=str, help='interface name (e.g. vcan0)')
353 parser.add_argument('--lin-interface', help='Separate LIN interface name (e.g. sllin0)')
354 parser.add_argument('-v', '--verbose', help='increase output verbosity', action='store_true')
355 args = parser.parse_args()
357 lin_interface = args.lin_interface
358 if lin_interface == None:
359 lin_interface = args.interface
362 can_sock = CANSocket(args.interface)
363 diag_can_sock = CANSocket(args.interface)
364 steeringwheel_can_sock = CANSocket(lin_interface)
366 sys.stderr.write('Could not listen on interface {0}\n'.format(args.interface))
369 print('Using {0}'.format(args.interface))
370 sim = VehicleSimulator()
371 status_sender = StatusMessageSender(can_sock, sim, args.verbose)
372 diag_handler = DiagnosticMessageHandler(diag_can_sock, sim, args.verbose)
373 steeringwheel_handler = SteeringWheelMessageHandler(steeringwheel_can_sock, sim, args.verbose)
375 status_sender.start()
377 steeringwheel_handler.start()
381 except (KeyboardInterrupt, SystemExit):
385 if __name__ == '__main__':