Pyspark Zip, ZipwithIndex et ZipwithUniqueID

Pyspark Zip, ZipwithIndex et ZipwithUniqueID
Dans cet article, nous présenterons et démontrerons Pyspark's Zip (), ZipWithIndex () et ZipWithUniqueID ().

Avant de commencer avec ces méthodes, nous devons importer RDD depuis le Pyspark.module RDD. RDD signifie ensemble de données distribuées résilientes. Nous pouvons appeler RDD comme une structure de données fondamentale dans Apache Spark. Ainsi, dans Pyspark pour créer un RDD, nous pouvons utiliser la méthode parallelize ().

Syntaxe:

Spark_App.étincelle.paralléliser (données)

Où: Les données peuvent être une dimension (données linéaires) ou des données bidimensionnelles (données de colonne de ligne).

Dans cet article de Pyspark, nous discuterons de Zip (), ZipWithIndex () & ZipWithUniqueID ().

Pyspark zip ()

La fonction Pyspark zip () est utilisée pour combiner des valeurs dans les deux RDD en paires en renvoyant un nouveau RDD.

Syntaxe:

DATA RDD1.zip (rdd_data2)

Ici:

  1. RDD_DATA1 est le premier RDD
  2. RDD_DATA2 est le deuxième RDD

Note que le nombre total d'éléments dans les RDD doit être le même. Sinon, il renverra une erreur.

Exemple 1:

Dans cet exemple, nous retournerons Zipped Rdd de Student_Marks1 et Student_Marks2 Numeric Rdds.

# importer le module Pyspark
Importer Pyspark
#mport Sparkcession pour la création d'une session
de Pyspark.SQL IMPORT SPARKSESSE
# Importer RDD de Pyspark.RDD
de Pyspark.RDD IMPORT RDD
#create une application nommée Linuxhint
Spark_App = Sparkcession.constructeur.appname ('Linuxhint').getorCreate ()
# Créer des données sur les étudiants avec 5 éléments
Student_marks1 = Spark_App.étincelle.Parallélize ([89,76,78,89,90])
# Créer des données sur les étudiants avec 5 éléments
Student_marks2 = Spark_App.étincelle.parallélize ([1,2,3,4,5])
#Display les données dans RDD
Print ("Données réelles dans Student_Marks1:", Student_Marks1.Carte (élément lambda: élément).collecter())
print ("Données réelles dans Student_Marks2:", Student_Marks2.Carte (élément lambda: élément).collecter())
#zip les deux RDD à l'aide de zip ()
Imprimer (Student_marks1.zip (étudiant_marks2).collecter())

Sortir:

Données réelles dans Student_Marks1: [89, 76, 78, 89, 90]
Données réelles dans Student_marks2: ['1', 2, 3, 4, 5]
[(89, «1»), (76, 2), (78, 3), (89, 4), (90, 5)]

Nous pouvons voir que chaque valeur du premier RDD est combinée avec le deuxième RDD.

Exemple 2:

Dans cet exemple, nous retournerons RDD zippé de Student_Marks1 et Student_Marks2 String RDDS.

# importer le module Pyspark
Importer Pyspark
#mport Sparkcession pour la création d'une session
de Pyspark.SQL IMPORT SPARKSESSE
# Importer RDD de Pyspark.RDD
de Pyspark.RDD IMPORT RDD
#create une application nommée Linuxhint
Spark_App = Sparkcession.constructeur.appname ('Linuxhint').getorCreate ()
# Créer des données sur les sujets étudiants avec 2 éléments
sujets1 = spark_app.étincelle.Parallélize (['Python', 'Java'])
# Créer des données sur les sujets étudiants avec 2 éléments
sujets2 = spark_app.étincelle.parallélize (['html', 'java'])
#Display les données dans RDD
Imprimer ("Données réelles dans Sujets1:", Sujets1.Carte (élément lambda: élément).collecter())
Print ("Données réelles dans les sujets2:", Sujets2.Carte (élément lambda: élément).collecter())
#zip les deux RDD à l'aide de zip ()
Impression (Sujets1.zip (sujets2).collecter())
Sortir:
Données réelles dans les sujets1: ['Python', 'Java']
Données réelles dans les sujets2: ['html', 'java']
[('python', 'html'), ('java', 'java')]]

Nous pouvons voir que les valeurs des deux RDD sont zippées.

Pyspark zipwithindex ()

La fonction Pyspark ZipwithIndex () est utilisée pour combiner les valeurs dans un seul RDD avec des valeurs. Ici, les valeurs par défaut commencent par 0.

