# See the License for the specific language governing permissions and
# limitations under the License.
-
from extras import config
import extras.Kuksa_Instance as kuksa_instance
# ========================================
+# Global Variables
Steering_Signal_Type = "Kuksa"
-
+Protocol = None
def create_animated_toggle():
return AnimatedToggle(
"""
super(self.__class__, self).__init__(parent)
self.setupUi(self)
+ self.client = None
self.SSL_toggle = create_animated_toggle()
self.Protocol_toggle = create_animated_toggle()
self.startClientBtn.setCheckable(True)
self.startClientBtn.setStyleSheet("border: 1px solid green;")
+ self.Hide_IC = self.findChild(QPushButton, "Hide_IC")
+ self.Hide_HVAC = self.findChild(QPushButton, "Hide_HVAC")
+ self.Hide_HUD = self.findChild(QPushButton, "Hide_HUD")
+
self.startClientBtn.clicked.connect(self.start_stop_client)
self.reconnectBtn.clicked.connect(self.reconnectClient)
self.SSL_toggle.clicked.connect(self.toggleSSL)
def start_stop_client(self):
if self.startClientBtn.isChecked():
self.set_instance()
- if self.client is not None:
+ elif self.client is not None:
self.client.stop()
self.refreshThread = RefreshThread(self)
Steering_Signal_Type = "Kuksa"
def get_protocol(self):
+ global Protocol
if (not self.Protocol_toggle.isChecked()):
+ Protocol = "ws"
return "ws"
else:
+ Protocol = "grpc"
return "grpc"
def set_instance(self):
"""
if (self.client is not None):
try:
- config = self.make_new_config()
self.client.stop()
- self.client = self.kuksa.reconnect(config, self.kuksa_token)
- self.client.start()
+ self.client = self.kuksa.reconnect(
+ self.make_new_config(), self.kuksa_token)
self.refreshThread = RefreshThread(self)
self.refreshThread.start()
except Exception as e:
logging.error(e)
- self.set_instance()
+ else:
+ self.set_instance()
- self.refreshThread = RefreshThread(self)
- self.refreshThread.start()
+ self.refreshThread = RefreshThread(self)
+ self.refreshThread.start()
def make_new_config(self):
"""
from PyQt5.QtWidgets import QWidget
from PyQt5.QtCore import QEasingCurve
from PyQt5.QtWidgets import QGraphicsOpacityEffect
+from kuksa_client.grpc import Field, SubscribeEntry, View
+from kuksa_client.grpc.aio import VSSClient
+from PyQt5.QtCore import pyqtSignal
+import asyncio
+from PyQt5.QtCore import QThread
+import pathlib
import logging
import json
from . import Kuksa_Instance as kuksa_instance
+from Widgets import settings
# Global variables
subscribed = False
-block_subscription_updates = False
-class UI_Handeler(MainWindow):
+class GrpcSubscriptionThread(QThread):
+ updateReceived = pyqtSignal(str, str)
+
+ def __init__(self):
+ QThread.__init__(self)
+ self.client = None
+
+ def run(self):
+ config = kuksa_instance.KuksaClientSingleton.instance().get_config()
+ token = kuksa_instance.KuksaClientSingleton.instance().get_token()
+
+ SUBSCRIPTION_ENTRIES = [
+ SubscribeEntry('Vehicle.Speed', View.FIELDS, (Field.VALUE,)),
+ SubscribeEntry('Vehicle.Powertrain.CombustionEngine.Speed', View.FIELDS, (Field.VALUE,)),
+ SubscribeEntry('Vehicle.Body.Lights.DirectionIndicator.Left.IsSignaling', View.FIELDS, (Field.VALUE,)),
+ SubscribeEntry('Vehicle.Body.Lights.DirectionIndicator.Right.IsSignaling', View.FIELDS, (Field.VALUE,)),
+ SubscribeEntry('Vehicle.Body.Lights.Hazard.IsSignaling', View.FIELDS, (Field.VALUE,)),
+ SubscribeEntry('Vehicle.Powertrain.FuelSystem.Level', View.FIELDS, (Field.VALUE,)),
+ SubscribeEntry('Vehicle.Powertrain.CombustionEngine.ECT', View.FIELDS, (Field.VALUE,)),
+ SubscribeEntry('Vehicle.Powertrain.Transmission.SelectedGear', View.FIELDS, (Field.VALUE,)),
+ SubscribeEntry('Vehicle.Cabin.HVAC.Station.Row1.Left.Temperature', View.FIELDS, (Field.VALUE,)),
+ SubscribeEntry('Vehicle.Cabin.HVAC.Station.Row1.Left.FanSpeed', View.FIELDS, (Field.VALUE,)),
+ SubscribeEntry('Vehicle.Cabin.HVAC.Station.Row1.Right.Temperature', View.FIELDS, (Field.VALUE,)),
+ SubscribeEntry('Vehicle.Cabin.HVAC.Station.Row1.Right.FanSpeed', View.FIELDS, (Field.VALUE,)),
+ ]
+
+ async def grpc_subscription(client):
+ try:
+ await client.connect()
+ async for updates in client.subscribe(entries=SUBSCRIPTION_ENTRIES):
+ for update in updates:
+ if update.entry.value is not None:
+ self.updateReceived.emit(str(update.entry.path),
+ str(update.entry.value.value))
+ client.disconnect()
+ except Exception as e:
+ logging.error(f"Error during gRPC subscription: {e}")
+
+ try:
+ client = VSSClient(host=config['ip'],
+ port=config['port'],
+ token=token,
+ root_certificates=pathlib.Path(config['cacertificate']),
+ tls_server_name=config['tls_server_name'])
+ asyncio.set_event_loop(asyncio.new_event_loop())
+ loop = asyncio.get_event_loop()
+ loop.run_until_complete(grpc_subscription(client))
+ except Exception as e:
+ logging.error(f"Error during gRPC subscription: {e}")
+
+
+class UI_Handeler(MainWindow):
def fullscreen(self):
self.headerContainer.hide()
self.setAttribute(QtCore.Qt.WA_TranslucentBackground, False)
self.showFullScreen()
- def display_sending_message(self):
- print("message sent")
-
def block_updates():
global block_subscription_updates
block_subscription_updates = True
"Vehicle.Cabin.HVAC.Station.Row1.Right.Temperature",
"Vehicle.Cabin.HVAC.Station.Row1.Right.FanSpeed"]
- for signal in signals:
- self.client.subscribe(
- signal, lambda data: UI_Handeler.VSS_callback(self, data), 'value')
+ if settings.Protocol == "ws":
+ for signal in signals:
+ self.client.subscribe(
+ signal, lambda data: UI_Handeler.VSS_callback(self, data), 'value')
+ if settings.Protocol == "grpc":
+ self.worker = GrpcSubscriptionThread()
+ self.worker.updateReceived.connect(
+ lambda path, value: UI_Handeler.VSS_callback(self=self, path=path, value=value))
+ self.worker.start()
subscribed = True
else:
subscribed = False
logging.error(
"Kuksa client is not connected, try reconnecting")
- def VSS_callback(self, data):
+ def VSS_callback(self, data=None, path=None, value=None):
"""
This method is the callback function for the VSS signals from Kuksa.
Args:
- data: The data received from the signal.
"""
- global block_subscription_updates
- if block_subscription_updates:
- return
IC_Page = self.stackedWidget.widget(1)
HVAC_Page = self.stackedWidget.widget(2)
- info = json.loads(data)
- path = info.get('data', {}).get('path')
- value = info.get('data', {}).get('dp', {}).get('value')
-
- print(f"Received subscription event: {path} {value}")
+ if data is not None:
+ info = json.loads(data)
+ path = info.get('data', {}).get('path')
+ value = info.get('data', {}).get('dp', {}).get('value')
- if path == "Vehicle.Speed":
+ if path == "Vehicle.Speed" and int(float(value)):
+ # block connection updates for IC_Page.Speed_slider.
+ IC_Page.Speed_slider.blockSignals(True)
+ IC_Page.Speed_slider.setValue(int(float(value)))
+ IC_Page.Speed_slider.blockSignals(False)
IC_Page.Speed_monitor.display(int(IC_Page.Speed_slider.value()))
- IC_Page.Speed_slider.setValue(int(value))
- if path == "Vehicle.Powertrain.CombustionEngine.Speed":
- IC_Page.RPM_slider.setValue(int(value))
+ if path == "Vehicle.Powertrain.CombustionEngine.Speed" and int(float(value)):
+ # block connection updates for IC_Page.RPM_slider.
+ IC_Page.RPM_slider.blockSignals(True)
IC_Page.RPM_monitor.display(int(IC_Page.RPM_slider.value()))
+ IC_Page.RPM_slider.setValue(int(float(value)))
+ IC_Page.RPM_slider.blockSignals(False)
if path == "Vehicle.Body.Lights.DirectionIndicator.Left.IsSignaling":
+ IC_Page.leftIndicatorBtn.blockSignals(True)
IC_Page.leftIndicatorBtn.setChecked(bool(value))
+ IC_Page.leftIndicatorBtn.blockSignals(False)
if path == "Vehicle.Body.Lights.DirectionIndicator.Right.IsSignaling":
+ IC_Page.rightIndicatorBtn.blockSignals(True)
IC_Page.rightIndicatorBtn.setChecked(bool(value))
+ IC_Page.rightIndicatorBtn.blockSignals(False)
if path == "Vehicle.Body.Lights.Hazard.IsSignaling":
+ IC_Page.hazardBtn.blockSignals(True)
IC_Page.hazardBtn.setChecked(bool(value))
+ IC_Page.hazardBtn.blockSignals(False)
if path == "Vehicle.Powertrain.FuelSystem.Level":
- IC_Page.fuelLevel_slider.setValue(int(value))
+ IC_Page.fuelLevel_slider.blockSignals(True)
+ IC_Page.fuelLevel_slider.setValue(int(float(value)))
+ IC_Page.fuelLevel_slider.blockSignals(False)
if path == "Vehicle.Powertrain.CombustionEngine.ECT":
- IC_Page.coolantTemp_slider.setValue(int(value))
+ IC_Page.coolantTemp_slider.blockSignals(True)
+ IC_Page.coolantTemp_slider.setValue(int(float(value)))
+ IC_Page.coolantTemp_slider.blockSignals(False)
if path == "Vehicle.Powertrain.Transmission.SelectedGear":
- if int(value) == 127:
+ if int(float(value)) == 127:
+ IC_Page.driveBtn.blockSignals(True)
IC_Page.driveBtn.setChecked(True)
- elif int(value) == 126:
+ IC_Page.driveBtn.blockSignals(False)
+ elif int(float(value)) == 126:
+ IC_Page.parkBtn.blockSignals(True)
IC_Page.parkBtn.setChecked(True)
- elif int(value) == -1:
+ IC_Page.parkBtn.blockSignals(False)
+ elif int(float(value)) == -1:
+ IC_Page.reverseBtn.blockSignals(True)
IC_Page.reverseBtn.setChecked(True)
- elif int(value) == 0:
+ IC_Page.reverseBtn.blockSignals(False)
+ elif int(float(value)) == 0:
+ IC_Page.neutralBtn.blockSignals(True)
IC_Page.neutralBtn.setChecked(True)
+ IC_Page.neutralBtn.blockSignals(False)
if path == "Vehicle.Cabin.HVAC.Station.Row1.Left.Temperature":
- HVAC_Page.left_temp.setValue(int(value))
+ HVAC_Page.leftTempList.blockSignals(True)
+ item = HVAC_Page.leftTempList.findItems(
+ str(int(float(value))) + "°C", QtCore.Qt.MatchExactly)[0]
+ HVAC_Page.leftTempList.setCurrentItem(item)
+ HVAC_Page.leftTempList.scrollToItem(item, 1)
+ HVAC_Page.leftTempList.blockSignals(False)
if path == "Vehicle.Cabin.HVAC.Station.Row1.Left.FanSpeed":
- HVAC_Page.left_fan.setValue(int(value))
+ HVAC_Page.leftFanSpeed_slider.blockSignals(True)
+ HVAC_Page.leftFanSpeed_slider.setValue(int(float(value)))
+ HVAC_Page.leftFanSpeed_slider.blockSignals(False)
if path == "Vehicle.Cabin.HVAC.Station.Row1.Right.Temperature":
- HVAC_Page.right_temp.setValue(int(value))
+ HVAC_Page.rightTempList.blockSignals(True)
+ item = HVAC_Page.leftTempList.findItems(
+ str(int(float(value))) + "°C", QtCore.Qt.MatchExactly)[0]
+ HVAC_Page.leftTempList.setCurrentItem(item)
+ HVAC_Page.rightTempList.scrollToItem(item, 1)
+ HVAC_Page.rightTempList.blockSignals(False)
if path == "Vehicle.Cabin.HVAC.Station.Row1.Right.FanSpeed":
- HVAC_Page.right_fan.setValue(int(value))
+ HVAC_Page.rightFanSpeed_slider.blockSignals(True)
+ HVAC_Page.rightFanSpeed_slider.setValue(int(float(value)))
+ HVAC_Page.rightFanSpeed_slider.blockSignals(False)
class FaderWidget(QWidget):