Come implementare lo streaming di dati in tempo reale in Python

Come Implementare Lo Streaming Di Dati In Tempo Reale In Python



Padroneggiare l'implementazione dello streaming di dati in tempo reale in Python rappresenta una competenza essenziale nel mondo odierno coinvolto nei dati. Questa guida esplora i passaggi principali e gli strumenti essenziali per utilizzare lo streaming di dati in tempo reale con autenticità in Python. Dalla selezione di un framework adeguato come Apache Kafka o Apache Pulsar alla scrittura di un codice Python per il consumo, l'elaborazione e la visualizzazione efficace dei dati senza sforzo, acquisiremo le competenze necessarie per costruire canali di dati in tempo reale agili ed efficienti.

Esempio 1: implementazione dello streaming di dati in tempo reale in Python

L’implementazione di uno streaming di dati in tempo reale in Python è fondamentale nell’era e nel mondo di oggi basati sui dati. In questo esempio dettagliato, esamineremo il processo di creazione di un sistema di streaming di dati in tempo reale utilizzando Apache Kafka e Python in Google Colab.







Per inizializzare l'esempio prima di iniziare a scrivere codice, è essenziale creare un ambiente specifico in Google Colab. La prima cosa che dobbiamo fare è installare le librerie necessarie. Utilizziamo la libreria 'kafka-python' per l'integrazione di Kafka.



! pip installare kafka-pitone


Questo comando installa la libreria 'kafka-python' che fornisce le funzioni Python e i collegamenti per Apache Kafka. Successivamente, importiamo le librerie richieste per il nostro progetto. Importando le librerie richieste tra cui “KafkaProducer” e “KafkaConsumer” sono le classi della libreria “kafka-python” che ci consentono di interagire con i broker Kafka. JSON è la libreria Python per lavorare con i dati JSON che utilizziamo per serializzare e deserializzare i messaggi.



da Kafka import KafkaProducer, KafkaConsumer
importa json


Creazione di un produttore Kafka





Questo è importante perché un produttore Kafka invia i dati a un argomento Kafka. Nel nostro esempio, creiamo un produttore per inviare dati simulati in tempo reale a un argomento chiamato 'argomento in tempo reale'.

Creiamo un'istanza 'KafkaProducer' che specifica l'indirizzo del broker Kafka come 'localhost:9092'. Quindi utilizziamo “value_serializer”, una funzione che serializza i dati prima di inviarli a Kafka. Nel nostro caso, una funzione lambda codifica i dati come JSON con codifica UTF-8. Ora simuliamo alcuni dati in tempo reale e inviamoli all'argomento Kafka.



produttore = KafkaProduttore ( bootstrap_servers = 'localhost:9092' ,
valore_serializzatore =lambda v: json.dumps ( In ) .codificare ( 'utf-8' ) )
# Dati simulati in tempo reale
dati = { 'id_sensore' : 1 , 'temperatura' : 25,5 , 'umidità' : 60.2 }
# Invio di dati all'argomento
produttore.send ( 'argomento in tempo reale' , dati )


In queste righe definiamo un dizionario “dati” che rappresenta i dati di un sensore simulato. Utilizziamo quindi il metodo 'invia' per pubblicare questi dati nell''argomento in tempo reale'.

Quindi, vogliamo creare un consumatore Kafka e un consumatore Kafka legge i dati da un argomento Kafka. Creiamo un consumatore per consumare ed elaborare i messaggi nell ''argomento in tempo reale'. Creiamo un'istanza 'KafkaConsumer', specificando l'argomento che vogliamo consumare, ad esempio (argomento in tempo reale) e l'indirizzo del broker Kafka. Quindi, 'value_deserializer' è una funzione che deserializza i dati ricevuti da Kafka. Nel nostro caso, una funzione lambda decodifica i dati come JSON con codifica UTF-8.

consumatore = KafkaConsumatore ( 'argomento in tempo reale' ,
bootstrap_servers = 'localhost:9092' ,
valore_deserializzatore =lambdax:json.loads ( x.decodifica ( 'utf-8' ) ) )


Usiamo un ciclo iterativo per consumare ed elaborare continuamente i messaggi dall'argomento.

# Lettura ed elaborazione dei dati in tempo reale
per Messaggio In consumatore:
dati = messaggio.valore
stampa ( F 'Dati ricevuti: {data}' )


Recuperiamo il valore di ogni messaggio e i dati dei nostri sensori simulati all'interno del loop e li stampiamo sulla console. L'esecuzione del produttore e del consumatore Kafka implica l'esecuzione di questo codice in Google Colab e l'esecuzione delle celle di codice individualmente. Il produttore invia i dati simulati all'argomento Kafka e il consumatore legge e stampa i dati ricevuti.


Analisi dell'output durante l'esecuzione del codice

Osserveremo i dati in tempo reale che vengono prodotti e consumati. Il formato dei dati può variare a seconda della nostra simulazione o della fonte dati effettiva. In questo esempio dettagliato, copriamo l'intero processo di configurazione di un sistema di streaming di dati in tempo reale utilizzando Apache Kafka e Python in Google Colab. Spiegheremo ogni riga di codice e il suo significato nella costruzione di questo sistema. Lo streaming di dati in tempo reale è una funzionalità potente e questo esempio funge da base per applicazioni del mondo reale più complesse.

Esempio 2: implementazione di uno streaming di dati in tempo reale in Python utilizzando dati di mercato azionario

