Comment utiliser les clusters Spark pour le traitement parallèle du Big Data
par Hari Santanam
Utilisez l'ensemble de données distribuées résilientes (RDD) d'Apache Spark avec Databricks
En raison de limitations physiques, le processeur informatique individuel a largement atteint le plafond supérieur en termes de vitesse avec les conceptions actuelles. Ainsi, les fabricants de matériel ont ajouté davantage de processeurs à la carte mère (cœurs de processeur parallèles, fonctionnant à la même vitesse).
Mais… la plupart des applications logicielles écrites au cours des dernières décennies n’ont pas été écrites pour un traitement parallèle.
De plus, la collecte de données est devenue exponentiellement importante, grâce à des appareils bon marché capables de collecter des données spécifiques (telles que la température, le son, la vitesse…).
Pour traiter ces données de manière plus efficace, de nouvelles méthodes de programmation étaient nécessaires.
Un cluster de processus informatiques est similaire à un groupe de travailleurs. Une équipe peut travailler mieux et plus efficacement qu’un seul travailleur. Ils mutualisent les ressources. Cela signifie qu'ils partagent des informations, répartissent les tâches et collectent des mises à jour et des résultats pour aboutir à un ensemble unique de résultats.
Tout comme les agriculteurs sont passés du travail sur un seul champ à l'utilisation de moissonneuses-batteuses et de tracteurs pour produire efficacement des aliments à partir d'exploitations plus grandes et plus nombreuses, et que les coopératives agricoles ont facilité la transformation, le cluster travaille ensemble pour s'attaquer à la collecte et au traitement de données plus vastes et plus complexes.
Le calcul en cluster et le traitement parallèle étaient les réponses, et nous disposons aujourd'hui du framework Apache Spark. Databricks est une plate-forme d'analyse unifiée utilisée pour lancer le cluster computing Spark de manière simple et conviviale.
Qu’est-ce que Spark ?
Apache Spark est un moteur d'analyse unifié ultra-rapide pour le Big Data et l'apprentissage automatique. Il a été initialement développé à l'UC Berkeley.
Spark est rapide. Il tire parti du calcul en mémoire et d'autres optimisations. Il détient actuellement le record du tri sur disque à grande échelle.
Spark utilise des ensembles de données distribués résilients (RDD) pour effectuer un traitement parallèle sur un cluster ou des processeurs informatiques.
Il dispose d'API faciles à utiliser pour fonctionner sur de grands ensembles de données, dans divers langages de programmation. Il dispose également d'API pour transformer les données et d'API de trames de données familières pour manipuler des données semi-structurées.
Fondamentalement, Spark utilise un gestionnaire de cluster pour coordonner le travail sur un cluster d'ordinateurs. Un cluster est un groupe d'ordinateurs connectés et coordonnés les uns avec les autres pour traiter les données et calculer.
Les applications Spark se composent d'un processus pilote et de processus exécuteurs.
En bref, le processus pilote exécute la fonction principale, analyse et répartit le travail entre les exécuteurs. Les exécuteurs accomplissent réellement les tâches assignées : exécuter le code et rendre compte au nœud pilote.
Dans les applications du monde réel en entreprise et dans la programmation émergente de l’IA, le traitement parallèle devient une nécessité pour l’efficacité, la rapidité et la complexité.
Excellent - alors, qu'est-ce que Databricks ?
Databricks est une plateforme d'analyse unifiée conçue par les créateurs d'Apache Spark. Il facilite le lancement de clusters Spark optimisés pour le cloud en quelques minutes.
Considérez-le comme un package tout-en-un pour écrire votre code. Vous pouvez utiliser Spark (sans vous soucier des détails sous-jacents) et produire des résultats.
Il comprend également des blocs-notes Jupyter qui peuvent être partagés, ainsi que l'intégration GitHub, des connexions à de nombreux outils largement utilisés et une surveillance, une planification et un débogage automatisés. Voir ici pour plus d'informations.
Vous pouvez vous inscrire gratuitement avec l'édition communautaire. Cela vous permettra de jouer avec les clusters Spark. Les autres avantages, selon le plan, comprennent :
- Obtenez des clusters opérationnels en quelques secondes sur les instances CPU et GPU AWS et Azure pour une flexibilité maximale.
- Démarrez rapidement grâce à l'intégration prête à l'emploi de TensorFlow, Keras et de leurs dépendances sur les clusters Databricks.
Commençons. Si vous avez déjà utilisé Databricks auparavant, passez à la partie suivante. Sinon, vous pouvez vous inscrire ici et sélectionner « édition communautaire » pour l’essayer gratuitement.
Suivez les instructions là-bas. Ils sont clairs, concis et simples :
- Créer un cluster
- Attachez un notebook au cluster et exécutez des commandes dans le notebook sur le cluster
- Manipuler les données et créer un graphique
- Opérations sur l'API Python DataFrame ; créer un DataFrame à partir d'un ensemble de données Databricks
- Manipuler les données et afficher les résultats
Maintenant que vous avez créé un programme de données sur cluster, passons à un autre ensemble de données, avec plus d'opérations afin que vous puissiez avoir plus de données.
L'ensemble de données est le Rapport sur le bonheur dans le monde 2017 par pays, basé sur différents facteurs tels que le PIB, la générosité, la confiance, la famille et autres. Les champs et leurs descriptions sont répertoriés plus bas dans l'article.
J'ai précédemment téléchargé l'ensemble de données, puis je l'ai déplacé dans le DBFS (DataBricks Files System) de Databricks par simple glisser-déposer dans la fenêtre de Databricks.
Vous pouvez également cliquer sur Données dans le volet de navigation de gauche, cliquer sur Ajouter des données, puis glisser-déposer ou parcourir et ajouter.
# File location and type#this file was dragged and dropped into Databricks from stored #location; https://www.kaggle.com/unsdsn/world-happiness#2017.csv
file_location = "/FileStore/tables/2017.csv"file_type = "csv"
# CSV options# The applied options are for CSV files. For other file types, these # will be ignored: Schema is inferred; first row is header - I # deleted header row in editor and intentionally left it 'false' to #contrast with later rdd parsing, #delimiter # separated, #file_location; if you don't delete header row, instead of reading #C0, C1, it would read "country", "dystopia" etc.infer_schema = "true"first_row_is_header = "false"delimiter = ","df = spark.read.format(file_type) \ .option("inferSchema", infer_schema) \ .option("header", first_row_is_header) \ .option("sep", delimiter) \ .load(file_location)
display(df)
Maintenant, chargeons le fichier dans l’ensemble de données distribuées résilientes (RDD) de Spark mentionné précédemment. RDD effectue un traitement parallèle sur un cluster ou des processeurs informatiques et rend les opérations de données plus rapides et plus efficaces.
#load the file into Spark's Resilient Distributed Dataset(RDD)data_file = "/FileStore/tables/2017.csv"raw_rdd = sc.textFile(data_file).cache()#show the top 5 lines of the fileraw_rdd.take(5)
Notez les « Spark Jobs » ci-dessous, juste au-dessus de la sortie. Cliquez sur Afficher pour voir les détails, comme indiqué dans la fenêtre en encart à droite.
Databricks et Sparks ont d'excellentes visualisations des processus.
Dans Spark, un job est associé à une chaîne de dépendances RDD organisées dans un graphe acyclique direct (DAG). Dans un DAG, les branches sont dirigées d’un nœud à un autre, sans bouclage. Les tâches sont soumises au planificateur, qui les exécute en utilisant le pipeline pour optimiser le travail et les transformer en étapes minimales.
Ne vous inquiétez pas si les éléments ci-dessus semblent compliqués. Il existe des instantanés visuels des processus se produisant au cours de l'étape spécifique pour laquelle vous avez appuyé sur le bouton d'affichage Spark Job. Vous pouvez ou non avoir besoin de ces informations - elles sont là si vous en avez besoin.
Les entrées RDD sont séparées par des virgules, que nous devons diviser avant d'analyser et de construire une trame de données. Nous prendrons ensuite des colonnes spécifiques de l'ensemble de données à utiliser.
#split RDD before parsing and building dataframecsv_rdd = raw_rdd.map(lambda row: row.split(","))#print 2 rowsprint(csv_rdd.take(2))#print typesprint(type(csv_rdd))print('potential # of columns: ', len(csv_rdd.take(1)[0]))
#use specific columns from dataset
from pyspark.sql import Row
parsed_rdd = csv_rdd.map(lambda r: Row( country = r[0], #country, position 1, type=string happiness_rank = r[1], happiness_score = r[2], gdp_per_capita = r[5], family = r[6], health = r[7], freedom = r[8], generosity = r[9], trust = r[10], dystopia = r[11], label = r[-1] ))parsed_rdd.take(5)
Voici les colonnes et les définitions de l'ensemble de données Bonheur :
Colonnes et définitions de l'ensemble de données sur le bonheur
Pays — Nom du pays.
Région — Région à laquelle appartient le pays.
Rang de bonheur - Classement du pays basé sur le score de bonheur.
Score de bonheur — Une mesure mesurée en 2015 en posant la question aux personnes échantillonnées : « Comment évalueriez-vous votre bonheur sur une échelle de 0 à 10, 10 étant le plus heureux. »
Économie (PIB par habitant) — La mesure dans laquelle le PIB (produit intérieur brut) contribue au calcul du score de bonheur
Famille — La mesure dans laquelle la famille contribue au calcul du score de bonheur
Santé — (Espérance de vie) La mesure dans laquelle l'espérance de vie a contribué au calcul du score de bonheur
Liberté — La mesure dans laquelle la liberté a contribué au calcul du score de bonheur.
Confiance — (Corruption gouvernementale) La mesure dans laquelle la perception de la corruption contribue au score de bonheur.
Générosité — La mesure dans laquelle la générosité a contribué au calcul du score de bonheur.
Dystopie résiduelle — La mesure dans laquelle la dystopie résiduelle a contribué au calcul du score de bonheur (dystopie=lieu ou état imaginé dans lequel tout est désagréable ou mauvais, généralement totalitaire ou dégradé sur le plan environnemental. Résiduel — ce qui reste ou ce qui reste après que tout soit autrement est comptabilisé ou emporté).
# Create a view or table
temp_table_name = "2017_csv"
df.createOrReplaceTempView(temp_table_name)
#build dataframe from RDD created earlierdf = sqlContext.createDataFrame(parsed_rdd)display(df.head(10)#view the dataframe's schemadf.printSchema()
#build temporary table to run SQL commands#table only alive for the session#table scoped to the cluster; highly optimizeddf.registerTempTable("happiness")#display happiness_score counts using dataframe syntaxdisplay(df.groupBy('happiness_score') .count() .orderBy('count', ascending=False) )
df.registerTempTable("happiness")
#display happiness_score counts using dataframe syntaxdisplay(df.groupBy('happiness_score') .count() .orderBy('count', ascending=False) )
Maintenant, utilisons SQL pour exécuter une requête afin de faire la même chose. Le but est de vous montrer différentes manières de traiter les données et de comparer les méthodes.
#use SQL to run query to do same thing as previously done with dataframe (count by happiness_score)happ_query = sqlContext.sql(""" SELECT happiness_score, count(*) as freq FROM happiness GROUP BY happiness_score ORDER BY 2 DESC """)display(happ_query)
Une autre requête SQL pour pratiquer notre traitement de données :
#another sql queryhapp_stats = sqlContext.sql(""" SELECT country, happiness_rank, dystopia FROM happiness WHERE happiness_rank > 20 """)display(happ_stats)
Là! Vous l'avez fait : vous avez créé un cluster alimenté par Spark et terminé un processus de requête d'ensemble de données à l'aide de ce cluster. Vous pouvez l'utiliser avec vos propres ensembles de données pour traiter et générer vos projets Big Data.
Vous pouvez également jouer avec les graphiques : cliquez sur l'icône graphique/graphique au bas de n'importe quelle sortie, spécifiez les valeurs et le type de graphique et voyez ce qui se passe. C'est amusant.
Le code est publié dans un cahier ici sur le forum public Databricks et sera disponible pendant environ 6 mois selon Databricks.
- Pour plus d'informations sur l'utilisation de Sparks avec Deep Learning, lisez cet excellent article de Favio Vázquez
Merci d'avoir lu! J'espère que vous avez des programmes intéressants avec Databricks et que vous l'apprécierez autant que moi. Veuillez applaudir si vous l'avez trouvé intéressant ou utile.
Pour une liste complète de mes articles, voir ici.