Fonction de fenêtre de rang Pyspark

Fonction de fenêtre de rang Pyspark
La spécification d'un numéro de classement pour chaque rangée de Pyspark est la meilleure façon afin qu'il soit facile de trouver les lignes dans une partition.

Dans Pyspark, il est possible de partitionner les lignes dans le dataframe à l'aide de la fonction de fenêtre. Il est disponible dans le pyspark.SQL.fenêtre module.

L'avantage de la partition est d'atteindre la cohérence des données et le regroupement. Après avoir effectué la partition, nous pouvons définir les rangs sur les lignes de chaque partition en utilisant la fonction Rank ().

Il est possible de partitionner les lignes dans un dataframe en fonction des valeurs d'une colonne particulière. Donc, toutes les valeurs similaires sont définies sur chaque partition.

Fonction de fenêtre RANK ()

Rank in Pyspark est une fonction de fenêtre qui définit les rangs sur les lignes de chaque partition.

Le rang commence à partir de 1, et si des valeurs sont les mêmes en deux lignes ou plus, le même rang sera attribué. Cependant, pour la ligne suivante, le rang sera le nombre suivant comptant à partir des lignes liées (lignes avec le même rang).

Ici, les valeurs proviennent de la colonne où nous avons spécifié la colonne à l'intérieur commandé par(). Donc, sur la base des valeurs de cette colonne, rang() vérifie et attribue le numéro de rang.

L'essentiel est rang() n'est pas disponible directement. Nous devons l'importer à partir du pyspark.SQL.les fonctions module.

Module syntaxe pour importer

de Pyspark.SQL.Fonctions Import Rank

Sinon, nous pouvons également faire comme cela pour importer toutes les méthodes de ce module.

de Pyspark.SQL.Fonctions Import *

Nous devons suivre les étapes ci-dessous pour effectuer la partition et appliquer un rang sur les données partitionnées.

Pas:

  1. Créez un Pyspark DataFrame avec des valeurs similaires dans au moins une colonne.
  2. Partition Les données à l'aide de la méthode partition () disponible dans la fonction de fenêtre et les commandent en fonction de la colonne à l'aide de la fonction OrderBy ().

Syntaxe:

partition = fenêtre.partitionby («colonne»).OrderBy («colonne»)

Nous pouvons commander les données partitionnées avec la colonne partitionnée ou toute autre colonne.
Maintenant, en utilisant le sur() fonction sur les lignes partitionnées, vous pouvez utiliser le rang() fonction. Nous allons donc ajouter une colonne pour stocker le numéro de ligne à l'aide du withColumn () fonction.
Syntaxe:

dataframe_obj.withColumn ("name", rank ().sur (partition))

Donc, le nom spécifie le nom de la ligne ici, et DataFrame_OBJ est notre Pyspark DataFrame. Implémentons le code.

Exemple 1: Rank Technology1 par sujet_id

Ici, nous allons créer un pyspark dataframe qui a 5 colonnes: [«sujet_id», «nom», «âge», «Technology1», «Technology2»] avec 10 lignes et partitionner les lignes en fonction de Technologie1 en utilisant la fonction de fenêtre.

Enfin, nous donnerons des rangs aux lignes dans toutes les partitions en ajoutant une nouvelle colonne nommée RANG.

