Conversione di PySpark DataFrame in CSV

Conversione Di Pyspark Dataframe In Csv



Diamo un'occhiata ai quattro diversi scenari di conversione di PySpark DataFrame in CSV. Direttamente, usiamo il metodo write.csv() per convertire il PySpark DataFrame in CSV. Usando la funzione to_csv(), convertiamo il DataFrame di PySpark Pandas in CSV. Può anche essere possibile convertendolo nell'array NumPy.

Argomento dei contenuti:

Se vuoi conoscere PySpark DataFrame e l'installazione del modulo, passa attraverso questo articolo .







PySpark DataFrame in CSV convertendo in Pandas DataFrame

Il to_csv() è un metodo disponibile nel modulo Pandas che converte il DataFrame Pandas in CSV. Innanzitutto, dobbiamo convertire il nostro PySpark DataFrame in Pandas DataFrame. Il metodo toPandas() viene utilizzato per farlo. Vediamo la sintassi di to_csv() insieme ai suoi parametri.



Sintassi:



pandas_dataframe_obj.to_csv(percorso/ 'nome_file.csv' , intestazione ,indice,colonne,modo...)
  1. Dobbiamo specificare il nome file del file CSV. Se desideri archiviare il CSV scaricato in una posizione particolare sul tuo PC, puoi anche specificare il percorso insieme al nome del file.
  2. Le colonne sono incluse se l'intestazione è impostata su 'True'. Se non hai bisogno di colonne, imposta l'intestazione su 'False'.
  3. Gli indici vengono specificati se l'indice è impostato su 'True'. Se non hai bisogno di indici, imposta l'indice su 'False'.
  4. Il parametro Columns accetta un elenco di nomi di colonne in cui è possibile specificare quali colonne particolari vengono estratte nel file CSV.
  5. Siamo in grado di aggiungere i record a CSV utilizzando il parametro mode. Aggiungi - 'a' è usato per fare questo.

Esempio 1: con i parametri Header e Index

Crea il PySpark DataFrame 'skills_df' con 3 righe e 4 colonne. Converti questo DataFrame in CSV convertendolo prima nel Pandas DataFrame.





importa pyspark

da pyspark.sql importa SparkSession

linuxhint_spark_app = SparkSession.builder.appName( 'Suggerimento per Linux' ).getOrCreate()

# dati sulle competenze con 3 righe e 4 colonne

abilità =[{ 'id' : 123 , 'persona' : 'Miele' , 'abilità' : 'pittura' , 'premio' : 25000 },

{ 'id' : 112 , 'persona' : 'Muni' , 'abilità' : 'danza' , 'premio' : 2000 },

{ 'id' : 153 , 'persona' : 'Tulasì' , 'abilità' : 'lettura' , 'premio' : 1200 }

]

# crea il dataframe delle competenze dai dati di cui sopra

skills_df = linuxhint_spark_app.createDataFrame(skills)

competenze_df.show()

# Converti skills_df in panda DataFrame

pandas_skills_df= skills_df.toPandas()

print(pandas_skills_df)

# Converti questo DataFrame in csv con intestazione e indice

pandas_skills_df.to_csv( 'pandas_skills1.csv' , intestazione =Vero, indice=Vero)

Produzione:



Possiamo vedere che PySpark DataFrame viene convertito in Pandas DataFrame. Vediamo se viene convertito in CSV con nomi di colonne e indici:

Esempio 2: aggiungere i dati a CSV

Crea un altro PySpark DataFrame con 1 record e aggiungilo a CSV che viene creato come parte del nostro primo esempio. Assicurati di dover impostare l'intestazione su 'False' insieme al parametro mode. In caso contrario, anche i nomi delle colonne vengono aggiunti come riga.

importa pyspark

da pyspark.sql importa SparkSession

linuxhint_spark_app = SparkSession.builder.appName( 'Suggerimento per Linux' ).getOrCreate()

abilità =[{ 'id' : 90 , 'persona' : 'Bhargav' , 'abilità' : 'lettura' , 'premio' : 12000 }

]

# crea il dataframe delle competenze dai dati di cui sopra

skills_df = linuxhint_spark_app.createDataFrame(skills)

# Converti skills_df in panda DataFrame

pandas_skills_df= skills_df.toPandas()

# Aggiungi questo DataFrame al file pandas_skills1.csv

pandas_skills_df.to_csv( 'pandas_skills1.csv' , modalità= 'UN' , intestazione =falso)

