PySpark Pandas_Udf()

Pyspark Pandas Udf



Mengubah PySpark DataFrame dimungkinkan menggunakan fungsi pandas_udf() . Ini adalah fungsi yang ditentukan pengguna yang diterapkan pada PySpark DataFrame dengan panah. Kita dapat melakukan operasi vektor menggunakan pandas_udf(). Ini dapat diimplementasikan dengan melewatkan fungsi ini sebagai dekorator. Mari selami panduan ini untuk mengetahui sintaks, parameter, dan berbagai contoh.

Topik Isi:

Jika Anda ingin tahu tentang pemasangan PySpark DataFrame dan modul, baca ini artikel .







Pyspark.sql.functions.pandas_udf()

Pandas_udf () tersedia di modul sql.functions di PySpark yang dapat diimpor menggunakan kata kunci “from”. Ini digunakan untuk melakukan operasi vektor pada PySpark DataFrame kami. Fungsi ini diimplementasikan seperti dekorator dengan melewatkan tiga parameter. Setelah itu, kita dapat membuat fungsi yang ditentukan pengguna yang mengembalikan data dalam format vektor (seperti kita menggunakan seri/NumPy untuk ini) menggunakan panah. Dalam fungsi ini, kami dapat mengembalikan hasilnya.



Struktur & Sintaks:



Pertama, mari kita lihat struktur dan sintaks dari fungsi ini:

@pandas_udf(tipe data)
def function_name(operasi) -> convert_format:
pernyataan pengembalian

Di sini, function_name adalah nama dari fungsi yang kita tentukan. Tipe data menentukan tipe data yang dikembalikan oleh fungsi ini. Kami dapat mengembalikan hasilnya menggunakan kata kunci 'kembalikan'. Semua operasi dilakukan di dalam fungsi dengan penetapan panah.





Pandas_udf (Fungsi dan Jenis Pengembalian)

  1. Parameter pertama adalah fungsi yang ditentukan pengguna yang diteruskan ke sana.
  2. Parameter kedua digunakan untuk menentukan tipe data pengembalian dari fungsi.

Data:

Di seluruh panduan ini, kami hanya menggunakan satu PySpark DataFrame untuk demonstrasi. Semua fungsi yang ditentukan pengguna yang kami definisikan diterapkan pada PySpark DataFrame ini. Pastikan Anda membuat DataFrame ini di lingkungan Anda terlebih dahulu setelah instalasi PySpark.



impor pyspark

dari pyspark.sql impor SparkSession

linuxhint_spark_app = SparkSession.builder.appName( 'Petunjuk Linux' ).getOrCreate()

dari pyspark.sql.fungsi impor pandas_udf

dari pyspark.sql.types impor *

impor panda sebagai panda

# detail sayuran

sayur =[{ 'jenis' : 'sayur-mayur' , 'nama' : 'tomat' , 'cari_negara' : 'AMERIKA SERIKAT' , 'kuantitas' : 800 },

{ 'jenis' : 'buah' , 'nama' : 'pisang' , 'cari_negara' : 'CINA' , 'kuantitas' : dua puluh },

{ 'jenis' : 'sayur-mayur' , 'nama' : 'tomat' , 'cari_negara' : 'AMERIKA SERIKAT' , 'kuantitas' : 800 },

{ 'jenis' : 'sayur-mayur' , 'nama' : 'Buah mangga' , 'cari_negara' : 'JEPANG' , 'kuantitas' : 0 },

{ 'jenis' : 'buah' , 'nama' : 'lemon' , 'cari_negara' : 'INDIA' , 'kuantitas' : 1700 },

{ 'jenis' : 'sayur-mayur' , 'nama' : 'tomat' , 'cari_negara' : 'AMERIKA SERIKAT' , 'kuantitas' : 1200 },

{ 'jenis' : 'sayur-mayur' , 'nama' : 'Buah mangga' , 'cari_negara' : 'JEPANG' , 'kuantitas' : 0 },

{ 'jenis' : 'buah' , 'nama' : 'lemon' , 'cari_negara' : 'INDIA' , 'kuantitas' : 0 }

]

