Il est possible de partitionner les lignes dans le dataframe à l'aide de la fonction de fenêtre. Il est disponible dans le pyspark.SQL.fenêtre module.
Syntaxe:
dataframe_obj.WithColumn ("Lag_Column", LAG ("Colonne", Offset).sur (partition))Il faut deux paramètres:
Pas:
Syntaxe:
partition = fenêtre.partitionby («colonne»).OrderBy («colonne»)Nous pouvons commander les données partitionnées avec la colonne partitionnée ou toute autre colonne.
Maintenant, vous pouvez utiliser la fonction LAG () sur les lignes partitionnées en utilisant le sur() fonction.
Nous ajoutons une colonne pour stocker le numéro de ligne à l'aide du withColumn () fonction.
Syntaxe:
dataframe_obj.WithColumn ("Lag_Column", LAG ("Colonne", Offset).sur (partition))Ici, le nom spécifie le nom de la ligne et le dataframe_obj est notre pyspark dataframe.
Implémentons le code.
Exemple 1:
Ici, nous créons un pyspark dataframe qui a 5 colonnes - [«sujet_id», «nom», «âge», «technologie1», «technologie2»] avec 10 lignes et partitionner les lignes en fonction de Technologie1 en utilisant la fonction de fenêtre. Après cela, nous sommes en retard 1 rangée.
Importer PysparkSortir:
Explication:
Dans la première sortie représente les données réelles présentes dans le dataframe. Dans la deuxième sortie, la partition est effectuée sur la base du Technologie1 colonne.
Le nombre total de partitions est 4.
Partition 1:
Le .Le filet s'est produit deux fois dans la première partition. Puisque nous avons spécifié le décalage du décalage comme 1, le premier .La valeur nette est nulle et la suivante .La valeur nette est la valeur de la ligne précédente.
Partition 2:
Hadoop s'est produit une fois dans la deuxième partition. Donc, le retard est nul.
Partition 3:
Oracle s'est produit quatre fois dans la troisième partition.
Pour le premier oracle, le décalage est nul.
Pour le deuxième oracle, la valeur de décalage est de 4 (car la valeur de la ligne précédente est 4).
Pour le troisième oracle, la valeur de décalage est de 4 (car la valeur de la ligne précédente est 4).
Pour le quatrième oracle, la valeur de décalage est de 12 (puisque la valeur de la ligne précédente est 12).
Partition 4:
PHP s'est produit trois fois dans la quatrième partition.
La valeur de décalage pour le 1er php est nul.
La valeur de décalage pour le 2ème PHP est de 4 (car la valeur de la ligne précédente est 4).
La valeur de décalage pour le 3ème PHP est de 4 (car la valeur de la ligne précédente est 4).
Exemple 2:
En traîne les lignes par 2. Assurez-vous que vous avez créé le Pyspark DataFrame comme le montre l'exemple 1.
# Importer la fonction de fenêtreSortir:
Explication:
La partition est effectuée sur la base du Technologie1 colonne. Le nombre total de partitions est 4.
Partition 1:
Le .Le filet s'est produit deux fois dans la première partition. Puisque nous avons spécifié le décalage du décalage comme 2, le décalage est nul pour les deux valeurs.
Partition 2:
Hadoop s'est produit une fois dans la deuxième partition. Donc, le retard est nul.
Partition 3:
Oracle s'est produit quatre fois dans la troisième partition.
Pour le premier et le deuxième oracle, le retard est nul.
Pour le troisième oracle, la valeur de décalage est de 4 (car la valeur de sujets de 2 lignes précédentes est 4).
Pour le quatrième oracle, la valeur de décalage est de 4 (puisque la valeur de sujets de 2 lignes précédentes est 4).
Partition 4:
PHP s'est produit trois fois dans la quatrième partition.
La valeur de décalage pour les 1er et 2e PHP est nul.
La valeur de décalage pour le 3e PHP est de 4 (car la valeur sublaine_id de 2 lignes précédentes est 4).
Exemple 3:
Song les lignes par 2 en fonction de la colonne d'âge. Assurez-vous que vous avez créé le Pyspark DataFrame comme le montre l'exemple 1.
# Importer la fonction de fenêtreSortir:
Explication:
La partition est effectuée sur la base du Technologie1 la colonne et le décalage sont définis en fonction de la colonne d'âge. Le nombre total de partitions est 4.
Partition 1:
Le .Le filet s'est produit deux fois dans la première partition. Puisque nous avons spécifié le décalage du décalage comme 2, le décalage est nul pour les deux valeurs.
Partition 2:
Hadoop s'est produit une fois dans la deuxième partition. Donc, le retard est nul.
Partition 3:
Oracle s'est produit quatre fois dans la troisième partition.
Pour le premier et le deuxième oracle, le retard est nul.
Pour le troisième oracle, la valeur de décalage est de 21 (la valeur d'âge des deux lignes précédentes est de 21).
Pour le quatrième oracle, la valeur de décalage est de 22 (la valeur d'âge des deux lignes précédentes est de 22).
Partition 4:
PHP s'est produit trois fois dans la quatrième partition.
La valeur de décalage pour les 1er et 2e PHP est nul.
La valeur de décalage pour le 3e HP est de 21 (la valeur d'âge des deux lignes précédentes est de 21).
Conclusion
Nous avons appris à obtenir les valeurs de décalage dans le Pyspark DataFrame dans les lignes partitionnées. La fonction LAG () dans Pyspark est disponible dans le module de fenêtre qui est utilisé pour renvoyer les valeurs des lignes précédentes aux lignes actuelles. Nous avons appris les différents exemples en définissant les différents décalages.