Recherche de site Web

Comment construire un tableau de bord de trafic réseau en temps réel avec Python et rationaliser


Avez-vous déjà eu envie de visualiser le trafic de votre réseau en temps réel ? Dans ce didacticiel, vous apprendrez à créer un tableau de bord interactif d'analyse du trafic réseau avec Python et Streamlit. Streamlit est un framework Python open source que vous pouvez utiliser pour développer des applications Web pour l'analyse et le traitement des données.

À la fin de ce didacticiel, vous saurez comment capturer les paquets réseau bruts de la NIC (Network Interface Card) de votre ordinateur, traiter les données et créer de superbes visualisations qui seront mises à jour en temps réel.

Table des matières

  • Pourquoi l’analyse du trafic réseau est-elle importante ?

  • Conditions préalables

  • Comment configurer votre projet

  • Comment construire les fonctionnalités principales

  • Comment créer les visualisations rationalisées

  • Comment capturer les paquets réseau

  • Tout rassembler

  • Améliorations futures

  • Conclusion

Pourquoi l’analyse du trafic réseau est-elle importante ?

L'analyse du trafic réseau est une exigence essentielle dans les entreprises où les réseaux constituent l'épine dorsale de presque toutes les applications et services. Au cœur de celui-ci, nous avons l'analyse des paquets réseau qui implique la surveillance du réseau, la capture de tout le trafic (entrée et sortie) et l'interprétation de ces paquets lorsqu'ils circulent à travers un réseau. Vous pouvez utiliser cette technique pour identifier les modèles de sécurité, détecter les anomalies et garantir la sécurité et l'efficacité du réseau.

Ce projet de preuve de concept sur lequel nous travaillerons dans ce tutoriel est particulièrement utile car il vous aide à visualiser et à analyser l'activité du réseau en temps réel. Et cela vous permettra de comprendre comment les problèmes de dépannage, les optimisations des performances et l'analyse de sécurité se font dans les systèmes d'entreprise.

Condition préalable

  • Python 3.8 ou une version plus récente installée sur votre système.

  • Une compréhension de base des concepts de réseaux informatiques.

  • Familiarité avec le langage de programmation Python et ses bibliothèques largement utilisées.

  • Connaissance de base des techniques de visualisation de données et des bibliothèques.

Comment configurer votre projet

Pour commencer, créez la structure du projet et installez les outils nécessaires avec Pip avec les commandes suivantes :

mkdir network-dashboard
cd network-dashboard
pip install streamlit pandas scapy plotly

Nous utiliserons streamlit pour les visualisations du tableau de bord, pandas pour le traitement des données, Scapy pour la capture du paquet réseau et le traitement des paquets, et enfin Tracly pour tracer des graphiques avec nos données collectées.

Comment construire les fonctionnalités principales

Nous mettrons tout le code dans un seul fichier nommé dashboard.py. Tout d’abord, commençons par importer tous les éléments que nous allons utiliser :

import streamlit as st
import pandas as pd
import plotly.express as px
import plotly.graph_objects as go
from scapy.all import *
from collections import defaultdict
import time
from datetime import datetime
import threading
import warnings
import logging
from typing import Dict, List, Optional
import socket

Maintenant, configurons la journalisation en configurant une configuration de journalisation de base. Ceci sera utilisé pour suivre les événements et exécuter notre application en mode débogage. Nous avons actuellement défini le niveau de journalisation comme info , ce qui signifie que les événements avec le niveau info ou supérieur seront affichés. Si vous n'êtes pas familier avec la connexion à Python, je vous recommande de consulter cette pièce de documentation qui va en profondeur.

# Configure logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)

Ensuite, nous allons construire notre processeur de paquets. Nous implémenterons la fonctionnalité de traitement de nos paquets capturés dans cette classe.

class PacketProcessor:
    """Process and analyze network packets"""

    def __init__(self):
        self.protocol_map = {
            1: 'ICMP',
            6: 'TCP',
            17: 'UDP'
        }
        self.packet_data = []
        self.start_time = datetime.now()
        self.packet_count = 0
        self.lock = threading.Lock()

    def get_protocol_name(self, protocol_num: int) -> str:
        """Convert protocol number to name"""
        return self.protocol_map.get(protocol_num, f'OTHER({protocol_num})')

    def process_packet(self, packet) -> None:
        """Process a single packet and extract relevant information"""
        try:
            if IP in packet:
                with self.lock:
                    packet_info = {
                        'timestamp': datetime.now(),
                        'source': packet[IP].src,
                        'destination': packet[IP].dst,
                        'protocol': self.get_protocol_name(packet[IP].proto),
                        'size': len(packet),
                        'time_relative': (datetime.now() - self.start_time).total_seconds()
                    }

                    # Add TCP-specific information
                    if TCP in packet:
                        packet_info.update({
                            'src_port': packet[TCP].sport,
                            'dst_port': packet[TCP].dport,
                            'tcp_flags': packet[TCP].flags
                        })

                    # Add UDP-specific information
                    elif UDP in packet:
                        packet_info.update({
                            'src_port': packet[UDP].sport,
                            'dst_port': packet[UDP].dport
                        })

                    self.packet_data.append(packet_info)
                    self.packet_count += 1

                    # Keep only last 10000 packets to prevent memory issues
                    if len(self.packet_data) > 10000:
                        self.packet_data.pop(0)

        except Exception as e:
            logger.error(f"Error processing packet: {str(e)}")

    def get_dataframe(self) -> pd.DataFrame:
        """Convert packet data to pandas DataFrame"""
        with self.lock:
            return pd.DataFrame(self.packet_data)