Uscita CSV:

Possiamo vedere che una nuova riga viene aggiunta al file CSV.

Esempio 3: con il parametro Columns

Prendiamo lo stesso DataFrame e convertiamolo in CSV con due colonne: 'persona' e 'premio'.

importa pyspark

da pyspark.sql importa SparkSession

linuxhint_spark_app = SparkSession.builder.appName( 'Suggerimento per Linux' ).getOrCreate()

# dati sulle competenze con 3 righe e 4 colonne

abilità =[{ 'id' : 123 , 'persona' : 'Miele' , 'abilità' : 'pittura' , 'premio' : 25000 },

{ 'id' : 112 , 'persona' : 'Muni' , 'abilità' : 'danza' , 'premio' : 2000 },

{ 'id' : 153 , 'persona' : 'Tulasì' , 'abilità' : 'lettura' , 'premio' : 1200 }

]

# crea il dataframe delle competenze dai dati di cui sopra

skills_df = linuxhint_spark_app.createDataFrame(skills)

# Converti skills_df in panda DataFrame

pandas_skills_df= skills_df.toPandas()

# Converti questo DataFrame in csv con colonne specifiche

pandas_skills_df.to_csv( 'pandas_skills2.csv' , colonne=[ 'persona' , 'premio' ])

Uscita CSV:

Possiamo vedere che nel file CSV esistono solo le colonne 'persona' e 'premio'.

PySpark Pandas DataFrame in CSV utilizzando il metodo To_Csv()

Il to_csv() è un metodo disponibile nel modulo Pandas che converte il DataFrame Pandas in CSV. Innanzitutto, dobbiamo convertire il nostro PySpark DataFrame in Pandas DataFrame. Il metodo toPandas() viene utilizzato per farlo. Vediamo la sintassi di to_csv() insieme ai suoi parametri:

Sintassi:

pyspark_pandas_dataframe_obj.to_csv(percorso/ 'nome_file.csv' , intestazione ,indice,colonne,...)
  1. Dobbiamo specificare il nome file del file CSV. Se desideri archiviare il CSV scaricato in una posizione particolare sul tuo PC, puoi anche specificare il percorso insieme al nome del file.
  2. Le colonne sono incluse se l'intestazione è impostata su 'True'. Se non hai bisogno di colonne, imposta l'intestazione su 'False'.
  3. Gli indici vengono specificati se l'indice è impostato su 'True'. Se non hai bisogno di indici, imposta l'indice su 'False'.
  4. Il parametro columns accetta un elenco di nomi di colonne in cui possiamo specificare quali particolari colonne vengono estratte nel file CSV.

Esempio 1: con il parametro Columns

Crea un DataFrame PySpark Pandas con 3 colonne e convertilo in CSV utilizzando to_csv() con le colonne 'persona' e 'premio'.

da pyspark import panda

pyspark_pandas_dataframe=pandas.DataFrame({ 'id' :[ 90 , 78 , 90 , 57 ], 'persona' :[ 'Miele' , 'Muni' , 'lui stesso' , 'radha' ], 'premio' :[ 1 , 2 , 3 , 4 ]})

print(pyspark_pandas_dataframe)

# Converti questo DataFrame in csv con colonne specifiche

pyspark_pandas_dataframe.to_csv( 'pyspark_pandas1' , colonne=[ 'persona' , 'premio' ])

Produzione:

Possiamo vedere che il DataFrame di PySpark Pandas viene convertito in CSV con due partizioni. Ogni partizione contiene 2 record. Inoltre, le colonne nel CSV sono solo 'persona' e 'premio'.

File di partizione 1:

File di partizione 2:

Esempio 2: con il parametro Header

Utilizzare il DataFrame precedente e specificare il parametro dell'intestazione impostandolo su 'True'.

da pyspark import panda

pyspark_pandas_dataframe=pandas.DataFrame({ 'id' :[ 90 , 78 , 90 , 57 ], 'persona' :[ 'Miele' , 'Muni' , 'lui stesso' , 'radha' ], 'premio' :[ 1 , 2 , 3 , 4 ]})

# Converti questo DataFrame in csv con intestazione.

pyspark_pandas_dataframe.to_csv( 'pyspark_pandas2' , intestazione =Vero)

Uscita CSV:

Possiamo vedere che il DataFrame di PySpark Pandas viene convertito in CSV con due partizioni. Ogni partizione contiene 2 record con nomi di colonna.

