Pyspark - fonction de plomb

Pyspark - fonction de plomb
La fonction lead () dans Pyspark est disponible dans le module de fenêtre qui est utilisé pour renvoyer les valeurs des lignes suivantes aux lignes actuelles. Tout d'abord, la fonction lead () renvoie le null pour la dernière ligne / s dans une partition. Il prend un paramètre de décalage qui représente le nombre total de lignes de telle sorte que les valeurs de ligne suivantes sont renvoyées aux lignes réelles. Les Nulls sont placés pour la première dernière ligne / s (décalage).

Il est possible de partitionner les lignes dans le dataframe à l'aide de la fonction de fenêtre. Il est disponible dans le pyspark.SQL.fenêtre module.

Syntaxe:

dataframe_obj.WithColumn ("Lead_Column", lead ("colonne", offset).sur (partition))

Il faut deux paramètres:

  1. Le Lead_Column est le nom de colonne dans Pyspark DataFrame dans lequel les valeurs de ligne de plomb sont placées en fonction des valeurs de cette colonne.
  2. Le décalage spécifie l'entier pour renvoyer ce nombre des lignes suivantes aux valeurs de ligne actuelles.

Pas:

  1. Créez un Pyspark DataFrame qui a des valeurs similaires dans au moins une colonne.
  2. Partitionne les données à l'aide de la méthode partition () disponible dans la fonction de fenêtre et commandez-les en fonction de la colonne à l'aide de la fonction OrderBy ().

Syntaxe:

partition = fenêtre.partitionby («colonne»).OrderBy («colonne»)

Nous pouvons commander les données partitionnées avec la colonne partitionnée ou toute autre colonne.

Maintenant, vous pouvez utiliser la fonction lead () sur les lignes partitionnées en utilisant le sur() fonction.

Nous ajoutons une colonne pour stocker le numéro de ligne à l'aide du withColumn () fonction.

Syntaxe:

dataframe_obj.WithColumn ("Lead_Column", lead ("colonne", offset).sur (partition))

Ici, le nom spécifie le nom de la ligne et le dataframe_obj est notre pyspark dataframe.

Implémentons le code.

Exemple 1:

Ici, nous créons un pyspark dataframe qui a 5 colonnes - [«sujet_id», «nom», «Âge», «Technology1», «Technology2»] avec 10 lignes et partitionner les lignes en fonction de la Technologie1 en utilisant la fonction de fenêtre. Après cela, nous dirigeons 1 ligne.

