checked PC_CONTROL_CODES with mypy

This commit is contained in:
YuruC3 2025-08-09 10:53:17 +02:00
parent 45459ee231
commit 224fae39f7
4 changed files with 30 additions and 22 deletions

1
PC_CONTROL_CODE/docker/.gitignore vendored Normal file
View File

@ -0,0 +1 @@
.mypy_cache

View File

@ -95,9 +95,11 @@ def getTemp() -> dict:
continue
# continue
return MDict
else:
return {"error": "unidentified"}
def setSpeed(inSpeeDict: dict) -> None:
def setSpeed(inSpeeDict: dict) -> int:
bpavrg = 0
# default
@ -109,18 +111,20 @@ def setSpeed(inSpeeDict: dict) -> None:
outfanprcntg = int((bpavrg / (HIGH_FAN_TRSHD - LOW_FAN_TRSHD)) * TEMP_FACTOR)
# os.system(f"echo setting {outfanprcntg}%")
return outfanprcntg
# Set fan speed
if outfanprcntg >= 20:
MDserial.write((f"{SETFANCMND} {str(outfanprcntg)} \n\r").encode())
print(f"setting {outfanprcntg}%", flush=True)
return 0
return outfanprcntg
else:
# Set default value
MDserial.write((f"{SETFANCMND} {str(DEFOUTPRCNTG)} \n\r").encode())
return 1
return DEFOUTPRCNTG
# If something goes super wrong
return -1
return 1

View File

@ -0,0 +1 @@
.mypy_cache

View File

@ -35,19 +35,19 @@ BACKOFFTIME: Final[float] = float(os.getenv("BACKOFFTIME", 5))
# INFLUXDB config
# token = "apg1gysUeCcxdcRTMmosJTenbEppmUNi9rXlANDB2oNadBdWAu2GVTDc_q_dyo0iyYsckKaOvPRm6ba2NK0y_A=="
INFLUXTOKEN: Final[str] = os.getenv("INFLUX_TOKEN")
INFLUXTOKEN: Final[str] = str(os.getenv("INFLUX_TOKEN", "0"))
# bucket = "JBOD"
INFLUXBUCKET: Final[str] = os.getenv("INFLUX_BUCKET")
INFLUXBUCKET: Final[str] = str(os.getenv("INFLUX_BUCKET", "0"))
# org = "staging"
INFLUXORG: Final[str] = os.getenv("INFLUX_ORG")
INFLUXORG: Final[str] = str(os.getenv("INFLUX_ORG", "0"))
# url = "http://localhost:8086"
INFLUXURL: Final[str] = os.getenv("INFLUX_URL")
INFLUXURL: Final[str] = str(os.getenv("INFLUX_URL", "0"))
# measurement = "MD1200"
INFLUXMEASUREMENT: Final[str] = os.getenv("INFLUX_MEASUREMENT")
INFLUXMEASUREMENT: Final[str] = str(os.getenv("INFLUX_MEASUREMENT", "0"))
# MACHINE_TAG = "CHONGUS1200"
MACHINE_TAG: Final[str] = os.getenv("INFLUX_MACHINE_TAG")
MACHINE_TAG: Final[str] = str(os.getenv("INFLUX_MACHINE_TAG", "0"))
# LOCATION = "HQ"
LOCATION: Final[str] = os.getenv("INFLUX_LOCATION")
LOCATION: Final[str] = str(os.getenv("INFLUX_LOCATION", "0"))
# INFLX_SEPARATE_POINTS = 0.1
# INFLUX_SEPARATE_POINTS = float(os.getenv("INFLUX_SEPARATE_POINTS"), 0.1)
@ -60,14 +60,14 @@ MDserial = serial.Serial(
bytesize=serial.EIGHTBITS,\
timeout=MDSERIALTIMEOUT)
lastTempReading = time.time()
MDtempDict = {}
MDict = {}
# lastTempReading: float = time.time()
MDtempDict: dict = {}
MDict: dict = {}
currentSerialUsage = threading.Lock()
fluxSending = False
currentTime = 0
lastTempReading = 0
inflxdb_LeData = []
fluxSending: bool = False
currentTime: float = 0
lastTempReading: float = 0
inflxdb_LeData: list = []
# Initialize InfluxDB client and influxdb API
# ---------------------UNCOMMENT-----------------------
inflxdb_client = influxdb_client.InfluxDBClient(url=INFLUXURL, token=INFLUXTOKEN, org=INFLUXORG)
@ -142,9 +142,11 @@ def getTemp() -> dict:
fluxSending = True
return MDict
else:
return {"error": "unidentified"}
def setSpeed(inSpeeDict: dict) -> None:
def setSpeed(inSpeeDict: dict) -> int:
bpavrg = 0
# default
@ -163,16 +165,16 @@ def setSpeed(inSpeeDict: dict) -> None:
MDserial.write((f"{SETFANCMND} {str(outfanprcntg)} \n\r").encode())
print(f"setting {outfanprcntg}%", flush=True)
return 0
return outfanprcntg
else:
# Set default value
MDserial.write((f"{SETFANCMND} {str(DEFOUTPRCNTG)} \n\r").encode())
return 1
return outfanprcntg
# If something goes super wrong
return -1
return 1