কিভাবে PySpark এ একটি টেবিল ডেটা পড়তে এবং লিখতে হয়

Kibhabe Pyspark E Ekati Tebila Deta Parate Ebam Likhate Haya



PySpark-এ ডেটা প্রসেসিং দ্রুততর হয় যদি ডেটা টেবিলের আকারে লোড করা হয়। এর সাথে, এসকিউএল এক্সপ্রেশন ব্যবহার করে, প্রক্রিয়াকরণ দ্রুত হবে। সুতরাং, প্রক্রিয়াকরণের জন্য পাঠানোর আগে PySpark DataFrame/RDD কে একটি টেবিলে রূপান্তর করা আরও ভাল পদ্ধতি। আজ, আমরা দেখব কীভাবে PySpark ডেটাফ্রেমে টেবিলের ডেটা পড়তে হয়, টেবিলে PySpark ডেটাফ্রেম লিখতে হয় এবং বিল্ট-ইন ফাংশন ব্যবহার করে বিদ্যমান টেবিলে নতুন ডেটাফ্রেম সন্নিবেশ করা যায়। চলো যাই!

Pyspark.sql.DataFrameWriter.saveAsTable()

প্রথমত, আমরা দেখব কিভাবে লেখা.saveAsTable() ফাংশন ব্যবহার করে টেবিলে বিদ্যমান PySpark DataFrame লিখতে হয়। টেবিলে ডেটাফ্রেম লিখতে এটি টেবিলের নাম এবং অন্যান্য ঐচ্ছিক প্যারামিটার যেমন মোড, partionBy ইত্যাদি লাগে। এটি একটি parquet ফাইল হিসাবে সংরক্ষণ করা হয়.

বাক্য গঠন:







dataframe_obj.write.saveAsTable(path/Table_name,mode,partitionBy,…)
  1. Table_name হল টেবিলের নাম যা dataframe_obj থেকে তৈরি করা হয়।
  2. আমরা মোড প্যারামিটার ব্যবহার করে টেবিলের ডেটা যুক্ত/ওভাররাইট করতে পারি।
  3. এই প্রদত্ত কলামের মানের উপর ভিত্তি করে পার্টিশন তৈরি করতে partitionBy একক/মাল্টিপল কলাম নেয়।

উদাহরণ 1:

5টি সারি এবং 4টি কলাম সহ একটি PySpark ডেটাফ্রেম তৈরি করুন। এই ডেটাফ্রেমটিকে “Agri_Table1” নামের একটি টেবিলে লিখুন।



পাইসপার্ক আমদানি করুন

pyspark.sql থেকে SparkSession আমদানি করুন

