Artificial Intelligence (AI)

Working with Window Functions in PySpark

Written by smirow

Introduction

Apprendre les fonctions de fenêtre dans PySpark peut être difficile mais cela en vaut la peine. Les fonctions de fenêtre sont un outil puissant pour analyser les données et peuvent vous aider à obtenir des informations que vous n'auriez peut-être pas vues autrement. En comprenant comment utiliser les fonctions Windows dans Spark ; vous pouvez faire passer vos compétences en analyse de données au niveau supérieur et prendre des décisions plus éclairées. Que vous travailliez avec des ensembles de données grands ou petits, l'apprentissage des fonctions de fenêtre dans Spark vous permettra de manipuler et d'analyser les données de manière nouvelle et passionnante.

Fonctions de fenêtre dans PySpark

Dans ce blog, nous comprendrons d'abord le concept des fonctions de fenêtre, puis discuterons de la façon de les utiliser avec Spark SQL et l'API PySpark DataFrame. Ainsi, à la fin de cet article, vous comprendrez comment utiliser les fonctions de fenêtre avec des ensembles de données réels et obtiendrez des informations essentielles pour les entreprises.

Objectifs d'apprentissage

  • Comprendre le concept des fonctions de fenêtre.
  • Travailler avec des fonctions de fenêtre à l'aide d'ensembles de données.
  • Découvrez les informations à l’aide des fonctions de la fenêtre.
  • Utilisez Spark SQL et l'API DataFrame pour travailler avec les fonctions de fenêtre.

Cet article a été publié dans le cadre du Blogathon sur la science des données.

Que sont les fonctions de fenêtre ?

Les fonctions de fenêtre permettent d'analyser les données au sein d'un groupe de lignes liées les unes aux autres. Ils permettent aux utilisateurs d'effectuer des transformations complexes sur les lignes d'une trame de données ou d'un ensemble de données associés les unes aux autres en fonction de certains critères de partitionnement et de classement.

Les fonctions de fenêtre fonctionnent sur une partition spécifique d'une trame de données ou d'un ensemble de données défini par un ensemble de colonnes de partitionnement. La clause ORDER BY partitionne les données dans une fonction de fenêtre pour les organiser dans un ordre spécifique. Les fonctions de fenêtre effectuent ensuite des calculs sur une fenêtre glissante de lignes qui inclut la ligne actuelle et un sous-ensemble des lignes précédentes « et »/« ou » suivantes, comme spécifié dans le cadre de la fenêtre.

Travailler avec les fonctions de fenêtre dans PySpark

Certains exemples courants de fonctions de fenêtre incluent le calcul de moyennes mobiles, le classement ou le tri des lignes en fonction d'une colonne ou d'un groupe de colonnes spécifique, le calcul des totaux cumulés et la recherche de la première ou de la dernière valeur d'un groupe de lignes. Grâce aux puissantes fonctions de fenêtre de Spark, les utilisateurs peuvent effectuer des analyses et des agrégations complexes sur de grands ensembles de données avec une relative facilité, ce qui en fait un outil populaire pour le traitement et l'analyse du Big Data.

"

Fonctions de fenêtre en SQL

Spark SQL prend en charge trois types de fonctions de fenêtre :

  • Fonctions de classement : – Ces fonctions attribuent un rang à chaque ligne dans une partition de l'ensemble de résultats. Par exemple, la fonction ROW_NUMBER() donne un numéro séquentiel unique à chaque ligne de la partition.
  • Fonctions d'analyse : – Ces fonctions calculent des valeurs agrégées sur une fenêtre de lignes. Par exemple, la fonction SUM() calcule la somme d'une colonne sur une fenêtre de lignes.
  • Fonctions de valeur : – Ces fonctions calculent une valeur analytique pour chaque ligne d'une partition, en fonction des valeurs des autres lignes de la même partition. Par exemple, la fonction LAG() renvoie la valeur d'une colonne de la ligne précédente de la partition.

