diff --git a/Python/INFLUXDBmthrd.py b/Python/INFLUXDBmthrd.py index 764078c..3913f69 100644 --- a/Python/INFLUXDBmthrd.py +++ b/Python/INFLUXDBmthrd.py @@ -6,30 +6,31 @@ from proto import manWhatTheProto from IP2Loc import ermWhatTheCountry from whatDomain import ermWhatATheIpFromDomainYaCrazy, ermWhatAAAATheIpFromDomainYaCrazy from concurrent.futures import ThreadPoolExecutor +from typing import Final # Netentry preconf -WHAT_THE_NETFLOW_PORT = 2055 -WHAT_THE_NETFLOW_IP = "0.0.0.0" +WHAT_THE_NETFLOW_PORT: Final[int] = 2055 +WHAT_THE_NETFLOW_IP: Final[str] = "0.0.0.0" # INFLUXDB config -token = "apg1gysUeCcxdcRTMmosJTenbEppmUNi9rXlANDB2oNadBdWAu2GVTDc_q_dyo0iyYsckKaOvPRm6ba2NK0y_A==" +INFLUXTOKEN: Final[str] = "apg1gysUeCcxdcRTMmosJTenbEppmUNi9rXlANDB2oNadBdWAu2GVTDc_q_dyo0iyYsckKaOvPRm6ba2NK0y_A==" #token = os.getenv("INFLUX_TOKEN") -bucket = "NETFLOW-7" +INFLUXBUCKET: Final[str] = "NETFLOW-7" # bucket = os.getenv("INFLUX_BUCKET") -org = "staging" +INFLUXORG: Final[str] = "staging" # org = os.getenv("INFLUX_ORG") -url = "http://localhost:8086" +INFLUXURL: Final[str] = "http://localhost:8086" # url = os.getenv("INFLUX_URL") -measurement = "testNetFlowPython" +INFLUXMEASUREMENT: Final[str] = "testNetFlowPython" # measurement = os.getenv("INFLUX_MEASUREMENT") -MACHINE_TAG = "YUKIKAZE" +MACHINE_TAG: FINAL[str] = "YUKIKAZE" # MACHINE_TAG = os.getenv("INFLUX_MACHINE_TAG") -ROUTER_TAG = "HQ" +ROUTER_TAG: Final[str] = "HQ" # ROUTER_TAG = os.getenv("INFLUX_ROUTER_TAG") -INFLX_SEPARATE_POINTS = 0.05 +INFLX_SEPARATE_POINTS: Final[float] = 0.05 # Initialize InfluxDB client and influxdb API -inflxdb_client = influxdb_client.InfluxDBClient(url=url, token=token, org=org) +inflxdb_client = influxdb_client.InfluxDBClient(url=INFLUXURL, token=INFLUXTOKEN, org=INFLUXORG) #write_api = inflxdb_client.write_api(write_options=SYNCHRONOUS) write_api = inflxdb_client.write_api(write_options=WriteOptions(batch_size=500, flush_interval=1000)) @@ -64,7 +65,7 @@ def process_flow(i, entry): # Prep InfluxDB data inflxdb_Data_To_Send = ( - influxdb_client.Point(f"{measurement}-script") + influxdb_client.Point(f"{INFLUXMEASUREMENT}-script") .tag("MACHINE", MACHINE_TAG) .tag("ROUTER", ROUTER_TAG) .field("dstAddr", inEntry["IPV4_DST_ADDR"]) @@ -140,7 +141,7 @@ with ThreadPoolExecutor(max_workers=8) as executor: bigDict[i] = inEntry # Send data to InfluxDB - write_api.write(bucket=bucket, org=org, record=inflxdb_Datazz_To_Send) + write_api.write(bucket=INFLUXBUCKET, org=INFLUXORG, record=inflxdb_Datazz_To_Send) time.sleep(INFLX_SEPARATE_POINTS) # separate points print(f"{len(bigDict)} <--- This many entrys") diff --git a/Python/IP2Loc.py b/Python/IP2Loc.py index 64d587f..e08136f 100644 --- a/Python/IP2Loc.py +++ b/Python/IP2Loc.py @@ -2,9 +2,9 @@ import IP2Location from typing import Optional, Annotated # Load database once -ip2loc_db = IP2Location.IP2Location("IP2LOCATION-LITE-DB9.BIN") +ip2loc_db: IP2Location = IP2Location.IP2Location("IP2LOCATION-LITE-DB9.BIN") -def ermWhatTheCountry(inpIpAddress: Annotated[str, "Some IP address that ya want to get country for"]): +def ermWhatTheCountry(inpIpAddress: Annotated[str, "Some IP address that ya want to get country for"]) -> str: try: skibidi = ip2loc_db.get_all(inpIpAddress) @@ -14,7 +14,7 @@ def ermWhatTheCountry(inpIpAddress: Annotated[str, "Some IP address that ya want except Exception as errrrrr: return f"Error: {errrrrr}" -def ermWhatTheISP(inpIpAddress: Annotated[str, "Some IP address that ya want to get ISP for"]): +def ermWhatTheISP(inpIpAddress: Annotated[str, "Some IP address that ya want to get ISP for"]) -> str: try: skibidi = ip2loc_db.get_all(inpIpAddress) diff --git a/Python/proto.py b/Python/proto.py index bb2a536..05e20a6 100644 --- a/Python/proto.py +++ b/Python/proto.py @@ -153,7 +153,7 @@ PROTO_MAP = { } -def manWhatTheProto(inpProtoNumbrMaybe: Annotated[int, "Protocol number goes here"]): +def manWhatTheProto(inpProtoNumbrMaybe: Annotated[int, "Protocol number goes here"]) -> int: if inpProtoNumbrMaybe <= 145: return PROTO_MAP.get(inpProtoNumbrMaybe) @@ -167,7 +167,7 @@ def manWhatTheProto(inpProtoNumbrMaybe: Annotated[int, "Protocol number goes her elif inpProtoNumbrMaybe not in PROTO_MAP: return inpProtoNumbrMaybe else: - return "no" + return -1 #outPotentialProtoNameIfItExistsInInternalList = PROTO_MAP.get(inpProtoNumbrMaybe) diff --git a/Python/whatDomain.py b/Python/whatDomain.py index 70cb81e..68efcfe 100644 --- a/Python/whatDomain.py +++ b/Python/whatDomain.py @@ -1,10 +1,11 @@ #from nslookup import Nslookup from typing import Optional, Annotated import dns, dns.resolver +from typing import Final # https://www.codeunderscored.com/nslookup-python/ -def ermWhatATheIpFromDomainYaCrazy(inpDomainNameOrSomething: Annotated[str, "Domain name to lookup IP for"]): +def ermWhatATheIpFromDomainYaCrazy(inpDomainNameOrSomething: Annotated[str, "Domain name to lookup IP for"]) -> dict: #dns_query = Nslookup() """ Tells you what IPv4 address/es a domain point to. @@ -14,7 +15,7 @@ def ermWhatATheIpFromDomainYaCrazy(inpDomainNameOrSomething: Annotated[str, "Dom """ # i = 0 - outDict = {} + outDict: dict = {} #result = dns_query.dns_lookup("example.com") #result = Nslookup.dns_lookup(inpDomainNameOrSomething) @@ -42,7 +43,7 @@ def ermWhatATheIpFromDomainYaCrazy(inpDomainNameOrSomething: Annotated[str, "Dom return outDict -def ermWhatAAAATheIpFromDomainYaCrazy(inpDomainNameOrSomething: Annotated[str, "Domain name to lookup IP for"]): +def ermWhatAAAATheIpFromDomainYaCrazy(inpDomainNameOrSomething: Annotated[str, "Domain name to lookup IP for"]) -> dict: #dns_query = Nslookup() """ Tells you what IPv6 address/es a domain point to. @@ -53,7 +54,7 @@ def ermWhatAAAATheIpFromDomainYaCrazy(inpDomainNameOrSomething: Annotated[str, " # i = 0 - outDict = {} + outDict: dict = {} #result = dns_query.dns_lookup("example.com") #result = Nslookup.dns_lookup(inpDomainNameOrSomething) @@ -82,7 +83,7 @@ def ermWhatAAAATheIpFromDomainYaCrazy(inpDomainNameOrSomething: Annotated[str, " return outDict -def ermWhatPTRTheIpFromDomainYaCrazy(inpIpAddressOrSomething: Annotated[str, "IP address to lookup domain for"]): +def ermWhatPTRTheIpFromDomainYaCrazy(inpIpAddressOrSomething: Annotated[str, "IP address to lookup domain for"]) -> dict: #dns_query = Nslookup() """ Tells you what IPv6 address/es a domain point to. @@ -91,16 +92,16 @@ def ermWhatPTRTheIpFromDomainYaCrazy(inpIpAddressOrSomething: Annotated[str, "IP """ - whatToCheck = inpIpAddressOrSomething + ".in-addr.arpa" + WHATTOCHECK: Final[str] = inpIpAddressOrSomething + ".in-addr.arpa" # i = 0 - outDict = {} + outDict: dict = {} #result = dns_query.dns_lookup("example.com") #result = Nslookup.dns_lookup(inpDomainNameOrSomething) try: - result = dns.resolver.resolve(whatToCheck, 'PTR') + result = dns.resolver.resolve(WHATTOCHECK, 'PTR') except dns.resolver.NoAnswer: print("\nDNS ERROR") print("No answer from dns server.\n")