Cara Membaca dan Menulis Data Tabel di PySpark

Cara Membaca Dan Menulis Data Tabel Di Pyspark



Pemrosesan data di PySpark lebih cepat jika data dimuat dalam bentuk tabel. Dengan ini, menggunakan Ekspresi SQL, pemrosesan akan cepat. Jadi, mengonversi PySpark DataFrame/RDD menjadi tabel sebelum mengirimkannya untuk diproses adalah pendekatan yang lebih baik. Hari ini, kita akan melihat cara membaca data tabel ke dalam PySpark DataFrame, menulis PySpark DataFrame ke tabel, dan memasukkan DataFrame baru ke tabel yang ada menggunakan fungsi bawaan. Ayo pergi!

Pyspark.sql.DataFrameWriter.saveAsTable()

Pertama, kita akan melihat cara menulis PySpark DataFrame yang ada ke dalam tabel menggunakan fungsi write.saveAsTable() . Dibutuhkan nama tabel dan parameter opsional lainnya seperti mode, partionBy, dll., untuk menulis DataFrame ke tabel. Itu disimpan sebagai file parket.

Sintaksis:







dataframe_obj.write.saveAsTable(path/Table_name,mode,partitionBy,…)
  1. Table_name adalah nama tabel yang dibuat dari dataframe_obj.
  2. Kita dapat menambahkan/menimpa data tabel menggunakan parameter mode.
  3. PartitionBy mengambil kolom tunggal/banyak untuk membuat partisi berdasarkan nilai di kolom yang disediakan ini.

Contoh 1:

Buat PySpark DataFrame dengan 5 baris dan 4 kolom. Tulis Dataframe ini ke tabel bernama “Agri_Table1”.



impor pyspark

dari pyspark.sql impor SparkSession

linuxhint_spark_app = SparkSession.builder.appName( 'Petunjuk Linux' ).getOrCreate()

# data pertanian dengan 5 baris dan 5 kolom

pertanian =[{ 'Jenis tanah' : 'Hitam' , 'Irigasi_ketersediaan' : 'TIDAK' , 'Hektar' : 2500 , 'status_tanah' : 'Kering' ,
'Negara' : 'AMERIKA SERIKAT' },

{ 'Jenis tanah' : 'Hitam' , 'Irigasi_ketersediaan' : 'Ya' , 'Hektar' : 3500 , 'status_tanah' : 'Basah' ,
'Negara' : 'India' },

{ 'Jenis tanah' : 'Merah' , 'Irigasi_ketersediaan' : 'Ya' , 'Hektar' : 210 , 'status_tanah' : 'Kering' ,
'Negara' : 'Inggris' },

{ 'Jenis tanah' : 'Lainnya' , 'Irigasi_ketersediaan' : 'TIDAK' , 'Hektar' : 1000 , 'status_tanah' : 'Basah' ,
'Negara' : 'AMERIKA SERIKAT' },

{ 'Jenis tanah' : 'Pasir' , 'Irigasi_ketersediaan' : 'TIDAK' , 'Hektar' : 500 , 'status_tanah' : 'Kering' ,
'Negara' : 'India' }]



# buat kerangka data dari data di atas

agri_df = linuxhint_spark_app.createDataFrame(agri)

agri_df.show()

# Tulis DataFrame di atas ke tabel.

