পাইস্পার্ক পান্ডাস_ইউডিএফ()

Pa Isparka Pandasa I Udi Epha



pandas_udf() ফাংশন ব্যবহার করে PySpark DataFrame রূপান্তর করা সম্ভব। এটি একটি ব্যবহারকারী সংজ্ঞায়িত ফাংশন যা PySpark ডেটাফ্রেমে তীর সহ প্রয়োগ করা হয়। আমরা pandas_udf() ব্যবহার করে ভেক্টরাইজড অপারেশন করতে পারি। এটি একটি ডেকোরেটর হিসাবে এই ফাংশন পাস করে বাস্তবায়ন করা যেতে পারে। চলুন সিনট্যাক্স, প্যারামিটার এবং বিভিন্ন উদাহরণ জানতে এই গাইডে ডুব দেওয়া যাক।

বিষয়বস্তুর বিষয়:

আপনি যদি PySpark DataFrame এবং মডিউল ইনস্টলেশন সম্পর্কে জানতে চান তবে এটির মাধ্যমে যান নিবন্ধ .







Pyspark.sql.functions.pandas_udf()

pandas_udf () PySpark-এর sql.functions মডিউলে পাওয়া যায় যা 'from' কীওয়ার্ড ব্যবহার করে আমদানি করা যেতে পারে। এটি আমাদের PySpark ডেটাফ্রেমে ভেক্টরাইজড অপারেশন করতে ব্যবহৃত হয়। এই ফাংশনটি তিনটি পরামিতি অতিক্রম করে একটি ডেকোরেটরের মতো বাস্তবায়িত হয়। এর পরে, আমরা একটি ব্যবহারকারী-সংজ্ঞায়িত ফাংশন তৈরি করতে পারি যা একটি তীর ব্যবহার করে ভেক্টর ফর্ম্যাটে ডেটা ফেরত দেয় (যেমন আমরা এর জন্য সিরিজ/NumPy ব্যবহার করি)। এই ফাংশনের মধ্যে, আমরা ফলাফল ফেরত দিতে সক্ষম।



গঠন ও সিনট্যাক্স:



প্রথমে, আসুন এই ফাংশনের গঠন এবং সিনট্যাক্স দেখি:

@পান্ডাস_উডিএফ(ডেটাটাইপ)
def function_name(operation) -> convert_format:
ফেরত বিবৃতি

এখানে, function_name হল আমাদের সংজ্ঞায়িত ফাংশনের নাম। ডেটা টাইপ এই ফাংশন দ্বারা ফেরত দেওয়া ডেটা টাইপ নির্দিষ্ট করে। আমরা 'রিটার্ন' কীওয়ার্ড ব্যবহার করে ফলাফল ফেরত দিতে পারি। সমস্ত ক্রিয়াকলাপ তীর অ্যাসাইনমেন্টের সাথে ফাংশনের ভিতরে সঞ্চালিত হয়।





পান্ডাস_উডিএফ (ফাংশন এবং রিটার্ন টাইপ)

  1. প্রথম প্যারামিটারটি ব্যবহারকারী-সংজ্ঞায়িত ফাংশন যা এটিতে পাস করা হয়।
  2. দ্বিতীয় প্যারামিটারটি ফাংশন থেকে রিটার্ন ডেটা টাইপ নির্দিষ্ট করতে ব্যবহৃত হয়।

তথ্য:

এই সম্পূর্ণ গাইডে, আমরা প্রদর্শনের জন্য শুধুমাত্র একটি PySpark ডেটাফ্রেম ব্যবহার করি। আমরা সংজ্ঞায়িত সমস্ত ব্যবহারকারী-সংজ্ঞায়িত ফাংশন এই PySpark ডেটাফ্রেমে প্রয়োগ করা হয়। নিশ্চিত করুন যে আপনি PySpark ইনস্টল করার পরে প্রথমে আপনার পরিবেশে এই DataFrame তৈরি করেছেন।



পাইসপার্ক আমদানি করুন

pyspark.sql থেকে SparkSession আমদানি করুন

