Pyspark Rdd - Transformations

Pyspark Rdd - Transformations
Dans Python, Pyspark est un module Spark utilisé pour fournir un type de traitement similaire comme Spark.

RDD signifie ensemble de données distribuées résilientes. Nous pouvons appeler RDD une structure de données fondamentale dans Apache Spark.

Nous devons importer RDD à partir du pyspark.module RDD.

Ainsi, dans Pyspark pour créer un RDD, nous pouvons utiliser la méthode parallelize ().

Syntaxe:

Spark_App.étincelle.paralléliser (données)

Où,

Les données peuvent être une dimension (données linéaires) ou des données bidimensionnelles (données en row-colonne).

Transformations RDD:

Un RDD de transformation est une opération qui est appliquée à un RDD pour créer de nouvelles données à partir du RDD existant. En utilisant des transformations, nous pouvons filtrer le RDD en appliquant certaines transformations.

Voyons les transformations qui sont effectuées sur le RDD donné.

Nous en discuterons un par un.

1. carte()

La transformation map () est utilisée pour cartographier une valeur aux éléments présents dans le RDD. Il faut une fonction anonyme comme un paramètre, comme Lambda et transforme les éléments dans un RDD.

Syntaxe:

Rdd_data.map (anonymous_function)

Paramètres:

Anonymous_Function ressemble:

Élément lambda: opération

Par exemple, l'opération consiste à ajouter / soustraire tous les éléments avec un nouvel élément.

Voyons mieux les exemples pour mieux comprendre cette transformation.

Exemple 1:

Dans cet exemple, nous créons un RDD nommé Student_marks avec 20 éléments et appliquons la transformation MAP () en ajoutant chaque élément avec 20 et en les affichant en utilisant l'action Collect ().

# importer le module Pyspark
Importer Pyspark
#mport Sparkcession pour la création d'une session
de Pyspark.SQL IMPORT SPARKSESSE
# Importer RDD de Pyspark.RDD
de Pyspark.RDD IMPORT RDD
#create une application nommée Linuxhint
Spark_App = Sparkcession.constructeur.appname ('Linuxhint').getorCreate ()
# Créer des données sur les étudiants avec 20 éléments
Student_marks = Spark_App.étincelle.Parallélize ([89,76,78,89,90,100,34,56,54,22,45,43,23,56,78,21,34,34,56,34]))
#Display les données dans RDD
Imprimer ("Données réelles dans RDD:", Student_marks.Carte (élément lambda: élément).collecter())
#Apply map () Transformation en ajoutant 20 à chaque élément de RDD
Imprimer ("Après en ajouter 20 à chaque élément de RDD:", Student_marks.Carte (élément lambda: élément + 20).collecter())

Sortir:

Données réelles dans RDD: [89, 76, 78, 89, 90, 100, 34, 56, 54, 22, 45, 43, 23, 56, 78, 21, 34, 34, 56, 34]
Après avoir ajouté 20 à chaque élément de RDD: [109, 96, 98, 109, 110, 120, 54, 76, 74, 42, 65, 63, 43, 76, 98, 41, 54, 54, 76, 54]

À partir de la sortie ci-dessus, nous pouvons voir que l'élément 20 est ajouté à chaque élément de RDD via la fonction lambda en utilisant la transformation map ().

Exemple 2:

Dans cet exemple, nous créons un RDD nommé Student_marks avec 20 éléments et appliquons la transformation MAP () en soustrayant chaque élément par 15 et en les affichant en utilisant l'action Collect ().

# importer le module Pyspark
Importer Pyspark
#mport Sparkcession pour la création d'une session
de Pyspark.SQL IMPORT SPARKSESSE
# Importer RDD de Pyspark.RDD
de Pyspark.RDD IMPORT RDD
#create une application nommée Linuxhint
Spark_App = Sparkcession.constructeur.appname ('Linuxhint').getorCreate ()
# Créer des données sur les étudiants avec 20 éléments
Student_marks = Spark_App.étincelle.Parallélize ([89,76,78,89,90,100,34,56,54,22,45,43,23,56,78,21,34,34,56,34]))
#Display les données dans RDD
Imprimer ("Données réelles dans RDD:", Student_marks.Carte (élément lambda: élément).collecter())
#Apply map () Transformation en soustrayant 15 de chaque élément de RDD
Imprimer ("Après avoir soustrait 15 de chaque élément de RDD:", Student_marks.Carte (élément lambda: élément-15).collecter())

Sortir:

Données réelles dans RDD: [89, 76, 78, 89, 90, 100, 34, 56, 54, 22, 45, 43, 23, 56, 78, 21, 34, 34, 56, 34]
Après avoir soustrait 15 de chaque élément de RDD: [74, 61, 63, 74, 75, 85, 19, 41, 39, 7, 30, 28, 8, 41, 63, 6, 19, 19, 41, 19]

