Pyspark.sql.DataFrameWriter.saveAsTable()
Innanzitutto, vedremo come scrivere il DataFrame PySpark esistente nella tabella utilizzando la funzione write.saveAsTable(). Prende il nome della tabella e altri parametri opzionali come mode, partionBy, ecc., per scrivere il DataFrame nella tabella. Viene memorizzato come file parquet.
Sintassi:
dataframe_obj.write.saveAsTable(path/Table_name,mode,partitionBy,…)
- Table_name è il nome della tabella creata da dataframe_obj.
- Possiamo aggiungere/sovrascrivere i dati della tabella utilizzando il parametro mode.
- Il partitionBy accetta le colonne singole/multiple per creare partizioni basate sui valori in queste colonne fornite.
Esempio 1:
Crea un PySpark DataFrame con 5 righe e 4 colonne. Scrivi questo Dataframe in una tabella denominata 'Agri_Table1'.
importa pyspark
da pyspark.sql importa SparkSession
linuxhint_spark_app = SparkSession.builder.appName( 'Suggerimento per Linux' ).getOrCreate()
# dati agricoli con 5 righe e 5 colonne
Agri =[{ 'Tipo_suolo' : 'Nero' , 'Irrigazione_disponibilità' : 'NO' , 'Acri' : 2500 , 'Stato_suolo' : 'Asciutto' ,
'Paese' : 'STATI UNITI D'AMERICA' },
{ 'Tipo_suolo' : 'Nero' , 'Irrigazione_disponibilità' : 'SÌ' , 'Acri' : 3500 , 'Stato_suolo' : 'Bagnato' ,
'Paese' : 'India' },
{ 'Tipo_suolo' : 'Rosso' , 'Irrigazione_disponibilità' : 'SÌ' , 'Acri' : 210 , 'Stato_suolo' : 'Asciutto' ,
'Paese' : 'UK' },
{ 'Tipo_suolo' : 'Altro' , 'Irrigazione_disponibilità' : 'NO' , 'Acri' : 1000 , 'Stato_suolo' : 'Bagnato' ,
'Paese' : 'STATI UNITI D'AMERICA' },
{ 'Tipo_suolo' : 'Sabbia' , 'Irrigazione_disponibilità' : 'NO' , 'Acri' : 500 , 'Stato_suolo' : 'Asciutto' ,
'Paese' : 'India' }]
# crea il dataframe dai dati di cui sopra
agri_df = linuxhint_spark_app.createDataFrame(agri)
agri_df.mostra()
# Scrivi il DataFrame sopra nella tabella.
agri_df.coalesce( 1 ).write.saveAsTable( 'Agri_Tabella1' )
Produzione:
Possiamo vedere che un file parquet viene creato con i dati PySpark precedenti.
Esempio 2:
Considera il DataFrame precedente e scrivi 'Agri_Table2' nella tabella partizionando i record in base ai valori nella colonna 'Country'.
# Scrivi il DataFrame sopra nella tabella con il parametro partitionByagri_df.write.saveAsTable( 'Agri_Tabella2' ,partizionePer=[ 'Paese' ])
Produzione:
Nella colonna 'Paese' sono presenti tre valori univoci: 'India', 'Regno Unito' e 'USA'. Quindi, vengono create tre partizioni. Ogni partizione contiene i file parquet.
Pyspark.sql.DataFrameReader.table()
Carichiamo la tabella nel PySpark DataFrame usando la funzione spark.read.table(). Prende solo un parametro che è il nome del percorso/tabella. Carica direttamente la tabella nel DataFrame PySpark e tutte le funzioni SQL applicate al DataFrame PySpark possono essere applicate anche su questo DataFrame caricato.
Sintassi:
spark_app.read.table(percorso/'nome_tabella')In questo scenario, usiamo la tabella precedente che è stata creata dal PySpark DataFrame. Assicurati di dover implementare i frammenti di codice dello scenario precedente nel tuo ambiente.
Esempio:
Carica la tabella 'Agri_Table1' nel DataFrame denominato 'loaded_data'.
loading_data = linuxhint_spark_app.read.table( 'Agri_Tabella1' )dati_caricati.mostra()
Produzione:
Possiamo vedere che la tabella è caricata nel PySpark DataFrame.
Esecuzione delle query SQL
Ora eseguiamo alcune query SQL sul DataFrame caricato utilizzando la funzione spark.sql().
# Utilizzare il comando SELEZIONA per visualizzare tutte le colonne della tabella precedente.linuxhint_spark_app.sql( 'SELEZIONA * da Agri_Table1' ).spettacolo()
# Dove la clausola
linuxhint_spark_app.sql( 'SELEZIONA * da Agri_Table1 WHERE Soil_status='Secco' ' ).spettacolo()
linuxhint_spark_app.sql( 'SELEZIONA * da Agri_Table1 WHERE Acri > 2000 ' ).spettacolo()
Produzione:
- La prima query visualizza tutte le colonne e i record del DataFrame.
- La seconda query visualizza i record in base alla colonna 'Soil_status'. Ci sono solo tre record con l'elemento 'Dry'.
- L'ultima query restituisce due record con 'Acres' maggiori di 2000.
Pyspark.sql.DataFrameWriter.insertInto()
Usando la funzione insertInto(), possiamo aggiungere il DataFrame nella tabella esistente. Possiamo usare questa funzione insieme a selectExpr() per definire i nomi delle colonne e quindi inserirli nella tabella. Questa funzione accetta anche tableName come parametro.
Sintassi:
DataFrame_obj.write.insertInto('Table_name')In questo scenario, usiamo la tabella precedente che è stata creata dal PySpark DataFrame. Assicurati di dover implementare i frammenti di codice dello scenario precedente nel tuo ambiente.
Esempio:
Crea un nuovo DataFrame con due record e inseriscili nella tabella “Agri_Table1”.
importa pysparkda pyspark.sql importa SparkSession
linuxhint_spark_app = SparkSession.builder.appName( 'Suggerimento per Linux' ).getOrCreate()
# dati agricoli con 2 righe
Agri =[{ 'Tipo_suolo' : 'Sabbia' , 'Irrigazione_disponibilità' : 'NO' , 'Acri' : 2500 , 'Stato_suolo' : 'Asciutto' ,
'Paese' : 'STATI UNITI D'AMERICA' },
{ 'Tipo_suolo' : 'Sabbia' , 'Irrigazione_disponibilità' : 'NO' , 'Acri' : 1200 , 'Stato_suolo' : 'Bagnato' ,
'Paese' : 'Giappone' }]
# crea il dataframe dai dati di cui sopra
agri_df2 = linuxhint_spark_app.createDataFrame(agri)
agri_df2.mostra()
# scrivi.inserisciInto()
agri_df2.selectExpr( 'Acri' , 'Paese' , 'Irrigazione_disponibilità' , 'Tipo_suolo' ,
'Stato_suolo' ).write.insertInto( 'Agri_Tabella1' )
# Mostra l'Agri_Table1 finale
linuxhint_spark_app.sql( 'SELEZIONA * da Agri_Table1' ).spettacolo()
Produzione:
Ora, il numero totale di righe presenti nel DataFrame è 7.
Conclusione
Ora capisci come scrivere il PySpark DataFrame nella tabella usando la funzione write.saveAsTable(). Accetta il nome della tabella e altri parametri facoltativi. Quindi, abbiamo caricato questa tabella nel PySpark DataFrame utilizzando la funzione spark.read.table(). Prende solo un parametro che è il nome del percorso/tabella. Se vuoi aggiungere il nuovo DataFrame alla tabella esistente, usa la funzione insertInto().