# buat kerangka data pasar dari data di atas

market_df = linuxhint_spark_app.createDataFrame(sayuran)

market_df.show()

Keluaran:

Di sini, kami membuat DataFrame ini dengan 4 kolom dan 8 baris. Sekarang, kami menggunakan pandas_udf() untuk membuat fungsi yang ditentukan pengguna dan menerapkannya ke kolom ini.

Pandas_udf() dengan Tipe Data Berbeda

Dalam skenario ini, kami membuat beberapa fungsi yang ditentukan pengguna dengan pandas_udf() dan menerapkannya pada kolom dan menampilkan hasilnya menggunakan metode select(). Dalam setiap kasus, kami menggunakan pandas.Series saat kami melakukan operasi vektor. Ini menganggap nilai kolom sebagai larik satu dimensi dan operasi diterapkan pada kolom. Di dekorator itu sendiri, kami menentukan tipe pengembalian fungsi.

Contoh 1: Pandas_udf() dengan Tipe String

Di sini, kami membuat dua fungsi yang ditentukan pengguna dengan tipe pengembalian string untuk mengonversi nilai kolom tipe string menjadi huruf besar dan huruf kecil. Terakhir, kami menerapkan fungsi ini pada kolom “type” dan “locate_country”.

# Ubah kolom tipe menjadi huruf besar dengan pandas_udf

@pandas_udf(StringType())

def type_upper_case(i: panda.Series) -> panda.Series:

kembalikan i.str.upper()

# Ubah kolom loc_country menjadi huruf kecil dengan pandas_udf

@pandas_udf(StringType())

def country_lower_case(i: panda.Series) -> panda.Series:

kembalikan i.str.lower()

# Tampilkan kolom menggunakan pilih ()

pasar_df.pilih( 'jenis' ,ketik_huruf_besar( 'jenis' ), 'cari_negara' ,
country_lower_case( 'cari_negara' )).menunjukkan()

Keluaran:

Penjelasan:

Fungsi StringType() tersedia di modul pyspark.sql.types. Kami sudah mengimpor modul ini saat membuat PySpark DataFrame.

  1. Pertama, UDF (fungsi yang ditentukan pengguna) mengembalikan string dalam huruf besar menggunakan fungsi str.upper(). str.upper() tersedia dalam Struktur Data Seri (karena kami mengonversi ke seri dengan panah di dalam fungsi) yang mengubah string yang diberikan menjadi huruf besar. Terakhir, fungsi ini diterapkan pada kolom “type” yang ditentukan di dalam metode select(). Sebelumnya, semua string pada kolom type menggunakan huruf kecil. Sekarang, mereka diubah menjadi huruf besar.
  2. Kedua, UDF mengembalikan string dalam huruf besar menggunakan fungsi str.lower(). str.lower() tersedia dalam Struktur Data Seri yang mengubah string yang diberikan menjadi huruf kecil. Terakhir, fungsi ini diterapkan pada kolom “type” yang ditentukan di dalam metode select(). Sebelumnya, semua string pada kolom type menggunakan huruf besar. Sekarang, mereka diubah menjadi huruf kecil.

Contoh 2: Pandas_udf() dengan Tipe Integer

Mari buat UDF yang mengonversi kolom bilangan bulat PySpark DataFrame ke seri Pandas dan menambahkan 100 ke setiap nilai. Lewati kolom 'kuantitas' ke fungsi ini di dalam metode select().

# Tambahkan 100

@pandas_udf(IntegerType())

def add_100(i: panda.Series) -> panda.Series:

mengembalikan i+ 100

# Lewati kolom kuantitas ke fungsi dan tampilan di atas.

pasar_df.pilih( 'kuantitas' ,tambah_100( 'kuantitas' )).menunjukkan()

Keluaran:

Penjelasan:

Di dalam UDF, kami mengulangi semua nilai dan mengonversinya menjadi Seri. Setelah itu, kami menambahkan 100 ke setiap nilai di Seri. Terakhir, kita meneruskan kolom 'kuantitas' ke fungsi ini dan kita dapat melihat bahwa 100 ditambahkan ke semua nilai.

Pandas_udf() dengan Tipe Data Berbeda Menggunakan Groupby() & Agg()

Mari kita lihat contoh untuk meneruskan UDF ke kolom gabungan. Di sini, nilai kolom dikelompokkan terlebih dahulu menggunakan fungsi groupby() dan agregasi dilakukan menggunakan fungsi agg(). Kami melewatkan UDF kami di dalam fungsi agregat ini.

Sintaksis:

pyspark_dataframe_object.groupby( 'pengelompokan_kolom' ).agg(UDF
(pyspark_dataframe_object[ 'kolom' ]))

Di sini, nilai di kolom pengelompokan dikelompokkan terlebih dahulu. Kemudian, agregasi dilakukan pada setiap data yang dikelompokkan sehubungan dengan UDF kami.

Contoh 1: Pandas_udf() dengan Agregat Mean()

Di sini, kami membuat fungsi yang ditentukan pengguna dengan float tipe pengembalian. Di dalam fungsi, kami menghitung rata-rata menggunakan fungsi mean(). UDF ini diteruskan ke kolom 'jumlah' untuk mendapatkan jumlah rata-rata untuk setiap jenis.

# kembalikan mean/rata-rata

@panda_udf( 'mengambang' )

def average_function(i: panda.Series) -> float:

kembalikan i.mean()

# Lewati kolom kuantitas ke fungsi dengan mengelompokkan kolom jenis.

market_df.groupby( 'jenis' .agg(fungsi_rata-rata(market_df[ 'kuantitas' ])).menunjukkan()

Keluaran:

Kami mengelompokkan berdasarkan elemen di kolom 'jenis'. Dua kelompok terbentuk - 'buah' dan 'sayuran'. Untuk setiap grup, rata-rata dihitung dan dikembalikan.

Contoh 2: Pandas_udf() dengan Agregat Max() dan Min()

Di sini, kami membuat dua fungsi yang ditentukan pengguna dengan tipe pengembalian integer (int). UDF pertama mengembalikan nilai minimum dan UDF kedua mengembalikan nilai maksimum.

# pandas_udf yang mengembalikan nilai minimum

@panda_udf( 'int' )

def min_(i: panda.Series) -> int:

kembalikan i.min()

# pandas_udf yang mengembalikan nilai maksimum

@panda_udf( 'int' )

def max_(i: panda.Series) -> int:

kembalikan i.max()

# Lewati kolom jumlah ke min_ pandas_udf dengan mengelompokkan loc_country.

market_df.groupby( 'cari_negara' .agg(min_(market_df[ 'kuantitas' ])).menunjukkan()

# Lewati kolom jumlah ke max_ pandas_udf dengan mengelompokkan loc_country.

market_df.groupby( 'cari_negara' .agg(max_(market_df[ 'kuantitas' ])).menunjukkan()

Keluaran:

Untuk mengembalikan nilai minimum dan maksimum, kami menggunakan fungsi min() dan max() dalam jenis pengembalian UDF. Sekarang, kami mengelompokkan data di kolom “locate_country”. Empat grup dibentuk ('CHINA', 'INDIA', 'JAPAN', 'USA'). Untuk setiap grup, kami mengembalikan jumlah maksimum. Demikian pula, kami mengembalikan jumlah minimum.

Kesimpulan

Pada dasarnya, pandas_udf () digunakan untuk melakukan operasi vektor pada DataFrame PySpark kami. Kami telah melihat cara membuat pandas_udf() dan menerapkannya ke PySpark DataFrame. Untuk pemahaman yang lebih baik, kami membahas berbagai contoh dengan mempertimbangkan semua tipe data (string, float, dan integer). Dimungkinkan untuk menggunakan pandas_udf() dengan groupby() melalui fungsi agg() .