À partir de la sortie ci-dessus, nous pouvons voir que l'élément 15 est soustrait à chaque élément de RDD via la fonction lambda en utilisant la transformation map ().

2. filtre()

La transformation filtre () est utilisée pour filtrer les valeurs du RDD. Il prend une fonction anonyme comme Lambda et renvoie les éléments en filtrant les éléments d'un RDD.

Syntaxe:

Rdd_data.filtre (anonymous_function)

Paramètres:

Anonymous_Function ressemble:

Élément lambda: condition / expression

Par exemple, la condition est utilisée pour spécifier les instructions expressives pour filtrer le RDD.

Voyons mieux les exemples pour mieux comprendre cette transformation.

Exemple 1:

Dans cet exemple, nous créons un RDD nommé Student_marks avec 20 éléments et appliquons une transformation filtrante () en filtrant uniquement les multiples de 5 et en les affichant en utilisant l'action Collect ().

# importer le module Pyspark
Importer Pyspark
#mport Sparkcession pour la création d'une session
de Pyspark.SQL IMPORT SPARKSESSE
# Importer RDD de Pyspark.RDD
de Pyspark.RDD IMPORT RDD
#create une application nommée Linuxhint
Spark_App = Sparkcession.constructeur.appname ('Linuxhint').getorCreate ()
# Créer des données sur les étudiants avec 20 éléments
Student_marks = Spark_App.étincelle.Parallélize ([89,76,78,89,90,100,34,56,54,22,45,43,23,56,78,21,34,34,56,34]))
#Display les données dans RDD
Imprimer ("Données réelles dans RDD:", Student_marks.Carte (élément lambda: élément).collecter())
#Apply Filter () Transformation en retournant des multiples intérieurs de 5.
Print ("Multiples de 5 à partir d'un RDD:", Student_Marks.Filtre (élément lambda: élément% 5 == 0).collecter())
)

Sortir:

Données réelles dans RDD: [89, 76, 78, 89, 90, 100, 34, 56, 54, 22, 45, 43, 23, 56, 78, 21, 34, 34, 56, 34]
Multiples de 5 à partir d'un RDD: [90, 100, 45]

À partir de la sortie ci-dessus, nous pouvons voir que des multiples de 5 éléments sont filtrés à partir du RDD.

Exemple 2:

Dans cet exemple, nous créons un RDD nommé Student_marks avec 20 éléments et appliquons une transformation filtrante () en filtrant des éléments supérieurs à 45 et en les affichant en utilisant l'action collecture ().

# importer le module Pyspark
Importer Pyspark
#mport Sparkcession pour la création d'une session
de Pyspark.SQL IMPORT SPARKSESSE
# Importer RDD de Pyspark.RDD
de Pyspark.RDD IMPORT RDD
#create une application nommée Linuxhint
Spark_App = Sparkcession.constructeur.appname ('Linuxhint').getorCreate ()
# Créer des données sur les étudiants avec 20 éléments
Student_marks = Spark_App.étincelle.Parallélize ([89,76,78,89,90,100,34,56,54,22,45,43,23,56,78,21,34,34,56,34]))
#Display les données dans RDD
Imprimer ("Données réelles dans RDD:", Student_marks.Carte (élément lambda: élément).collecter())
#Apply filter () Transformation en filtrant des valeurs supérieures à 45
Imprimer ("Valeurs supérieures à 45:", Student_marks.Filtre (élément lambda: élément> 45).collecter())

Sortir:

Données réelles dans RDD: [89, 76, 78, 89, 90, 100, 34, 56, 54, 22, 45, 43, 23, 56, 78, 21, 34, 34, 56, 34]
Valeurs supérieures à 45: [89, 76, 78, 89, 90, 100, 56, 54, 56, 78, 56]

À partir de la sortie ci-dessus, nous pouvons voir que ces éléments supérieurs à 45 sont filtrés du RDD.

3. syndicat()

La transformation Union () est utilisée pour combiner deux RDD. Nous pouvons effectuer cette transformation sur deux RDD…

Syntaxe:

Rdd_data1.Union (RDD_DATA2)

Voyons mieux les exemples pour mieux comprendre cette transformation.

Exemple 1:

Dans cet exemple, nous créerons un seul RDD avec des données sur les marques d'étudiant et générerons deux RDD à partir du RDD unique en filtrant certaines valeurs à l'aide de la transformation filtre (). Après cela, nous pouvons effectuer une transformation Union () sur les deux RDD filtrés.