Facciamo un altro esempio unico di implementazione di uno streaming di dati in tempo reale in Python utilizzando uno scenario diverso; questa volta ci concentreremo sui dati del mercato azionario. Creiamo un sistema di streaming di dati in tempo reale che cattura le variazioni del prezzo delle azioni e le elabora utilizzando Apache Kafka e Python in Google Colab. Come dimostrato nell'esempio precedente, iniziamo configurando il nostro ambiente in Google Colab. Per prima cosa installiamo le librerie richieste:

! pip installare kafka-python yfinance


Qui aggiungiamo la libreria “yfinance” che ci consente di ottenere dati sul mercato azionario in tempo reale. Successivamente, importiamo le librerie necessarie. Continuiamo a utilizzare le classi 'KafkaProducer' e 'KafkaConsumer' dalla libreria 'kafka-python' per l'interazione con Kafka. Importiamo JSON per lavorare con i dati JSON. Usiamo anche 'yfinance' per ottenere dati sul mercato azionario in tempo reale. Importiamo anche la libreria 'time' per aggiungere un ritardo temporale per simulare gli aggiornamenti in tempo reale.

da Kafka import KafkaProducer, KafkaConsumer
importa json
importare finanza COME
importare tempo


Ora creiamo un produttore Kafka per i dati azionari. Il nostro produttore Kafka ottiene dati azionari in tempo reale e li invia a un argomento Kafka denominato 'prezzo delle azioni'.

produttore = KafkaProduttore ( bootstrap_servers = 'localhost:9092' ,
valore_serializzatore =lambda v: json.dumps ( In ) .codificare ( 'utf-8' ) )

Mentre VERO:
azioni = yf.Ticker ( 'AAPL' ) # Esempio: azioni Apple Inc.
stock_data = stock.storia ( periodo = '1d' )
ultimo_prezzo = dati_stock [ 'Vicino' ] .iloc [ - 1 ]
dati = { 'simbolo' : 'AAPL' , 'prezzo' : ultimo prezzo }
produttore.send ( 'prezzo delle azioni' , dati )
tempo.di.sonno ( 10 ) # Simula aggiornamenti in tempo reale ogni 10 secondi


Creiamo un'istanza 'KafkaProducer' con l'indirizzo del broker Kafka in questo codice. All'interno del ciclo, utilizziamo 'yfinance' per ottenere l'ultimo prezzo delle azioni di Apple Inc. ('AAPL'). Quindi, estraiamo l'ultimo prezzo di chiusura e lo inviamo all'argomento 'prezzo delle azioni'. Alla fine, introduciamo un ritardo temporale per simulare gli aggiornamenti in tempo reale ogni 10 secondi.

Creiamo un consumatore Kafka per leggere ed elaborare i dati sui prezzi delle azioni dall'argomento 'prezzo delle azioni'.

consumatore = KafkaConsumatore ( 'prezzo delle azioni' ,
bootstrap_servers = 'localhost:9092' ,
valore_deserializzatore =lambdax:json.loads ( x.decodifica ( 'utf-8' ) ) )

per Messaggio In consumatore:
dati_stock = messaggio.valore
stampa ( F 'Dati azionari ricevuti: {stock_data['symbol']} - Prezzo: {stock_data['price']}' )


Questo codice è simile alla configurazione consumer dell'esempio precedente. Legge ed elabora continuamente i messaggi dall'argomento 'prezzo delle azioni' e stampa il simbolo delle azioni e il prezzo sulla console. Eseguiamo le celle di codice in sequenza, ad esempio una per una in Google Colab per eseguire il produttore e il consumatore. Il produttore riceve e invia gli aggiornamenti dei prezzi delle azioni in tempo reale mentre il consumatore legge e visualizza questi dati.

! pip installare kafka-python yfinance
da Kafka import KafkaProducer, KafkaConsumer
importa json
importare finanza COME
importare tempo
produttore = KafkaProduttore ( bootstrap_servers = 'localhost:9092' ,
valore_serializzatore =lambda v: json.dumps ( In ) .codificare ( 'utf-8' ) )

Mentre VERO:
azioni = yf.Ticker ( 'AAPL' ) # azioni Apple Inc
stock_data = stock.storia ( periodo = '1d' )
ultimo_prezzo = dati_stock [ 'Vicino' ] .iloc [ - 1 ]

dati = { 'simbolo' : 'AAPL' , 'prezzo' : ultimo prezzo }

produttore.send ( 'prezzo delle azioni' , dati )

tempo.di.sonno ( 10 ) # Simula aggiornamenti in tempo reale ogni 10 secondi
consumatore = KafkaConsumatore ( 'prezzo delle azioni' ,
bootstrap_servers = 'localhost:9092' ,
valore_deserializzatore =lambdax:json.loads ( x.decodifica ( 'utf-8' ) ) )

per Messaggio In consumatore:
dati_stock = messaggio.valore
stampa ( F 'Dati azionari ricevuti: {stock_data['symbol']} - Prezzo: {stock_data['price']}' )


Nell'analisi dell'output dopo l'esecuzione del codice, osserveremo gli aggiornamenti dei prezzi delle azioni in tempo reale per Apple Inc. prodotti e consumati.

Conclusione

In questo esempio unico, abbiamo dimostrato l'implementazione dello streaming di dati in tempo reale in Python utilizzando Apache Kafka e la libreria 'yfinance' per acquisire ed elaborare i dati del mercato azionario. Abbiamo spiegato approfonditamente ogni riga del codice. Lo streaming di dati in tempo reale può essere applicato a vari campi per creare applicazioni reali nel campo della finanza, dell'IoT e altro ancora.