Bagaimana Menerapkan Streaming Data Waktu Nyata dengan Python

Bagaimana Menerapkan Streaming Data Waktu Nyata Dengan Python



Menguasai implementasi streaming data real-time dengan Python merupakan keterampilan penting di dunia yang melibatkan data saat ini. Panduan ini mengeksplorasi langkah-langkah inti dan alat penting untuk memanfaatkan streaming data real-time dengan keaslian dengan Python. Dari memilih kerangka kerja yang sesuai seperti Apache Kafka atau Apache Pulsar hingga menulis kode Python untuk konsumsi data yang mudah, pemrosesan, dan visualisasi yang efektif, kita akan memperoleh keterampilan yang diperlukan untuk membangun saluran data real-time yang gesit dan efisien.

Contoh 1: Implementasi Streaming Data Real-Time dengan Python

Menerapkan streaming data real-time dengan Python sangat penting di era dan dunia yang didorong oleh data saat ini. Dalam contoh mendetail ini, kita akan memandu proses pembuatan sistem streaming data real-time menggunakan Apache Kafka dan Python di Google Colab.







Untuk menginisialisasi contoh sebelum kita memulai coding, membangun lingkungan spesifik di Google Colab sangatlah penting. Hal pertama yang perlu kita lakukan adalah menginstal perpustakaan yang diperlukan. Kami menggunakan perpustakaan “kafka-python” untuk integrasi Kafka.



! pip Install kafka-python


Perintah ini menginstal perpustakaan “kafka-python” yang menyediakan fungsi Python dan binding untuk Apache Kafka. Selanjutnya, kami mengimpor perpustakaan yang diperlukan untuk proyek kami. Mengimpor perpustakaan yang diperlukan termasuk “KafkaProducer” dan “KafkaConsumer” adalah kelas dari perpustakaan “kafka-python” yang memungkinkan kita berinteraksi dengan broker Kafka. JSON adalah pustaka Python untuk bekerja dengan data JSON yang kami gunakan untuk membuat serialisasi dan deserialisasi pesan.



dari kafka impor KafkaProducer, KafkaConsumer
impor json


Penciptaan Produser Kafka





Hal ini penting karena produser Kafka mengirimkan data ke topik Kafka. Dalam contoh kita, kita membuat produser untuk mengirimkan simulasi data real-time ke topik yang disebut “topik real-time.”

Kami membuat instance “KafkaProducer” yang menentukan alamat broker Kafka sebagai “localhost:9092”. Kemudian, kami menggunakan “value_serializer”, sebuah fungsi yang membuat serial data sebelum mengirimkannya ke Kafka. Dalam kasus kami, fungsi lambda mengkodekan data sebagai JSON yang dikodekan UTF-8. Sekarang, mari kita simulasikan beberapa data real-time dan kirimkan ke topik Kafka.



produser = KafkaProduser ( bootstrap_servers = 'host lokal:9092' ,
nilai_serializer =lambda v: json.dumps ( di dalam ) .menyandi ( 'utf-8' ) )
# Simulasi data waktu nyata
data = { 'sensor_id' : 1 , 'suhu' : 25.5 , 'kelembaban' : 60.2 }
# Mengirim data ke topik
produser.kirim ( 'topik waktu nyata' , data )


Pada baris ini, kami mendefinisikan kamus “data” yang mewakili data sensor yang disimulasikan. Kami kemudian menggunakan metode 'kirim' untuk mempublikasikan data ini ke 'topik waktu nyata'.

Lalu, kami ingin membuat konsumen Kafka, dan konsumen Kafka membaca data dari topik Kafka. Kami menciptakan konsumen untuk mengonsumsi dan memproses pesan dalam “topik waktu nyata”. Kami membuat instance “KafkaConsumer”, menentukan topik yang ingin kami konsumsi, misalnya (topik real-time) dan alamat broker Kafka. Kemudian, “value_deserializer” adalah fungsi yang melakukan deserialisasi data yang diterima dari Kafka. Dalam kasus kami, fungsi lambda mendekode data sebagai JSON yang dikodekan UTF-8.

konsumen = KafkaConsumer ( 'topik waktu nyata' ,
bootstrap_servers = 'host lokal:9092' ,
nilai_deserializer =lambda x: json.beban ( x.decode ( 'utf-8' ) ) )


Kami menggunakan perulangan berulang untuk terus menggunakan dan memproses pesan dari topik.

# Membaca dan memproses data waktu nyata
untuk pesan di dalam konsumen:
data = pesan.nilai
mencetak ( F 'Data yang Diterima: {data}' )


Kami mengambil setiap nilai pesan dan data sensor simulasi kami di dalam loop dan mencetaknya ke konsol. Menjalankan produsen dan konsumen Kafka melibatkan menjalankan kode ini di Google Colab dan mengeksekusi sel kode satu per satu. Produser mengirimkan data simulasi ke topik Kafka, dan konsumen membaca dan mencetak data yang diterima.


Analisis Output saat Kode Berjalan

Kami akan mengamati data real-time yang diproduksi dan dikonsumsi. Format data dapat bervariasi tergantung pada simulasi atau sumber data aktual kami. Dalam contoh mendetail ini, kami membahas seluruh proses penyiapan sistem streaming data real-time menggunakan Apache Kafka dan Python di Google Colab. Kami akan menjelaskan setiap baris kode dan signifikansinya dalam membangun sistem ini. Streaming data real-time adalah kemampuan yang hebat, dan contoh ini berfungsi sebagai landasan untuk aplikasi dunia nyata yang lebih kompleks.

Contoh 2: Menerapkan Streaming Data Real-Time dengan Python Menggunakan Data Pasar Saham

