Reorder, categorise and clean up. Also now there
is a somewhat usefull readme.
This commit is contained in:
parent
262c297350
commit
ef535fa2df
4
.gitignore
vendored
4
.gitignore
vendored
@ -1,3 +1,5 @@
|
||||
venv
|
||||
__pycache__
|
||||
IP2Location
|
||||
IP2Location
|
||||
flows
|
||||
old
|
104
INFLUXDB.py.bck
104
INFLUXDB.py.bck
@ -1,104 +0,0 @@
|
||||
import netflow, socket, json, time, os, influxdb_client, ipaddress
|
||||
from influxdb_client import InfluxDBClient, Point, WritePrecision
|
||||
from influxdb_client.client.write_api import SYNCHRONOUS
|
||||
from datetime import timedelta
|
||||
|
||||
# Netentry preconf
|
||||
WHAT_THE_NETFLOW_PORT = 2055
|
||||
WHAT_THE_NETFLOW_IP = "0.0.0.0"
|
||||
|
||||
|
||||
# INFLUXDB config
|
||||
|
||||
token = "L56MZGgLNOPcL_nfc4bYujQHPVB0xNGMEHXJj8CEWrE3lioi3PN4eH52NhnEyKdFTfwW1Tlqhl35PAGSva5qXw=="
|
||||
bucket = "NETFLOW-STAGING"
|
||||
org = "staging"
|
||||
url = "http://localhost:8086"
|
||||
measurement = "testNetFlowPython"
|
||||
LOCATION_TAG = "YUKIKAZE"
|
||||
INFLX_SEPARATE_POINTS = 0.1
|
||||
|
||||
# Initialize InfluxDB client
|
||||
inflxdb_client = influxdb_client.InfluxDBClient(url=url, token=token, org=org)
|
||||
|
||||
# Other preconf
|
||||
bigDict = {}
|
||||
|
||||
# Get netentry data ig?
|
||||
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
||||
sock.bind((WHAT_THE_NETFLOW_IP, WHAT_THE_NETFLOW_PORT))
|
||||
payload, client = sock.recvfrom(4096) # experimental, tested with 1464 bytes
|
||||
p = netflow.parse_packet(payload) # Test result: <ExportPacket v5 with 30 records>
|
||||
#print(p.entrys) # Test result: 5
|
||||
|
||||
|
||||
for i, entry in enumerate(p.flows, 1):
|
||||
# prep dict
|
||||
#tmpEntry = str(entry)
|
||||
#tmpEntry = tmpEntry[22:-1]
|
||||
#tmpEntry2 = tmpEntry.replace("'", '"')
|
||||
|
||||
#print(tmpEntry2)
|
||||
#print(entry
|
||||
#exit()
|
||||
#dictEntry = json.loads(tmpEntry2)
|
||||
#bigDict[i] = (dictEntry)
|
||||
|
||||
|
||||
# take data out from netentry
|
||||
inEntry = entry.data
|
||||
|
||||
# Convert IPs and time duration
|
||||
# IPs
|
||||
inEntry["IPV4_SRC_ADDR"] = str(ipaddress.IPv4Address(inEntry["IPV4_SRC_ADDR"]))
|
||||
inEntry["IPV4_DST_ADDR"] = str(ipaddress.IPv4Address(inEntry["IPV4_DST_ADDR"]))
|
||||
|
||||
# Convert time from ms to HH:MM:SS
|
||||
first = int(inEntry["FIRST_SWITCHED"])
|
||||
last = int(inEntry["LAST_SWITCHED"])
|
||||
|
||||
inEntry["FIRST_SWITCHED_HR"] = str(timedelta(milliseconds=first))
|
||||
inEntry["LAST_SWITCHED_HR"] = str(timedelta(milliseconds=last))
|
||||
|
||||
|
||||
# Prep InfluxDB data
|
||||
inflxdb_Data_To_Send = (
|
||||
influxdb_client.Point(f"{measurement}-script")
|
||||
.tag("LOCATION", LOCATION_TAG)
|
||||
.field("dstAddr", inEntry["IPV4_DST_ADDR"])
|
||||
.field("srcAddr", inEntry["IPV4_SRC_ADDR"])
|
||||
.field("nextHop", inEntry["NEXT_HOP"])
|
||||
.field("inptInt", inEntry["INPUT"])
|
||||
.field("outptInt", inEntry["OUTPUT"])
|
||||
.field("inPackt", inEntry["IN_PACKETS"])
|
||||
.field("outPakt", inEntry["IN_OCTETS"])
|
||||
.field("frstSwtchd", inEntry["FIRST_SWITCHED"])
|
||||
.field("lstSwtchd", inEntry["LAST_SWITCHED"])
|
||||
.field("srcPort", inEntry["SRC_PORT"])
|
||||
.field("dstPort", inEntry["DST_PORT"])
|
||||
.field("tcpFlags", inEntry["TCP_FLAGS"])
|
||||
.field("proto", inEntry["PROTO"])
|
||||
.field("tos", inEntry["TOS"])
|
||||
.field("srcAS", inEntry["SRC_AS"])
|
||||
.field("dstAS", inEntry["DST_AS"])
|
||||
.field("srcMask", inEntry["SRC_MASK"])
|
||||
.field("dstMask", inEntry["DST_MASK"])
|
||||
)
|
||||
|
||||
# idk
|
||||
write_api = inflxdb_client.write_api(write_options=SYNCHRONOUS)
|
||||
|
||||
# Send data to InfluxDB
|
||||
write_api.write(bucket=bucket, org="staging", record=inflxdb_Data_To_Send)
|
||||
time.sleep(INFLX_SEPARATE_POINTS) # separate points by 1 second
|
||||
|
||||
#i+=1
|
||||
#type(tmpEntry)
|
||||
#print(dictEntry)
|
||||
#print(tmpEntry.lstrip(20))
|
||||
|
||||
print("----------------")
|
||||
bigDict[i] = (inEntry)
|
||||
|
||||
#print(bigDict)
|
||||
print(f"{len(bigDict)} <--- This many entrys")
|
17
Main.py
17
Main.py
@ -1,17 +0,0 @@
|
||||
from scapy.all import *
|
||||
|
||||
while True:
|
||||
netflow = NetflowHeader()/NetflowHeaderV5(count=1)/NetflowRecordV5(dst="172.20.240.2")
|
||||
pkt = Ether()/IP()/UDP()/netflow
|
||||
|
||||
print(pkt)
|
||||
|
||||
UDP.payload_guess = [({}, NetflowHeader)]
|
||||
pkts = sniff(iface="VIP-NET")
|
||||
|
||||
print(netflow)
|
||||
|
||||
|
||||
|
||||
print(netflow)
|
||||
print(pkts)
|
21
README.md
21
README.md
@ -1,3 +1,20 @@
|
||||
# NetFlow collector and InfluxDB and Grafana
|
||||
# NetFlowV5 collector and InfluxDB and Grafana
|
||||
|
||||
It works.
|
||||
|
||||
## Python script
|
||||
Install required modules with
|
||||
```
|
||||
pip install -r requirements.txt
|
||||
```
|
||||
|
||||
Then change InfluxDB variables.
|
||||
|
||||
### INFLUXDB.py vs INFLUXDBmthrd.py
|
||||
First one runs on one thread and should work when there isn't much data on the network.
|
||||
|
||||
Second is when there are a ton of flows that need to be collected. More flows aka more data.
|
||||
|
||||
## sysctl.d
|
||||
Place it in /etc/sysctl.d/ and apply with ```sysctl -p```
|
||||
|
||||
It works.
|
File diff suppressed because one or more lines are too long
@ -1,2 +0,0 @@
|
||||
Ready
|
||||
{'IPV4_SRC_ADDR': 3232246278, 'IPV4_DST_ADDR': 100622484, 'NEXT_HOP': 1403798273, 'INPUT': 11, 'OUTPUT': 4, 'IN_PACKETS': 5, 'IN_OCTETS': 468, 'FIRST_SWITCHED': 2565993000, 'LAST_SWITCHED': 2565994000, 'SRC_PORT': 51413, 'DST_PORT': 36839, 'TCP_FLAGS': 25, 'PROTO': 6, 'TOS': 4, 'SRC_AS': 0, 'DST_AS': 0, 'SRC_MASK': 29, 'DST_MASK': 0}
|
31
main.py
31
main.py
@ -1,31 +0,0 @@
|
||||
import netflow
|
||||
import socket
|
||||
import json
|
||||
|
||||
i=1
|
||||
bigDict = {}
|
||||
|
||||
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
||||
sock.bind(("0.0.0.0", 2055))
|
||||
payload, client = sock.recvfrom(4096) # experimental, tested with 1464 bytes
|
||||
p = netflow.parse_packet(payload) # Test result: <ExportPacket v5 with 30 records>
|
||||
#print(p.flows) # Test result: 5
|
||||
|
||||
for entry in p.flows:
|
||||
# prep dict
|
||||
tmpEntry = str(entry)
|
||||
tmpEntry = tmpEntry[22:-1]
|
||||
tmpEntry2 = tmpEntry.replace("'", '"')
|
||||
|
||||
#print(tmpEntry2)
|
||||
|
||||
|
||||
|
||||
dictEntry = json.loads(tmpEntry2)
|
||||
bigDict[i] = (dictEntry)
|
||||
i+=1
|
||||
#type(tmpEntry)
|
||||
#print(dictEntry)
|
||||
#print(tmpEntry.lstrip(20))
|
||||
print("----------------")
|
||||
print(bigDict)
|
62
main2.py
62
main2.py
@ -1,62 +0,0 @@
|
||||
from scapy.all import *
|
||||
import struct
|
||||
|
||||
# Define NetFlow v5 header and record formats
|
||||
NETFLOW_V5_HEADER_LEN = 24
|
||||
NETFLOW_V5_RECORD_LEN = 48
|
||||
|
||||
def parse_netflow_v5(packet):
|
||||
if not packet.haslayer(UDP):
|
||||
return
|
||||
|
||||
udp_payload = bytes(packet[UDP].payload)
|
||||
|
||||
if len(udp_payload) < NETFLOW_V5_HEADER_LEN:
|
||||
print("Invalid NetFlow v5 header length")
|
||||
return
|
||||
|
||||
# Parse header
|
||||
header = struct.unpack('!HHIIIIBBH', udp_payload[:NETFLOW_V5_HEADER_LEN])
|
||||
version, count, uptime, unix_secs, unix_nsecs, flow_seq, engine_type, engine_id, sampling = header
|
||||
|
||||
print(f"\n--- NetFlow v5 Packet ---")
|
||||
print(f"Version: {version}, Record count: {count}, Sys uptime: {uptime}")
|
||||
print(f"Unix secs: {unix_secs}, Flow sequence: {flow_seq}")
|
||||
|
||||
# Parse flow records
|
||||
offset = NETFLOW_V5_HEADER_LEN
|
||||
for i in range(count):
|
||||
if offset + NETFLOW_V5_RECORD_LEN > len(udp_payload):
|
||||
print("Incomplete record")
|
||||
break
|
||||
|
||||
record = struct.unpack('!IIIHHIIIIHHBBBBHHBBH', udp_payload[offset:offset + NETFLOW_V5_RECORD_LEN])
|
||||
src_addr = inet_ntoa(struct.pack('!I', record[0]))
|
||||
dst_addr = inet_ntoa(struct.pack('!I', record[1]))
|
||||
next_hop = inet_ntoa(struct.pack('!I', record[2]))
|
||||
src_port = record[10]
|
||||
dst_port = record[11]
|
||||
packets = record[4]
|
||||
bytes_ = record[5]
|
||||
|
||||
print(f"\nFlow #{i+1}")
|
||||
print(f"Src IP: {src_addr}:{src_port} → Dst IP: {dst_addr}:{dst_port}")
|
||||
print(f"Next hop: {next_hop}, Packets: {packets}, Bytes: {bytes_}")
|
||||
|
||||
offset += NETFLOW_V5_RECORD_LEN
|
||||
|
||||
# Sniff on a given interface or from a pcap file
|
||||
def start_sniff(interface=None, pcap_file=None):
|
||||
if pcap_file:
|
||||
packets = rdpcap(pcap_file)
|
||||
for pkt in packets:
|
||||
parse_netflow_v5(pkt)
|
||||
elif interface:
|
||||
sniff(iface=interface, filter="udp port 2055", prn=parse_netflow_v5)
|
||||
|
||||
# Example usage
|
||||
if __name__ == "__main__":
|
||||
# Provide either an interface or a pcap file
|
||||
start_sniff(pcap_file="netflowv5_sample.pcap")
|
||||
# start_sniff(interface="eth0")
|
||||
|
14
sysctl.d/netflow.conf
Normal file
14
sysctl.d/netflow.conf
Normal file
@ -0,0 +1,14 @@
|
||||
# for less then 75k flows per second
|
||||
net.core.netdev_max_backlog=4096
|
||||
net.core.rmem_default=262144
|
||||
net.core.rmem_max=67108864
|
||||
net.ipv4.udp_rmem_min=131072
|
||||
net.ipv4.udp_mem=2097152 4194304 8388608
|
||||
|
||||
|
||||
# for more than 75k flows per second
|
||||
#net.core.netdev_max_backlog=8192
|
||||
#net.core.rmem_default=262144
|
||||
#net.core.rmem_max=134217728
|
||||
#net.ipv4.udp_rmem_min=131072
|
||||
#net.ipv4.udp_mem=4194304 8388608 16777216
|
8
systemdConfig/README.md
Normal file
8
systemdConfig/README.md
Normal file
@ -0,0 +1,8 @@
|
||||
# What you need to prepare
|
||||
|
||||
virtual enviroment in python with installed packages
|
||||
|
||||
python script and InfluxDB
|
||||
|
||||
## Change stuff
|
||||
In .service file change everything that has <SOMETHIN_LIKE_THIS> and /path/to/dir
|
@ -1,16 +1,19 @@
|
||||
[Unit]
|
||||
Description=Netflow to InfluxDB script
|
||||
After=multi-user.target network.target network-online.target
|
||||
# Place in /etc/systemd/system/
|
||||
|
||||
[Service]
|
||||
User=yuru
|
||||
Group=yuru
|
||||
Type=simple
|
||||
Restart=on-failure
|
||||
# EnvironmentFile=/etc/NetFlux/netflow.env
|
||||
# User=myuser
|
||||
WorkingDirectory=/etc/NetFlux/HQ/
|
||||
ExecStart=/etc/NetFlux/HQ/venv/bin/python3 /etc/NetFlux/HQ/HQnetflow.py --serve-in-foreground
|
||||
# ExecStart=/usr/bin/python3 /usr/bin/NetFlux/INFLUXDBmthrd.py
|
||||
# StandardInput=tty-force
|
||||
|
||||
#StandardInput=tty-force
|
||||
|
||||
# Log file will be create if it doesn't exist
|
||||
StandardOutput=append:/var/log/HQNetFlowInflux.log
|
@ -4,20 +4,19 @@ After=multi-user.target network.target network-online.target
|
||||
# Place in /etc/systemd/system/
|
||||
|
||||
[Service]
|
||||
User=root
|
||||
Group=root
|
||||
User=<CHANGE>
|
||||
Group=<CHANGE>
|
||||
Type=simple
|
||||
Restart=on-failure
|
||||
# EnvironmentFile=/etc/NetFlux/netflow.env
|
||||
# User=myuser
|
||||
WorkingDirectory=/etc/NetFlux/HAOS/
|
||||
ExecStart=/etc/NetFlux/HAOS/venv/bin/python3 /etc/NetFlux/HAOS/HAOSnetflow.py --serve-in-foreground
|
||||
# ExecStart=/usr/bin/python3 /usr/bin/NetFlux/INFLUXDBmthrd.py
|
||||
WorkingDirectory=/dir/to/script/
|
||||
ExecStart=/dir/to/script'sVENV/venv/bin/python3 /dir/to/script/NetFlowCollect.py --serve-in-foreground
|
||||
#StandardInput=tty-force
|
||||
|
||||
# Log file will be create if it doesn't exist
|
||||
StandardOutput=append:/var/log/HAOSNetFlowInflux.log
|
||||
StandardError=append:/var/log/HAOSNetFlowInflux.errlog
|
||||
StandardOutput=append:/var/log/NetFlowCollect.log
|
||||
StandardError=append:/var/log/NetFlowCollect.errlog
|
||||
|
||||
# StandardOutput=syslog
|
||||
# StandardError=syslog
|
@ -1,138 +0,0 @@
|
||||
import netflow, socket, json, time, os, influxdb_client, ipaddress
|
||||
from influxdb_client import InfluxDBClient, Point, WritePrecision
|
||||
from influxdb_client.client.write_api import SYNCHRONOUS, ASYNCHRONOUS, WriteOptions
|
||||
from datetime import timedelta
|
||||
from proto import manWhatTheProto
|
||||
from IP2Loc import ermWhatTheCountry
|
||||
from whatDomain import ermWhatATheIpFromDomainYaCrazy, ermWhatAAAATheIpFromDomainYaCrazy
|
||||
|
||||
# Netentry preconf
|
||||
WHAT_THE_NETFLOW_PORT = 2055
|
||||
WHAT_THE_NETFLOW_IP = "0.0.0.0"
|
||||
|
||||
|
||||
# INFLUXDB config
|
||||
|
||||
token = "apg1gysUeCcxdcRTMmosJTenbEppmUNi9rXlANDB2oNadBdWAu2GVTDc_q_dyo0iyYsckKaOvPRm6ba2NK0y_A=="
|
||||
bucket = "NETFLOW-7"
|
||||
org = "staging"
|
||||
url = "http://localhost:8086"
|
||||
measurement = "testNetFlowPython"
|
||||
MACHINE_TAG = "YUKIKAZE"
|
||||
ROUTER_TAG = "HQ"
|
||||
INFLX_SEPARATE_POINTS = 0.1
|
||||
|
||||
# Initialize InfluxDB client and influxdb API
|
||||
inflxdb_client = influxdb_client.InfluxDBClient(url=url, token=token, org=org)
|
||||
write_api = inflxdb_client.write_api(write_options=SYNCHRONOUS)
|
||||
|
||||
# Other preconf
|
||||
bigDict = {}
|
||||
inflxdb_Datazz_To_Send = []
|
||||
|
||||
# Bind
|
||||
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
||||
sock.bind((WHAT_THE_NETFLOW_IP, WHAT_THE_NETFLOW_PORT))
|
||||
|
||||
print("Ready")
|
||||
|
||||
while True:
|
||||
# Get netentry data ig?
|
||||
payload, client = sock.recvfrom(4096) # experimental, tested with 1464 bytes
|
||||
p = netflow.parse_packet(payload) # Test result: <ExportPacket v5 with 30 records>
|
||||
#print(p.entrys) # Test result: 5
|
||||
|
||||
#yesyes = p.flows
|
||||
#print(yesyes.data)
|
||||
#exit()
|
||||
|
||||
|
||||
|
||||
for i, entry in enumerate(p.flows, 1):
|
||||
# prep dict
|
||||
#tmpEntry = str(entry)
|
||||
#tmpEntry = tmpEntry[22:-1]
|
||||
#tmpEntry2 = tmpEntry.replace("'", '"')
|
||||
|
||||
#print(tmpEntry2)
|
||||
#print(entry
|
||||
#exit()
|
||||
#dictEntry = json.loads(tmpEntry2)
|
||||
#bigDict[i] = (dictEntry)
|
||||
|
||||
|
||||
# take data out from netentry
|
||||
inEntry = entry.data
|
||||
|
||||
print(inEntry)
|
||||
exit()
|
||||
|
||||
# Convert IPs and time duration
|
||||
# IPs
|
||||
inEntry["IPV4_SRC_ADDR"] = str(ipaddress.IPv4Address(inEntry["IPV4_SRC_ADDR"]))
|
||||
inEntry["IPV4_DST_ADDR"] = str(ipaddress.IPv4Address(inEntry["IPV4_DST_ADDR"]))
|
||||
inEntry["NEXT_HOP"] = str(ipaddress.IPv4Address(inEntry["NEXT_HOP"]))
|
||||
|
||||
# Convert time from ms to HH:MM:SS
|
||||
first = int(inEntry["FIRST_SWITCHED"])
|
||||
last = int(inEntry["LAST_SWITCHED"])
|
||||
|
||||
inEntry["FIRST_SWITCHED_HR"] = str(timedelta(milliseconds=first))
|
||||
inEntry["LAST_SWITCHED_HR"] = str(timedelta(milliseconds=last))
|
||||
|
||||
|
||||
# Prep InfluxDB data
|
||||
inflxdb_Data_To_Send = (
|
||||
influxdb_client.Point(f"{measurement}-script")
|
||||
.tag("MACHINE", MACHINE_TAG)
|
||||
.tag("ROUTER", ROUTER_TAG)
|
||||
.field("dstAddr", inEntry["IPV4_DST_ADDR"])
|
||||
.field("srcAddr", inEntry["IPV4_SRC_ADDR"])
|
||||
.field("nextHop", inEntry["NEXT_HOP"])
|
||||
.field("inptInt", inEntry["INPUT"])
|
||||
.field("outptInt", inEntry["OUTPUT"])
|
||||
.field("inPackt", inEntry["IN_PACKETS"])
|
||||
.field("outPakt", inEntry["IN_OCTETS"])
|
||||
.field("frstSwtchd", inEntry["FIRST_SWITCHED"])
|
||||
.field("lstSwtchd", inEntry["LAST_SWITCHED"])
|
||||
.field("srcPort", inEntry["SRC_PORT"])
|
||||
.field("dstPort", inEntry["DST_PORT"])
|
||||
.field("tcpFlags", inEntry["TCP_FLAGS"])
|
||||
.tag("proto", manWhatTheProto(int(inEntry["PROTO"])))
|
||||
.field("tos", inEntry["TOS"])
|
||||
.field("srcAS", inEntry["SRC_AS"])
|
||||
.field("dstAS", inEntry["DST_AS"])
|
||||
.field("srcMask", inEntry["SRC_MASK"])
|
||||
.field("dstMask", inEntry["DST_MASK"])
|
||||
.field("dstCntr", ermWhatTheCountry(str(inEntry["IPV4_DST_ADDR"])))
|
||||
.field("srcCntr", ermWhatTheCountry(str(inEntry["IPV4_SRC_ADDR"])))
|
||||
)
|
||||
|
||||
inflxdb_Datazz_To_Send.append(inflxdb_Data_To_Send)
|
||||
|
||||
#i+=1
|
||||
#type(tmpEntry)
|
||||
#print(dictEntry)
|
||||
#print(tmpEntry.lstrip(20))
|
||||
|
||||
print("----------------")
|
||||
bigDict[i] = (inEntry)
|
||||
|
||||
# end while True
|
||||
|
||||
print()
|
||||
print(bigDict)
|
||||
exit()
|
||||
|
||||
# Send data to InfluxDB
|
||||
write_api.write(bucket=bucket, org="staging", record=inflxdb_Data_To_Send)
|
||||
time.sleep(INFLX_SEPARATE_POINTS) # separate points
|
||||
|
||||
print(f"{len(bigDict)} <--- This many entrys")
|
||||
|
||||
|
||||
# Clean up before another loop
|
||||
bigDict.clear()
|
||||
inflxdb_Datazz_To_Send.clear()
|
||||
|
||||
#print(bigDict)
|
@ -1,151 +0,0 @@
|
||||
import netflow, socket, json, time, os, influxdb_client, ipaddress
|
||||
from influxdb_client import InfluxDBClient, Point, WritePrecision
|
||||
from influxdb_client.client.write_api import SYNCHRONOUS, ASYNCHRONOUS, WriteOptions
|
||||
from datetime import timedelta
|
||||
from proto import manWhatTheProto
|
||||
from IP2Loc import ermWhatTheCountry
|
||||
from whatDomain import ermWhatATheIpFromDomainYaCrazy, ermWhatAAAATheIpFromDomainYaCrazy
|
||||
from concurrent.futures import ThreadPoolExecutor
|
||||
|
||||
# Netentry preconf
|
||||
WHAT_THE_NETFLOW_PORT = 2055
|
||||
WHAT_THE_NETFLOW_IP = "0.0.0.0"
|
||||
|
||||
# INFLUXDB config
|
||||
token = "apg1gysUeCcxdcRTMmosJTenbEppmUNi9rXlANDB2oNadBdWAu2GVTDc_q_dyo0iyYsckKaOvPRm6ba2NK0y_A=="
|
||||
#token = os.getenv("INFLUX_TOKEN")
|
||||
bucket = "NETFLOW-7"
|
||||
# bucket = os.getenv("INFLUX_BUCKET")
|
||||
org = "staging"
|
||||
# org = os.getenv("INFLUX_ORG")
|
||||
url = "http://localhost:8086"
|
||||
# url = os.getenv("INFLUX_URL")
|
||||
measurement = "testNetFlowPython"
|
||||
# measurement = os.getenv("INFLUX_MEASUREMENT")
|
||||
MACHINE_TAG = "YUKIKAZE"
|
||||
# MACHINE_TAG = os.getenv("INFLUX_MACHINE_TAG")
|
||||
ROUTER_TAG = "HQ"
|
||||
# ROUTER_TAG = os.getenv("INFLUX_ROUTER_TAG")
|
||||
INFLX_SEPARATE_POINTS = 0.05
|
||||
|
||||
# Initialize InfluxDB client and influxdb API
|
||||
inflxdb_client = influxdb_client.InfluxDBClient(url=url, token=token, org=org)
|
||||
#write_api = inflxdb_client.write_api(write_options=SYNCHRONOUS)
|
||||
write_api = inflxdb_client.write_api(write_options=WriteOptions(batch_size=500, flush_interval=1000))
|
||||
|
||||
# Threaded flow processor
|
||||
def process_flow(i, entry):
|
||||
# prep dict
|
||||
#tmpEntry = str(entry)
|
||||
#tmpEntry = tmpEntry[22:-1]
|
||||
#tmpEntry2 = tmpEntry.replace("'", '"')
|
||||
|
||||
#print(tmpEntry2)
|
||||
#print(entry)
|
||||
#exit()
|
||||
#dictEntry = json.loads(tmpEntry2)
|
||||
#bigDict[i] = (dictEntry)
|
||||
|
||||
# take data out from netentry
|
||||
inEntry = entry.data
|
||||
|
||||
# Convert IPs and time duration
|
||||
# IPs
|
||||
inEntry["IPV4_SRC_ADDR"] = str(ipaddress.IPv4Address(inEntry["IPV4_SRC_ADDR"]))
|
||||
inEntry["IPV4_DST_ADDR"] = str(ipaddress.IPv4Address(inEntry["IPV4_DST_ADDR"]))
|
||||
inEntry["NEXT_HOP"] = str(ipaddress.IPv4Address(inEntry["NEXT_HOP"]))
|
||||
|
||||
# Convert time from ms to HH:MM:SS
|
||||
first = int(inEntry["FIRST_SWITCHED"])
|
||||
last = int(inEntry["LAST_SWITCHED"])
|
||||
|
||||
inEntry["FIRST_SWITCHED_HR"] = str(timedelta(milliseconds=first))
|
||||
inEntry["LAST_SWITCHED_HR"] = str(timedelta(milliseconds=last))
|
||||
|
||||
# Prep InfluxDB data
|
||||
inflxdb_Data_To_Send = (
|
||||
influxdb_client.Point(f"{measurement}-script")
|
||||
.tag("MACHINE", MACHINE_TAG)
|
||||
.tag("ROUTER", ROUTER_TAG)
|
||||
.field("dstAddr", inEntry["IPV4_DST_ADDR"])
|
||||
.field("srcAddr", inEntry["IPV4_SRC_ADDR"])
|
||||
.field("nextHop", inEntry["NEXT_HOP"])
|
||||
.field("inptInt", inEntry["INPUT"])
|
||||
.field("outptInt", inEntry["OUTPUT"])
|
||||
.field("inPackt", inEntry["IN_PACKETS"])
|
||||
.field("outPakt", inEntry["IN_OCTETS"])
|
||||
.field("frstSwtchd", inEntry["FIRST_SWITCHED"])
|
||||
.field("lstSwtchd", inEntry["LAST_SWITCHED"])
|
||||
.field("srcPort", inEntry["SRC_PORT"])
|
||||
.field("dstPort", inEntry["DST_PORT"])
|
||||
.field("tcpFlags", inEntry["TCP_FLAGS"])
|
||||
.tag("proto", manWhatTheProto(int(inEntry["PROTO"])))
|
||||
.field("tos", inEntry["TOS"])
|
||||
.field("srcAS", inEntry["SRC_AS"])
|
||||
.field("dstAS", inEntry["DST_AS"])
|
||||
.field("srcMask", inEntry["SRC_MASK"])
|
||||
.field("dstMask", inEntry["DST_MASK"])
|
||||
.field("dstCntr", ermWhatTheCountry(str(inEntry["IPV4_DST_ADDR"])))
|
||||
.field("srcCntr", ermWhatTheCountry(str(inEntry["IPV4_SRC_ADDR"])))
|
||||
)
|
||||
|
||||
print("----------------")
|
||||
return (i, inflxdb_Data_To_Send, inEntry)
|
||||
|
||||
# Bind
|
||||
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
||||
sock.bind((WHAT_THE_NETFLOW_IP, WHAT_THE_NETFLOW_PORT))
|
||||
|
||||
print("Ready")
|
||||
|
||||
with ThreadPoolExecutor(max_workers=8) as executor:
|
||||
# With means that when exiting the "executor" will be cleanly shut down
|
||||
# as executor is an object that is then used to give jobs to
|
||||
|
||||
while True:
|
||||
# Get netentry data ig?
|
||||
sock.settimeout(5)
|
||||
|
||||
|
||||
payload, client = sock.recvfrom(4096) # experimental, tested with 1464 bytes
|
||||
# Tajes UPD packets that are at max 4096 bytes
|
||||
# payload has the raw netflow data
|
||||
# client has source IP address as well as port
|
||||
|
||||
p = netflow.parse_packet(payload) # Test result: <ExportPacket v5 with 30 records>
|
||||
#print(p.entrys) # Test result: 5
|
||||
|
||||
# Submit all entries to thread pool
|
||||
futures = [executor.submit(process_flow, i, entry) for i, entry in enumerate(p.flows, 1)]
|
||||
# Big thinkg happen here
|
||||
# Here I give an executor a job. That job is to run function <process_flow> with arguments i and entry. Then it becomes one thread
|
||||
# Furthermore for each entry, so flow record, we submit a task to a thread
|
||||
# In comparasion, without multithreading it only had one for function
|
||||
# for i, entry in enumerate(p.flows, 1)
|
||||
# And the results from a job on a thread (executor) are stored in futures ____list____
|
||||
|
||||
|
||||
bigDict = {}
|
||||
inflxdb_Datazz_To_Send = []
|
||||
|
||||
for future in futures:
|
||||
i, point, inEntry = future.result()
|
||||
# goes through every job done by executor.
|
||||
# i is being reused from the original enumerate
|
||||
# point is the InfluxDB-ready data object
|
||||
# inEntry is what a single flow in the 30 flow dictionary in Netflow
|
||||
|
||||
inflxdb_Datazz_To_Send.append(point)
|
||||
bigDict[i] = inEntry
|
||||
|
||||
# Send data to InfluxDB
|
||||
write_api.write(bucket=bucket, org=org, record=inflxdb_Datazz_To_Send)
|
||||
time.sleep(INFLX_SEPARATE_POINTS) # separate points
|
||||
|
||||
print(f"{len(bigDict)} <--- This many entrys")
|
||||
|
||||
# Clean up before another loop
|
||||
bigDict.clear()
|
||||
inflxdb_Datazz_To_Send.clear()
|
||||
#print(bigDict)
|
||||
|
Binary file not shown.
@ -1,18 +0,0 @@
|
||||
import IP2Location
|
||||
from typing import Optional, Annotated
|
||||
|
||||
# Load database once
|
||||
ip2loc_db = IP2Location.IP2Location("IP2LOCATION-LITE-DB9.BIN")
|
||||
|
||||
def ermWhatTheCountry(inpIpAddress: Annotated[str, "Some IP address that ya want to get country for"]):
|
||||
try:
|
||||
skibidi = ip2loc_db.get_all(inpIpAddress)
|
||||
|
||||
#return rec.country_long # Full country name, e.g. "Sweden"
|
||||
return skibidi.country_short
|
||||
|
||||
except Exception as errrrrr:
|
||||
return f"Error: {errrrrr}"
|
||||
|
||||
#print(ermWhatTheCountry("65.109.142.32"))
|
||||
|
@ -1,24 +0,0 @@
|
||||
[Unit]
|
||||
Description=Netflow to InfluxDB script
|
||||
After=multi-user.target network.target network-online.target
|
||||
|
||||
[Service]
|
||||
Type=simple
|
||||
Restart=on-failure
|
||||
# EnvironmentFile=/etc/NetFlux/netflow.env
|
||||
# User=myuser
|
||||
WorkingDirectory=/etc/NetFlux/LH/
|
||||
ExecStart=/etc/NetFlux/LH/venv/bin/python3 /etc/NetFlux/LH/LHnetflow.py --serve-in-foreground
|
||||
# ExecStart=/usr/bin/python3 /usr/bin/NetFlux/INFLUXDBmthrd.py
|
||||
# StandardInput=tty-force
|
||||
|
||||
# Log file will be create if it doesn't exist
|
||||
StandardOutput=append:/var/log/LHNetFlowInflux.log
|
||||
StandardError=append:/var/log/LHNetFlowInflux.errlog
|
||||
|
||||
# StandardOutput=syslog
|
||||
# StandardError=syslog
|
||||
# SyslogIdentifier=NetFlowInflux
|
||||
|
||||
[Install]
|
||||
WantedBy=multi-user.target
|
@ -1,13 +0,0 @@
|
||||
#Put it in /etc/rsyslog.d/
|
||||
|
||||
if $programname == 'NetFlowInflux' then /var/log/NetFlowInflux.log
|
||||
& stop
|
||||
|
||||
|
||||
#then run
|
||||
#
|
||||
#touch /var/log/yapyap
|
||||
#chown syslog /var/log/yapyap
|
||||
#ls -l /var/log/yapyap.log
|
||||
# And then
|
||||
# systemctl restart rsyslog
|
@ -1,7 +0,0 @@
|
||||
INFLUX_TOKEN=""
|
||||
INFLUX_BUCKET=""
|
||||
INFLUX_ORG=""
|
||||
INFLUX_URL=""
|
||||
INFLUX_MEASUREMENT=""
|
||||
INFLUX_MACHINE_TAG=""
|
||||
INFLUX_ROUTER_TAG=""
|
@ -1,24 +0,0 @@
|
||||
[Unit]
|
||||
Description=Netflow to InfluxDB script
|
||||
After=multi-user.target network.target network-online.target
|
||||
|
||||
[Service]
|
||||
Type=simple
|
||||
Restart=on-failure
|
||||
# EnvironmentFile=/etc/NetFlux/netflow.env
|
||||
# User=myuser
|
||||
WorkingDirectory=/etc/NetFlux//
|
||||
ExecStart=/etc/NetFlux//venv/bin/python3 /etc/NetFlux//HAOSnetflow.py --serve-in-foreground
|
||||
# ExecStart=/usr/bin/python3 /usr/bin/NetFlux/INFLUXDBmthrd.py
|
||||
StandardInput=tty-force
|
||||
|
||||
# Log file will be create if it doesn't exist
|
||||
StandardOutput=append:/var/log/NetFlowInflux.log
|
||||
StandardError=append:/var/log/NetFlowInflux.errlog
|
||||
|
||||
# StandardOutput=syslog
|
||||
# StandardError=syslog
|
||||
# SyslogIdentifier=NetFlowInflux
|
||||
|
||||
[Install]
|
||||
WantedBy=multi-user.target
|
@ -1,178 +0,0 @@
|
||||
from typing import Optional, Annotated
|
||||
|
||||
# Source
|
||||
# https://en.wikipedia.org/wiki/List_of_IP_protocol_numbers
|
||||
PROTO_MAP = {
|
||||
0: "HOPOPT",
|
||||
1: "ICMP",
|
||||
2: "IGMP",
|
||||
3: "GGP",
|
||||
4: "IPv4",
|
||||
5: "ST",
|
||||
6: "TCP",
|
||||
7: "CBT",
|
||||
8: "EGP",
|
||||
9: "IGP",
|
||||
10: "BBN-RCC-MON",
|
||||
11: "NVP-II",
|
||||
12: "PUP",
|
||||
13: "ARGUS",
|
||||
14: "EMCON",
|
||||
15: "XNET",
|
||||
16: "CHAOS",
|
||||
17: "UDP",
|
||||
18: "MUX",
|
||||
19: "DCN-MEAS",
|
||||
20: "HMP",
|
||||
21: "PRM",
|
||||
22: "XNS-IDP",
|
||||
23: "TRUNK-1",
|
||||
24: "TRUNK-2",
|
||||
25: "LEAF-1",
|
||||
26: "LEAF-2",
|
||||
27: "RDP",
|
||||
28: "IRTP",
|
||||
29: "ISO-TP4",
|
||||
30: "NETBLT",
|
||||
31: "MFE-NSP",
|
||||
32: "MERIT-INP",
|
||||
33: "DCCP",
|
||||
34: "3PC",
|
||||
35: "IDPR",
|
||||
36: "XTP",
|
||||
37: "DDP",
|
||||
38: "IDPR-CMTP",
|
||||
39: "TP++",
|
||||
40: "IL",
|
||||
41: "IPv6",
|
||||
42: "SDRP",
|
||||
43: "IPv6-Route",
|
||||
44: "IPv6-Frag",
|
||||
45: "IDRP",
|
||||
46: "RSVP",
|
||||
47: "GRE",
|
||||
48: "DSR",
|
||||
49: "BNA",
|
||||
50: "ESP",
|
||||
51: "AH",
|
||||
52: "I-NLSP",
|
||||
53: "SWIPE",
|
||||
54: "NARP",
|
||||
55: "MOBILE",
|
||||
56: "TLSP",
|
||||
57: "SKIP",
|
||||
58: "IPv6-ICMP",
|
||||
59: "IPv6-NoNxt",
|
||||
60: "IPv6-Opts",
|
||||
61: "ANY_HOST_INTERNAL",
|
||||
62: "CFTP",
|
||||
63: "ANY_LOCAL_NETWORK",
|
||||
64: "SAT-EXPAK",
|
||||
65: "KRYPTOLAN",
|
||||
66: "RVD",
|
||||
67: "IPPC",
|
||||
68: "ANY_DISTRIBUTED_FS",
|
||||
69: "SAT-MON",
|
||||
70: "VISA",
|
||||
71: "IPCV",
|
||||
72: "CPNX",
|
||||
73: "CPHB",
|
||||
74: "WSN",
|
||||
75: "PVP",
|
||||
76: "BR-SAT-MON",
|
||||
77: "SUN-ND",
|
||||
78: "WB-MON",
|
||||
79: "WB-EXPAK",
|
||||
80: "ISO-IP",
|
||||
81: "VMTP",
|
||||
82: "SECURE-VMTP",
|
||||
83: "VINES",
|
||||
84: "TTP",
|
||||
85: "NSFNET-IGP",
|
||||
86: "DGP",
|
||||
87: "TCF",
|
||||
88: "EIGRP",
|
||||
89: "OSPF",
|
||||
90: "Sprite-RPC",
|
||||
91: "LARP",
|
||||
92: "MTP",
|
||||
93: "AX.25",
|
||||
94: "IPIP",
|
||||
95: "MICP",
|
||||
96: "SCC-SP",
|
||||
97: "ETHERIP",
|
||||
98: "ENCAP",
|
||||
99: "ANY_PRIVATE_ENCRYPTION",
|
||||
100: "GMTP",
|
||||
101: "IFMP",
|
||||
102: "PNNI",
|
||||
103: "PIM",
|
||||
104: "ARIS",
|
||||
105: "SCPS",
|
||||
106: "QNX",
|
||||
107: "A/N",
|
||||
108: "IPComp",
|
||||
109: "SNP",
|
||||
110: "Compaq-Peer",
|
||||
111: "IPX-in-IP",
|
||||
112: "VRRP",
|
||||
113: "PGM",
|
||||
114: "ANY_0_HOP",
|
||||
115: "L2TP",
|
||||
116: "DDX",
|
||||
117: "IATP",
|
||||
118: "STP",
|
||||
119: "SRP",
|
||||
120: "UTI",
|
||||
121: "SMP",
|
||||
122: "SM",
|
||||
123: "PTP",
|
||||
124: "ISIS over IPv4",
|
||||
125: "FIRE",
|
||||
126: "CRTP",
|
||||
127: "CRUDP",
|
||||
128: "SSCOPMCE",
|
||||
129: "IPLT",
|
||||
130: "SPS",
|
||||
131: "PIPE",
|
||||
132: "SCTP",
|
||||
133: "FC",
|
||||
134: "RSVP-E2E-IGNORE",
|
||||
135: "Mobility Header",
|
||||
136: "UDPLite",
|
||||
137: "MPLS-in-IP",
|
||||
138: "manet",
|
||||
139: "HIP",
|
||||
140: "Shim6",
|
||||
141: "WESP",
|
||||
142: "ROHC",
|
||||
143: "Ethernet",
|
||||
144: "AGGFRAG",
|
||||
145: "NSH"
|
||||
|
||||
}
|
||||
|
||||
|
||||
def manWhatTheProto(inpProtoNumbrMaybe: Annotated[int, "Protocol number goes here"]):
|
||||
|
||||
if inpProtoNumbrMaybe <= 145:
|
||||
return PROTO_MAP.get(inpProtoNumbrMaybe)
|
||||
elif inpProtoNumbrMaybe >= 146 and inpProtoNumbrMaybe <= 252:
|
||||
return "Unassigned"
|
||||
elif inpProtoNumbrMaybe >= 253 and inpProtoNumbrMaybe <= 254:
|
||||
# Use for experimentation and testing
|
||||
return "RFC3692"
|
||||
elif inpProtoNumbrMaybe == 255:
|
||||
return "Reserved"
|
||||
elif inpProtoNumbrMaybe not in PROTO_MAP:
|
||||
return inpProtoNumbrMaybe
|
||||
else:
|
||||
return "no"
|
||||
|
||||
#outPotentialProtoNameIfItExistsInInternalList = PROTO_MAP.get(inpProtoNumbrMaybe)
|
||||
|
||||
|
||||
|
||||
|
||||
#print(manWhatTheProto(253))
|
||||
#print( PROTO_MAP.get(2))
|
@ -1,8 +0,0 @@
|
||||
netflow==0.12.2
|
||||
#socket==
|
||||
#json==
|
||||
influxdb-client==1.48.0
|
||||
ipaddress==1.0.23
|
||||
typing==3.7.4.3
|
||||
IP2Location==8.10.5
|
||||
nslookup==1.8.1
|
@ -1,34 +0,0 @@
|
||||
#!/bin/bash
|
||||
|
||||
# -----------------------------
|
||||
# IP2Location DB1 Updater
|
||||
# -----------------------------
|
||||
|
||||
# Change this URL to your personal download link if needed
|
||||
DOWNLOAD_URL="https://www.ip2location.com/download/?token=MkyoKFL854ID0FOWoeTWVRsw0SVcbA7ey6tvuzHchsIQ6AMGy7YXIDfwrEEA4Ozn&file=DB9LITEBIN"
|
||||
|
||||
# Define filenames
|
||||
ZIP_FILE="IP2LOCATION-LITE-DB9.BIN.ZIP"
|
||||
BIN_FILE="IP2LOCATION-LITE-DB9.BIN"
|
||||
|
||||
echo "Downloading latest IP2Location DB1..."
|
||||
curl -L -o "$ZIP_FILE" "$DOWNLOAD_URL"
|
||||
|
||||
if [ $? -ne 0 ]; then
|
||||
echo "Download failed!"
|
||||
exit 1
|
||||
fi
|
||||
|
||||
echo "Unzipping BIN file..."
|
||||
unzip -o "$ZIP_FILE"
|
||||
|
||||
if [ ! -f "$BIN_FILE" ]; then
|
||||
echo "Unzip failed or $BIN_FILE not found."
|
||||
exit 1
|
||||
fi
|
||||
|
||||
echo "Cleaning up ZIP..."
|
||||
rm "$ZIP_FILE"
|
||||
|
||||
echo "Update complete. BIN file ready: $BIN_FILE"
|
||||
|
@ -1,78 +0,0 @@
|
||||
#from nslookup import Nslookup
|
||||
from typing import Optional, Annotated
|
||||
import dns, dns.resolver
|
||||
|
||||
# https://www.codeunderscored.com/nslookup-python/
|
||||
|
||||
def ermWhatATheIpFromDomainYaCrazy(inpDomainNameOrSomething: Annotated[str, "Domain name to lookup IP for"]):
|
||||
#dns_query = Nslookup()
|
||||
"""
|
||||
Tells you what IPv4 address/es a domain point to.
|
||||
Returns:
|
||||
dict: A dictionary with IP addresses associated with that domain.
|
||||
|
||||
"""
|
||||
|
||||
# i = 0
|
||||
outDict = {}
|
||||
|
||||
#result = dns_query.dns_lookup("example.com")
|
||||
#result = Nslookup.dns_lookup(inpDomainNameOrSomething)
|
||||
result = dns.resolver.resolve(inpDomainNameOrSomething, 'A')
|
||||
for i, something in enumerate(result):
|
||||
outDict[i] = something.to_text()
|
||||
# i += 1
|
||||
|
||||
return outDict
|
||||
|
||||
def ermWhatAAAATheIpFromDomainYaCrazy(inpDomainNameOrSomething: Annotated[str, "Domain name to lookup IP for"]):
|
||||
#dns_query = Nslookup()
|
||||
"""
|
||||
Tells you what IPv6 address/es a domain point to.
|
||||
Returns:
|
||||
dict: A dictionary with IP addresses associated with that domain.
|
||||
|
||||
"""
|
||||
|
||||
|
||||
# i = 0
|
||||
outDict = {}
|
||||
|
||||
#result = dns_query.dns_lookup("example.com")
|
||||
#result = Nslookup.dns_lookup(inpDomainNameOrSomething)
|
||||
result = dns.resolver.resolve(inpDomainNameOrSomething, 'AAAA')
|
||||
for i, something in enumerate(result):
|
||||
outDict[i] = something.to_text()
|
||||
# i += 1
|
||||
|
||||
return outDict
|
||||
|
||||
|
||||
def ermWhatPTRTheIpFromDomainYaCrazy(inpIpAddressOrSomething: Annotated[str, "IP address to lookup domain for"]):
|
||||
#dns_query = Nslookup()
|
||||
"""
|
||||
Tells you what IPv6 address/es a domain point to.
|
||||
Returns:
|
||||
dict: A dictionary with IP addresses associated with that domain.
|
||||
|
||||
"""
|
||||
|
||||
whatToCheck = inpIpAddressOrSomething + ".in-addr.arpa"
|
||||
|
||||
|
||||
# i = 0
|
||||
outDict = {}
|
||||
|
||||
#result = dns_query.dns_lookup("example.com")
|
||||
#result = Nslookup.dns_lookup(inpDomainNameOrSomething)
|
||||
result = dns.resolver.resolve(whatToCheck, 'PTR')
|
||||
for i, something in enumerate(result):
|
||||
outDict[i] = something.to_text()
|
||||
# i += 1
|
||||
|
||||
return outDict
|
||||
|
||||
|
||||
#print(ermWhatATheIpFromDomainYaCrazy("fubukus.net"))
|
||||
#print(ermWhatAAAATheIpFromDomainYaCrazy("fubukus.net"))
|
||||
#print(ermWhatPTRTheIpFromDomainYaCrazy("192.168.1.226"))
|
Loading…
x
Reference in New Issue
Block a user