Added constant def and return types

This commit is contained in:
YuruC3 2025-08-09 10:37:35 +02:00
parent 978dd0dc5d
commit 45459ee231
3 changed files with 50 additions and 47 deletions

View File

@ -1,29 +1,30 @@
import serial, time, os
from typing import Final
# setting consts that can be customized
# baud rate. Prob not needed as 38400 is standard
MD1200BAUD = int(os.getenv("MD1200BAUD", 38400))
MD1200BAUD: Final[int] = int(os.getenv("MD1200BAUD", 38400))
# used if you want to run it on multiple JBODs
SERIALADAPTER = os.getenv("SERIALADAPTER", "/dev/ttyUSB0")
SERIALADAPTER: Final[str] = os.getenv("SERIALADAPTER", "/dev/ttyUSB0")
# Factor that defines how aggressive the temperature curve is
TEMP_FACTOR = int(os.getenv("TEMP_FACTOR", 16))
TEMP_FACTOR: Final[int] = int(os.getenv("TEMP_FACTOR", 16))
# time between sending command to get temp and storing it. It's there to allow JBOD to answer
EPPYSLEEPY = float(os.getenv("EPPYSLEEPY", 1))
EPPYSLEEPY: Final[float] = float(os.getenv("EPPYSLEEPY", 1))
LOW_FAN_TRSHD = int(os.getenv("LOW_FAN_TRSHD", 21))
HIGH_FAN_TRSHD = int(os.getenv("HIGH_FAN_TRSHD", 40))
LOW_FAN_TRSHD: Final[int] = int(os.getenv("LOW_FAN_TRSHD", 21))
HIGH_FAN_TRSHD: Final[int] = int(os.getenv("HIGH_FAN_TRSHD", 40))
GETTMPCMND = os.getenv("GETTMPCMND", "_temp_rd")
SETFANCMND = os.getenv("SETFANCMND", "set_speed")
GETTMPCMND: Final[str] = os.getenv("GETTMPCMND", "_temp_rd")
SETFANCMND: Final[str] = os.getenv("SETFANCMND", "set_speed")
DEFOUTPRCNTG = int(os.getenv("DEFOUTPRCNTG", 24))
DEFOUTPRCNTG: Final[int] = int(os.getenv("DEFOUTPRCNTG", 24))
MDSERIALTIMEOUT = float(os.getenv("MDSERIALTIMEOUT", 1))
MDSERIALTIMEOUT: Final[float] = float(os.getenv("MDSERIALTIMEOUT", 1))
TEMPREADINTERVAL = int(os.getenv("TEMPREADINTERVAL", 15))
TEMPREADINTERVAL: Final[int] = int(os.getenv("TEMPREADINTERVAL", 15))
GETTEMPTIMESLEEP = int(os.getenv("GETTEMPTIMESLEEP", 1))
GETTEMPTIMESLEEP: Final[int] = int(os.getenv("GETTEMPTIMESLEEP", 1))
# init
MDserial = serial.Serial(
@ -38,7 +39,7 @@ lastTempReading = time.time()
MDtempDict = {}
def getTemp():
def getTemp() -> dict:
MDserial.write(f"{GETTMPCMND}\n\r".encode())
time.sleep(GETTEMPTIMESLEEP)
@ -96,7 +97,7 @@ def getTemp():
return MDict
def setSpeed(inSpeeDict: dict):
def setSpeed(inSpeeDict: dict) -> None:
bpavrg = 0
# default

View File

@ -3,17 +3,18 @@ from influxdb_client import InfluxDBClient, Point, WritePrecision
from influxdb_client.client.write_api import SYNCHRONOUS, ASYNCHRONOUS, WriteOptions
from datetime import timedelta
from concurrent.futures import ThreadPoolExecutor
from typing import Final
# INFLUXDB config
# token = "apg1gysUeCcxdcRTMmosJTenbEppmUNi9rXlANDB2oNadBdWAu2GVTDc_q_dyo0iyYsckKaOvPRm6ba2NK0y_A=="
token = os.getenv("INFLUX_TOKEN")
INFLUXTOKEN: Final[str] = os.getenv("INFLUX_TOKEN")
# bucket = "JBOD"
bucket = os.getenv("INFLUX_BUCKET")
# INFLUXBUCKET: Final[str] = os.getenv("INFLUX_BUCKET")
# org = "staging"
org = os.getenv("INFLUX_ORG")
INFLUXORG: Final[str] = os.getenv("INFLUX_ORG")
# url = "http://localhost:8086"
url = os.getenv("INFLUX_URL")
INFLUXURL: Final[str] = os.getenv("INFLUX_URL")
# measurement = "MD1200"
measurement = os.getenv("INFLUX_MEASUREMENT")
# MACHINE_TAG = "CHONGUS1200"
@ -24,7 +25,7 @@ LOCATION = os.getenv("INFLUX_LOCATION")
INFLUX_SEPARATE_POINTS = int(os.getenv("INFLUX_SEPARATE_POINTS"))
# Initialize InfluxDB client and influxdb API
inflxdb_client = influxdb_client.InfluxDBClient(url=url, token=token, org=org)
inflxdb_client = influxdb_client.InfluxDBClient(url=INFLUXURL, token=INFLUXTOKEN, org=INFLUXORG)
#write_api = inflxdb_client.write_api(write_options=SYNCHRONOUS)
write_api = inflxdb_client.write_api(write_options=WriteOptions(batch_size=500, flush_interval=1000))

View File

@ -2,51 +2,52 @@ import time, os, influxdb_client, serial, threading
from influxdb_client import InfluxDBClient, Point, WritePrecision
from influxdb_client.client.write_api import SYNCHRONOUS, ASYNCHRONOUS, WriteOptions
from concurrent.futures import ThreadPoolExecutor
from typing import Final
# setting consts that can be customized
# baud rate. Prob not needed as 38400 is standard
MD1200BAUD = int(os.getenv("MD1200BAUD", 38400))
MD1200BAUD: Final[int] = int(os.getenv("MD1200BAUD", 38400))
# used if you want to run it on multiple JBODs
SERIALADAPTER = os.getenv("SERIALADAPTER", "/dev/ttyUSB0")
SERIALADAPTER: Final[str] = os.getenv("SERIALADAPTER", "/dev/ttyUSB0")
# Factor that defines how aggressive the temperature curve is
TEMP_FACTOR = float(os.getenv("TEMP_FACTOR", 19))
TEMP_FACTOR: Final[float] = float(os.getenv("TEMP_FACTOR", 19))
# time between sending command to get temp and storing it. It's there to allow JBOD to answer
EPPYSLEEPY = float(os.getenv("EPPYSLEEPY", 1))
EPPYSLEEPY: Final[float] = float(os.getenv("EPPYSLEEPY", 1))
LOW_FAN_TRSHD = float(os.getenv("LOW_FAN_TRSHD", 21))
HIGH_FAN_TRSHD = float(os.getenv("HIGH_FAN_TRSHD", 40))
LOW_FAN_TRSHD: Final[float] = float(os.getenv("LOW_FAN_TRSHD", 21))
HIGH_FAN_TRSHD: Final[float] = float(os.getenv("HIGH_FAN_TRSHD", 40))
GETTMPCMND = os.getenv("GETTMPCMND", "_temp_rd")
SETFANCMND = os.getenv("SETFANCMND", "set_speed")
GETTMPCMND: Final[str] = os.getenv("GETTMPCMND", "_temp_rd")
SETFANCMND: Final[str] = os.getenv("SETFANCMND", "set_speed")
DEFOUTPRCNTG = float(os.getenv("DEFOUTPRCNTG", 24))
DEFOUTPRCNTG: Final[float] = float(os.getenv("DEFOUTPRCNTG", 24))
MDSERIALTIMEOUT = float(os.getenv("MDSERIALTIMEOUT", 1))
MDSERIALTIMEOUT: Final[float] = float(os.getenv("MDSERIALTIMEOUT", 1))
TEMPREADINTERVAL = float(os.getenv("TEMPREADINTERVAL", 15))
TEMPREADINTERVAL: Final[float] = float(os.getenv("TEMPREADINTERVAL", 15))
TEMPSETING = bool(os.getenv("TEMPSETING", True))
TEMPSETING: Final[bool] = bool(os.getenv("TEMPSETING", True))
PROCESSTEMPWAITTIME = float(os.getenv("PROCESSTEMPWAITTIME", 0.75))
PROCESSTEMPWAITTIME: Final[float] = float(os.getenv("PROCESSTEMPWAITTIME", 0.75))
BACKOFFTIME = float(os.getenv("BACKOFFTIME", 5))
BACKOFFTIME: Final[float] = float(os.getenv("BACKOFFTIME", 5))
# INFLUXDB config
# token = "apg1gysUeCcxdcRTMmosJTenbEppmUNi9rXlANDB2oNadBdWAu2GVTDc_q_dyo0iyYsckKaOvPRm6ba2NK0y_A=="
token = os.getenv("INFLUX_TOKEN")
INFLUXTOKEN: Final[str] = os.getenv("INFLUX_TOKEN")
# bucket = "JBOD"
bucket = os.getenv("INFLUX_BUCKET")
INFLUXBUCKET: Final[str] = os.getenv("INFLUX_BUCKET")
# org = "staging"
org = os.getenv("INFLUX_ORG")
INFLUXORG: Final[str] = os.getenv("INFLUX_ORG")
# url = "http://localhost:8086"
url = os.getenv("INFLUX_URL")
INFLUXURL: Final[str] = os.getenv("INFLUX_URL")
# measurement = "MD1200"
measurement = os.getenv("INFLUX_MEASUREMENT")
INFLUXMEASUREMENT: Final[str] = os.getenv("INFLUX_MEASUREMENT")
# MACHINE_TAG = "CHONGUS1200"
MACHINE_TAG = os.getenv("INFLUX_MACHINE_TAG")
MACHINE_TAG: Final[str] = os.getenv("INFLUX_MACHINE_TAG")
# LOCATION = "HQ"
LOCATION = os.getenv("INFLUX_LOCATION")
LOCATION: Final[str] = os.getenv("INFLUX_LOCATION")
# INFLX_SEPARATE_POINTS = 0.1
# INFLUX_SEPARATE_POINTS = float(os.getenv("INFLUX_SEPARATE_POINTS"), 0.1)
@ -69,12 +70,12 @@ lastTempReading = 0
inflxdb_LeData = []
# Initialize InfluxDB client and influxdb API
# ---------------------UNCOMMENT-----------------------
inflxdb_client = influxdb_client.InfluxDBClient(url=url, token=token, org=org)
inflxdb_client = influxdb_client.InfluxDBClient(url=INFLUXURL, token=INFLUXTOKEN, org=INFLUXORG)
write_api = inflxdb_client.write_api(write_options=SYNCHRONOUS)
# ---------------------UNCOMMENT-----------------------
def getTemp():
def getTemp() -> dict:
global MDict, fluxSending
@ -143,7 +144,7 @@ def getTemp():
return MDict
def setSpeed(inSpeeDict: dict):
def setSpeed(inSpeeDict: dict) -> None:
bpavrg = 0
# default
@ -186,7 +187,7 @@ def setSpeed(inSpeeDict: dict):
# print("Port allready opened.\nTry closing it first")
# Threaded flow processor
def process_temps():
def process_temps() -> None:
global MDict, fluxSending
while True:
@ -197,7 +198,7 @@ def process_temps():
# Prep InfluxDB data
try:
inflxdb_Data_To_Send = (
influxdb_client.Point(f"{measurement}-script")
influxdb_client.Point(f"{INFLUXMEASUREMENT}-script")
.tag("MACHINE", MACHINE_TAG)
.tag("LOCATION", LOCATION)
.field("Backplane1", MDict["bp1"])
@ -215,7 +216,7 @@ def process_temps():
# Prep/append data
inflxdb_LeData.append(inflxdb_Data_To_Send)
# Send data to InfluxDB
write_api.write(bucket=bucket, org=org, record=inflxdb_Data_To_Send)
write_api.write(bucket=INFLUXBUCKET, org=INFLUXORG, record=inflxdb_Data_To_Send)
# Clean up before another lo#op, 0.75
@ -234,7 +235,7 @@ def process_temps():
MDict = getTemp()
lastTempReading = time.time()
def mainCodeHere():
def mainCodeHere() -> None:
while True:
global MDict, fluxSending, currentTime, lastTempReading
# https://stackoverflow.com/questions/52578122/not-able-to-send-the-enter-command-on-pyserial