File di partizione 1:

File di partizione 2:

PySpark Pandas DataFrame in CSV convertendo in NumPy Array

Abbiamo un'opzione per convertire il DataFrame PySpark Pandas in CSV convertendolo nell'array Numpy. to_numpy() è un metodo disponibile nel modulo PySpark Pandas che converte il DataFrame PySpark Pandas nell'array NumPy.

Sintassi:

pyspark_pandas_dataframe_obj.to_numpy()

Non richiederà alcun parametro.

Utilizzo del metodo Tofile()

Dopo la conversione nell'array NumPy, possiamo utilizzare il metodo tofile() per convertire NumPy in CSV. Qui, memorizza ogni record in una nuova cella colonnare nel file CSV.

Sintassi:

array_obj.to_numpy(nomefile/percorso,sep=' ')

Prende il nome del file o il percorso di un CSV e un separatore.

Esempio:

Crea PySpark Pandas DataFrame con 3 colonne e 4 record e convertilo in CSV convertendolo prima in un array NumPy.

da pyspark import panda

pyspark_pandas_dataframe=pandas.DataFrame({ 'id' :[ 90 , 78 , 90 , 57 ], 'persona' :[ 'Miele' , 'Muni' , 'lui stesso' , 'radha' ], 'premio' :[ 1 , 2 , 3 , 4 ]})

# Converti il ​​​​DataFrame sopra in un array numpy

convertito = pyspark_pandas_dataframe.to_numpy()

stampa (convertito)

# Usando tofile()

convertito.tofile( 'convertito1.csv' , settembre = ',' )

Produzione:

[[ 90 'Miele' 1 ]

[ 78 'Muni' 2 ]

[ 90 'lui stesso' 3 ]

[ 57 'radha' 4 ]]

Possiamo vedere che il DataFrame PySpark Pandas viene convertito in un array NumPy (12 valori). Se riesci a vedere i dati CSV, memorizza ogni valore di cella in una nuova colonna.

PySpark DataFrame in CSV utilizzando il metodo Write.Csv()

Il metodo write.csv() prende il nome/percorso del file in cui dobbiamo salvare il file CSV come parametro.

Sintassi:

dataframe_object.coalesce( 1 ).write.csv( 'nome del file' )

In realtà, il CSV viene salvato come partizioni (più di una). Per sbarazzarcene, uniamo tutti i file CSV partizionati in uno solo. In questo scenario, usiamo la funzione coalesce(). Ora possiamo vedere solo un file CSV con tutte le righe dal PySpark DataFrame.

Esempio:

Considera il PySpark DataFrame con 4 record con 4 colonne. Scrivi questo DataFrame in CSV con il file denominato 'market_details'.

importa pyspark

da pyspark.sql importa SparkSession

linuxhint_spark_app = SparkSession.builder.appName( 'Suggerimento per Linux' ).getOrCreate()

# dati di mercato con 4 righe e 4 colonne

mercato =[{ 'm_id' : 'mz-001' , 'm_name' : 'ABC' , 'm_città' : 'Delhi' , 'm_stato' : 'Delhi' },

{ 'm_id' : 'mz-002' , 'm_name' : 'XYZ' , 'm_città' : 'patna' , 'm_stato' : 'fortuna' },

{ 'm_id' : 'mz-003' , 'm_name' : 'PQR' , 'm_città' : 'Florida' , 'm_stato' : 'uno' },

{ 'm_id' : 'mz-004' , 'm_name' : 'ABC' , 'm_città' : 'Delhi' , 'm_stato' : 'fortuna' }

]



# crea il dataframe di mercato dai dati di cui sopra

market_df = linuxhint_spark_app.createDataFrame(mercato)

# Dati di mercato effettivi

mercato_df.show()

# scrivi.csv()

mercato_df.coalesce( 1 ).write.csv( 'dettagli_mercato' )

Produzione:

Controlliamo il file:

Apri l'ultimo file per vedere i record.

Conclusione

Abbiamo appreso i quattro diversi scenari che convertono il PySpark DataFrame in CSV con esempi considerando diversi parametri. Quando lavori con PySpark DataFrame, hai due opzioni per convertire questo DataFrame in CSV: un modo è usare il metodo write() e un altro è usare il metodo to_csv() convertendo in Pandas DataFrame. Se stai lavorando con PySpark Pandas DataFrame, puoi anche utilizzare to_csv() e tofile() convertendo in NumPy array.