PySpark Pandas_Udf()

Pyspark Pandas Udf



La trasformazione di PySpark DataFrame è possibile utilizzando la funzione pandas_udf(). È una funzione definita dall'utente che viene applicata su PySpark DataFrame con la freccia. Possiamo eseguire le operazioni vettorializzate usando pandas_udf(). Può essere implementato passando questa funzione come decoratore. Immergiamoci in questa guida per conoscere la sintassi, i parametri e diversi esempi.

Argomento dei contenuti:

Se vuoi conoscere PySpark DataFrame e l'installazione del modulo, passa attraverso questo articolo .







Pyspark.sql.functions.pandas_udf()

Il pandas_udf () è disponibile nel modulo sql.functions in PySpark che può essere importato utilizzando la parola chiave 'from'. Viene utilizzato per eseguire le operazioni vettorializzate sul nostro PySpark DataFrame. Questa funzione è implementata come un decoratore passando tre parametri. Successivamente, possiamo creare una funzione definita dall'utente che restituisca i dati nel formato vettoriale (come usiamo serie/NumPy per questo) usando una freccia. All'interno di questa funzione, siamo in grado di restituire il risultato.



Struttura e sintassi:



Per prima cosa, diamo un'occhiata alla struttura e alla sintassi di questa funzione:

@pandas_udf(tipo di dati)
def nome_funzione(operazione) -> convert_format:
dichiarazione di ritorno

Qui, il nome_funzione è il nome della nostra funzione definita. Il tipo di dati specifica il tipo di dati restituito da questa funzione. Possiamo restituire il risultato usando la parola chiave 'return'. Tutte le operazioni vengono eseguite all'interno della funzione con l'assegnazione della freccia.





Pandas_udf (funzione e tipo di ritorno)

  1. Il primo parametro è la funzione definita dall'utente che gli viene passata.
  2. Il secondo parametro viene utilizzato per specificare il tipo di dati restituito dalla funzione.

Dati:

In questa intera guida, usiamo un solo PySpark DataFrame per la dimostrazione. Tutte le funzioni definite dall'utente che definiamo vengono applicate su questo PySpark DataFrame. Assicurati di creare questo DataFrame nel tuo ambiente prima dopo l'installazione di PySpark.



importa pyspark

da pyspark.sql importa SparkSession

linuxhint_spark_app = SparkSession.builder.appName( 'Suggerimento per Linux' ).getOrCreate()

da pyspark.sql.functions import pandas_udf

dall'importazione di pyspark.sql.types *

importa panda come panda

# dettagli vegetali