Syntaxe:

Rdd_data.zipwithindex ()

Ici, RDD_DATA est le RDD

Exemple 1:

Dans cet exemple, nous avons créé un RDD avec 2 éléments de chaîne et zip avec des valeurs à l'aide de zipwithindex ().

# importer le module Pyspark
Importer Pyspark
#mport Sparkcession pour la création d'une session
de Pyspark.SQL IMPORT SPARKSESSE
# Importer RDD de Pyspark.RDD
de Pyspark.RDD IMPORT RDD
#create une application nommée Linuxhint
Spark_App = Sparkcession.constructeur.appname ('Linuxhint').getorCreate ()
# Créer des données sur les sujets étudiants avec 2 éléments
sujets1 = spark_app.étincelle.Parallélize (['Python', 'Java'])
#Display les données dans RDD
Imprimer ("Données réelles dans Sujets1:", Sujets1.Carte (élément lambda: élément).collecter())
#zip les deux RDD à l'aide de zipwithindex ()
Impression (Sujets1.zipwithindex ().collecter())

Sortir:

Données réelles dans les sujets1: ['Python', 'Java']
[('Python', 0), ('Java', 1)]

Nous pouvons voir que la valeur Python est déchirée avec la valeur 0 et Java est zippé avec la valeur 1.

Exemple 2:

Dans cet exemple, nous avons créé un RDD avec 6 éléments de chaîne et zip avec des valeurs à l'aide de zipwithindex ().

# importer le module Pyspark
Importer Pyspark
#mport Sparkcession pour la création d'une session
de Pyspark.SQL IMPORT SPARKSESSE
# Importer RDD de Pyspark.RDD
de Pyspark.RDD IMPORT RDD
#create une application nommée Linuxhint
Spark_App = Sparkcession.constructeur.appname ('Linuxhint').getorCreate ()
# Créer des données sur les sujets étudiants avec 6 éléments
sujets1 = spark_app.étincelle.Parallélize ([«Python», «Java», «Python», «Java», «Python», «Java»])
#Display les données dans RDD
Imprimer ("Données réelles dans Sujets1:", Sujets1.Carte (élément lambda: élément).collecter())
#zip les deux RDD à l'aide de zipwithindex ()
Impression (Sujets1.zipwithindex ().collecter())

Sortir:

Données réelles dans les sujets1: ['Python', 'Java', 'Python', 'Java', 'Python', 'Java']
[('Python', 0), ('Java', 1), ('Python', 2), ('Java', 3), ('Python', 4), ('Java', 5)]

Pyspark zipwithunitiqueid ()

La fonction Pyspark ZipWithUniqueID () est similaire à la méthode ci-dessus, mais les valeurs qui forment une paire se trouvent dans le modèle suivant:

k, 1 * n + k, 2 * n + k, 3 * n + k… .

n représente le nombre de partitions.

Syntaxe:

Rdd_data.zipwithunitiqueid ()

Ici, RDD_DATA est le RDD

Il peut y avoir de nombreuses lacunes entre les valeurs zippées.

Exemple:

# importer le module Pyspark
Importer Pyspark
#mport Sparkcession pour la création d'une session
de Pyspark.SQL IMPORT SPARKSESSE
# Importer RDD de Pyspark.RDD
de Pyspark.RDD IMPORT RDD
#create une application nommée Linuxhint
Spark_App = Sparkcession.constructeur.appname ('Linuxhint').getorCreate ()
# Créer des données sur les sujets étudiants avec 6 éléments
sujets1 = spark_app.étincelle.Parallélize ([«Python», «Java», «Python», «Java», «Python», «Java»])
#Display les données dans RDD
Imprimer ("Données réelles dans Sujets1:", Sujets1.Carte (élément lambda: élément).collecter())
#zip les deux RDD à l'aide de ZipWithUniqueID ()
Impression (Sujets1.zipwithunitiqueid ().collecter())

Sortir:

Données réelles dans les sujets1: ['Python', 'Java', 'Python', 'Java', 'Python', 'Java']
[('Python', 0), ('Java', 2), ('Python', 4), ('Java', 1), ('Python', 3), ('Java', 5)]

À partir de la sortie ci-dessus, nous pouvons voir que différentes valeurs sont zippées avec des valeurs réelles.

Conclusion

Dans ce tutoriel, nous avons vu comment zipper le RDD avec certaines valeurs. zip () est utilisé pour zip deux paires de RDD. ZipwithIndex () est utilisé pour zip avec des valeurs et zipwithuniqueid () est utilisé pour zip avec des valeurs basées sur des partitions.