Mari kita lakukan contoh unik lainnya dalam mengimplementasikan streaming data real-time dengan Python menggunakan skenario berbeda; kali ini kita akan fokus pada data pasar saham. Kami membuat sistem streaming data real-time yang menangkap perubahan harga saham dan memprosesnya menggunakan Apache Kafka dan Python di Google Colab. Seperti yang ditunjukkan pada contoh sebelumnya, kita mulai dengan mengonfigurasi lingkungan kita di Google Colab. Pertama, kami menginstal perpustakaan yang diperlukan:

! pip Install kafka-python yfinance


Di sini, kami menambahkan perpustakaan “yfinance” yang memungkinkan kami mendapatkan data pasar saham secara real-time. Selanjutnya, kami mengimpor perpustakaan yang diperlukan. Kami terus menggunakan kelas “KafkaProducer” dan “KafkaConsumer” dari perpustakaan “kafka-python” untuk interaksi Kafka. Kami mengimpor JSON agar berfungsi dengan data JSON. Kami juga menggunakan “yfinance” untuk mendapatkan data pasar saham secara real-time. Kami juga mengimpor perpustakaan 'waktu' untuk menambahkan penundaan waktu guna mensimulasikan pembaruan waktu nyata.

dari kafka impor KafkaProducer, KafkaConsumer
impor json
impor keuangan sebagai kamuf
impor waktu


Sekarang, kami membuat produser Kafka untuk data stok. Produser Kafka kami mendapatkan data saham real-time dan mengirimkannya ke topik Kafka bernama “harga saham”.

produser = KafkaProduser ( bootstrap_servers = 'host lokal:9092' ,
nilai_serializer =lambda v: json.dumps ( di dalam ) .menyandi ( 'utf-8' ) )

ketika BENAR:
stok = yf.Ticker ( 'AAPL' ) # Contoh: Saham Apple Inc
stock_data = stok.sejarah ( periode = '1d' )
harga_terakhir = data_stok [ 'Menutup' ] .iloc [ - 1 ]
data = { 'simbol' : 'AAPL' , 'harga' : harga terakhir }
produser.kirim ( 'harga saham' , data )
waktu tidur ( 10 ) # Simulasikan pembaruan waktu nyata setiap 10 detik


Kami membuat instance “KafkaProducer” dengan alamat broker Kafka di kode ini. Di dalam loop, kami menggunakan “yfinance” untuk mendapatkan harga saham terbaru untuk Apple Inc. (“AAPL”). Kemudian, kami mengekstrak harga penutupan terakhir dan mengirimkannya ke topik “harga saham”. Pada akhirnya, kami memperkenalkan penundaan waktu untuk mensimulasikan pembaruan waktu nyata setiap 10 detik.

Mari kita buat konsumen Kafka untuk membaca dan memproses data harga saham dari topik “harga saham”.

konsumen = KafkaConsumer ( 'harga saham' ,
bootstrap_servers = 'host lokal:9092' ,
nilai_deserializer =lambda x: json.beban ( x.decode ( 'utf-8' ) ) )

untuk pesan di dalam konsumen:
stock_data = pesan.nilai
mencetak ( F 'Data Stok yang Diterima: {stock_data['symbol']} - Harga: {stock_data['price']}' )


Kode ini mirip dengan penyiapan konsumen pada contoh sebelumnya. Ia terus membaca dan memproses pesan dari topik “harga saham” dan mencetak simbol dan harga saham ke konsol. Kami mengeksekusi sel kode secara berurutan, misalnya satu per satu di Google Colab untuk menjalankan produsen dan konsumen. Produsen mendapatkan dan mengirimkan pembaruan harga saham secara real-time sementara konsumen membaca dan menampilkan data ini.

! pip Install kafka-python yfinance
dari kafka impor KafkaProducer, KafkaConsumer
impor json
impor keuangan sebagai kamuf
impor waktu
produser = KafkaProduser ( bootstrap_servers = 'host lokal:9092' ,
nilai_serializer =lambda v: json.dumps ( di dalam ) .menyandi ( 'utf-8' ) )

ketika BENAR:
stok = yf.Ticker ( 'AAPL' ) # Saham Apple Inc
stock_data = stok.sejarah ( periode = '1d' )
harga_terakhir = data_stok [ 'Menutup' ] .iloc [ - 1 ]

data = { 'simbol' : 'AAPL' , 'harga' : harga terakhir }

produser.kirim ( 'harga saham' , data )

waktu tidur ( 10 ) # Simulasikan pembaruan waktu nyata setiap 10 detik
konsumen = KafkaConsumer ( 'harga saham' ,
bootstrap_servers = 'host lokal:9092' ,
nilai_deserializer =lambda x: json.beban ( x.decode ( 'utf-8' ) ) )

untuk pesan di dalam konsumen:
stock_data = pesan.nilai
mencetak ( F 'Data Stok yang Diterima: {stock_data['symbol']} - Harga: {stock_data['price']}' )


Dalam analisis keluaran setelah kode dijalankan, kita akan mengamati pembaruan harga saham secara real-time untuk Apple Inc. yang diproduksi dan dikonsumsi.

Kesimpulan

Dalam contoh unik ini, kami mendemonstrasikan implementasi streaming data real-time dengan Python menggunakan Apache Kafka dan perpustakaan “yfinance” untuk menangkap dan memproses data pasar saham. Kami menjelaskan secara menyeluruh setiap baris kode. Streaming data real-time dapat diterapkan ke berbagai bidang untuk membangun aplikasi dunia nyata di bidang keuangan, IoT, dan banyak lagi.