Fonction Pyspark - LAG

Fonction Pyspark - LAG
La fonction LAG () dans Pyspark est disponible dans le module de fenêtre qui est utilisé pour renvoyer les valeurs des lignes précédentes aux lignes actuelles. Firstl, la fonction LAG () renvoie nul pour les lignes supérieures. Il prend un paramètre de décalage qui représente le nombre total de lignes de telle sorte que les valeurs de ligne précédentes sont renvoyées aux lignes suivantes. Pour les premières lignes supérieures, les nulls (décalés) sont placés.

Il est possible de partitionner les lignes dans le dataframe à l'aide de la fonction de fenêtre. Il est disponible dans le pyspark.SQL.fenêtre module.

Syntaxe:

dataframe_obj.WithColumn ("Lag_Column", LAG ("Colonne", Offset).sur (partition))

Il faut deux paramètres:

  1. La colonne est le nom de colonne dans le pyspark dataframe dans lequel les valeurs de ligne décalées sont placées en fonction des valeurs de cette colonne.
  2. Le décalage spécifie l'entier pour renvoyer ce nombre de lignes précédentes aux valeurs de ligne actuelles.

Pas:

  1. Créez un Pyspark DataFrame qui a des valeurs similaires dans au moins une colonne.
  2. Partitionne les données à l'aide de la méthode partition () disponible dans la fonction de fenêtre et commandez-les en fonction de la colonne à l'aide de la fonction OrderBy ().

Syntaxe:

partition = fenêtre.partitionby («colonne»).OrderBy («colonne»)

Nous pouvons commander les données partitionnées avec la colonne partitionnée ou toute autre colonne.

Maintenant, vous pouvez utiliser la fonction LAG () sur les lignes partitionnées en utilisant le sur() fonction.

Nous ajoutons une colonne pour stocker le numéro de ligne à l'aide du withColumn () fonction.

Syntaxe:

dataframe_obj.WithColumn ("Lag_Column", LAG ("Colonne", Offset).sur (partition))

Ici, le nom spécifie le nom de la ligne et le dataframe_obj est notre pyspark dataframe.

Implémentons le code.

Exemple 1:

Ici, nous créons un pyspark dataframe qui a 5 colonnes - [«sujet_id», «nom», «âge», «technologie1», «technologie2»] avec 10 lignes et partitionner les lignes en fonction de Technologie1 en utilisant la fonction de fenêtre. Après cela, nous sommes en retard 1 rangée.