Cette classe construira nos fonctionnalités de base et dispose de plusieurs fonctions utilitaires qui seront utilisées pour traiter les paquets.

Les paquets de réseau sont classés en deux au niveau du transport (TCP et UDP) et le protocole ICMP au niveau du réseau. Si vous n'êtes pas familier avec les concepts de TCP/IP, je vous recommande de consulter cet article sur FreeCodeCamp News.

Notre constructeur gardera la trace de tous les paquets vus qui sont classés dans ces seaux de type protocole TCP/IP que nous avons définis. Nous prendrons également note du temps de capture des paquets, des données capturées et du nombre de paquets capturés.

Nous tirons également parti d'un verrouillage de fil pour nous assurer qu'un seul paquet est traité en un seul moment. Cela peut être étendu pour permettre au projet d'avoir un traitement parallèle des paquets.

La fonction d'assistance get_protocol_name nous aide à obtenir le type correct de protocole en fonction de leurs numéros de protocole. Pour donner quelques informations à ce sujet, l'IANA (Internet Assigned Numbers Authority) attribue des numéros standardisés pour identifier les différents protocoles dans un paquet réseau. Au fur et à mesure que nous verrons ces numéros dans le paquet réseau analysé, nous saurons quel type de protocole est utilisé dans le paquet actuellement intercepté. Pour la portée de ce projet, nous mapperons uniquement sur TCP, UDP et ICMP (Ping). Si nous rencontrons un autre type de paquet, nous le catégoriserons comme OTHER().

La fonction process_packet gère notre fonctionnalité principale qui traitera ces paquets individuels. Si le paquet contient une couche IP, il prendra note des adresses IP source et de destination, du type de protocole, de la taille du paquet et du temps écoulé depuis le début de la capture du paquet.

Pour les paquets avec des protocoles de calque de transport spécifiques (comme TCP et UDP), nous capturerons les ports source et de destination ainsi que les drapeaux TCP pour les paquets TCP. Ces détails extraits seront stockés en mémoire dans la liste paquet_data . Nous garderons également une trace du packet_count au fur et à mesure que ces paquets sont traités.

La fonction get_dataframe nous aide à convertir la liste packet_data en une trame de données Pandas qui sera ensuite utilisée pour notre visualisation.

Comment créer les visualisations rationalisées

Il est maintenant temps pour nous de créer notre tableau de bord interactif Streamlit. Nous définirons une fonction appelée create_visualization dans le script dashboard.py (en dehors de notre classe de traitement de paquets).

def create_visualizations(df: pd.DataFrame):
    """Create all dashboard visualizations"""
    if len(df) > 0:
        # Protocol distribution
        protocol_counts = df['protocol'].value_counts()
        fig_protocol = px.pie(
            values=protocol_counts.values,
            names=protocol_counts.index,
            title="Protocol Distribution"
        )
        st.plotly_chart(fig_protocol, use_container_width=True)

        # Packets timeline
        df['timestamp'] = pd.to_datetime(df['timestamp'])
        df_grouped = df.groupby(df['timestamp'].dt.floor('S')).size()
        fig_timeline = px.line(
            x=df_grouped.index,
            y=df_grouped.values,
            title="Packets per Second"
        )
        st.plotly_chart(fig_timeline, use_container_width=True)

        # Top source IPs
        top_sources = df['source'].value_counts().head(10)
        fig_sources = px.bar(
            x=top_sources.index,
            y=top_sources.values,
            title="Top Source IP Addresses"
        )
        st.plotly_chart(fig_sources, use_container_width=True)

Cette fonction prendra la trame de données en entrée et nous aidera à tracer trois graphiques/graphiques:

  1. Tableau de distribution des protocoles: ce graphique affichera la proportion de différents protocoles (par exemple, TCP, UDP, ICMP) dans le trafic de paquets capturés.

  2. Graphique chronologique des paquets : ce graphique affichera le nombre de paquets traités par seconde sur une période de temps.

  3. Tableau d'adresses IP des meilleures source: ce graphique mettra en évidence les 10 meilleures adresses IP qui ont envoyé le plus de paquets dans le trafic capturé.

Le graphique de distribution des protocoles est simplement un diagramme circulaire du nombre de protocoles pour les trois types différents (avec AUTRES). Nous utilisons les outils Python Streamlit et Plotly pour tracer ces graphiques. Puisque nous avons également noté l'horodatage depuis le début de la capture des paquets, nous utiliserons ces données pour tracer la tendance des paquets capturés au fil du temps.

