Comment construire un tableau de bord de trafic réseau en temps réel avec Python et rationaliser
Avez-vous déjà eu envie de visualiser le trafic de votre réseau en temps réel ? Dans ce didacticiel, vous apprendrez à créer un tableau de bord interactif d'analyse du trafic réseau avec Python et Streamlit
. Streamlit
est un framework Python open source que vous pouvez utiliser pour développer des applications Web pour l'analyse et le traitement des données.
À la fin de ce didacticiel, vous saurez comment capturer les paquets réseau bruts de la NIC (Network Interface Card) de votre ordinateur, traiter les données et créer de superbes visualisations qui seront mises à jour en temps réel.
Table des matières
Pourquoi l’analyse du trafic réseau est-elle importante ?
Conditions préalables
-
Comment configurer votre projet
Comment construire les fonctionnalités principales
Comment créer les visualisations rationalisées
Comment capturer les paquets réseau
Tout rassembler
Améliorations futures
Conclusion
Pourquoi l’analyse du trafic réseau est-elle importante ?
L'analyse du trafic réseau est une exigence essentielle dans les entreprises où les réseaux constituent l'épine dorsale de presque toutes les applications et services. Au cœur de celui-ci, nous avons l'analyse des paquets réseau qui implique la surveillance du réseau, la capture de tout le trafic (entrée et sortie) et l'interprétation de ces paquets lorsqu'ils circulent à travers un réseau. Vous pouvez utiliser cette technique pour identifier les modèles de sécurité, détecter les anomalies et garantir la sécurité et l'efficacité du réseau.
Ce projet de preuve de concept sur lequel nous travaillerons dans ce tutoriel est particulièrement utile car il vous aide à visualiser et à analyser l'activité du réseau en temps réel. Et cela vous permettra de comprendre comment les problèmes de dépannage, les optimisations des performances et l'analyse de sécurité se font dans les systèmes d'entreprise.
Condition préalable
Python 3.8 ou une version plus récente installée sur votre système.
-
Une compréhension de base des concepts de réseaux informatiques.
Familiarité avec le langage de programmation Python et ses bibliothèques largement utilisées.
Connaissance de base des techniques de visualisation de données et des bibliothèques.
Comment configurer votre projet
Pour commencer, créez la structure du projet et installez les outils nécessaires avec Pip avec les commandes suivantes :
mkdir network-dashboard
cd network-dashboard
pip install streamlit pandas scapy plotly
Nous utiliserons streamlit
pour les visualisations du tableau de bord, pandas
pour le traitement des données, Scapy
pour la capture du paquet réseau et le traitement des paquets, et enfin Tracly
pour tracer des graphiques avec nos données collectées.
Comment construire les fonctionnalités principales
Nous mettrons tout le code dans un seul fichier nommé dashboard.py
. Tout d’abord, commençons par importer tous les éléments que nous allons utiliser :
import streamlit as st
import pandas as pd
import plotly.express as px
import plotly.graph_objects as go
from scapy.all import *
from collections import defaultdict
import time
from datetime import datetime
import threading
import warnings
import logging
from typing import Dict, List, Optional
import socket
Maintenant, configurons la journalisation en configurant une configuration de journalisation de base. Ceci sera utilisé pour suivre les événements et exécuter notre application en mode débogage. Nous avons actuellement défini le niveau de journalisation comme info
, ce qui signifie que les événements avec le niveau info
ou supérieur seront affichés. Si vous n'êtes pas familier avec la connexion à Python, je vous recommande de consulter cette pièce de documentation qui va en profondeur.
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
Ensuite, nous allons construire notre processeur de paquets. Nous implémenterons la fonctionnalité de traitement de nos paquets capturés dans cette classe.
class PacketProcessor:
"""Process and analyze network packets"""
def __init__(self):
self.protocol_map = {
1: 'ICMP',
6: 'TCP',
17: 'UDP'
}
self.packet_data = []
self.start_time = datetime.now()
self.packet_count = 0
self.lock = threading.Lock()
def get_protocol_name(self, protocol_num: int) -> str:
"""Convert protocol number to name"""
return self.protocol_map.get(protocol_num, f'OTHER({protocol_num})')
def process_packet(self, packet) -> None:
"""Process a single packet and extract relevant information"""
try:
if IP in packet:
with self.lock:
packet_info = {
'timestamp': datetime.now(),
'source': packet[IP].src,
'destination': packet[IP].dst,
'protocol': self.get_protocol_name(packet[IP].proto),
'size': len(packet),
'time_relative': (datetime.now() - self.start_time).total_seconds()
}
# Add TCP-specific information
if TCP in packet:
packet_info.update({
'src_port': packet[TCP].sport,
'dst_port': packet[TCP].dport,
'tcp_flags': packet[TCP].flags
})
# Add UDP-specific information
elif UDP in packet:
packet_info.update({
'src_port': packet[UDP].sport,
'dst_port': packet[UDP].dport
})
self.packet_data.append(packet_info)
self.packet_count += 1
# Keep only last 10000 packets to prevent memory issues
if len(self.packet_data) > 10000:
self.packet_data.pop(0)
except Exception as e:
logger.error(f"Error processing packet: {str(e)}")
def get_dataframe(self) -> pd.DataFrame:
"""Convert packet data to pandas DataFrame"""
with self.lock:
return pd.DataFrame(self.packet_data)
Cette classe construira nos fonctionnalités de base et dispose de plusieurs fonctions utilitaires qui seront utilisées pour traiter les paquets.
Les paquets de réseau sont classés en deux au niveau du transport (TCP et UDP) et le protocole ICMP au niveau du réseau. Si vous n'êtes pas familier avec les concepts de TCP/IP, je vous recommande de consulter cet article sur FreeCodeCamp News.
Notre constructeur gardera la trace de tous les paquets vus qui sont classés dans ces seaux de type protocole TCP/IP que nous avons définis. Nous prendrons également note du temps de capture des paquets, des données capturées et du nombre de paquets capturés.
Nous tirons également parti d'un verrouillage de fil pour nous assurer qu'un seul paquet est traité en un seul moment. Cela peut être étendu pour permettre au projet d'avoir un traitement parallèle des paquets.
La fonction d'assistance get_protocol_name
nous aide à obtenir le type correct de protocole en fonction de leurs numéros de protocole. Pour donner quelques informations à ce sujet, l'IANA (Internet Assigned Numbers Authority) attribue des numéros standardisés pour identifier les différents protocoles dans un paquet réseau. Au fur et à mesure que nous verrons ces numéros dans le paquet réseau analysé, nous saurons quel type de protocole est utilisé dans le paquet actuellement intercepté. Pour la portée de ce projet, nous mapperons uniquement sur TCP, UDP et ICMP (Ping). Si nous rencontrons un autre type de paquet, nous le catégoriserons comme OTHER(
.
La fonction process_packet
gère notre fonctionnalité principale qui traitera ces paquets individuels. Si le paquet contient une couche IP, il prendra note des adresses IP source et de destination, du type de protocole, de la taille du paquet et du temps écoulé depuis le début de la capture du paquet.
Pour les paquets avec des protocoles de calque de transport spécifiques (comme TCP et UDP), nous capturerons les ports source et de destination ainsi que les drapeaux TCP pour les paquets TCP. Ces détails extraits seront stockés en mémoire dans la liste paquet_data
. Nous garderons également une trace du packet_count
au fur et à mesure que ces paquets sont traités.
La fonction get_dataframe
nous aide à convertir la liste packet_data
en une trame de données Pandas
qui sera ensuite utilisée pour notre visualisation.
Comment créer les visualisations rationalisées
Il est maintenant temps pour nous de créer notre tableau de bord interactif Streamlit. Nous définirons une fonction appelée create_visualization
dans le script dashboard.py
(en dehors de notre classe de traitement de paquets).
def create_visualizations(df: pd.DataFrame):
"""Create all dashboard visualizations"""
if len(df) > 0:
# Protocol distribution
protocol_counts = df['protocol'].value_counts()
fig_protocol = px.pie(
values=protocol_counts.values,
names=protocol_counts.index,
title="Protocol Distribution"
)
st.plotly_chart(fig_protocol, use_container_width=True)
# Packets timeline
df['timestamp'] = pd.to_datetime(df['timestamp'])
df_grouped = df.groupby(df['timestamp'].dt.floor('S')).size()
fig_timeline = px.line(
x=df_grouped.index,
y=df_grouped.values,
title="Packets per Second"
)
st.plotly_chart(fig_timeline, use_container_width=True)
# Top source IPs
top_sources = df['source'].value_counts().head(10)
fig_sources = px.bar(
x=top_sources.index,
y=top_sources.values,
title="Top Source IP Addresses"
)
st.plotly_chart(fig_sources, use_container_width=True)
Cette fonction prendra la trame de données en entrée et nous aidera à tracer trois graphiques/graphiques:
Tableau de distribution des protocoles: ce graphique affichera la proportion de différents protocoles (par exemple, TCP, UDP, ICMP) dans le trafic de paquets capturés.
Graphique chronologique des paquets : ce graphique affichera le nombre de paquets traités par seconde sur une période de temps.
Tableau d'adresses IP des meilleures source: ce graphique mettra en évidence les 10 meilleures adresses IP qui ont envoyé le plus de paquets dans le trafic capturé.
Le graphique de distribution des protocoles est simplement un diagramme circulaire du nombre de protocoles pour les trois types différents (avec AUTRES). Nous utilisons les outils Python Streamlit
et Plotly
pour tracer ces graphiques. Puisque nous avons également noté l'horodatage depuis le début de la capture des paquets, nous utiliserons ces données pour tracer la tendance des paquets capturés au fil du temps.
Pour le deuxième graphique, nous effectuerons une opération groupby
sur les données et obtiendrons le nombre de paquets capturés par seconde (S
signifie secondes), puis enfin nous ferons tracer le graphique.
Enfin, pour le troisième tableau, nous compterons les IP source distincts observés et le tracé un tableau de l'IP compte pour afficher les 10 premiers IP.
Comment capturer les paquets réseau
Maintenant, créons la fonctionnalité pour nous permettre de capturer des données de paquets de réseau.
def start_packet_capture():
"""Start packet capture in a separate thread"""
processor = PacketProcessor()
def capture_packets():
sniff(prn=processor.process_packet, store=False)
capture_thread = threading.Thread(target=capture_packets, daemon=True)
capture_thread.start()
return processor
Il s'agit d'une fonction simple qui instancie la classe PacketProcessor
, puis utilise la fonction sniff
dans le module SCAPY
pour commencer à capturer les paquets.
Nous utilisons le filetage ici pour nous permettre de capturer des paquets indépendamment du flux principal du programme. Cela garantit que l'opération de capture de paquets ne bloque pas d'autres opérations comme la mise à jour du tableau de bord en temps réel. Nous renvoyons également l'instance créée PacketProcessor
afin qu'elle puisse être utilisée dans notre programme principal.
Tout rassembler
Maintenant, cousons toutes ces pièces avec notre fonction Main
qui agira comme la fonction du pilote pour notre programme.
def main():
"""Main function to run the dashboard"""
st.set_page_config(page_title="Network Traffic Analysis", layout="wide")
st.title("Real-time Network Traffic Analysis")
# Initialize packet processor in session state
if 'processor' not in st.session_state:
st.session_state.processor = start_packet_capture()
st.session_state.start_time = time.time()
# Create dashboard layout
col1, col2 = st.columns(2)
# Get current data
df = st.session_state.processor.get_dataframe()
# Display metrics
with col1:
st.metric("Total Packets", len(df))
with col2:
duration = time.time() - st.session_state.start_time
st.metric("Capture Duration", f"{duration:.2f}s")
# Display visualizations
create_visualizations(df)
# Display recent packets
st.subheader("Recent Packets")
if len(df) > 0:
st.dataframe(
df.tail(10)[['timestamp', 'source', 'destination', 'protocol', 'size']],
use_container_width=True
)
# Add refresh button
if st.button('Refresh Data'):
st.rerun()
# Auto refresh
time.sleep(2)
st.rerun()
Cette fonction instanciera également le Streamlit
Dashboard et intégrera tous nos composants ensemble. Nous définissons d'abord le titre de la page de notre Streamlit
Dashboard, puis initialisons notre PacketProcessor
. Nous utilisons l'état de session dans Streamlit
pour nous assurer qu'une seule instance de capture de paquets est créée et que l'état est conservé.
Maintenant, nous obtiendrons dynamiquement le DataFrame à partir de l'état de session à chaque fois que les données sont traitées et commencerons à afficher les métriques et les visualisations. Nous afficherons également les paquets récemment capturés ainsi que des informations telles que l'horodatage, la source et les IP de destination, le protocole et la taille du paquet. Nous ajouterons également la possibilité à l'utilisateur de rafraîchir manuellement les données du tableau de bord alors que nous la rafraîchissons également toutes les deux secondes.
Exécutons enfin le programme avec la commande suivante:
sudo streamlit run dashboard.py
Notez que vous devrez exécuter le programme avec sudo
car les capacités de capture de paquets nécessitent des privilèges administratifs. Si vous êtes sous Windows, ouvrez votre terminal en tant qu'administrateur, puis exécutez le programme sans le préfixe sudo
.
Attendez un moment pour que le programme commence à capturer les paquets. Si tout se passe bien, vous devriez voir quelque chose comme ceci :
Ce sont toutes les visualisations que nous venons d'implémenter dans notre programme de tableau de bord Streamlit
.
Améliorations futures
Sur ce, voici quelques idées d'améliorations futures que vous pourrez utiliser pour étendre les fonctionnalités du tableau de bord :
Ajoutez des capacités d'apprentissage automatique pour la détection des anomalies
Implémenter la cartographie IP géographique
Créez des alertes personnalisées basées sur des modèles d'analyse du trafic
Ajouter des options d'analyse de la charge utile des paquets
Conclusion
Félicitations! Vous avez maintenant construit avec succès un tableau de bord d'analyse du trafic réseau en temps réel avec Python et Streamlit
. Ce programme fournira des informations précieuses sur le comportement du réseau et peut être étendue pour divers cas d'utilisation, de la surveillance de la sécurité à l'optimisation du réseau.
Avec cela, j'espère que vous avez appris certaines bases sur l'analyse du trafic réseau ainsi qu'un peu de programmation Python. Merci d'avoir lu!