Conversione di PySpark DataFrame in JSON

Conversione Di Pyspark Dataframe In Json



La trasmissione di dati strutturati tramite JSON è possibile e consuma poca memoria. Rispetto a PySpark RDD o PySpark DataFrame, JSON consuma poca memoria e serializzazione che è possibile con JSON. Siamo in grado di convertire il PySpark DataFrame in JSON utilizzando il metodo pyspark.sql.DataFrameWriter.json(). Oltre a ciò, ci sono altri due modi per convertire il DataFrame in JSON.

Argomento dei contenuti:

Consideriamo un semplice DataFrame PySpark in tutti gli esempi e convertiamolo in JSON utilizzando le funzioni citate.







Modulo richiesto:

Installa la libreria PySpark nel tuo ambiente se non è ancora installata. È possibile fare riferimento al seguente comando per installarlo:



pip installa pyspark

Da PySpark DataFrame a JSON utilizzando To_json() con ToPandas()

Il metodo to_json() è disponibile nel modulo Pandas che converte Pandas DataFrame in JSON. Possiamo utilizzare questo metodo se convertiamo il nostro PySpark DataFrame in Pandas DataFrame. Per convertire il PySpark DataFrame in Pandas DataFrame, viene utilizzato il metodo toPandas(). Vediamo la sintassi di to_json() insieme ai suoi parametri.



Sintassi:





dataframe_object.toPandas().to_json(orient,index,...)
  1. Orient viene utilizzato per visualizzare il JSON convertito come formato desiderato. Prende 'record', 'tabella', 'valori', 'colonne', 'indice', 'diviso'.
  2. Index viene utilizzato per includere/rimuovere l'indice dalla stringa JSON convertita. Se è impostato su 'True', vengono visualizzati gli indici. In caso contrario, gli indici non verranno visualizzati se l'orient è 'split' o 'table'.

Esempio 1: Orient come 'Record'

Crea un dataframe PySpark 'skills_df' con 3 righe e 4 colonne. Converti questo DataFrame in JSON specificando il parametro orient come 'record'.

importa pyspark

importare panda

da pyspark.sql importa SparkSession

linuxhint_spark_app = SparkSession.builder.appName( 'Suggerimento per Linux' ).getOrCreate()

# dati sulle competenze con 3 righe e 4 colonne

abilità =[{ 'id' : 123 , 'persona' : 'Miele' , 'abilità' : 'pittura' , 'premio' : 25000 },

{ 'id' : 112 , 'persona' : 'Muni' , 'abilità' : 'danza' , 'premio' : 2000 },

{ 'id' : 153 , 'persona' : 'Tulasì' , 'abilità' : 'lettura' , 'premio' : 1200 }

]

# crea il dataframe delle competenze dai dati di cui sopra

skills_df = linuxhint_spark_app.createDataFrame(skills)

# Dati sulle competenze effettive

competenze_df.show()

# Converti in JSON usando to_json() con orient come 'record'

json_skills_data = skills_df.toPandas().to_json(orient= 'record' )

print(json_skills_data)

Produzione:



+---+------+-----+--------+

| id|persona|premio| abilità|

+---+------+-----+--------+

| 123 | Miele| 25000 |pittura|

| 112 | Mouni| 2000 | danza|

| 153 |Tulassi| 1200 | lettura|

+---+------+-----+--------+

[{ 'id' : 123 , 'persona' : 'Miele' , 'premio' : 25000 , 'abilità' : 'pittura' },{ 'id' : 112 , 'persona' : 'Muni' , 'premio' : 2000 , 'abilità' : 'danza' },{ 'id' : 153 , 'persona' : 'Tulas' , 'premio' : 1200 , 'abilità' : 'lettura' }]

Possiamo vedere che PySpark DataFrame viene convertito nell'array JSON con un dizionario di valori. Qui, le chiavi rappresentano il nome della colonna e il valore rappresenta il valore di riga/cella in PySpark DataFrame.

Esempio 2: Orient come 'Dividi'

Il formato JSON restituito dall'orient 'split' include i nomi delle colonne che hanno un elenco di colonne, un elenco di indici e un elenco di dati. Quello che segue è il formato dell'oriente 'diviso'.