Pour le deuxième graphique, nous effectuerons une opération groupby sur les données et obtiendrons le nombre de paquets capturés par seconde (S signifie secondes), puis enfin nous ferons tracer le graphique.

Enfin, pour le troisième tableau, nous compterons les IP source distincts observés et le tracé un tableau de l'IP compte pour afficher les 10 premiers IP.

Comment capturer les paquets réseau

Maintenant, créons la fonctionnalité pour nous permettre de capturer des données de paquets de réseau.

def start_packet_capture():
    """Start packet capture in a separate thread"""
    processor = PacketProcessor()

    def capture_packets():
        sniff(prn=processor.process_packet, store=False)

    capture_thread = threading.Thread(target=capture_packets, daemon=True)
    capture_thread.start()

    return processor

Il s'agit d'une fonction simple qui instancie la classe PacketProcessor , puis utilise la fonction sniff dans le module SCAPY pour commencer à capturer les paquets.

Nous utilisons le filetage ici pour nous permettre de capturer des paquets indépendamment du flux principal du programme. Cela garantit que l'opération de capture de paquets ne bloque pas d'autres opérations comme la mise à jour du tableau de bord en temps réel. Nous renvoyons également l'instance créée PacketProcessor afin qu'elle puisse être utilisée dans notre programme principal.

Tout rassembler

Maintenant, cousons toutes ces pièces avec notre fonction Main qui agira comme la fonction du pilote pour notre programme.

def main():
    """Main function to run the dashboard"""
    st.set_page_config(page_title="Network Traffic Analysis", layout="wide")
    st.title("Real-time Network Traffic Analysis")

    # Initialize packet processor in session state
    if 'processor' not in st.session_state:
        st.session_state.processor = start_packet_capture()
        st.session_state.start_time = time.time()

    # Create dashboard layout
    col1, col2 = st.columns(2)

    # Get current data
    df = st.session_state.processor.get_dataframe()

    # Display metrics
    with col1:
        st.metric("Total Packets", len(df))
    with col2:
        duration = time.time() - st.session_state.start_time
        st.metric("Capture Duration", f"{duration:.2f}s")

    # Display visualizations
    create_visualizations(df)

    # Display recent packets
    st.subheader("Recent Packets")
    if len(df) > 0:
        st.dataframe(
            df.tail(10)[['timestamp', 'source', 'destination', 'protocol', 'size']],
            use_container_width=True
        )

    # Add refresh button
    if st.button('Refresh Data'):
        st.rerun()

    # Auto refresh
    time.sleep(2)
    st.rerun()

Cette fonction instanciera également le Streamlit Dashboard et intégrera tous nos composants ensemble. Nous définissons d'abord le titre de la page de notre Streamlit Dashboard, puis initialisons notre PacketProcessor . Nous utilisons l'état de session dans Streamlit pour nous assurer qu'une seule instance de capture de paquets est créée et que l'état est conservé.

Maintenant, nous obtiendrons dynamiquement le DataFrame à partir de l'état de session à chaque fois que les données sont traitées et commencerons à afficher les métriques et les visualisations. Nous afficherons également les paquets récemment capturés ainsi que des informations telles que l'horodatage, la source et les IP de destination, le protocole et la taille du paquet. Nous ajouterons également la possibilité à l'utilisateur de rafraîchir manuellement les données du tableau de bord alors que nous la rafraîchissons également toutes les deux secondes.

Exécutons enfin le programme avec la commande suivante:

sudo streamlit run dashboard.py

Notez que vous devrez exécuter le programme avec sudo car les capacités de capture de paquets nécessitent des privilèges administratifs. Si vous êtes sous Windows, ouvrez votre terminal en tant qu'administrateur, puis exécutez le programme sans le préfixe sudo .

Attendez un moment pour que le programme commence à capturer les paquets. Si tout se passe bien, vous devriez voir quelque chose comme ceci :

Ce sont toutes les visualisations que nous venons d'implémenter dans notre programme de tableau de bord Streamlit.

Améliorations futures

Sur ce, voici quelques idées d'améliorations futures que vous pourrez utiliser pour étendre les fonctionnalités du tableau de bord :

  1. Ajoutez des capacités d'apprentissage automatique pour la détection des anomalies

  2. Implémenter la cartographie IP géographique

  3. Créez des alertes personnalisées basées sur des modèles d'analyse du trafic

  4. Ajouter des options d'analyse de la charge utile des paquets

Conclusion

Félicitations! Vous avez maintenant construit avec succès un tableau de bord d'analyse du trafic réseau en temps réel avec Python et Streamlit . Ce programme fournira des informations précieuses sur le comportement du réseau et peut être étendue pour divers cas d'utilisation, de la surveillance de la sécurité à l'optimisation du réseau.

Avec cela, j'espère que vous avez appris certaines bases sur l'analyse du trafic réseau ainsi qu'un peu de programmation Python. Merci d'avoir lu!