Création de DataFrames

Nous allons créer un exemple de dataframe afin de pouvoir pratiquement travailler avec différentes fonctions de fenêtre. Nous essaierons également de répondre à quelques questions à l’aide de ces données et fonctions de fenêtre.

La base de données contient des détails sur les employés tels que leur nom, leur désignation, leur numéro d'employé, leur date d'embauche, leur salaire, etc. Au total, nous avons 8 colonnes qui sont les suivantes :

  • 'emno' : Cette colonne contient le numéro de l'employé.
  • 'ename' : cette colonne contient les noms des employés.
  • « emploi » : cette colonne contient des informations sur les titres de poste des employés.
  • « date d'embauche » : cette colonne indique la date d'embauche de l'employé.
  • 'sal' : les détails du salaire figurent dans cette colonne.
  • « comm » : cette colonne contient les détails des commissions des employés, le cas échéant.
  • 'deptno' : Le numéro du service auquel appartient l'employé se trouve dans cette colonne.
# Create Sample Dataframe
employees = [
    (7369, "SMITH", "CLERK", "17-Dec-80", 800, 20, 10),
    (7499, "ALLEN", "SALESMAN", "20-Feb-81", 1600, 300, 30),
    (7521, "WARD", "SALESMAN", "22-Feb-81", 1250, 500, 30),
    (7566, "JONES", "MANAGER", "2-Apr-81", 2975, 0, 20),
    (7654, "MARTIN", "SALESMAN", "28-Sep-81", 1250, 1400, 30),
    (7698, "BLAKE", "MANAGER", "1-May-81", 2850, 0, 30),
    (7782, "CLARK", "MANAGER", "9-Jun-81", 2450, 0, 10),
    (7788, "SCOTT", "ANALYST", "19-Apr-87", 3000, 0, 20),
    (7629, "ALEX", "SALESMAN", "28-Sep-79", 1150, 1400, 30),
    (7839, "KING", "PRESIDENT", "17-Nov-81", 5000, 0, 10),
    (7844, "TURNER", "SALESMAN", "8-Sep-81", 1500, 0, 30),
    (7876, "ADAMS", "CLERK", "23-May-87", 1100, 0, 20)    
]
# create dataframe
emp_df = spark.createDataFrame(employees, 
           ["empno", "ename", "job", "hiredate", "sal", "comm", "deptno"])
emp_df.show()

# Output:
+-----+------+---------+---------+----+----+------+
|empno| ename|      job| hiredate| sal|comm|deptno|
+-----+------+---------+---------+----+----+------+
| 7369| SMITH|    CLERK|17-Dec-80| 800|  20|    10|
| 7499| ALLEN| SALESMAN|20-Feb-81|1600| 300|    30|
| 7521|  WARD| SALESMAN|22-Feb-81|1250| 500|    30|
| 7566| JONES|  MANAGER| 2-Apr-81|2975|   0|    20|
| 7654|MARTIN| SALESMAN|28-Sep-81|1250|1400|    30|
| 7698| BLAKE|  MANAGER| 1-May-81|2850|   0|    30|
| 7782| CLARK|  MANAGER| 9-Jun-81|2450|   0|    10|
| 7788| SCOTT|  ANALYST|19-Apr-87|3000|   0|    20|
| 7629|  ALEX| SALESMAN|28-Sep-79|1150|1400|    30|
| 7839|  KING|PRESIDENT|17-Nov-81|5000|   0|    10|
| 7844|TURNER| SALESMAN| 8-Sep-81|1500|   0|    30|
| 7876| ADAMS|    CLERK|23-May-87|1100|   0|    20|
+-----+------+---------+---------+----+----+------+

Nous allons maintenant vérifier le schéma :

# Checking the schema

emp_df.printSchema()