verdura =[{ 'tipo' : 'verdura' , 'nome' : 'pomodoro' , 'localizza_paese' : 'STATI UNITI D'AMERICA' , 'quantità' : 800 },

{ 'tipo' : 'frutta' , 'nome' : 'banana' , 'localizza_paese' : 'CINA' , 'quantità' : venti },

{ 'tipo' : 'verdura' , 'nome' : 'pomodoro' , 'localizza_paese' : 'STATI UNITI D'AMERICA' , 'quantità' : 800 },

{ 'tipo' : 'verdura' , 'nome' : 'Mango' , 'localizza_paese' : 'GIAPPONE' , 'quantità' : 0 },

{ 'tipo' : 'frutta' , 'nome' : 'limone' , 'localizza_paese' : 'INDIA' , 'quantità' : 1700 },

{ 'tipo' : 'verdura' , 'nome' : 'pomodoro' , 'localizza_paese' : 'STATI UNITI D'AMERICA' , 'quantità' : 1200 },

{ 'tipo' : 'verdura' , 'nome' : 'Mango' , 'localizza_paese' : 'GIAPPONE' , 'quantità' : 0 },

{ 'tipo' : 'frutta' , 'nome' : 'limone' , 'localizza_paese' : 'INDIA' , 'quantità' : 0 }

]

# crea il dataframe di mercato dai dati di cui sopra

market_df = linuxhint_spark_app.createDataFrame(vegetale)

mercato_df.show()

Produzione:

Qui, creiamo questo DataFrame con 4 colonne e 8 righe. Ora usiamo pandas_udf() per creare le funzioni definite dall'utente e applicarle a queste colonne.

Pandas_udf() con diversi tipi di dati

In questo scenario, creiamo alcune funzioni definite dall'utente con pandas_udf() e le applichiamo alle colonne e visualizziamo i risultati usando il metodo select(). In ogni caso, usiamo pandas.Series mentre eseguiamo le operazioni vettorializzate. Questo considera i valori della colonna come un array unidimensionale e l'operazione viene applicata alla colonna. Nello stesso decoratore, specifichiamo il tipo restituito dalla funzione.

Esempio 1: Pandas_udf() con tipo stringa

Qui creiamo due funzioni definite dall'utente con il tipo di ritorno stringa per convertire i valori della colonna di tipo stringa in lettere maiuscole e minuscole. Infine, applichiamo queste funzioni sulle colonne 'type' e 'locate_country'.

# Converti la colonna del tipo in maiuscolo con pandas_udf

@pandas_udf(StringType())

def type_upper_case(i: panda.Series) -> panda.Series:

return i.str.upper()

# Converti la colonna locate_country in minuscolo con pandas_udf

@pandas_udf(StringType())

def country_lower_case(i: panda.Series) -> panda.Series:

return i.str.lower()

# Visualizza le colonne usando select()

mercato_df.select( 'tipo' ,type_upper_case( 'tipo' ), 'localizza_paese' ,
country_lower_case( 'localizza_paese' )).spettacolo()

Produzione:

Spiegazione:

La funzione StringType() è disponibile nel modulo pyspark.sql.types. Abbiamo già importato questo modulo durante la creazione del PySpark DataFrame.

  1. Innanzitutto, UDF (funzione definita dall'utente) restituisce le stringhe in maiuscolo utilizzando la funzione str.upper(). Lo str.upper() è disponibile nella struttura dati della serie (poiché stiamo convertendo in serie con una freccia all'interno della funzione) che converte la stringa data in maiuscolo. Infine, questa funzione viene applicata alla colonna 'type' specificata all'interno del metodo select(). In precedenza, tutte le stringhe nella colonna type erano in minuscolo. Ora sono cambiati in maiuscolo.
  2. In secondo luogo, UDF restituisce le stringhe in maiuscolo utilizzando la funzione str.lower(). Lo str.lower() è disponibile nella struttura dei dati della serie che converte la stringa data in minuscolo. Infine, questa funzione viene applicata alla colonna 'type' specificata all'interno del metodo select(). In precedenza, tutte le stringhe nella colonna type erano in maiuscolo. Ora sono cambiati in minuscolo.

Esempio 2: Pandas_udf() con tipo intero

Creiamo una UDF che converte la colonna intera PySpark DataFrame nella serie Pandas e aggiungiamo 100 a ciascun valore. Passa la colonna 'quantity' a questa funzione all'interno del metodo select().

# Aggiungi 100

@pandas_udf(IntegerType())

def add_100(i: panda.Serie) -> panda.Serie:

ritorna io+ 100

# Passa la colonna della quantità alla funzione sopra e visualizza.

market_df.select( 'quantità' ,aggiungi_100( 'quantità' )).spettacolo()

Produzione:

Spiegazione:

All'interno dell'UDF, iteriamo tutti i valori e li convertiamo in Series. Successivamente, aggiungiamo 100 a ciascun valore della serie. Infine, passiamo la colonna 'quantità' a questa funzione e possiamo vedere che 100 viene aggiunto a tutti i valori.

Pandas_udf() con diversi tipi di dati utilizzando Groupby() e Agg()

Diamo un'occhiata agli esempi per passare l'UDF alle colonne aggregate. Qui, i valori della colonna vengono prima raggruppati utilizzando la funzione groupby() e l'aggregazione viene eseguita utilizzando la funzione agg(). Passiamo la nostra UDF all'interno di questa funzione aggregata.

Sintassi:

pyspark_dataframe_object.groupby( 'raggruppamento_colonna' ).agg(UDF
(pyspark_dataframe_object[ 'colonna' ]))

Qui, i valori nella colonna di raggruppamento vengono raggruppati per primi. Quindi, l'aggregazione viene eseguita su ciascun dato raggruppato rispetto alla nostra UDF.

Esempio 1: Pandas_udf() con media aggregata()

Qui creiamo una funzione definita dall'utente con un tipo di ritorno float. All'interno della funzione, calcoliamo la media usando la funzione mean(). Questa FDU viene passata alla colonna 'quantità' per ottenere la quantità media per ogni tipo.

# restituisce la media/media

@pandas_udf( 'galleggiante' )

def media_funzione(i: panda.Serie) -> float:

return i.mean()

# Passa la colonna quantità alla funzione raggruppando la colonna tipo.

market_df.groupby( 'tipo' ).agg(media_funzione(mercato_df[ 'quantità' ])).spettacolo()

Produzione:

Stiamo raggruppando in base agli elementi nella colonna 'tipo'. Si formano due gruppi: 'frutta' e 'verdura'. Per ogni gruppo viene calcolata e restituita la media.

Esempio 2: Pandas_udf() con Aggregate Max() e Min()

Qui creiamo due funzioni definite dall'utente con il tipo di ritorno intero (int). La prima UDF restituisce il valore minimo e la seconda UDF restituisce il valore massimo.

# pandas_udf che restituiscono il valore minimo

@pandas_udf( 'int' )

def min_(i: panda.Serie) -> int:

ritorno i.min()

# pandas_udf che restituiscono il valore massimo

@pandas_udf( 'int' )

def max_(i: panda.Serie) -> int:

restituisce i.max()

# Passa la colonna quantità a min_ pandas_udf raggruppando locate_country.

market_df.groupby( 'localizza_paese' ).agg(min_(mercato_df[ 'quantità' ])).spettacolo()

# Passa la colonna della quantità a max_ pandas_udf raggruppando locate_country.

market_df.groupby( 'localizza_paese' ).agg(max_(mercato_df[ 'quantità' ])).spettacolo()

Produzione:

Per restituire i valori minimo e massimo, utilizziamo le funzioni min() e max() nel tipo restituito delle UDF. Raggruppiamo ora i dati nella colonna 'locate_country'. Si formano quattro gruppi (“CHINA”, “INDIA”, “JAPAN”, “USA”). Per ogni gruppo, restituiamo la quantità massima. Allo stesso modo, restituiamo la quantità minima.

Conclusione

Fondamentalmente, pandas_udf () viene utilizzato per eseguire le operazioni vettorializzate sul nostro PySpark DataFrame. Abbiamo visto come creare la pandas_udf() e applicarla al PySpark DataFrame. Per una migliore comprensione, abbiamo discusso i diversi esempi considerando tutti i tipi di dati (stringa, float e intero). È possibile utilizzare pandas_udf() con groupby() tramite la funzione agg().