This commit is contained in:
YuruC3 2025-08-09 10:28:47 +02:00
parent a2950a7c14
commit 1fc5c263c6
4 changed files with 28 additions and 26 deletions

View File

@ -6,30 +6,31 @@ from proto import manWhatTheProto
from IP2Loc import ermWhatTheCountry
from whatDomain import ermWhatATheIpFromDomainYaCrazy, ermWhatAAAATheIpFromDomainYaCrazy
from concurrent.futures import ThreadPoolExecutor
from typing import Final
# Netentry preconf
WHAT_THE_NETFLOW_PORT = 2055
WHAT_THE_NETFLOW_IP = "0.0.0.0"
WHAT_THE_NETFLOW_PORT: Final[int] = 2055
WHAT_THE_NETFLOW_IP: Final[str] = "0.0.0.0"
# INFLUXDB config
token = "apg1gysUeCcxdcRTMmosJTenbEppmUNi9rXlANDB2oNadBdWAu2GVTDc_q_dyo0iyYsckKaOvPRm6ba2NK0y_A=="
INFLUXTOKEN: Final[str] = "apg1gysUeCcxdcRTMmosJTenbEppmUNi9rXlANDB2oNadBdWAu2GVTDc_q_dyo0iyYsckKaOvPRm6ba2NK0y_A=="
#token = os.getenv("INFLUX_TOKEN")
bucket = "NETFLOW-7"
INFLUXBUCKET: Final[str] = "NETFLOW-7"
# bucket = os.getenv("INFLUX_BUCKET")
org = "staging"
INFLUXORG: Final[str] = "staging"
# org = os.getenv("INFLUX_ORG")
url = "http://localhost:8086"
INFLUXURL: Final[str] = "http://localhost:8086"
# url = os.getenv("INFLUX_URL")
measurement = "testNetFlowPython"
INFLUXMEASUREMENT: Final[str] = "testNetFlowPython"
# measurement = os.getenv("INFLUX_MEASUREMENT")
MACHINE_TAG = "YUKIKAZE"
MACHINE_TAG: FINAL[str] = "YUKIKAZE"
# MACHINE_TAG = os.getenv("INFLUX_MACHINE_TAG")
ROUTER_TAG = "HQ"
ROUTER_TAG: Final[str] = "HQ"
# ROUTER_TAG = os.getenv("INFLUX_ROUTER_TAG")
INFLX_SEPARATE_POINTS = 0.05
INFLX_SEPARATE_POINTS: Final[float] = 0.05
# Initialize InfluxDB client and influxdb API
inflxdb_client = influxdb_client.InfluxDBClient(url=url, token=token, org=org)
inflxdb_client = influxdb_client.InfluxDBClient(url=INFLUXURL, token=INFLUXTOKEN, org=INFLUXORG)
#write_api = inflxdb_client.write_api(write_options=SYNCHRONOUS)
write_api = inflxdb_client.write_api(write_options=WriteOptions(batch_size=500, flush_interval=1000))
@ -64,7 +65,7 @@ def process_flow(i, entry):
# Prep InfluxDB data
inflxdb_Data_To_Send = (
influxdb_client.Point(f"{measurement}-script")
influxdb_client.Point(f"{INFLUXMEASUREMENT}-script")
.tag("MACHINE", MACHINE_TAG)
.tag("ROUTER", ROUTER_TAG)
.field("dstAddr", inEntry["IPV4_DST_ADDR"])
@ -140,7 +141,7 @@ with ThreadPoolExecutor(max_workers=8) as executor:
bigDict[i] = inEntry
# Send data to InfluxDB
write_api.write(bucket=bucket, org=org, record=inflxdb_Datazz_To_Send)
write_api.write(bucket=INFLUXBUCKET, org=INFLUXORG, record=inflxdb_Datazz_To_Send)
time.sleep(INFLX_SEPARATE_POINTS) # separate points
print(f"{len(bigDict)} <--- This many entrys")

View File

@ -2,9 +2,9 @@ import IP2Location
from typing import Optional, Annotated
# Load database once
ip2loc_db = IP2Location.IP2Location("IP2LOCATION-LITE-DB9.BIN")
ip2loc_db: IP2Location = IP2Location.IP2Location("IP2LOCATION-LITE-DB9.BIN")
def ermWhatTheCountry(inpIpAddress: Annotated[str, "Some IP address that ya want to get country for"]):
def ermWhatTheCountry(inpIpAddress: Annotated[str, "Some IP address that ya want to get country for"]) -> str:
try:
skibidi = ip2loc_db.get_all(inpIpAddress)
@ -14,7 +14,7 @@ def ermWhatTheCountry(inpIpAddress: Annotated[str, "Some IP address that ya want
except Exception as errrrrr:
return f"Error: {errrrrr}"
def ermWhatTheISP(inpIpAddress: Annotated[str, "Some IP address that ya want to get ISP for"]):
def ermWhatTheISP(inpIpAddress: Annotated[str, "Some IP address that ya want to get ISP for"]) -> str:
try:
skibidi = ip2loc_db.get_all(inpIpAddress)

View File

@ -153,7 +153,7 @@ PROTO_MAP = {
}
def manWhatTheProto(inpProtoNumbrMaybe: Annotated[int, "Protocol number goes here"]):
def manWhatTheProto(inpProtoNumbrMaybe: Annotated[int, "Protocol number goes here"]) -> int:
if inpProtoNumbrMaybe <= 145:
return PROTO_MAP.get(inpProtoNumbrMaybe)
@ -167,7 +167,7 @@ def manWhatTheProto(inpProtoNumbrMaybe: Annotated[int, "Protocol number goes her
elif inpProtoNumbrMaybe not in PROTO_MAP:
return inpProtoNumbrMaybe
else:
return "no"
return -1
#outPotentialProtoNameIfItExistsInInternalList = PROTO_MAP.get(inpProtoNumbrMaybe)

View File

@ -1,10 +1,11 @@
#from nslookup import Nslookup
from typing import Optional, Annotated
import dns, dns.resolver
from typing import Final
# https://www.codeunderscored.com/nslookup-python/
def ermWhatATheIpFromDomainYaCrazy(inpDomainNameOrSomething: Annotated[str, "Domain name to lookup IP for"]):
def ermWhatATheIpFromDomainYaCrazy(inpDomainNameOrSomething: Annotated[str, "Domain name to lookup IP for"]) -> dict:
#dns_query = Nslookup()
"""
Tells you what IPv4 address/es a domain point to.
@ -14,7 +15,7 @@ def ermWhatATheIpFromDomainYaCrazy(inpDomainNameOrSomething: Annotated[str, "Dom
"""
# i = 0
outDict = {}
outDict: dict = {}
#result = dns_query.dns_lookup("example.com")
#result = Nslookup.dns_lookup(inpDomainNameOrSomething)
@ -42,7 +43,7 @@ def ermWhatATheIpFromDomainYaCrazy(inpDomainNameOrSomething: Annotated[str, "Dom
return outDict
def ermWhatAAAATheIpFromDomainYaCrazy(inpDomainNameOrSomething: Annotated[str, "Domain name to lookup IP for"]):
def ermWhatAAAATheIpFromDomainYaCrazy(inpDomainNameOrSomething: Annotated[str, "Domain name to lookup IP for"]) -> dict:
#dns_query = Nslookup()
"""
Tells you what IPv6 address/es a domain point to.
@ -53,7 +54,7 @@ def ermWhatAAAATheIpFromDomainYaCrazy(inpDomainNameOrSomething: Annotated[str, "
# i = 0
outDict = {}
outDict: dict = {}
#result = dns_query.dns_lookup("example.com")
#result = Nslookup.dns_lookup(inpDomainNameOrSomething)
@ -82,7 +83,7 @@ def ermWhatAAAATheIpFromDomainYaCrazy(inpDomainNameOrSomething: Annotated[str, "
return outDict
def ermWhatPTRTheIpFromDomainYaCrazy(inpIpAddressOrSomething: Annotated[str, "IP address to lookup domain for"]):
def ermWhatPTRTheIpFromDomainYaCrazy(inpIpAddressOrSomething: Annotated[str, "IP address to lookup domain for"]) -> dict:
#dns_query = Nslookup()
"""
Tells you what IPv6 address/es a domain point to.
@ -91,16 +92,16 @@ def ermWhatPTRTheIpFromDomainYaCrazy(inpIpAddressOrSomething: Annotated[str, "IP
"""
whatToCheck = inpIpAddressOrSomething + ".in-addr.arpa"
WHATTOCHECK: Final[str] = inpIpAddressOrSomething + ".in-addr.arpa"
# i = 0
outDict = {}
outDict: dict = {}
#result = dns_query.dns_lookup("example.com")
#result = Nslookup.dns_lookup(inpDomainNameOrSomething)
try:
result = dns.resolver.resolve(whatToCheck, 'PTR')
result = dns.resolver.resolve(WHATTOCHECK, 'PTR')
except dns.resolver.NoAnswer:
print("\nDNS ERROR")
print("No answer from dns server.\n")