# Output:-
root
 |-- empno: long (nullable = true)
 |-- ename: string (nullable = true)
 |-- job: string (nullable = true)
 |-- hiredate: string (nullable = true)
 |-- sal: long (nullable = true)
 |-- comm: long (nullable = true)
 |-- deptno: long (nullable = true)

Créez une vue temporaire du DataFrame 'emp_df' avec le nom « emp ». Cela nous permet d'interroger le DataFrame en utilisant la syntaxe SQL dans Spark SQL comme s'il s'agissait d'une table. La vue temporaire n'est valide que pour la durée de la session Spark.

emp_df.createOrReplaceTempView("emp")

Résolution des problèmes à l'aide des fonctions de fenêtre

Ici, nous allons résoudre plusieurs problèmes à l'aide des fonctions Windows :

T1. Classez le salaire au sein de chaque département.

# Using spark sql

rank_df = spark.sql(
        """SELECT empno, ename, job, deptno, sal, 
        RANK() OVER (PARTITION BY deptno ORDER BY sal DESC) AS rank FROM emp""")
rank_df.show()

# Using PySpark

windowSpec = Window.partitionBy(col('deptno')).orderBy(col('sal').desc())
            ranking_result_df = emp_df.select('empno', 'ename', 'job', 'deptno', 'sal', 
            F.rank().over(windowSpec).alias('rank'))
ranking_result_df.show()

# Output:-
+-----+------+---------+------+----+----+
|empno| ename|      job|deptno| sal|rank|
+-----+------+---------+------+----+----+
| 7839|  KING|PRESIDENT|    10|5000|   1|
| 7782| CLARK|  MANAGER|    10|2450|   2|
| 7369| SMITH|    CLERK|    10| 800|   3|
| 7788| SCOTT|  ANALYST|    20|3000|   1|
| 7566| JONES|  MANAGER|    20|2975|   2|
| 7876| ADAMS|    CLERK|    20|1100|   3|
| 7698| BLAKE|  MANAGER|    30|2850|   1|
| 7499| ALLEN| SALESMAN|    30|1600|   2|
| 7844|TURNER| SALESMAN|    30|1500|   3|
| 7521|  WARD| SALESMAN|    30|1250|   4|
| 7654|MARTIN| SALESMAN|    30|1250|   4|
| 7629|  ALEX| SALESMAN|    30|1150|   6|
+-----+------+---------+------+----+----+

Approche pour le code PySpark

  • La fonction Window partitionne les données par numéro de service à l'aide de partitionBy(col('deptno')), puis classe les données dans chaque partition par salaire par ordre décroissant à l'aide de orderBy(col('sal').desc()). La variable windowSpec contient la spécification finale de la fenêtre.
  • 'emp_df' est la trame de données qui contient les données des employés, y compris les colonnes empno, ename, job, deptno et sal.
  • La fonction de classement est appliquée à la colonne salaire à l'aide de « F.rank().over(windowSpec) » dans l'instruction select. La colonne résultante a un alias « rang ».
  • Il créera une trame de données, 'ranking_result_df', qui comprend l'emno, l'ename, l'emploi, le deptno et le salaire. Il comporte également une nouvelle colonne, « rang », qui représente le rang du salaire de l'employé au sein de son service.

Sortir:

Le résultat a un rang salarial dans chaque département.

Q2. Classement dense des salaires au sein de chaque département.

# Using Spark SQL
dense_df = spark.sql(
        """SELECT empno, ename, job, deptno, sal, 
        DENSE_RANK() OVER (PARTITION BY deptno ORDER BY sal DESC) 
        AS dense_rank FROM emp""")
dense_df.show()

# Using PySpark
windowSpec = Window.partitionBy(col('deptno')).orderBy(col('sal').desc())
dense_ranking_df=emp_df.select('empno', 'ename', 'job', 'deptno', 'sal', 
                      F.dense_rank().over(windowSpec).alias('dense_rank'))
dense_ranking_df.show()

