2 # Copyright (c) 2022 Aakash Solanki, tech2aks@gmail.com
3 # Copyright (c) 2024 Scott Murray <scott.murray@konsulko.com>
5 # SPDX-License-Identifier: MIT
8 from pathlib import Path
11 import concurrent.futures
12 from kuksa_client.grpc.aio import VSSClient
13 from kuksa_client.grpc import Datapoint
14 from systemd.daemon import notify
17 hostname = "localhost"
19 config_filename = "/etc/xdg/AGL/agl-vss-helper.yaml"
20 token_filename = "/etc/xdg/AGL/agl-vss-helper/agl-vss-helper.token"
21 ca_cert_filename = "/etc/kuksa-val/CA.pem"
22 tls_server_name = "localhost"
26 client = VSSClient(hostname,
28 root_certificates=Path(ca_cert_filename),
29 tls_server_name=tls_server_name,
31 ensure_startup_connection=True)
32 await client.connect()
33 print(f"Connected to KUKSA.val databroker at {hostname}:{port}")
34 if "initialize" in config and isinstance(config["initialize"], list):
35 for entry in config["initialize"]:
36 if "signal" in entry and "value" in entry:
38 print(f"Setting {entry['signal']} to {entry['value']}")
39 await client.set_current_values({ entry["signal"] : Datapoint(entry["value"]) })
43 if "mock" in config and isinstance(config["mock"], list):
44 if len(config["mock"]) != 0:
45 print(f"Mocking actuators:")
46 for signal in config["mock"]:
48 async for updates in client.subscribe_target_values(config["mock"]):
49 for signal in updates:
50 if updates[signal] is not None:
52 print(f"Actuating {signal} to {updates[signal].value}")
53 await client.set_current_values({ signal : Datapoint(updates[signal].value) })
61 config_file = open(config_filename, "r")
62 config = yaml.safe_load(config_file)
63 except yaml.YAMLError as exc:
64 print(f"Could not parse configuration: ${exc}")
66 print(f"Could not read configuration")
68 if "verbose" in config and isinstance(config["verbose"], bool):
69 verbose = config["verbose"]
70 if "hostname" in config and isinstance(config["hostname"], string):
71 hostname = config["hostname"]
72 if "port" in config and isinstance(config["port"], int):
74 if "use-tls" in config and isinstance(config["use-tls"], bool):
75 use_tls = config["use-tls"]
76 if "token-file" in config and isinstance(config["token-file"], string):
77 token_filename = config["token-file"]
78 if "ca-certificate" in config and isinstance(config["ca-certificate"], string):
79 ca_cert_filename = config["ca-certificate"]
81 if token_filename != "":
83 print(f"Reading authorization token {token_filename}")
84 token_file = open(token_filename, "r")
85 token = token_file.read()
92 except KeyboardInterrupt: