Come leggere e scrivere i dati di una tabella in PySpark

Come Leggere E Scrivere I Dati Di Una Tabella In Pyspark



L'elaborazione dei dati in PySpark è più veloce se i dati vengono caricati sotto forma di tabella. Con questo, utilizzando le espressioni SQl, l'elaborazione sarà rapida. Pertanto, convertire PySpark DataFrame/RDD in una tabella prima di inviarlo per l'elaborazione è l'approccio migliore. Oggi vedremo come leggere i dati della tabella nel PySpark DataFrame, scrivere il PySpark DataFrame nella tabella e inserire un nuovo DataFrame nella tabella esistente utilizzando le funzioni integrate. Andiamo!

Pyspark.sql.DataFrameWriter.saveAsTable()

Innanzitutto, vedremo come scrivere il DataFrame PySpark esistente nella tabella utilizzando la funzione write.saveAsTable(). Prende il nome della tabella e altri parametri opzionali come mode, partionBy, ecc., per scrivere il DataFrame nella tabella. Viene memorizzato come file parquet.

Sintassi:







dataframe_obj.write.saveAsTable(path/Table_name,mode,partitionBy,…)
  1. Table_name è il nome della tabella creata da dataframe_obj.
  2. Possiamo aggiungere/sovrascrivere i dati della tabella utilizzando il parametro mode.
  3. Il partitionBy accetta le colonne singole/multiple per creare partizioni basate sui valori in queste colonne fornite.

Esempio 1:

Crea un PySpark DataFrame con 5 righe e 4 colonne. Scrivi questo Dataframe in una tabella denominata 'Agri_Table1'.



importa pyspark

da pyspark.sql importa SparkSession

linuxhint_spark_app = SparkSession.builder.appName( 'Suggerimento per Linux' ).getOrCreate()

# dati agricoli con 5 righe e 5 colonne

Agri =[{ 'Tipo_suolo' : 'Nero' , 'Irrigazione_disponibilità' : 'NO' , 'Acri' : 2500 , 'Stato_suolo' : 'Asciutto' ,
'Paese' : 'STATI UNITI D'AMERICA' },

{ 'Tipo_suolo' : 'Nero' , 'Irrigazione_disponibilità' : 'SÌ' , 'Acri' : 3500 , 'Stato_suolo' : 'Bagnato' ,
'Paese' : 'India' },

{ 'Tipo_suolo' : 'Rosso' , 'Irrigazione_disponibilità' : 'SÌ' , 'Acri' : 210 , 'Stato_suolo' : 'Asciutto' ,
'Paese' : 'UK' },

{ 'Tipo_suolo' : 'Altro' , 'Irrigazione_disponibilità' : 'NO' , 'Acri' : 1000 , 'Stato_suolo' : 'Bagnato' ,
'Paese' : 'STATI UNITI D'AMERICA' },

{ 'Tipo_suolo' : 'Sabbia' , 'Irrigazione_disponibilità' : 'NO' , 'Acri' : 500 , 'Stato_suolo' : 'Asciutto' ,
'Paese' : 'India' }]



# crea il dataframe dai dati di cui sopra

agri_df = linuxhint_spark_app.createDataFrame(agri)

agri_df.mostra()

# Scrivi il DataFrame sopra nella tabella.

agri_df.coalesce( 1 ).write.saveAsTable( 'Agri_Tabella1' )

Produzione:







Possiamo vedere che un file parquet viene creato con i dati PySpark precedenti.



Esempio 2:

Considera il DataFrame precedente e scrivi 'Agri_Table2' nella tabella partizionando i record in base ai valori nella colonna 'Country'.

# Scrivi il DataFrame sopra nella tabella con il parametro partitionBy

agri_df.write.saveAsTable( 'Agri_Tabella2' ,partizionePer=[ 'Paese' ])

Produzione:

Nella colonna 'Paese' sono presenti tre valori univoci: 'India', 'Regno Unito' e 'USA'. Quindi, vengono create tre partizioni. Ogni partizione contiene i file parquet.

Pyspark.sql.DataFrameReader.table()

Carichiamo la tabella nel PySpark DataFrame usando la funzione spark.read.table(). Prende solo un parametro che è il nome del percorso/tabella. Carica direttamente la tabella nel DataFrame PySpark e tutte le funzioni SQL applicate al DataFrame PySpark possono essere applicate anche su questo DataFrame caricato.

Sintassi:

spark_app.read.table(percorso/'nome_tabella')

In questo scenario, usiamo la tabella precedente che è stata creata dal PySpark DataFrame. Assicurati di dover implementare i frammenti di codice dello scenario precedente nel tuo ambiente.

Esempio:

Carica la tabella 'Agri_Table1' nel DataFrame denominato 'loaded_data'.

loading_data = linuxhint_spark_app.read.table( 'Agri_Tabella1' )

dati_caricati.mostra()

Produzione:

Possiamo vedere che la tabella è caricata nel PySpark DataFrame.

Esecuzione delle query SQL

Ora eseguiamo alcune query SQL sul DataFrame caricato utilizzando la funzione spark.sql().

# Utilizzare il comando SELEZIONA per visualizzare tutte le colonne della tabella precedente.

linuxhint_spark_app.sql( 'SELEZIONA * da Agri_Table1' ).spettacolo()

# Dove la clausola

linuxhint_spark_app.sql( 'SELEZIONA * da Agri_Table1 WHERE Soil_status='Secco' ' ).spettacolo()

linuxhint_spark_app.sql( 'SELEZIONA * da Agri_Table1 WHERE Acri > 2000 ' ).spettacolo()

Produzione:

  1. La prima query visualizza tutte le colonne e i record del DataFrame.
  2. La seconda query visualizza i record in base alla colonna 'Soil_status'. Ci sono solo tre record con l'elemento 'Dry'.
  3. L'ultima query restituisce due record con 'Acres' maggiori di 2000.

Pyspark.sql.DataFrameWriter.insertInto()

Usando la funzione insertInto(), possiamo aggiungere il DataFrame nella tabella esistente. Possiamo usare questa funzione insieme a selectExpr() per definire i nomi delle colonne e quindi inserirli nella tabella. Questa funzione accetta anche tableName come parametro.

Sintassi:

DataFrame_obj.write.insertInto('Table_name')

In questo scenario, usiamo la tabella precedente che è stata creata dal PySpark DataFrame. Assicurati di dover implementare i frammenti di codice dello scenario precedente nel tuo ambiente.

Esempio:

Crea un nuovo DataFrame con due record e inseriscili nella tabella “Agri_Table1”.

importa pyspark

da pyspark.sql importa SparkSession

linuxhint_spark_app = SparkSession.builder.appName( 'Suggerimento per Linux' ).getOrCreate()

# dati agricoli con 2 righe

Agri =[{ 'Tipo_suolo' : 'Sabbia' , 'Irrigazione_disponibilità' : 'NO' , 'Acri' : 2500 , 'Stato_suolo' : 'Asciutto' ,
'Paese' : 'STATI UNITI D'AMERICA' },

{ 'Tipo_suolo' : 'Sabbia' , 'Irrigazione_disponibilità' : 'NO' , 'Acri' : 1200 , 'Stato_suolo' : 'Bagnato' ,
'Paese' : 'Giappone' }]

# crea il dataframe dai dati di cui sopra

agri_df2 = linuxhint_spark_app.createDataFrame(agri)

agri_df2.mostra()

# scrivi.inserisciInto()

agri_df2.selectExpr( 'Acri' , 'Paese' , 'Irrigazione_disponibilità' , 'Tipo_suolo' ,
'Stato_suolo' ).write.insertInto( 'Agri_Tabella1' )

# Mostra l'Agri_Table1 finale

linuxhint_spark_app.sql( 'SELEZIONA * da Agri_Table1' ).spettacolo()

Produzione:

Ora, il numero totale di righe presenti nel DataFrame è 7.

Conclusione

Ora capisci come scrivere il PySpark DataFrame nella tabella usando la funzione write.saveAsTable(). Accetta il nome della tabella e altri parametri facoltativi. Quindi, abbiamo caricato questa tabella nel PySpark DataFrame utilizzando la funzione spark.read.table(). Prende solo un parametro che è il nome del percorso/tabella. Se vuoi aggiungere il nuovo DataFrame alla tabella esistente, usa la funzione insertInto().