agri_df.bergabung( 1 .write.saveAsTable( 'Agri_Tabel1' )

Keluaran:







Kita dapat melihat bahwa satu file parket dibuat dengan Data PySpark sebelumnya.



Contoh 2:

Pertimbangkan DataFrame sebelumnya dan tulis 'Agri_Table2' ke tabel dengan mempartisi catatan berdasarkan nilai di kolom 'Negara'.

# Tulis DataFrame di atas ke tabel dengan parameter partitionBy

agri_df.write.saveAsTable( 'Agri_Tabel2' ,partisiBy=[ 'Negara' ])

Keluaran:

Ada tiga nilai unik di kolom “Negara” – “India”, “Inggris”, dan “AS”. Jadi, tiga partisi dibuat. Setiap partisi menyimpan file parket.

Pyspark.sql.DataFrameReader.table()

Mari muat tabel ke dalam PySpark DataFrame menggunakan fungsi spark.read.table() . Hanya dibutuhkan satu parameter yaitu nama jalur/tabel. Itu langsung memuat tabel ke dalam PySpark DataFrame dan semua fungsi SQL yang diterapkan ke PySpark DataFrame juga dapat diterapkan pada DataFrame yang dimuat ini.

Sintaksis:

spark_app.read.table(path/'Table_name')

Dalam skenario ini, kami menggunakan tabel sebelumnya yang dibuat dari PySpark DataFrame. Pastikan Anda perlu mengimplementasikan cuplikan kode skenario sebelumnya di lingkungan Anda.

Contoh:

Muat tabel “Agri_Table1” ke dalam DataFrame bernama “loaded_data”.

loaded_data = linuxhint_spark_app.read.table( 'Agri_Table1' )

loaded_data.show()

Keluaran:

Kita dapat melihat bahwa tabel dimuat ke dalam PySpark DataFrame.

Menjalankan Kueri SQL

Sekarang, kami menjalankan beberapa kueri SQL pada DataFrame yang dimuat menggunakan fungsi spark.sql().

# Gunakan perintah SELECT untuk menampilkan semua kolom dari tabel di atas.

linuxhint_spark_app.sql( 'PILIH * dari Agri_Table1' ).menunjukkan()

# DI MANA Klausa

linuxhint_spark_app.sql( 'PILIH * dari Agri_Table1 WHERE Soil_status='Kering' ' ).menunjukkan()

linuxhint_spark_app.sql( 'PILIH * dari Agri_Table1 WHERE Acres > 2000' ).menunjukkan()

Keluaran:

  1. Kueri pertama menampilkan semua kolom dan rekaman dari DataFrame.
  2. Permintaan kedua menampilkan catatan berdasarkan kolom 'Soil_status'. Hanya ada tiga rekaman dengan elemen 'Kering'.
  3. Kueri terakhir mengembalikan dua catatan dengan 'Acres' yang lebih besar dari 2000.

Pyspark.sql.DataFrameWriter.insertInto()

Dengan menggunakan fungsi insertInto(), kita dapat menambahkan DataFrame ke dalam tabel yang sudah ada. Kita bisa menggunakan fungsi ini bersama dengan selectExpr() untuk menentukan nama kolom dan kemudian memasukkannya ke dalam tabel. Fungsi ini juga menggunakan tableName sebagai parameter.

Sintaksis:

DataFrame_obj.write.insertInto('Table_name')

Dalam skenario ini, kami menggunakan tabel sebelumnya yang dibuat dari PySpark DataFrame. Pastikan Anda perlu mengimplementasikan cuplikan kode skenario sebelumnya di lingkungan Anda.

Contoh:

Buat DataFrame baru dengan dua catatan dan masukkan ke dalam tabel 'Agri_Table1'.

impor pyspark

dari pyspark.sql impor SparkSession

linuxhint_spark_app = SparkSession.builder.appName( 'Petunjuk Linux' ).getOrCreate()

# data pertanian dengan 2 baris

pertanian =[{ 'Jenis tanah' : 'Pasir' , 'Irigasi_ketersediaan' : 'TIDAK' , 'Hektar' : 2500 , 'status_tanah' : 'Kering' ,
'Negara' : 'AMERIKA SERIKAT' },

{ 'Jenis tanah' : 'Pasir' , 'Irigasi_ketersediaan' : 'TIDAK' , 'Hektar' : 1200 , 'status_tanah' : 'Basah' ,
'Negara' : 'Jepang' }]

# buat kerangka data dari data di atas

agri_df2 = linuxhint_spark_app.createDataFrame(agri)

agri_df2.show()

# write.insertInto()

agri_df2.selectExpr( 'Hektar' , 'Negara' , 'Irigasi_ketersediaan' , 'Jenis tanah' ,
'Status_tanah' .write.insertInto( 'Agri_Tabel1' )

# Tampilkan Agri_Table1 akhir

linuxhint_spark_app.sql( 'PILIH * dari Agri_Table1' ).menunjukkan()

Keluaran:

Sekarang, jumlah baris yang ada di DataFrame adalah 7.

Kesimpulan

Anda sekarang mengerti cara menulis PySpark DataFrame ke tabel menggunakan fungsi write.saveAsTable() . Dibutuhkan nama tabel dan parameter opsional lainnya. Kemudian, kami memuat tabel ini ke dalam PySpark DataFrame menggunakan fungsi spark.read.table() . Hanya dibutuhkan satu parameter yaitu nama jalur/tabel. Jika Anda ingin menambahkan DataFrame baru ke dalam tabel yang sudah ada, gunakan fungsi insertInto().