Importer Pyspark
de Pyspark.Importation SQL *
Spark_App = Sparkcession.constructeur.nom de l'application('_').getorCreate ()
étudiants = [(4, «sravan», 23, «php», «test»),
(4, «sravan», 23, «php», «test»),
(46, «Mounika», 22, '.Net ',' html '),
(4, «Deepika», 21, «Oracle», «html»),
(46, «Mounika», 22, «Oracle», «Testing»),
(12, «Chandrika», 22, «Hadoop», «C #»),
(12, «Chandrika», 22, «Oracle», «Testing»),
(4, «Sravan», 23, «Oracle», «C #»),
(4, «Deepika», 21, «Php», «C #»),
(46, «Mounika», 22, '.Net ',' test ')
]]
dataframe_obj = spark_app.CreatedataFrame (étudiants, [«sujet_id», «nom», «âge», «technologie1», «technologie2»])
Imprimer ("---------- réelle DataFrame ----------")
dataframe_obj.montrer()
# Importer la fonction de fenêtre
de Pyspark.SQL.Fenêtre d'importation de fenêtre
# importer le rang de Pyspark.SQL.les fonctions
de Pyspark.SQL.Fonctions Import Rank
#Partition le dataframe en fonction des valeurs de la colonne Technology1 et
#ordre les lignes dans chaque partition basée sur la colonne sujette
partition = fenêtre.partitionby ("Technology1").OrderBy ('Subject_id')
print ("---------- partitionné DataFrame ----------")
#Now mentionner le rang pour chaque ligne dans la colonne de rang
dataframe_obj.withColumn ("Rank", Rank ().sur (partition)).montrer()

Sortir:

Explication
La première sortie représente les données réelles présentes dans le dataframe. Dans la deuxième sortie, la partition est effectuée sur la base du Technologie1 colonne.

Partitions totales: 4

Partition 1:
.Le filet s'est produit deux fois dans la première partition. Les valeurs dans la colonne subs_id sont les mêmes. Par conséquent, la fonction Rank () renvoie le même rang pour les deux .Valeurs nettes.

Partition 2:
Hadoop s'est produit une fois dans la deuxième partition. Donc, le rang est 1.

Partition 3:

  • Oracle s'est produit quatre fois dans la troisième partition.
  • Pour les deux premiers oracle, les valeurs sujets. Donc, pour les deux, le rang 1 est affecté.
  • Pour le troisième oracle, le rang est de 3 (le rang 2 n'existera pas car, pour les deux oracles ci-dessus, 2 rangs ont été donnés).
  • Pour le dernier oracle, le rang est de 4.

Partition 4:
PHP s'est produit trois fois dans la quatrième partition, et les valeurs dans la colonne Subject_ID sont les mêmes pour tous. Par conséquent, le rang est 1 pour tous les php.

Exemple 2: Rank Technology2 by suject_id

Ici, nous partisons les lignes en fonction Technologie2 en utilisant la fonction de fenêtre. Enfin, nous donnerons des rangs aux lignes basées sur la colonne Subject_ID dans toutes les partitions en ajoutant une nouvelle colonne nommée "RANG".

(Si vous n'avez pas créé le Pyspark DataFrame, créez-le en utilisant le premier exemple.)

# Importer la fonction de fenêtre
de Pyspark.SQL.Fenêtre d'importation de fenêtre
# importer le rang de Pyspark.SQL.les fonctions
de Pyspark.SQL.Fonctions Import Rank
#Partition le dataframe en fonction des valeurs de la colonne Technology2 et
#ordre les lignes dans chaque partition basée sur la colonne sujette
partition = fenêtre.partitionby ("Technology2").OrderBy ('Subject_id')
print ("---------- partitionné DataFrame ----------")
#Now mentionner le rang pour chaque ligne dans la colonne de rang
dataframe_obj.withColumn ("Rank", Rank ().sur (partition)).montrer()

Sortir:

Explication
La sortie représente les données réelles présentes dans le dataframe.

Dans la deuxième sortie, la partition est effectuée sur la base du Technologie2 colonne et commandée sur la base de la colonne Subject_ID.

Partitions totales: 3

Partition 1:
C # s'est produit trois fois dans la première partition. Les deux premières valeurs dans la colonne Subject_ID sont les mêmes. Par conséquent, la fonction RANK () renvoie le même rang 1 pour les deux premiers C #, et pour le troisième C #, le rang est 3.

Partition 2:
HTML s'est produit deux fois dans la deuxième partition, et les valeurs Subject_id sont différentes pour ces deux. Ainsi, le rang 1 est attribué pour le premier HTML, et le rang 2 est attribué pour le second.

Partition 3:

  • Les tests se sont produits cinq fois dans la troisième partition.
  • Pour les deux premiers tests, les valeurs Subject_id sont les mêmes. Donc, pour les deux, le rang 1 est affecté.
  • Pour le troisième test, le rang est de 3 (le rang 2 n'existera pas car, pour les deux tests précédents, 2 rangs ont été donnés).
  • Pour les derniers tests, le rang était de 4.

Exemple 3: Rank Technology2 par âge

Ici, nous partisons les lignes en fonction Technologie2 en utilisant la fonction de fenêtre.

Enfin, nous donnerons des rangs aux lignes en fonction de la colonne d'âge dans toutes les partitions en ajoutant une nouvelle colonne nommée «RANG".

(Si vous n'avez pas créé le Pyspark DataFrame, créez-le en utilisant le premier exemple.)

# Importer la fonction de fenêtre
de Pyspark.SQL.Fenêtre d'importation de fenêtre
# importer le rang de Pyspark.SQL.les fonctions
de Pyspark.SQL.Fonctions Import Rank
#Partition le dataframe en fonction des valeurs de la colonne Technology2 et
#ordre les lignes dans chaque partition en fonction de la colonne d'âge
partition = fenêtre.partitionby ("Technology2").OrderBy («Age»)
print ("---------- partitionné DataFrame ----------")
#Now mentionner le rang pour chaque ligne dans la colonne de rang
dataframe_obj.withColumn ("Rank", Rank ().sur (partition)).montrer()

Sortir:

Explication
La sortie représente les données réelles présentes dans le dataframe. Dans la deuxième sortie, la partition est effectuée sur la base du Technologie2 colonne et commandée en fonction de la colonne d'âge.

Partitions totales: 3

Partition 1:
C # s'est produit trois fois dans la première partition. Et toutes les valeurs de la colonne d'âge sont différentes. Par conséquent, la fonction Rank () renvoie les rangs 1, 2 et 3 pour trois valeurs C #.

Partition 2:
HTML s'est produit deux fois dans la première partition. Toutes les valeurs de la colonne d'âge sont différentes. Par conséquent, la fonction Rank () renvoie les rangs 1 et 2 pour deux valeurs HTML.

Partition 3:

  • Les tests se sont produits cinq fois dans la troisième partition.
  • Pour les trois premiers tests, les valeurs d'âge sont les mêmes. Donc, pour trois, le rang 1 est affecté.
  • Pour les quatrième et cinquième tests, les valeurs d'âge sont les mêmes. Donc, pour deux, le rang 4 est affecté.

Conclusion

Nous avons discuté de partitionner les lignes dans le pyspark dataframe et de définir le rang dans chaque partition à l'aide de la fonction de fenêtre Rank (). Rank in Pyspark est une fonction de fenêtre qui définit les rangs sur les lignes de chaque partition. Assurez-vous d'utiliser cette commande tout en utilisant cette fonction de Pyspark.SQL.Fonctions Import Rank.