linuxhint_spark_app = SparkSession.builder.appName( 'লিনাক্স ইঙ্গিত' .getOrCreate()

# 5টি সারি এবং 5টি কলাম সহ কৃষি তথ্য

কৃষি =[{ 'মাটির_প্রকার' : 'কালো' , 'সেচ_প্রাপ্যতা' : 'না' , 'একর' : 2500 , 'মাটির_স্থিতি' : 'শুকনো' ,
'দেশ' : 'আমেরিকা' },

{ 'মাটির_প্রকার' : 'কালো' , 'সেচ_প্রাপ্যতা' : 'হ্যাঁ' , 'একর' : 3500 , 'মাটির_স্থিতি' : 'ভেজা' ,
'দেশ' : 'ভারত' },

{ 'মাটির_প্রকার' : 'লাল' , 'সেচ_প্রাপ্যতা' : 'হ্যাঁ' , 'একর' : 210 , 'মাটির_স্থিতি' : 'শুকনো' ,
'দেশ' : 'ইউকে' },

{ 'মাটির_প্রকার' : 'অন্য' , 'সেচ_প্রাপ্যতা' : 'না' , 'একর' : 1000 , 'মাটির_স্থিতি' : 'ভেজা' ,
'দেশ' : 'আমেরিকা' },

{ 'মাটির_প্রকার' : 'বালি' , 'সেচ_প্রাপ্যতা' : 'না' , 'একর' : 500 , 'মাটির_স্থিতি' : 'শুকনো' ,
'দেশ' : 'ভারত' }]



# উপরের ডেটা থেকে ডেটাফ্রেম তৈরি করুন

agri_df = linuxhint_spark_app.createDataFrame(agri)

agri_df.show()

# উপরের ডেটাফ্রেমটি টেবিলে লিখুন।

agri_df.coalesce( 1 .write.saveAsTable( 'কৃষি_সারণী 1' )

আউটপুট:







আমরা দেখতে পাচ্ছি যে আগের PySpark ডেটা দিয়ে একটি parquet ফাইল তৈরি করা হয়েছে।



উদাহরণ 2:

আগের ডেটাফ্রেমটি বিবেচনা করুন এবং 'দেশ' কলামের মানগুলির উপর ভিত্তি করে রেকর্ডগুলিকে বিভাজন করে টেবিলে 'Agri_Table2' লিখুন।

# partitionBy প্যারামিটার সহ টেবিলে উপরের ডেটাফ্রেমটি লিখুন

agri_df.write.saveAsTable( 'কৃষি_সারণী 2' ,বিভাজন দ্বারা =[ 'দেশ' ])

আউটপুট:

'দেশ' কলামে তিনটি অনন্য মান রয়েছে - 'ভারত', 'ইউকে', এবং 'মার্কিন যুক্তরাষ্ট্র'। সুতরাং, তিনটি পার্টিশন তৈরি করা হয়। প্রতিটি পার্টিশনে parquet ফাইল থাকে।

Pyspark.sql.DataFrameReader.table()

spark.read.table() ফাংশন ব্যবহার করে PySpark ডেটাফ্রেমে টেবিল লোড করা যাক। এটি শুধুমাত্র একটি প্যারামিটার লাগে যা পাথ/টেবিল নাম। এটি সরাসরি PySpark DataFrame-এ টেবিল লোড করে এবং PySpark DataFrame-এ প্রয়োগ করা সমস্ত SQL ফাংশনও এই লোড করা ডেটাফ্রেমে প্রয়োগ করা যেতে পারে।

বাক্য গঠন:

spark_app.read.table(পাথ/'টেবিল_নাম')

এই পরিস্থিতিতে, আমরা আগের টেবিলটি ব্যবহার করি যা PySpark ডেটাফ্রেম থেকে তৈরি করা হয়েছিল। আপনার পরিবেশে পূর্ববর্তী দৃশ্য কোড স্নিপেটগুলি বাস্তবায়ন করতে হবে তা নিশ্চিত করুন।

উদাহরণ:

'লোডেড_ডেটা' নামের ডেটাফ্রেমে 'Agri_Table1' টেবিলটি লোড করুন।

loaded_data = linuxhint_spark_app.read.table( 'কৃষি_সারণী1' )

loaded_data.show()

আউটপুট:

আমরা দেখতে পাচ্ছি যে টেবিলটি PySpark ডেটাফ্রেমে লোড হয়েছে।

এসকিউএল কোয়েরি নির্বাহ করা হচ্ছে

এখন, আমরা spark.sql() ফাংশন ব্যবহার করে লোড করা ডেটাফ্রেমে কিছু SQL কোয়েরি চালাই।

# উপরের টেবিল থেকে সমস্ত কলাম প্রদর্শন করতে SELECT কমান্ডটি ব্যবহার করুন।

linuxhint_spark_app.sql( 'Agri_Table1 থেকে * নির্বাচন করুন' দেখান()

# যেখানে ধারা

linuxhint_spark_app.sql( 'কৃষি_সারণী 1 থেকে * নির্বাচন করুন যেখানে মাটি_স্থিতি='শুষ্ক'' দেখান()

linuxhint_spark_app.sql( 'Agri_Table1 থেকে * নির্বাচন করুন যেখানে একর > 2000' দেখান()

আউটপুট:

  1. প্রথম প্রশ্নটি ডেটাফ্রেম থেকে সমস্ত কলাম এবং রেকর্ড প্রদর্শন করে।
  2. দ্বিতীয় প্রশ্নটি 'মাটির_স্থিতি' কলামের উপর ভিত্তি করে রেকর্ডগুলি প্রদর্শন করে। 'শুষ্ক' উপাদানের সাথে মাত্র তিনটি রেকর্ড রয়েছে।
  3. শেষ ক্যোয়ারীটি 'একর' সহ দুটি রেকর্ড প্রদান করে যা 2000 এর চেয়ে বেশি।

Pyspark.sql.DataFrameWriter.insertInto()

insertInto() ফাংশন ব্যবহার করে, আমরা বিদ্যমান টেবিলে ডেটাফ্রেম যুক্ত করতে পারি। আমরা এই ফাংশনটি সিলেক্টএক্সপ্র() এর সাথে ব্যবহার করে কলামের নাম নির্ধারণ করতে পারি এবং তারপর এটি টেবিলে ঢোকাতে পারি। এই ফাংশনটি একটি প্যারামিটার হিসাবে টেবিলের নামও নেয়।

বাক্য গঠন:

DataFrame_obj.write.insertInto(’টেবিল_নাম’)

এই পরিস্থিতিতে, আমরা আগের টেবিলটি ব্যবহার করি যা PySpark ডেটাফ্রেম থেকে তৈরি করা হয়েছিল। আপনার পরিবেশে পূর্ববর্তী দৃশ্য কোড স্নিপেটগুলি বাস্তবায়ন করতে হবে তা নিশ্চিত করুন।

উদাহরণ:

দুটি রেকর্ড সহ একটি নতুন ডেটাফ্রেম তৈরি করুন এবং সেগুলিকে 'Agri_Table1' টেবিলে ঢোকান।

পাইসপার্ক আমদানি করুন

pyspark.sql থেকে SparkSession আমদানি করুন

linuxhint_spark_app = SparkSession.builder.appName( 'লিনাক্স ইঙ্গিত' .getOrCreate()

# 2 সারি সহ কৃষি তথ্য

কৃষি =[{ 'মাটির_প্রকার' : 'বালি' , 'সেচ_প্রাপ্যতা' : 'না' , 'একর' : 2500 , 'মাটির_স্থিতি' : 'শুকনো' ,
'দেশ' : 'আমেরিকা' },

{ 'মাটির_প্রকার' : 'বালি' , 'সেচ_প্রাপ্যতা' : 'না' , 'একর' : 1200 , 'মাটির_স্থিতি' : 'ভেজা' ,
'দেশ' : 'জাপান' }]

# উপরের ডেটা থেকে ডেটাফ্রেম তৈরি করুন

agri_df2 = linuxhint_spark_app.createDataFrame(agri)

agri_df2.show()

# write.insertInto()

agri_df2.selectExpr( 'একর' , 'দেশ' , 'সেচ_প্রাপ্যতা' , 'মাটির_প্রকার' ,
'মাটির_স্থিতি' .write.insertInto( 'কৃষি_সারণী 1' )

# চূড়ান্ত Agri_Table1 প্রদর্শন করুন

linuxhint_spark_app.sql( 'Agri_Table1 থেকে * নির্বাচন করুন' দেখান()

আউটপুট:

এখন, ডেটাফ্রেমে থাকা মোট সারির সংখ্যা 7টি।

উপসংহার

আপনি এখন বুঝতে পারছেন কিভাবে write.saveAsTable() ফাংশন ব্যবহার করে টেবিলে PySpark ডেটাফ্রেম লিখতে হয়। এটি টেবিলের নাম এবং অন্যান্য ঐচ্ছিক পরামিতি নেয়। তারপর, আমরা spark.read.table() ফাংশন ব্যবহার করে এই টেবিলটি PySpark ডেটাফ্রেমে লোড করেছি। এটি শুধুমাত্র একটি প্যারামিটার লাগে যা পাথ/টেবিল নাম। আপনি যদি বিদ্যমান টেবিলে নতুন ডেটাফ্রেম যুক্ত করতে চান, তাহলে insertInto() ফাংশনটি ব্যবহার করুন।