# Output:-
+-----+------+---------+------+----+----------+
|empno| ename|      job|deptno| sal|dense_rank|
+-----+------+---------+------+----+----------+
| 7839|  KING|PRESIDENT|    10|5000|         1|
| 7782| CLARK|  MANAGER|    10|2450|         2|
| 7369| SMITH|    CLERK|    10| 800|         3|
| 7788| SCOTT|  ANALYST|    20|3000|         1|
| 7566| JONES|  MANAGER|    20|2975|         2|
| 7876| ADAMS|    CLERK|    20|1100|         3|
| 7698| BLAKE|  MANAGER|    30|2850|         1|
| 7499| ALLEN| SALESMAN|    30|1600|         2|
| 7844|TURNER| SALESMAN|    30|1500|         3|
| 7521|  WARD| SALESMAN|    30|1250|         4|
| 7654|MARTIN| SALESMAN|    30|1250|         4|
| 7629|  ALEX| SALESMAN|    30|1150|         5|
+-----+------+---------+------+----+----------+

Approche pour le code PySpark

  • Tout d'abord, créez une spécification de fenêtre à l'aide de la fonction Window, qui partitionne le DataFrame 'emp_df' par deptno et l'ordonne en descendant la colonne 'sal'.
  • Ensuite, la fonction dense_rank() est appliquée sur la spécification de fenêtre, qui attribue un rang dense à chaque ligne de chaque partition en fonction de son ordre de tri.
  • Enfin, un nouveau DataFrame appelé “dense_ranking_df” est créé en sélectionnant des colonnes spécifiques dans emp_df (c'est-à-dire “empno”, “ename”, “job”, “deptno” et “sal”) et en ajoutant une nouvelle colonne “dense_rank” qui contient les valeurs de classement dense calculées par la fonction window.
  • Enfin, affichez le DataFrame résultant au format tabulaire.

Sortir:

Le résultat a un rang élevé en termes de salaire.

Q3. Numérotez la ligne au sein de chaque département.

# Using Spark SQL 
row_df = spark.sql(
        """SELECT empno, ename, job, deptno, sal, 
        ROW_NUMBER() OVER (PARTITION BY deptno ORDER BY sal DESC)
         AS row_num FROM emp """)
row_df.show()

# Using PySpark code
windowSpec = Window.partitionBy(col('deptno')).orderBy(col('sal').desc())
row_num_df = emp_df.select('empno', 'ename', 'job', 'deptno', 'sal', 
               F.row_number().over(windowSpec).alias('row_num'))
row_num_df.show()

# Output:-
+-----+------+---------+------+----+-------+
|empno| ename|      job|deptno| sal|row_num|
+-----+------+---------+------+----+-------+
| 7839|  KING|PRESIDENT|    10|5000|      1|
| 7782| CLARK|  MANAGER|    10|2450|      2|
| 7369| SMITH|    CLERK|    10| 800|      3|
| 7788| SCOTT|  ANALYST|    20|3000|      1|
| 7566| JONES|  MANAGER|    20|2975|      2|
| 7876| ADAMS|    CLERK|    20|1100|      3|
| 7698| BLAKE|  MANAGER|    30|2850|      1|
| 7499| ALLEN| SALESMAN|    30|1600|      2|
| 7844|TURNER| SALESMAN|    30|1500|      3|
| 7521|  WARD| SALESMAN|    30|1250|      4|
| 7654|MARTIN| SALESMAN|    30|1250|      5|
| 7629|  ALEX| SALESMAN|    30|1150|      6|
+-----+------+---------+------+----+-------+

Approche pour le code PySpark

  • La première ligne définit une spécification de fenêtre pour le calcul à l'aide des fonctions Window.partitionBy() et Window.orderBy(). Cette fenêtre est partitionnée par la colonne deptno et classée par la colonne sal par ordre décroissant.
  • La deuxième ligne crée un nouveau DataFrame appelé 'row_num_df', une projection de 'emp_df' avec une colonne supplémentaire appelée 'row_num' et contient les détails des numéros de ligne.
  • La fonction show() affiche le DataFrame résultant, qui affiche les colonnes empno, ename, job, deptno, sal et row_num de chaque employé.

Sortir:

La sortie aura le numéro de ligne de chaque employé au sein de son service en fonction de son salaire.

Q4. Montant total cumulé du salaire au sein de chaque département.

# Using Spark SQL
running_sum_df = spark.sql(
          """SELECT empno, ename, job, deptno, sal, 
          SUM(sal) OVER (PARTITION BY deptno ORDER BY sal DESC) 
          AS running_total FROM emp
          """)
running_sum_df.show()

# Using PySpar
windowSpec = Window.partitionBy(col('deptno')).orderBy(col('sal').desc())
running_sum_sal_df= emp_df.select('empno', 'ename', 'job', 'deptno', 'sal', 
                         F.sum('sal').over(windowSpec).alias('running_total'))
running_sum_sal_df.show()

# Output:-
+-----+------+---------+------+----+-------------+
|empno| ename|      job|deptno| sal|running_total|
+-----+------+---------+------+----+-------------+
| 7839|  KING|PRESIDENT|    10|5000|         5000|
| 7782| CLARK|  MANAGER|    10|2450|         7450|
| 7369| SMITH|    CLERK|    10| 800|         8250|
| 7788| SCOTT|  ANALYST|    20|3000|         3000|
| 7566| JONES|  MANAGER|    20|2975|         5975|
| 7876| ADAMS|    CLERK|    20|1100|         7075|
| 7698| BLAKE|  MANAGER|    30|2850|         2850|
| 7499| ALLEN| SALESMAN|    30|1600|         4450|
| 7844|TURNER| SALESMAN|    30|1500|         5950|
| 7521|  WARD| SALESMAN|    30|1250|         8450|
| 7654|MARTIN| SALESMAN|    30|1250|         8450|
| 7629|  ALEX| SALESMAN|    30|1150|         9600|
+-----+------+---------+------+----+-------------+

Approche pour le code PySpark

  • Tout d'abord, une spécification de fenêtre est définie à l'aide des méthodes « Window.partitionBy() » et « Window.orderBy() ». La méthode « partitionBy() » partitionne les données par colonne « deptno », tandis que la méthode « orderBy() » classe les données par colonne « sal » par ordre décroissant.
  • Ensuite, la fonction « sum() » est appliquée à la colonne « sal » en utilisant la méthode « over() » pour calculer le total cumulé des salaires au sein de chaque département. Le résultat sera dans un nouveau DataFrame appelé “running_sum_sal_df”, qui contient les colonnes “empno”, “ename”, “job”, “deptno”, “sal” et “running_total”.
  • Enfin, la méthode « show() » est appelée sur le DataFrame « running_sum_sal_df » pour afficher le résultat de la requête. Le DataFrame résultant affiche le total cumulé des salaires de chaque employé et d'autres détails tels que le nom, le numéro de service et le poste.

Sortir:

Le résultat contiendra un total cumulé des données salariales de chaque département.

Q5 : Le prochain salaire au sein de chaque département.

Pour trouver le prochain salaire au sein de chaque département, nous utilisons la fonction LEAD.

La fonction de fenêtre lead() permet d'obtenir la valeur de l'expression dans la ligne suivante de la partition de fenêtre. Il renvoie une colonne pour chaque colonne d'entrée, où chaque colonne contiendra la valeur de la colonne d'entrée pour la ligne de décalage au-dessus de la ligne actuelle dans la partition de fenêtre. La syntaxe de la fonction lead est :- lead(col, offset=1, default=None).

# Using Spark SQL
next_sal_df = spark.sql(
    """SELECT empno, ename, job, deptno, sal, LEAD(sal, 1) 
    OVER (PARTITION BY deptno ORDER BY sal DESC) AS next_val FROM emp
    """)
next_sal_df.show()

# Output:-
+-----+------+---------+------+----+--------+
|empno| ename|      job|deptno| sal|next_val|
+-----+------+---------+------+----+--------+
| 7839|  KING|PRESIDENT|    10|5000|    2450|
| 7782| CLARK|  MANAGER|    10|2450|     800|
| 7369| SMITH|    CLERK|    10| 800|    null|
| 7788| SCOTT|  ANALYST|    20|3000|    2975|
| 7566| JONES|  MANAGER|    20|2975|    1100|
| 7876| ADAMS|    CLERK|    20|1100|    null|
| 7698| BLAKE|  MANAGER|    30|2850|    1600|
| 7499| ALLEN| SALESMAN|    30|1600|    1500|
| 7844|TURNER| SALESMAN|    30|1500|    1250|
| 7521|  WARD| SALESMAN|    30|1250|    1250|
| 7654|MARTIN| SALESMAN|    30|1250|    1150|
| 7629|  ALEX| SALESMAN|    30|1150|    null|
+-----+------+---------+------+----+--------+

# Using PySpark
windowSpec = Window.partitionBy(col('deptno')).orderBy(col('sal').desc())
next_salary_df = emp_df.select('empno', 'ename', 'job', 'deptno', 'sal', 
               F.lead('sal', offset=1, default=0).over(windowSpec).alias('next_val'))
next_salary_df.show()

# Output:-
+-----+------+---------+------+----+--------+
|empno| ename|      job|deptno| sal|next_val|
+-----+------+---------+------+----+--------+
| 7839|  KING|PRESIDENT|    10|5000|    2450|
| 7782| CLARK|  MANAGER|    10|2450|     800|
| 7369| SMITH|    CLERK|    10| 800|       0|
| 7788| SCOTT|  ANALYST|    20|3000|    2975|
| 7566| JONES|  MANAGER|    20|2975|    1100|
| 7876| ADAMS|    CLERK|    20|1100|       0|
| 7698| BLAKE|  MANAGER|    30|2850|    1600|
| 7499| ALLEN| SALESMAN|    30|1600|    1500|
| 7844|TURNER| SALESMAN|    30|1500|    1250|
| 7521|  WARD| SALESMAN|    30|1250|    1250|
| 7654|MARTIN| SALESMAN|    30|1250|    1150|
| 7629|  ALEX| SALESMAN|    30|1150|       0|
+-----+------+---------+------+----+--------+

Approche pour le code PySpark

  • Premièrement, la fonction de fenêtre permet de partitionner les lignes du DataFrame par numéro de département (deptno) et de classer les salaires par ordre décroissant au sein de chaque partition.
  • La fonction lead() est ensuite appliquée à la colonne 'sal' ordonnée dans chaque partition pour renvoyer le salaire de l'employé suivant (avec un décalage de 1), et la valeur par défaut est 0 s'il n'y a pas d'employé suivant.
  • Le DataFrame résultant 'next_salary_df' contient des colonnes pour le numéro d'employé (empno), le nom (ename), le titre du poste (job), le numéro de département (deptno), le salaire actuel (sal) et le prochain salaire (next_val).

Sortir:

La sortie contient le salaire du prochain employé du département en fonction de l'ordre décroissant du salaire.

Q6. Salaire antérieur au sein de chaque département.

Pour calculer le salaire précédent, nous utilisons la fonction LAG.

La fonction lag renvoie la valeur d'une expression à un décalage donné avant la ligne actuelle dans la partition de fenêtre. La syntaxe de la fonction lag est :- lag(expr, offset=1, default=None).over(windowSpec).

# Using Spark SQL
preious_sal_df = spark.sql(
    """SELECT empno, ename, job, deptno, sal, LAG(sal, 1) 
           OVER (PARTITION BY deptno ORDER BY sal DESC) 
           AS prev_val FROM emp
         """)
preious_sal_df.show()

# Output:-
+-----+------+---------+------+----+--------+
|empno| ename|      job|deptno| sal|prev_val|
+-----+------+---------+------+----+--------+
| 7839|  KING|PRESIDENT|    10|5000|    null|
| 7782| CLARK|  MANAGER|    10|2450|    5000|
| 7369| SMITH|    CLERK|    10| 800|    2450|
| 7788| SCOTT|  ANALYST|    20|3000|    null|
| 7566| JONES|  MANAGER|    20|2975|    3000|
| 7876| ADAMS|    CLERK|    20|1100|    2975|
| 7698| BLAKE|  MANAGER|    30|2850|    null|
| 7499| ALLEN| SALESMAN|    30|1600|    2850|
| 7844|TURNER| SALESMAN|    30|1500|    1600|
| 7521|  WARD| SALESMAN|    30|1250|    1500|
| 7654|MARTIN| SALESMAN|    30|1250|    1250|
| 7629|  ALEX| SALESMAN|    30|1150|    1250|
+-----+------+---------+------+----+--------+

# Using PySpark
windowSpec = Window.partitionBy(col('deptno')).orderBy(col('sal').desc())
prev_sal_df = emp_df.select('empno', 'ename', 'job', 'deptno', 'sal', 
                F.lag('sal', offset=1, default=0).over(windowSpec).alias('prev_val'))
prev_sal_df.show()

# Output:-
+-----+------+---------+------+----+--------+
|empno| ename|      job|deptno| sal|prev_val|
+-----+------+---------+------+----+--------+
| 7839|  KING|PRESIDENT|    10|5000|       0|
| 7782| CLARK|  MANAGER|    10|2450|    5000|
| 7369| SMITH|    CLERK|    10| 800|    2450|
| 7788| SCOTT|  ANALYST|    20|3000|       0|
| 7566| JONES|  MANAGER|    20|2975|    3000|
| 7876| ADAMS|    CLERK|    20|1100|    2975|
| 7698| BLAKE|  MANAGER|    30|2850|       0|
| 7499| ALLEN| SALESMAN|    30|1600|    2850|
| 7844|TURNER| SALESMAN|    30|1500|    1600|
| 7521|  WARD| SALESMAN|    30|1250|    1500|
| 7654|MARTIN| SALESMAN|    30|1250|    1250|
| 7629|  ALEX| SALESMAN|    30|1150|    1250|
+-----+------+---------+------+----+--------+

Approche pour le code PySpark

  • Le window.partitionBy(col('deptno')) spécifie la partition de la fenêtre. Cela signifie que la fonction fenêtre fonctionne séparément pour chaque département.
  • Ensuite, orderBy(col('sal').desc()) spécifie l'ordre des salaires et classera les salaires au sein de chaque département par ordre décroissant.
  • F.lag('sal', offset=1, default=0).over(windowSpec).alias('prev_val') crée une nouvelle colonne appelée prev_val dans le DataFrame 'prev_sal_df'.
  • Pour chaque ligne, cette colonne contient la valeur de la colonne 'sal' de la ligne précédente dans la fenêtre définie par windowSpec.
  • Le paramètre offset=1 indique que la ligne précédente doit être une ligne avant la ligne actuelle, et default=0 spécifie la valeur par défaut pour la première ligne de chaque partition (puisqu'il n'y a pas de ligne précédente pour la première ligne).
  • Enfin, prev_sal_df.show() affiche le DataFrame résultant.

Sortir:

Le résultat représente le salaire précédent de chaque employé au sein de chaque département, en fonction du classement des salaires par ordre décroissant.

Q7. Premier salaire au sein de chaque département et comparé à celui de chaque membre de chaque département.

# Using Spark SQL
first_val_df = spark.sql("""SELECT empno, ename, job, deptno, sal, 
                   FIRST_VALUE(sal) OVER (PARTITION BY deptno ORDER BY sal DESC) 
                   AS first_val FROM emp """)
first_val_df.show()

# Using PySpark 
windowSpec = Window.partitionBy(col('deptno')).orderBy(col('sal').desc())
first_value_df = emp_df.select('empno', 'ename', 'job', 'deptno', 'sal', 
                   F.first('sal').over(windowSpec).alias('first_val'))
first_value_df.show()

# Output:-
+-----+------+---------+------+----+---------+
|empno| ename|      job|deptno| sal|first_val|
+-----+------+---------+------+----+---------+
| 7839|  KING|PRESIDENT|    10|5000|     5000|
| 7782| CLARK|  MANAGER|    10|2450|     5000|
| 7369| SMITH|    CLERK|    10| 800|     5000|
| 7788| SCOTT|  ANALYST|    20|3000|     3000|
| 7566| JONES|  MANAGER|    20|2975|     3000|
| 7876| ADAMS|    CLERK|    20|1100|     3000|
| 7698| BLAKE|  MANAGER|    30|2850|     2850|
| 7499| ALLEN| SALESMAN|    30|1600|     2850|
| 7844|TURNER| SALESMAN|    30|1500|     2850|
| 7521|  WARD| SALESMAN|    30|1250|     2850|
| 7654|MARTIN| SALESMAN|    30|1250|     2850|
| 7629|  ALEX| SALESMAN|    30|1150|     2850|
+-----+------+---------+------+----+---------+

Approche pour le code PySpark

  • Tout d'abord, créez un objet WindowSpec qui partitionne les données par numéro de service (deptno) et les classe par salaire (sal) par ordre décroissant.
  • Applique ensuite la fonction analytique first() à la colonne 'sal' sur la fenêtre définie par windowSpec. Cette fonction renvoie la première valeur de la colonne « sal » dans chaque partition (c'est-à-dire chaque groupe deptno) classée par « sal » décroissant. La colonne résultante a un nouveau nom, « first_val ».
  • Attribue désormais le DataFrame résultant, qui contient les colonnes sélectionnées et une nouvelle colonne, « first_val », qui affiche le premier salaire le plus élevé pour chaque département en fonction de l'ordre décroissant des valeurs de salaire, à une nouvelle variable appelée « first_value_df ».

Sortir:

La sortie affiche le premier salaire le plus élevé pour chaque département dans un DataFrame d’employé.

Conclusion

Dans cet article, nous découvrons les fonctions des fenêtres. Spark SQL propose trois types de fonctions de fenêtre : les fonctions de classement, les fonctions d'agrégation et les fonctions de valeur. À l'aide de cette fonction, nous avons travaillé sur un ensemble de données pour trouver des informations importantes et précieuses. Les fonctions Spark Window offrent de puissants outils d'analyse de données tels que le classement, l'analyse et les calculs de valeur. Qu'il s'agisse d'analyser les informations salariales par département ou d'utiliser des exemples pratiques avec PySpark et SQL, ces fonctions fournissent des outils essentiels pour un traitement et une analyse efficaces des données dans Spark.

Points clés à retenir

  • Nous avons découvert les fonctions de fenêtre et travaillé avec elles à l'aide de Spark SQL et de l'API PySpark DataFrame.
  • Nous utilisons des fonctions telles que Rank, dense_rank, Row_number, Lag, Lead, GroupBy, PartitionBy et d'autres fonctions pour fournir une analyse appropriée.
  • Nous avons également vu les solutions détaillées étape par étape au problème et analysé le résultat à la fin de chaque énoncé du problème.

Cette étude de cas vous aide à mieux comprendre les fonctions de PySpark. Si vous avez des opinions ou des questions, commentez ci-dessous. Connectez-vous avec moi sur LinkedIn pour en savoir plus. Continue d'apprendre!!!

Les médias présentés dans cet article n'appartiennent pas à Analytics Vidhya et sont utilisés à la discrétion de l'auteur.

About the author

smirow

Leave a Comment