# importer le module Pyspark
Importer Pyspark
#mport Sparkcession pour la création d'une session
de Pyspark.SQL IMPORT SPARKSESSE
# Importer RDD de Pyspark.RDD
de Pyspark.RDD IMPORT RDD
#create une application nommée Linuxhint
Spark_App = Sparkcession.constructeur.appname ('Linuxhint').getorCreate ()
# Créer des données sur les étudiants avec 20 éléments
Student_marks = Spark_App.étincelle.Parallélize ([89,76,78,89,90,100,34,56,54,22,45,43,23,56,78,21,34,34,56,34]))
#Display les données dans RDD
Imprimer ("Données réelles dans RDD:", Student_marks.Carte (élément lambda: élément).collecter())
First_Filter = Student_marks.Filtre (élément lambda: élément> 90)
second_filter = Student_marks.Filtre (élément lambda: élément <40)
#display première transformation filtrée
Imprimer ("Éléments dans RDD supérieur à 90", First_Filter.collecter())
#display deuxième transformation filtrée
Imprimer ("Éléments dans RDD Moins de 40", Second_Filter.collecter())
#Apply Union () Transformation en effectuant un syndicat sur les 2 filtres ci-dessus
Print ("Transformation de l'Union sur deux données filtrées", First_Filter.Union (Second_Filter).collecter())

Sortir:

Données réelles dans RDD: [89, 76, 78, 89, 90, 100, 34, 56, 54, 22, 45, 43, 23, 56, 78, 21, 34, 34, 56, 34]
Éléments de RDD supérieurs à 90 [100]
Éléments dans RDD Moins de 40 [34, 22, 23, 21, 34, 34, 34]
Transformation de l'Union sur deux données filtrées [100, 34, 22, 23, 21, 34, 34, 34]

D'après la sortie ci-dessus, vous pouvez voir que nous avons effectué Union sur First_Filter et Second_Filter.

First_Filter est obtenu en obtenant des éléments d'étudiants RDD supérieurs à 90 et Second_Filter est obtenu en obtenant des éléments de Studentsmarks Rdd moins de 40 en utilisant la transformation filtrante ().

Exemple 2:

Dans cet exemple, nous créerons deux RDD telles que le premier RDD a 20 éléments et le deuxième RDD a 10 éléments. Suite à cela, nous pouvons appliquer une transformation union () à ces deux RDD.

# importer le module Pyspark
Importer Pyspark
#mport Sparkcession pour la création d'une session
de Pyspark.SQL IMPORT SPARKSESSE
# Importer RDD de Pyspark.RDD
de Pyspark.RDD IMPORT RDD
#create une application nommée Linuxhint
Spark_App = Sparkcession.constructeur.appname ('Linuxhint').getorCreate ()
# Créer des données sur les étudiants avec 20 éléments
Student_marks1 = Spark_App.étincelle.Parallélize ([89,76,78,89,90,100,34,56,54,22,45,43,23,56,78,21,34,34,56,34]))
# Créer des données sur les notes des étudiants avec 10 éléments
Student_marks2 = Spark_App.étincelle.Parallélize ([45,43,23,56,78,21,34,34,56,34])
#Display les données dans RDD
Imprimer ("Données réelles dans les marques des étudiants 1 RDD:", Student_Marks1.Carte (élément lambda: élément).collecter())
#Display les données dans RDD
Print ("Données réelles dans les marques des étudiants 2 RDD:", Student_Marks2.Carte (élément lambda: élément).collecter())
#Apply Union () Transformation en effectuant un syndicat sur les 2 RDD ci-dessus
Print ("Transformation syndicale sur deux RDD", Student_Marks1.Union (Student_marks2).collecter())

Sortir:

Données réelles dans les marques des étudiants 1 RDD: [89, 76, 78, 89, 90, 100, 34, 56, 54, 22, 45, 43, 23, 56, 78, 21, 34, 34, 56, 34]
Données réelles dans les marques des étudiants 2 RDD: [45, 43, 23, 56, 78, 21, 34, 34, 56, 34]
Transformation de l'Union sur deux RDD [89, 76, 78, 89, 90, 100, 34, 56, 54, 22, 45, 43, 23, 56, 78, 21, 34, 34, 56, 34, 45, 43, 23, 56, 78, 21, 34, 34, 56, 34]

Nous pouvons voir que deux RDD sont combinés à l'aide de la transformation Union ().

Conclusion

De ce tutoriel Pyspark, nous voyons trois transformations appliquées à RDD. La transformation map () est utilisée pour mapper en transformant des éléments dans un RDD, filter () est utilisé pour effectuer des opérations de filtre et créer un nouveau RDD filtré à partir du RDD existant. Enfin, nous avons discuté de Union () RDD qui est utilisé pour combiner deux RDD.