# Converti in JSON usando to_json() con orient come 'split'

json_skills_data = skills_df.toPandas().to_json(orient= 'diviso' )

print(json_skills_data)

Produzione:

{ 'colonne' :[ 'id' , 'persona' , 'premio' , 'abilità' ], 'indice' :[ 0 , 1 , 2 ], 'dati' :[[ 123 , 'Miele' , 25000 , 'pittura' ],[ 112 , 'Muni' , 2000 , 'danza' ],[ 153 , 'Tulas' , 1200 , 'lettura' ]]}

Esempio 3: Orient come 'Indice'

Qui, ogni riga del PySpark DataFrame viene ritirata sotto forma di dizionario con la chiave come nome della colonna. Per ogni dizionario, la posizione dell'indice è specificata come chiave.

# Converti in JSON usando to_json() con orient come 'index'

json_skills_data = skills_df.toPandas().to_json(orient= 'indice' )

print(json_skills_data)

Produzione:

{ '0' :{ 'id' : 123 , 'persona' : 'Miele' , 'premio' : 25000 , 'abilità' : 'pittura' }, '1' :{ 'id' : 112 , 'persona' : 'Muni' , 'premio' : 2000 , 'abilità' : 'danza' }, '2' :{ 'id' : 153 , 'persona' : 'Tulas' , 'premio' : 1200 , 'abilità' : 'lettura' }}

Esempio 4: Orient come 'Colonne'

Le colonne sono la chiave per ogni record. Ogni colonna contiene un dizionario che accetta i valori della colonna con i numeri di indice.

# Converti in JSON usando to_json() con orient come 'colonne'

json_skills_data = skills_df.toPandas().to_json(orient= 'colonne' )

print(json_skills_data)

Produzione:

{ 'id' :{ '0' : 123 , '1' : 112 , '2' : 153 }, 'persona' :{ '0' : 'Miele' , '1' : 'Muni' , '2' : 'Tulas' }, 'premio' :{ '0' : 25000 , '1' : 2000 , '2' : 1200 }, 'abilità' :{ '0' : 'pittura' , '1' : 'danza' , '2' : 'lettura' }}

Esempio 5: Orient come 'Valori'

Se hai bisogno solo dei valori in JSON, puoi scegliere l'orientamento dei 'valori'. Visualizza ogni riga in un elenco. Infine, tutte le liste vengono memorizzate in una lista. Questo JSON è del tipo elenco nidificato.

# Converti in JSON usando to_json() con orient come 'valori'

json_skills_data = skills_df.toPandas().to_json(orient= 'valori' )

print(json_skills_data)

Produzione:

[[ 123 , 'Miele' , 25000 , 'pittura' ],[ 112 , 'Muni' , 2000 , 'danza' ],[ 153 , 'Tulas' , 1200 , 'lettura' ]]

Esempio 6: Orient come 'Tabella'

L'orientamento 'tabella' restituisce il JSON che include lo schema con i nomi dei campi insieme ai tipi di dati della colonna, l'indice come chiave primaria e la versione Pandas. I nomi delle colonne con i valori vengono visualizzati come 'dati'.

# Converti in JSON usando to_json() con orient come 'table'

json_skills_data = skills_df.toPandas().to_json(orient= 'tavolo' )

print(json_skills_data)

Produzione:

{ 'schema' :{ 'campi' :[{ 'nome' : 'indice' , 'tipo' : 'numero intero' },{ 'nome' : 'id' , 'tipo' : 'numero intero' },{ 'nome' : 'persona' , 'tipo' : 'corda' },{ 'nome' : 'premio' , 'tipo' : 'numero intero' },{ 'nome' : 'abilità' , 'tipo' : 'corda' }], 'chiave primaria' :[ 'indice' ], 'versione_panda' : '1.4.0' }, 'dati' :[{ 'indice' : 0 , 'id' : 123 , 'persona' : 'Miele' , 'premio' : 25000 , 'abilità' : 'pittura' },{ 'indice' : 1 , 'id' : 112 , 'persona' : 'Muni' , 'premio' : 2000 , 'abilità' : 'danza' },{ 'indice' : 2 , 'id' : 153 , 'persona' : 'Tulas' , 'premio' : 1200 , 'abilità' : 'lettura' }]}

