Pyspark.sql.DataFrameWriter.saveAsTable()
Pertama, kita akan melihat cara menulis PySpark DataFrame yang ada ke dalam tabel menggunakan fungsi write.saveAsTable() . Dibutuhkan nama tabel dan parameter opsional lainnya seperti mode, partionBy, dll., untuk menulis DataFrame ke tabel. Itu disimpan sebagai file parket.
Sintaksis:
dataframe_obj.write.saveAsTable(path/Table_name,mode,partitionBy,…)
- Table_name adalah nama tabel yang dibuat dari dataframe_obj.
- Kita dapat menambahkan/menimpa data tabel menggunakan parameter mode.
- PartitionBy mengambil kolom tunggal/banyak untuk membuat partisi berdasarkan nilai di kolom yang disediakan ini.
Contoh 1:
Buat PySpark DataFrame dengan 5 baris dan 4 kolom. Tulis Dataframe ini ke tabel bernama “Agri_Table1”.
impor pyspark
dari pyspark.sql impor SparkSession
linuxhint_spark_app = SparkSession.builder.appName( 'Petunjuk Linux' ).getOrCreate()
# data pertanian dengan 5 baris dan 5 kolom
pertanian =[{ 'Jenis tanah' : 'Hitam' , 'Irigasi_ketersediaan' : 'TIDAK' , 'Hektar' : 2500 , 'status_tanah' : 'Kering' ,
'Negara' : 'AMERIKA SERIKAT' },
{ 'Jenis tanah' : 'Hitam' , 'Irigasi_ketersediaan' : 'Ya' , 'Hektar' : 3500 , 'status_tanah' : 'Basah' ,
'Negara' : 'India' },
{ 'Jenis tanah' : 'Merah' , 'Irigasi_ketersediaan' : 'Ya' , 'Hektar' : 210 , 'status_tanah' : 'Kering' ,
'Negara' : 'Inggris' },
{ 'Jenis tanah' : 'Lainnya' , 'Irigasi_ketersediaan' : 'TIDAK' , 'Hektar' : 1000 , 'status_tanah' : 'Basah' ,
'Negara' : 'AMERIKA SERIKAT' },
{ 'Jenis tanah' : 'Pasir' , 'Irigasi_ketersediaan' : 'TIDAK' , 'Hektar' : 500 , 'status_tanah' : 'Kering' ,
'Negara' : 'India' }]
# buat kerangka data dari data di atas
agri_df = linuxhint_spark_app.createDataFrame(agri)
agri_df.show()
# Tulis DataFrame di atas ke tabel.
agri_df.bergabung( 1 .write.saveAsTable( 'Agri_Tabel1' )
Keluaran:
Kita dapat melihat bahwa satu file parket dibuat dengan Data PySpark sebelumnya.
Contoh 2:
Pertimbangkan DataFrame sebelumnya dan tulis 'Agri_Table2' ke tabel dengan mempartisi catatan berdasarkan nilai di kolom 'Negara'.
# Tulis DataFrame di atas ke tabel dengan parameter partitionByagri_df.write.saveAsTable( 'Agri_Tabel2' ,partisiBy=[ 'Negara' ])
Keluaran:
Ada tiga nilai unik di kolom “Negara” – “India”, “Inggris”, dan “AS”. Jadi, tiga partisi dibuat. Setiap partisi menyimpan file parket.
Pyspark.sql.DataFrameReader.table()
Mari muat tabel ke dalam PySpark DataFrame menggunakan fungsi spark.read.table() . Hanya dibutuhkan satu parameter yaitu nama jalur/tabel. Itu langsung memuat tabel ke dalam PySpark DataFrame dan semua fungsi SQL yang diterapkan ke PySpark DataFrame juga dapat diterapkan pada DataFrame yang dimuat ini.
Sintaksis:
spark_app.read.table(path/'Table_name')Dalam skenario ini, kami menggunakan tabel sebelumnya yang dibuat dari PySpark DataFrame. Pastikan Anda perlu mengimplementasikan cuplikan kode skenario sebelumnya di lingkungan Anda.
Contoh:
Muat tabel “Agri_Table1” ke dalam DataFrame bernama “loaded_data”.
loaded_data = linuxhint_spark_app.read.table( 'Agri_Table1' )loaded_data.show()
Keluaran:
Kita dapat melihat bahwa tabel dimuat ke dalam PySpark DataFrame.
Menjalankan Kueri SQL
Sekarang, kami menjalankan beberapa kueri SQL pada DataFrame yang dimuat menggunakan fungsi spark.sql().
# Gunakan perintah SELECT untuk menampilkan semua kolom dari tabel di atas.linuxhint_spark_app.sql( 'PILIH * dari Agri_Table1' ).menunjukkan()
# DI MANA Klausa
linuxhint_spark_app.sql( 'PILIH * dari Agri_Table1 WHERE Soil_status='Kering' ' ).menunjukkan()
linuxhint_spark_app.sql( 'PILIH * dari Agri_Table1 WHERE Acres > 2000' ).menunjukkan()
Keluaran:
- Kueri pertama menampilkan semua kolom dan rekaman dari DataFrame.
- Permintaan kedua menampilkan catatan berdasarkan kolom 'Soil_status'. Hanya ada tiga rekaman dengan elemen 'Kering'.
- Kueri terakhir mengembalikan dua catatan dengan 'Acres' yang lebih besar dari 2000.
Pyspark.sql.DataFrameWriter.insertInto()
Dengan menggunakan fungsi insertInto(), kita dapat menambahkan DataFrame ke dalam tabel yang sudah ada. Kita bisa menggunakan fungsi ini bersama dengan selectExpr() untuk menentukan nama kolom dan kemudian memasukkannya ke dalam tabel. Fungsi ini juga menggunakan tableName sebagai parameter.
Sintaksis:
DataFrame_obj.write.insertInto('Table_name')Dalam skenario ini, kami menggunakan tabel sebelumnya yang dibuat dari PySpark DataFrame. Pastikan Anda perlu mengimplementasikan cuplikan kode skenario sebelumnya di lingkungan Anda.
Contoh:
Buat DataFrame baru dengan dua catatan dan masukkan ke dalam tabel 'Agri_Table1'.
impor pysparkdari pyspark.sql impor SparkSession
linuxhint_spark_app = SparkSession.builder.appName( 'Petunjuk Linux' ).getOrCreate()
# data pertanian dengan 2 baris
pertanian =[{ 'Jenis tanah' : 'Pasir' , 'Irigasi_ketersediaan' : 'TIDAK' , 'Hektar' : 2500 , 'status_tanah' : 'Kering' ,
'Negara' : 'AMERIKA SERIKAT' },
{ 'Jenis tanah' : 'Pasir' , 'Irigasi_ketersediaan' : 'TIDAK' , 'Hektar' : 1200 , 'status_tanah' : 'Basah' ,
'Negara' : 'Jepang' }]
# buat kerangka data dari data di atas
agri_df2 = linuxhint_spark_app.createDataFrame(agri)
agri_df2.show()
# write.insertInto()
agri_df2.selectExpr( 'Hektar' , 'Negara' , 'Irigasi_ketersediaan' , 'Jenis tanah' ,
'Status_tanah' .write.insertInto( 'Agri_Tabel1' )
# Tampilkan Agri_Table1 akhir
linuxhint_spark_app.sql( 'PILIH * dari Agri_Table1' ).menunjukkan()
Keluaran:
Sekarang, jumlah baris yang ada di DataFrame adalah 7.
Kesimpulan
Anda sekarang mengerti cara menulis PySpark DataFrame ke tabel menggunakan fungsi write.saveAsTable() . Dibutuhkan nama tabel dan parameter opsional lainnya. Kemudian, kami memuat tabel ini ke dalam PySpark DataFrame menggunakan fungsi spark.read.table() . Hanya dibutuhkan satu parameter yaitu nama jalur/tabel. Jika Anda ingin menambahkan DataFrame baru ke dalam tabel yang sudah ada, gunakan fungsi insertInto().