Traitement parallèle de gros fichiers en Python
Apprenez diverses techniques pour réduire le temps de traitement des données en utilisant le multitraitement, joblib et tqdm concurrent.
Pour le traitement parallèle, nous divisons notre tâche en sous-unités. Cela augmente le nombre de tâches traitées par le programme et réduit le temps de traitement global.
Par exemple, si vous travaillez avec un fichier CSV volumineux et que vous souhaitez modifier une seule colonne. Nous transmettrons les données sous forme de tableau à la fonction, et celle-ci traitera en parallèle plusieurs valeurs à la fois en fonction du nombre de travailleurs disponibles. Ces travailleurs sont basés sur le nombre de cœurs de votre processeur.
Remarque : l'utilisation du traitement parallèle sur un ensemble de données plus petit n'améliorera pas le temps de traitement.
Dans ce blog, nous apprendrons comment réduire le temps de traitement des fichiers volumineux à l'aide des packages multiprocessing, joblib et tqdm Python. Il s'agit d'un didacticiel simple qui peut s'appliquer à n'importe quel fichier, base de données, image, vidéo et audio.
Remarque : nous utilisons le notebook Kaggle pour les expériences. Le temps de traitement peut varier d'une machine à l'autre.
Commencer
Nous utiliserons l'ensemble de données sur les accidents aux États-Unis (2016-2021) de Kaggle, qui comprend 2,8 millions d'enregistrements et 47 colonnes.
Nous importerons multiprocessing
, joblib
et tqdm
pour le traitement parallèle, pandas
pour ingestions de données, et re
, nltk
et string
pour le traitement de texte.
# Parallel Computing
import multiprocessing as mp
from joblib import Parallel, delayed
from tqdm.notebook import tqdm
# Data Ingestion
import pandas as pd
# Text Processing
import re
from nltk.corpus import stopwords
import string
Avant de commencer, définissons n_workers
en doublant cpu_count()
. Comme vous pouvez le constater, nous avons 8 ouvriers.
n_workers = 2 * mp.cpu_count()
print(f"{n_workers} workers are available")
>>> 8 workers are available
Dans la prochaine étape, nous allons ingérer des fichiers CSV volumineux à l'aide de la fonction pandas read_csv
. Ensuite, imprimez la forme du dataframe, le nom des colonnes et le temps de traitement.
Remarque : La fonction magique de Jupyter
%%time
peut afficher les temps CPU et le temps mural à la fin du processus.
%%time
file_name="../input/us-accidents/US_Accidents_Dec21_updated.csv"
df = pd.read_csv(file_name)
print(f"Shape:{df.shape}\n\nColumn Names:\n{df.columns}\n")
Sortie
Shape:(2845342, 47)
Column Names:
Index(['ID', 'Severity', 'Start_Time', 'End_Time', 'Start_Lat', 'Start_Lng',
'End_Lat', 'End_Lng', 'Distance(mi)', 'Description', 'Number', 'Street',
'Side', 'City', 'County', 'State', 'Zipcode', 'Country', 'Timezone',
'Airport_Code', 'Weather_Timestamp', 'Temperature(F)', 'Wind_Chill(F)',
'Humidity(%)', 'Pressure(in)', 'Visibility(mi)', 'Wind_Direction',
'Wind_Speed(mph)', 'Precipitation(in)', 'Weather_Condition', 'Amenity',
'Bump', 'Crossing', 'Give_Way', 'Junction', 'No_Exit', 'Railway',
'Roundabout', 'Station', 'Stop', 'Traffic_Calming', 'Traffic_Signal',
'Turning_Loop', 'Sunrise_Sunset', 'Civil_Twilight', 'Nautical_Twilight',
'Astronomical_Twilight'],
dtype='object')
CPU times: user 33.9 s, sys: 3.93 s, total: 37.9 s
Wall time: 46.9 s
Nettoyer le texte
Le clean_text
est une fonction simple pour traiter et nettoyer le texte. Nous obtiendrons les mots vides en anglais en utilisant nltk.copus
et nous l'utiliserons pour filtrer les mots vides de la ligne de texte. Après cela, nous supprimerons les caractères spéciaux et les espaces supplémentaires de la phrase. Ce sera la fonction de base pour déterminer le temps de traitement pour le traitement série, parallèle et par lots .
def clean_text(text):
# Remove stop words
stops = stopwords.words("english")
text = " ".join([word for word in text.split() if word not in stops])
# Remove Special Characters
text = text.translate(str.maketrans('', '', string.punctuation))
# removing the extra spaces
text = re.sub(' +',' ', text)
return text
Traitement en série
Pour le traitement en série, nous pouvons utiliser la fonction pandas .apply()
, mais si vous souhaitez voir la barre de progression, vous devez activer tqdm pour pandas< puis utilisez la fonction .progress_apply()
.
Nous allons traiter les 2,8 millions d'enregistrements et enregistrer le résultat dans la colonne « Description ».
%%time
tqdm.pandas()
df['Description'] = df['Description'].progress_apply(clean_text)
Sortie
Il a fallu 9 minutes et 5 secondes au processeur haut de gamme pour traiter en série 2,8 millions de lignes.
100% 2845342/2845342 [09:05<00:00, 5724.25it/s]
CPU times: user 8min 14s, sys: 53.6 s, total: 9min 7s
Wall time: 9min 5s
Multitraitement
Il existe différentes manières de traiter le fichier en parallèle, et nous allons toutes les découvrir. Le multiprocessing
est un package Python intégré couramment utilisé pour le traitement parallèle de fichiers volumineux.
Nous allons créer un Pool multitraitement avec 8 travailleurs et utiliser la fonction map pour lancer le processus. Pour afficher les barres de progression, nous utilisons tqdm.
La fonction de carte se compose de deux sections. Le premier nécessite la fonction et le second nécessite un argument ou une liste d'arguments.
Apprenez-en davantage en lisant la documentation.
%%time
p = mp.Pool(n_workers)
df['Description'] = p.map(clean_text,tqdm(df['Description']))
Sortie
Nous avons amélioré notre temps de traitement de près de 3X. Le temps de traitement est passé de 9 minutes 5 secondes à 3 minutes 51 secondes.
100% 2845342/2845342 [02:58<00:00, 135646.12it/s]
CPU times: user 5.68 s, sys: 1.56 s, total: 7.23 s
Wall time: 3min 51s
Parallèle
Nous allons maintenant découvrir un autre package Python pour effectuer un traitement parallèle. Dans cette section, nous utiliserons les fonctions Parallèle et retardée de joblib pour répliquer la fonction map .
Le Parallèle nécessite deux arguments : n_jobs=8 et backend=multiprocessing.
- Ensuite, nous ajouterons clean_text à la fonction retardée.
- Créez une boucle pour alimenter une seule valeur à la fois.
Le processus ci-dessous est assez générique et vous pouvez modifier votre fonction et votre tableau en fonction de vos besoins. Je l'ai utilisé pour traiter des milliers de fichiers audio et vidéo sans aucun problème.
Recommandé : ajoutez la gestion des exceptions à l'aide de try:
et sauf :
def text_parallel_clean(array):
result = Parallel(n_jobs=n_workers,backend="multiprocessing")(
delayed(clean_text)
(text)
for text in tqdm(array)
)
return result
Ajoutez la colonne « Description » à text_parallel_clean()
.
%%time
df['Description'] = text_parallel_clean(df['Description'])
Sortie
Cela a pris à notre fonction 13 secondes de plus que le multitraitement du Pool. Même dans ce cas, le traitement Parallèle est 4 minutes et 59 secondes plus rapide que le traitement série.
100% 2845342/2845342 [04:03<00:00, 10514.98it/s]
CPU times: user 44.2 s, sys: 2.92 s, total: 47.1 s
Wall time: 4min 4s
Traitement par lots parallèle
Il existe une meilleure façon de traiter des fichiers volumineux en les divisant en lots et en les traitant en parallèle. Commençons par créer une fonction batch qui exécutera une clean_function
sur un seul lot de valeurs.
Fonction de traitement par lots
def proc_batch(batch):
return [
clean_text(text)
for text in batch
]
Diviser le fichier en lots
La fonction ci-dessous divisera le fichier en plusieurs lots en fonction du nombre de travailleurs. Dans notre cas, nous obtenons 8 lots.
def batch_file(array,n_workers):
file_len = len(array)
batch_size = round(file_len / n_workers)
batches = [
array[ix:ix+batch_size]
for ix in tqdm(range(0, file_len, batch_size))
]
return batches
batches = batch_file(df['Description'],n_workers)
>>> 100% 8/8 [00:00<00:00, 280.01it/s]
Exécution d'un traitement par lots parallèle
Enfin, nous utiliserons le Parallèle et le différé pour traiter les lots.
Remarque : Pour obtenir un seul tableau de valeurs, nous devons exécuter la compréhension de liste comme indiqué ci-dessous.
%%time
batch_output = Parallel(n_jobs=n_workers,backend="multiprocessing")(
delayed(proc_batch)
(batch)
for batch in tqdm(batches)
)
df['Description'] = [j for i in batch_output for j in i]
Sortie
Nous avons amélioré le délai de traitement. Cette technique est célèbre pour traiter des données complexes et former des modèles d’apprentissage profond.
100% 8/8 [00:00<00:00, 2.19it/s]
CPU times: user 3.39 s, sys: 1.42 s, total: 4.81 s
Wall time: 3min 56s
tqdm simultané
tqdm fait passer le multitraitement à un niveau supérieur. C'est simple et puissant. Je le recommanderai à tous les data scientists.
Consultez la documentation pour en savoir plus sur le multitraitement.
Le process_map
nécessite :
Nom de la fonction
- Colonne Dataframe
- max_workers
- la taille du mandrin est similaire à la taille du lot. Nous calculerons la taille du lot en utilisant le nombre de travailleurs ou vous pourrez ajouter le nombre en fonction de vos préférences.
%%time
from tqdm.contrib.concurrent import process_map
batch = round(len(df)/n_workers)
df["Description"] = process_map(
clean_text, df["Description"], max_workers=n_workers, chunksize=batch
)
Sortie
Avec une seule ligne de code, nous obtenons le meilleur résultat.
100% 2845342/2845342 [03:48<00:00, 1426320.93it/s]
CPU times: user 7.32 s, sys: 1.97 s, total: 9.29 s
Wall time: 3min 51s
Conclusion
Vous devez trouver un équilibre et sélectionner la technique qui convient le mieux à votre cas. Il peut s'agir d'un traitement en série, en parallèle ou par lots. Le traitement parallèle peut se retourner contre vous si vous travaillez avec un ensemble de données plus petit et moins complexe.
Dans ce mini-tutoriel, nous avons découvert divers packages et techniques Python qui nous permettent de traiter en parallèle nos fonctions de données.
Si vous travaillez uniquement avec un ensemble de données tabulaires et souhaitez améliorer vos performances de traitement, je vous suggère d'essayer Dask, datatable et RAPIDS.
Référence
Traitement par lots parallèle en Python | de Dennis Bakhuis | Vers la science des données
- Traitement parallèle de gros fichiers en Python · Nurda Bolatov