Importer Pyspark
de Pyspark.Importation SQL *
Spark_App = Sparkcession.constructeur.nom de l'application('_').getorCreate ()
étudiants = [(4, «sravan», 23, «php», «test»),
(4, «sravan», 23, «php», «test»),
(46, «Mounika», 22, '.Net ',' html '),
(4, «Deepika», 21, «Oracle», «html»),
(46, «Mounika», 22, «Oracle», «Testing»),
(12, «Chandrika», 22, «Hadoop», «C #»),
(12, «Chandrika», 22, «Oracle», «Testing»),
(4, «Sravan», 23, «Oracle», «C #»),
(4, «Deepika», 21, «Php», «C #»),
(46, «Mounika», 22, '.Net ',' test ')
]]
dataframe_obj = spark_app.CreatedataFrame (étudiants, [«sujet_id», «nom», «âge», «technologie1», «technologie2»])
Imprimer ("---------- réelle DataFrame ----------")
dataframe_obj.montrer()
# Importer la fonction de fenêtre
de Pyspark.SQL.Fenêtre d'importation de fenêtre
#IMPORT L'ADENCE DE PYSPARK.SQL.les fonctions
de Pyspark.SQL.Fonctions Importer lead
#Partition le dataframe en fonction des valeurs de la colonne Technology1 et
#ordre les lignes dans chaque partition basée sur la colonne sujette
partition = fenêtre.partitionby ("Technology1").OrderBy ('Subject_id')
print ("---------- partitionné DataFrame ----------")
#Now mentionner le lead avec Offset-1 basé sur Subject_id
dataframe_obj.withColumn ("lead", lead ("suject_id", 1).sur (partition)).montrer()

Sortir:

Explication:

La première sortie représente les données réelles présentes dans le dataframe. Dans la deuxième sortie, la partition est effectuée sur la base du Technologie1 colonne.

Le nombre total de partitions est 4.

Partition 1:

Le .Le filet s'est produit deux fois dans la première partition. Puisque nous avons spécifié le compensation de plomb comme 1, le dernier .La valeur nette est nulle et la première .La valeur nette est la valeur sujette de la ligne suivante - 46.

Partition 2:

Hadoop s'est produit une fois dans la deuxième partition. Donc, le plomb est nul.

Partition 3:

Oracle s'est produit quatre fois dans la troisième partition.

  1. Pour le dernier oracle, le plomb est nul.
  2. Pour le premier oracle, la valeur du plomb est de 4 (car la valeur sujette de la ligne suivante est 4).
  3. Pour le troisième oracle, la valeur du plomb est de 12 (car la valeur sujette de la ligne suivante est 12).

Pour le quatrième oracle, la valeur du plomb est de 46 (car la valeur suivante de la ligne suivante est 46).

Partition 4:

PHP s'est produit trois fois dans la quatrième partition.

  1. La valeur de plomb pour le 3e php est nul.
  2. La valeur du plomb pour le 1er PHP est de 4 (car la valeur suivante de la ligne suivante est 4).
  3. La valeur du plomb pour le 2e PHP est de 4 (car la valeur sublasse_id suivante est 4).

Exemple 2:

Conduire les lignes par 2. Assurez-vous que vous avez créé le Pyspark DataFrame comme le montre l'exemple 1.

# Importer la fonction de fenêtre
de Pyspark.SQL.Fenêtre d'importation de fenêtre
#IMPORT L'ADENCE DE PYSPARK.SQL.les fonctions
de Pyspark.SQL.Fonctions Importer lead
#Partition le dataframe en fonction des valeurs de la colonne Technology1 et
#ordre les lignes dans chaque partition basée sur la colonne sujette
partition = fenêtre.partitionby ("Technology1").OrderBy ('Subject_id')
print ("---------- partitionné DataFrame ----------")
#Now mentionner le lead avec Offset-2 basé sur Subject_id
dataframe_obj.WithColumn ("Lead", lead ("suject_id", 2).sur (partition)).montrer()

Sortir:

Explication:

La partition est effectuée sur la base du Technologie1 colonne.

Le nombre total de partitions est 4.

Partition 1:

Le .Le filet s'est produit deux fois dans la première partition. Puisque nous avons spécifié le décalage du plomb comme 2, le décalage est nul pour les deux valeurs.

Partition 2:

Hadoop s'est produit une fois dans la deuxième partition. Donc, le plomb est nul.

Partition 3:

Oracle s'est produit quatre fois dans la troisième partition.

  • Dans les deux derniers oracle, le plomb est nul.
  • Pour le premier oracle, la valeur du plomb est de 12 (car les 2 prochaines lignes Subject_ID sont 12).
  • Pour le deuxième oracle, la valeur du plomb est de 46 (car la valeur sujette des 2 lignes suivantes est de 46).

Partition 4:

PHP s'est produit trois fois dans la quatrième partition.

  • Dans les deux derniers oracle, le plomb est nul.
  • Pour le premier PHP, la valeur du plomb est de 4 (car la valeur sujette_id des 2 lignes suivantes est 4).

Exemple 3:

Mener les lignes par 2 en fonction de la colonne d'âge. Assurez-vous que vous avez créé le Pyspark DataFrame comme le montre l'exemple 1.

# Importer la fonction de fenêtre
de Pyspark.SQL.Fenêtre d'importation de fenêtre
#IMPORT L'ADENCE DE PYSPARK.SQL.les fonctions
de Pyspark.SQL.Fonctions Importer lead
#Partition le dataframe en fonction des valeurs de la colonne Technology1 et
#ordre les lignes dans chaque partition basée sur la colonne sujette
partition = fenêtre.partitionby ("Technology1").OrderBy ('Subject_id')
print ("---------- partitionné DataFrame ----------")
#Now mentionner le plomb avec Offset-2 en fonction de l'âge
dataframe_obj.WithColumn ("plomb", plomb ("âge", 2).sur (partition)).montrer()

Sortir:

Explication:

La partition est effectuée sur la base du Technologie1 colonne et le plomb est définie en fonction de la colonne d'âge.

Le nombre total de partitions est 4.

Partition 1:

Le .Le filet s'est produit deux fois dans la première partition. Puisque nous avons spécifié le décalage du plomb comme 2, le décalage est nul pour les deux valeurs.

Partition 2:

Hadoop s'est produit une fois dans la deuxième partition. Donc, le plomb est nul.

Partition 3:

Oracle s'est produit quatre fois dans la troisième partition.

  • Pour les deux derniers oracle, le plomb est nul.
  • Pour le premier oracle, la valeur du plomb est de 22 (car la valeur d'âge des 2 lignes suivante est de 22).
  • Pour le deuxième oracle, la valeur du plomb est de 22 (car la valeur d'âge des 2 lignes suivantes est de 22).

Partition 4:

PHP s'est produit trois fois dans la quatrième partition.

  • Pour les deux derniers oracle, le plomb est nul.
  • Pour le premier PHP, la valeur du plomb est de 21 (car la valeur d'âge des 2 lignes suivante est de 21).

Conclusion

Nous avons appris à obtenir les valeurs de tête dans Pyspark DataFrame dans les lignes partitionnées. La fonction lead () dans Pyspark est disponible dans le module de fenêtre qui est utilisé pour renvoyer les valeurs de ligne suivantes aux lignes actuelles. Il prend un paramètre de décalage qui représente le nombre total de lignes de telle sorte que les valeurs de ligne suivantes sont renvoyées aux lignes réelles. Pour la première dernière ligne / s, les nulls (décalés) sont placés. Nous avons appris les différents exemples en définissant les différents décalages.