Importer Pyspark
de Pyspark.Importation SQL *
Spark_App = Sparkcession.constructeur.nom de l'application('_').getorCreate ()
étudiants = [(4, «sravan», 23, «php», «test»),
(4, «sravan», 23, «php», «test»),
(46, «Mounika», 22, '.Net ',' html '),
(4, «Deepika», 21, «Oracle», «html»),
(46, «Mounika», 22, «Oracle», «Testing»),
(12, «Chandrika», 22, «Hadoop», «C #»),
(12, «Chandrika», 22, «Oracle», «Testing»),
(4, «Sravan», 23, «Oracle», «C #»),
(4, «Deepika», 21, «Php», «C #»),
(46, «Mounika», 22, '.Net ',' test ')
]]
dataframe_obj = spark_app.CreatedataFrame (étudiants, [«sujet_id», «nom», «âge», «technologie1», «technologie2»])
Imprimer ("---------- réelle DataFrame ----------")
dataframe_obj.montrer()
# Importer la fonction de fenêtre
de Pyspark.SQL.Fenêtre d'importation de fenêtre
# importer le décalage de Pyspark.SQL.les fonctions
de Pyspark.SQL.fonctions importation décalage
#Partition le dataframe en fonction des valeurs de la colonne Technology1 et
#ordre les lignes dans chaque partition basée sur la colonne sujette
partition = fenêtre.partitionby ("Technology1").OrderBy ('Subject_id')
print ("---------- partitionné DataFrame ----------")
#Now mentionner le décalage avec Offset-1 basé sur Subject_id
dataframe_obj.WithColumn ("LAG", LAG ("Subject_id", 1).sur (partition)).montrer()

Sortir:

Explication:

Dans la première sortie représente les données réelles présentes dans le dataframe. Dans la deuxième sortie, la partition est effectuée sur la base du Technologie1 colonne.

Le nombre total de partitions est 4.

Partition 1:

Le .Le filet s'est produit deux fois dans la première partition. Puisque nous avons spécifié le décalage du décalage comme 1, le premier .La valeur nette est nulle et la suivante .La valeur nette est la valeur de la ligne précédente.

Partition 2:

Hadoop s'est produit une fois dans la deuxième partition. Donc, le retard est nul.

Partition 3:

Oracle s'est produit quatre fois dans la troisième partition.

Pour le premier oracle, le décalage est nul.

Pour le deuxième oracle, la valeur de décalage est de 4 (car la valeur de la ligne précédente est 4).

Pour le troisième oracle, la valeur de décalage est de 4 (car la valeur de la ligne précédente est 4).

Pour le quatrième oracle, la valeur de décalage est de 12 (puisque la valeur de la ligne précédente est 12).

Partition 4:

PHP s'est produit trois fois dans la quatrième partition.

La valeur de décalage pour le 1er php est nul.

La valeur de décalage pour le 2ème PHP est de 4 (car la valeur de la ligne précédente est 4).

La valeur de décalage pour le 3ème PHP est de 4 (car la valeur de la ligne précédente est 4).

Exemple 2:

En traîne les lignes par 2. Assurez-vous que vous avez créé le Pyspark DataFrame comme le montre l'exemple 1.

# Importer la fonction de fenêtre
de Pyspark.SQL.Fenêtre d'importation de fenêtre
# importer le décalage de Pyspark.SQL.les fonctions
de Pyspark.SQL.fonctions importation décalage
#Partition le dataframe en fonction des valeurs de la colonne Technology1 et
#ordre les lignes dans chaque partition basée sur la colonne sujette
partition = fenêtre.partitionby ("Technology1").OrderBy ('Subject_id')
print ("---------- partitionné DataFrame ----------")
#Now mentionner le retard avec Offset-2 basé sur Subject_id
dataframe_obj.WithColumn ("LAG", LAG ("Subject_id", 2).sur (partition)).montrer()

Sortir:

Explication:

La partition est effectuée sur la base du Technologie1 colonne. Le nombre total de partitions est 4.

Partition 1:

Le .Le filet s'est produit deux fois dans la première partition. Puisque nous avons spécifié le décalage du décalage comme 2, le décalage est nul pour les deux valeurs.

Partition 2:

Hadoop s'est produit une fois dans la deuxième partition. Donc, le retard est nul.

Partition 3:

Oracle s'est produit quatre fois dans la troisième partition.

Pour le premier et le deuxième oracle, le retard est nul.

Pour le troisième oracle, la valeur de décalage est de 4 (car la valeur de sujets de 2 lignes précédentes est 4).

Pour le quatrième oracle, la valeur de décalage est de 4 (puisque la valeur de sujets de 2 lignes précédentes est 4).

Partition 4:

PHP s'est produit trois fois dans la quatrième partition.

La valeur de décalage pour les 1er et 2e PHP est nul.

La valeur de décalage pour le 3e PHP est de 4 (car la valeur sublaine_id de 2 lignes précédentes est 4).

Exemple 3:

Song les lignes par 2 en fonction de la colonne d'âge. Assurez-vous que vous avez créé le Pyspark DataFrame comme le montre l'exemple 1.

# Importer la fonction de fenêtre
de Pyspark.SQL.Fenêtre d'importation de fenêtre
# importer le décalage de Pyspark.SQL.les fonctions
de Pyspark.SQL.fonctions importation décalage
#Partition le dataframe en fonction des valeurs de la colonne Technology1 et
#ordre les lignes dans chaque partition en fonction de la colonne d'âge
partition = fenêtre.partitionby ("Technology1").OrderBy («Age»)
print ("---------- partitionné DataFrame ----------")
#Now mentionner le décalage avec Offset-2 en fonction de l'âge
dataframe_obj.WithColumn ("LAG", LAG ("Age", 2).sur (partition)).montrer()

Sortir:

Explication:

La partition est effectuée sur la base du Technologie1 la colonne et le décalage sont définis en fonction de la colonne d'âge. Le nombre total de partitions est 4.

Partition 1:

Le .Le filet s'est produit deux fois dans la première partition. Puisque nous avons spécifié le décalage du décalage comme 2, le décalage est nul pour les deux valeurs.

Partition 2:

Hadoop s'est produit une fois dans la deuxième partition. Donc, le retard est nul.

Partition 3:

Oracle s'est produit quatre fois dans la troisième partition.

Pour le premier et le deuxième oracle, le retard est nul.

Pour le troisième oracle, la valeur de décalage est de 21 (la valeur d'âge des deux lignes précédentes est de 21).

Pour le quatrième oracle, la valeur de décalage est de 22 (la valeur d'âge des deux lignes précédentes est de 22).

Partition 4:

PHP s'est produit trois fois dans la quatrième partition.

La valeur de décalage pour les 1er et 2e PHP est nul.

La valeur de décalage pour le 3e HP est de 21 (la valeur d'âge des deux lignes précédentes est de 21).

Conclusion

Nous avons appris à obtenir les valeurs de décalage dans le Pyspark DataFrame dans les lignes partitionnées. La fonction LAG () dans Pyspark est disponible dans le module de fenêtre qui est utilisé pour renvoyer les valeurs des lignes précédentes aux lignes actuelles. Nous avons appris les différents exemples en définissant les différents décalages.