linuxhint_spark_app = SparkSession.builder.appName( 'লিনাক্স ইঙ্গিত' .getOrCreate()

pyspark.sql.functions থেকে pandas_udf আমদানি করুন

pyspark.sql.types থেকে আমদানি *

পান্ডা হিসাবে পান্ডা আমদানি করুন

# সবজির বিবরণ

সবজি =[{ 'টাইপ' : 'শাকসবজি' , 'নাম' : 'টমেটো' , 'স্থান_দেশ' : 'আমেরিকা' , 'পরিমাণ' : 800 },

{ 'টাইপ' : 'ফল' , 'নাম' : 'কলা' , 'স্থান_দেশ' : 'চীন' , 'পরিমাণ' : বিশ },

{ 'টাইপ' : 'শাকসবজি' , 'নাম' : 'টমেটো' , 'স্থান_দেশ' : 'আমেরিকা' , 'পরিমাণ' : 800 },

{ 'টাইপ' : 'শাকসবজি' , 'নাম' : 'আম' , 'স্থান_দেশ' : 'জাপান' , 'পরিমাণ' : 0 },

{ 'টাইপ' : 'ফল' , 'নাম' : 'লেবু' , 'স্থান_দেশ' : 'ভারত' , 'পরিমাণ' : 1700 },

{ 'টাইপ' : 'শাকসবজি' , 'নাম' : 'টমেটো' , 'স্থান_দেশ' : 'আমেরিকা' , 'পরিমাণ' : 1200 },

{ 'টাইপ' : 'শাকসবজি' , 'নাম' : 'আম' , 'স্থান_দেশ' : 'জাপান' , 'পরিমাণ' : 0 },

{ 'টাইপ' : 'ফল' , 'নাম' : 'লেবু' , 'স্থান_দেশ' : 'ভারত' , 'পরিমাণ' : 0 }

]

# উপরের ডেটা থেকে মার্কেট ডেটাফ্রেম তৈরি করুন

market_df = linuxhint_spark_app.createDataFrame(সবজি)

market_df.show()

আউটপুট:

এখানে, আমরা 4টি কলাম এবং 8টি সারি দিয়ে এই DataFrame তৈরি করি। এখন, আমরা ব্যবহারকারী-সংজ্ঞায়িত ফাংশন তৈরি করতে pandas_udf() ব্যবহার করি এবং এই কলামগুলিতে প্রয়োগ করি।

পান্ডাস_উডিএফ() বিভিন্ন ধরনের ডেটা সহ

এই পরিস্থিতিতে, আমরা pandas_udf() দিয়ে কিছু ব্যবহারকারী-সংজ্ঞায়িত ফাংশন তৈরি করি এবং সেগুলিকে কলামে প্রয়োগ করি এবং নির্বাচন() পদ্ধতি ব্যবহার করে ফলাফল প্রদর্শন করি। প্রতিটি ক্ষেত্রে, আমরা pandas.Series ব্যবহার করি যেহেতু আমরা ভেক্টরাইজড অপারেশন করি। এটি কলামের মানগুলিকে এক-মাত্রিক অ্যারে হিসাবে বিবেচনা করে এবং অপারেশনটি কলামে প্রয়োগ করা হয়। ডেকোরেটরে নিজেই, আমরা ফাংশন রিটার্ন টাইপ নির্দিষ্ট করি।

উদাহরণ 1: স্ট্রিং টাইপ সহ Pandas_udf()

এখানে, আমরা স্ট্রিং রিটার্ন টাইপ সহ দুটি ব্যবহারকারী-সংজ্ঞায়িত ফাংশন তৈরি করি যাতে স্ট্রিং টাইপ কলামের মানগুলিকে বড় হাতের এবং ছোট হাতের অক্ষরে রূপান্তর করা যায়। অবশেষে, আমরা এই ফাংশনগুলি 'টাইপ' এবং 'locate_country' কলামগুলিতে প্রয়োগ করি।

# pandas_udf দিয়ে টাইপ কলামকে বড় হাতের অক্ষরে রূপান্তর করুন

@pandas_udf(স্ট্রিংটাইপ())

def type_upper_case(i: panda.Series) -> panda.Series:

ফেরত i.str.upper()

# locate_country কলামকে pandas_udf দিয়ে ছোট হাতের অক্ষরে রূপান্তর করুন

@pandas_udf(স্ট্রিংটাইপ())

def country_lower_case(i: panda.Series) -> panda.Series:

ফেরত i.str.lower()

# নির্বাচন () ব্যবহার করে কলামগুলি প্রদর্শন করুন

market_df.select( 'প্রকার' ,টাইপ_অপার_হাস( 'প্রকার' ), 'স্থান_দেশ' ,
দেশ_লোয়ার_হাস( 'স্থান_দেশ' )) শো()

আউটপুট:

ব্যাখ্যা:

StringType() ফাংশন pyspark.sql.types মডিউলে উপলব্ধ। PySpark DataFrame তৈরি করার সময় আমরা ইতিমধ্যে এই মডিউলটি আমদানি করেছি।

  1. প্রথমত, UDF (ব্যবহারকারী-সংজ্ঞায়িত ফাংশন) str.upper() ফাংশন ব্যবহার করে বড় হাতের স্ট্রিংগুলি ফেরত দেয়। str.upper() সিরিজ ডেটা স্ট্রাকচারে উপলব্ধ (যেমন আমরা ফাংশনের ভিতরে একটি তীর দিয়ে সিরিজে রূপান্তর করছি) যা প্রদত্ত স্ট্রিংটিকে বড় হাতের অক্ষরে রূপান্তর করে। অবশেষে, এই ফাংশনটি 'টাইপ' কলামে প্রয়োগ করা হয় যা সিলেক্ট() মেথডের ভিতরে নির্দিষ্ট করা আছে। পূর্বে, টাইপ কলামের সমস্ত স্ট্রিং ছোট হাতের অক্ষরে ছিল। এখন, তারা বড় হাতের অক্ষরে পরিবর্তিত হয়.
  2. দ্বিতীয়ত, UDF str.lower() ফাংশন ব্যবহার করে বড় হাতের স্ট্রিংগুলি ফেরত দেয়। str.lower() সিরিজ ডেটা স্ট্রাকচারে উপলব্ধ যা প্রদত্ত স্ট্রিংকে ছোট হাতের অক্ষরে রূপান্তর করে। অবশেষে, এই ফাংশনটি 'টাইপ' কলামে প্রয়োগ করা হয় যা সিলেক্ট() মেথডের ভিতরে নির্দিষ্ট করা আছে। পূর্বে, টাইপ কলামের সমস্ত স্ট্রিং বড় হাতের অক্ষরে ছিল। এখন, তারা ছোট হাতের অক্ষরে পরিবর্তিত হয়.

উদাহরণ 2: পূর্ণসংখ্যার ধরন সহ Pandas_udf()

আসুন একটি UDF তৈরি করি যা PySpark DataFrame পূর্ণসংখ্যা কলামকে Pandas সিরিজে রূপান্তর করে এবং প্রতিটি মানের সাথে 100 যোগ করে। সিলেক্ট() পদ্ধতির ভিতরে এই ফাংশনে 'পরিমাণ' কলামটি পাস করুন।

# 100 যোগ করুন

@pandas_udf(IntegerType())

def add_100(i: panda.Series) -> panda.Series:

i+ ফেরত দিন 100

# উপরের ফাংশন এবং প্রদর্শনে পরিমাণ কলাম পাস করুন।

market_df.select( 'পরিমাণ' ,add_100( 'পরিমাণ' )) শো()

আউটপুট:

ব্যাখ্যা:

UDF-এর ভিতরে, আমরা সমস্ত মান পুনরাবৃত্তি করি এবং সেগুলিকে সিরিজে রূপান্তর করি। এর পরে, আমরা সিরিজের প্রতিটি মানের সাথে 100 যোগ করি। অবশেষে, আমরা এই ফাংশনে 'পরিমাণ' কলামটি পাস করি এবং আমরা দেখতে পাচ্ছি যে 100 সমস্ত মানের সাথে যোগ করা হয়েছে।

Groupby() এবং Agg() ব্যবহার করে বিভিন্ন ডেটা টাইপ সহ পান্ডাস_উডিএফ()

সমষ্টিগত কলামগুলিতে UDF পাস করার উদাহরণগুলি দেখুন। এখানে, গ্রুপবাই() ফাংশন ব্যবহার করে কলামের মানগুলি প্রথমে গ্রুপ করা হয় এবং agg() ফাংশন ব্যবহার করে একত্রিত করা হয়। আমরা এই সমষ্টিগত ফাংশনের মধ্যে আমাদের UDF পাস করি।

বাক্য গঠন:

pyspark_dataframe_object.groupby( 'গ্রুপিং_কলাম' .agg(UDF
(pyspark_dataframe_object[ 'কলাম' ]))

এখানে, গ্রুপিং কলামের মানগুলি প্রথমে গ্রুপ করা হয়। তারপরে, আমাদের UDF এর ক্ষেত্রে প্রতিটি গোষ্ঠীবদ্ধ ডেটাতে একত্রীকরণ করা হয়।

উদাহরণ 1: পান্ডাস_উডিএফ() সমষ্টিগত গড় () সহ

এখানে, আমরা রিটার্ন টাইপ ফ্লোট সহ একটি ব্যবহারকারী-সংজ্ঞায়িত ফাংশন তৈরি করি। ফাংশনের ভিতরে, আমরা গড় () ফাংশন ব্যবহার করে গড় গণনা করি। প্রতিটি প্রকারের গড় পরিমাণ পেতে এই UDF 'পরিমাণ' কলামে পাঠানো হয়।

# গড়/গড় ফেরত দিন

@পান্ডাস_উডিএফ( 'ভাসা' )

def average_function(i: panda.Series) -> float:

ফেরত i.mean()

# টাইপ কলাম গ্রুপ করে ফাংশনে পরিমাণ কলাম পাস করুন।

market_df.groupby( 'প্রকার' .agg(গড়_ফাংশন(বাজার_ডিএফ[ 'পরিমাণ' ])) শো()

আউটপুট:

আমরা 'টাইপ' কলামের উপাদানগুলির উপর ভিত্তি করে গ্রুপিং করছি। দুটি গ্রুপ গঠিত হয় - 'ফল' এবং 'সবজি'। প্রতিটি গ্রুপের জন্য, গড় গণনা করা হয় এবং ফেরত দেওয়া হয়।

উদাহরণ 2: Pandas_udf() সমষ্টি সর্বোচ্চ() এবং Min() সহ

এখানে, আমরা পূর্ণসংখ্যা (int) রিটার্ন টাইপ সহ দুটি ব্যবহারকারী-সংজ্ঞায়িত ফাংশন তৈরি করি। প্রথম UDF সর্বনিম্ন মান প্রদান করে এবং দ্বিতীয় UDF সর্বোচ্চ মান প্রদান করে।

# pandas_udf যা সর্বনিম্ন মান প্রদান করে

@পান্ডাস_উডিএফ( 'int' )

def min_(i: panda.Series) -> int:

ফেরত i.min()

# pandas_udf যা সর্বোচ্চ মান প্রদান করে

@পান্ডাস_উডিএফ( 'int' )

def max_(i: panda.Series) -> int:

ফেরত i.max()

# locate_country গোষ্ঠীবদ্ধ করে min_pandas_udf-এ পরিমাণ কলাম পাস করুন।

market_df.groupby( 'স্থান_দেশ' .agg(min_(market_df[ 'পরিমাণ' ])) শো()

# locate_country গোষ্ঠীবদ্ধ করে পরিমাণ কলামটিকে সর্বাধিক_পান্ডাস_উডিএফ-এ পাস করুন।

market_df.groupby( 'স্থান_দেশ' .agg(max_(market_df[ 'পরিমাণ' ])) শো()

আউটপুট:

সর্বনিম্ন এবং সর্বোচ্চ মান ফেরত দিতে, আমরা UDF-এর রিটার্ন প্রকারে min() এবং max() ফাংশন ব্যবহার করি। এখন, আমরা 'locate_country' কলামে ডেটা গ্রুপ করি। চারটি দল গঠিত হয় ('চীন', 'ভারত', 'জাপান', 'মার্কিন যুক্তরাষ্ট্র')। প্রতিটি গ্রুপের জন্য, আমরা সর্বাধিক পরিমাণ ফেরত দিই। একইভাবে, আমরা সর্বনিম্ন পরিমাণ ফেরত দিই।

উপসংহার

মূলত, pandas_udf() আমাদের PySpark ডেটাফ্রেমে ভেক্টরাইজড অপারেশন করতে ব্যবহৃত হয়। আমরা দেখেছি কিভাবে pandas_udf() তৈরি করতে হয় এবং PySpark ডেটাফ্রেমে প্রয়োগ করতে হয়। আরও ভালভাবে বোঝার জন্য, আমরা সমস্ত ডেটাটাইপ (স্ট্রিং, ফ্লোট এবং পূর্ণসংখ্যা) বিবেচনা করে বিভিন্ন উদাহরণ নিয়ে আলোচনা করেছি। agg() ফাংশনের মাধ্যমে groupby() এর সাথে pandas_udf() ব্যবহার করা সম্ভব।