Esempio 7: con parametro indice

Innanzitutto, passiamo il parametro index impostandolo su 'True'. Vedrai per ogni valore di colonna che la posizione dell'indice viene restituita come chiave in un dizionario.

Nel secondo output, vengono restituiti solo i nomi delle colonne ('colonne') e i record ('dati') senza le posizioni dell'indice poiché l'indice è impostato su 'False'.

# Converti in JSON usando to_json() con index=True

json_skills_data = skills_df.toPandas().to_json(index=True)

print(json_skills_data, ' \N ' )

# Converti in JSON usando to_json() con index=False

json_skills_data= skills_df.toPandas().to_json(index=False,orient= 'diviso' )

print(json_skills_data)

Produzione:

{ 'id' :{ '0' : 123 , '1' : 112 , '2' : 153 }, 'persona' :{ '0' : 'Miele' , '1' : 'Muni' , '2' : 'Tulas' }, 'premio' :{ '0' : 25000 , '1' : 2000 , '2' : 1200 }, 'abilità' :{ '0' : 'pittura' , '1' : 'danza' , '2' : 'lettura' }}

{ 'colonne' :[ 'id' , 'persona' , 'premio' , 'abilità' ], 'dati' :[[ 123 , 'Miele' , 25000 , 'pittura' ],[ 112 , 'Muni' , 2000 , 'danza' ],[ 153 , 'Tulas' , 1200 , 'lettura' ]]

Da PySpark DataFrame a JSON utilizzando ToJSON()

Il metodo toJSON() viene utilizzato per convertire il PySpark DataFrame in un oggetto JSON. Fondamentalmente, restituisce una stringa JSON che è circondata da un elenco. IL ['{colonna:valore,...}',.... ] è il formato restituito da questa funzione. Qui, ogni riga del PySpark DataFrame viene restituita come dizionario con il nome della colonna come chiave.

Sintassi:

dataframe_object.toJSON()

Può essere possibile passare i parametri come l'indice, le etichette delle colonne e il tipo di dati.

Esempio:

Crea un PySpark DataFrame 'skills_df' con 5 righe e 4 colonne. Converti questo DataFrame in JSON utilizzando il metodo toJSON().

importa pyspark

da pyspark.sql importa SparkSession

linuxhint_spark_app = SparkSession.builder.appName( 'Suggerimento per Linux' ).getOrCreate()

# dati sulle competenze con 5 righe e 4 colonne

abilità =[{ 'id' : 123 , 'persona' : 'Miele' , 'abilità' : 'pittura' , 'premio' : 25000 },

{ 'id' : 112 , 'persona' : 'Muni' , 'abilità' : 'musica/danza' , 'premio' : 2000 },

{ 'id' : 153 , 'persona' : 'Tulasì' , 'abilità' : 'lettura' , 'premio' : 1200 },

{ 'id' : 173 , 'persona' : 'Corso' , 'abilità' : 'musica' , 'premio' : 2000 },

{ 'id' : 43 , 'persona' : 'Kamala' , 'abilità' : 'lettura' , 'premio' : 10000 }

]

# crea il dataframe delle competenze dai dati di cui sopra

skills_df = linuxhint_spark_app.createDataFrame(skills)

# Dati sulle competenze effettive

competenze_df.show()

# Converti in array JSON

json_skills_data = skills_df.toJSON().collect()

print(json_skills_data)

Produzione:

+---+------+-----+-----------+

| id|persona|premio| abilità|

+---+------+-----+-----------+

| 123 | Miele| 25000 | pittura|

| 112 | Mouni| 2000 |musica/danza|

| 153 |Tulassi| 1200 | lettura|

| 173 | Corse| 2000 | musica|

| 43 |Kamala| 10000 | lettura|

+---+------+-----+-----------+

[ '{'id':123,'person':'Tesoro','premio':25000,'skill':'pittura'}' , '{'id':112,'person':'Mouni','premio':2000,'skill':'musica/danza'}' , '{'id':153,'person':'Tulasi','premio':1200,'skill':'lettura'}' , '{'id':173,'person':'Corso','premio':2000,'skill':'musica'}' , '{'id':43,'person':'Kamala','premio':10000,'skill':'lettura'}' ]

Ci sono 5 righe nel PySpark DataFrame. Tutte queste 5 righe vengono restituite come un dizionario di stringhe separate da virgola.

PySpark DataFrame in JSON utilizzando Write.json()

Il metodo write.json() è disponibile in PySpark che scrive/salva il PySpark DataFrame in un file JSON. Prende il nome/percorso del file come parametro. Fondamentalmente, restituisce il JSON in più file (file partizionati). Per unirli tutti in un unico file, possiamo usare il metodo coalesce().

Sintassi:

dataframe_object.coalesce( 1 ).write.json('nome_file')
  1. Modalità di aggiunta – dataframe_object.write.mode('append').json('file_name')
  2. Modalità sovrascrittura – dataframe_object.write.mode('overwrite').json('file_name')

È possibile aggiungere/sovrascrivere il JSON esistente. Usando write.mode(), possiamo aggiungere i dati passando 'append' o sovrascrivere i dati JSON esistenti passando 'overwrite' a questa funzione.

Esempio 1:

Crea un dataframe PySpark 'skills_df' con 3 righe e 4 colonne. Scrivi questo DataFrame in JSON.

importa pyspark

importare panda

da pyspark.sql importa SparkSession

linuxhint_spark_app = SparkSession.builder.appName( 'Suggerimento per Linux' ).getOrCreate()

# dati sulle competenze con 3 righe e 4 colonne

abilità =[{ 'id' : 123 , 'persona' : 'Miele' , 'abilità' : 'pittura' , 'premio' : 25000 },

{ 'id' : 112 , 'persona' : 'Muni' , 'abilità' : 'danza' , 'premio' : 2000 },

{ 'id' : 153 , 'persona' : 'Tulasì' , 'abilità' : 'lettura' , 'premio' : 1200 }

]

# crea il dataframe delle competenze dai dati di cui sopra

skills_df = linuxhint_spark_app.createDataFrame(skills)

# write.json()

skills_df.coalesce( 1 ).write.json( 'dati_competenze' )

File JSON:

Possiamo vedere che la cartella skills_data include i dati JSON partizionati.

Apriamo il file JSON. Possiamo vedere che tutte le righe del PySpark DataFrame vengono convertite in JSON.

Ci sono 5 righe nel PySpark DataFrame. Tutte queste 5 righe vengono restituite come un dizionario di stringhe separate da virgola.

Esempio 2:

Crea un PySpark DataFrame 'skills2_df' con una riga. Aggiungi una riga al file JSON precedente specificando la modalità come 'append'.

importa pyspark

importare panda

da pyspark.sql importa SparkSession

linuxhint_spark_app = SparkSession.builder.appName( 'Suggerimento per Linux' ).getOrCreate()

competenze2 =[{ 'id' : 78 , 'persona' : 'Maria' , 'abilità' : 'cavalcare' , 'premio' : 8960 }

]

# crea il dataframe delle competenze dai dati di cui sopra

skills2_df = linuxhint_spark_app.createDataFrame(skills2)

# write.json() con modalità di aggiunta.

skills2_df.write.mode( 'aggiungere' ).json( 'dati_competenze' )

File JSON:

Possiamo vedere i file JSON partizionati. Il primo file contiene i primi record DataFrame e il secondo file contiene il secondo record DataFrame.

Conclusione

Esistono tre modi diversi per convertire PySpark DataFrame in JSON. Innanzitutto, abbiamo discusso il metodo to_json() che converte in JSON convertendo il PySpark DataFrame nel Pandas DataFrame con diversi esempi considerando diversi parametri. Successivamente, abbiamo utilizzato il metodo toJSON(). Infine, abbiamo imparato come utilizzare la funzione write.json() per scrivere PySpark DataFrame in JSON. Con questa funzione è possibile aggiungere e sovrascrivere.