What is... the reason my code is running slow ?
There are a few reasons why your code may be running slow below
-
Data shuffling - You may be experiencing data shuffling, this is where your data is held in one geo location but processed in another, this means that data has to be copied across geo locations. not only can this affect performance but increase costs as moving data over geo locations occurs a cost. To overcome this make sure your data and clusters are in the same geo location.
-
Wrong cluster size - Your cluster size may be incorrect for your needs, the memory may be too low resulting in the process having to write too much to disk, or the code is trying to do too much for your cluster size
-
Wrong cluster type - If your task is a scheduled job then it should be running on a job cluster not a personal cluster. Job clusters are siloed from and all resources are dedicated to that task. Personal clusters may have conflicting work running while the job is underway.
- Poor code - It may be that your approach needs to be re-evaluated. Spark is well optimised, but can only do so much. Consider if there is a more efficient approach to performing the transformations you require.
-
Disk usage - You may be writing back to disk too often. Spark is highly optimised for in-memory operations. There are several things we can look at doing to help.
- Rewrite any SQL to Python, using Dataframes. Working in memory takes advantage of Spark's optimisations.
- Stop updating/merging/deleting in SQL tables. When you perform one of these tasks you actually are reading then rewriting the ENTIRE file back to disk, even if you are updating 1 record.
- Consider working on a chunk of the data, rather than a whole dataset. This can also be beneficial during testing, to limit the amount of data being read/written at once.
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when, round
# Function to perform the transformation for a given ID range and insert into an existing table
def process_marital_status_range(start_id, end_id, output_table):
# Use the specified database
spark.sql("USE insight.enhance_IMP")
enhance_2_maritalstatus = spark.sql(f"SELECT * FROM Enhance_2_Age__FINAL WHERE ID BETWEEN {start_id} AND {end_id}")
enhance_2_maritalstatus_2 = spark.sql(f"SELECT ID,i_MaritalStatus as i_MaritalStatus_2,d_MaritalStatus_1,d_MaritalStatus_2,d_MaritalStatus_3,d_MaritalStatus_4 FROM Enhance_2_MaritalStatus_2 WHERE ID BETWEEN {start_id} AND {end_id}")
# Perform the join and transformation
enhance_2_maritalstatus_3 = (
enhance_2_maritalstatus
.join(enhance_2_maritalstatus_2, col("Enhance_2_Age__FINAL.ID") == col("Enhance_2_MaritalStatus_2.ID"))
.select(
enhance_2_maritalstatus["*"],
when(col("i_MaritalStatus_2") == 1, 1).otherwise(0).alias("IVIB_MaritalStatus_1"),
round(col("d_MaritalStatus_1"), 5).alias("IVIC_MaritalStatus_1")
)
)
# Save the transformed data to the existing table or create it if it doesn't exist
enhance_2_maritalstatus_3.write.mode("append").saveAsTable(output_table)
min_id_result = spark.sql("SELECT min(ID) as ID FROM Enhance_2_Age__FINAL")
max_id_result = spark.sql("SELECT MAX(ID) as ID FROM Enhance_2_Age__FINAL")
# Retrieve the max ID value
start = min_id_result.collect()[0]["ID"]
end = max_id_result.collect()[0]["ID"]
#start = 1
#end = 10
step = 10000000
# Loop through the range
for i in range(start, end + 1, step):
current_start = i
current_end = i + step - 1 if i + step - 1 <= end else end
# Print the current start and end of the range
print(f"Current Range: {current_start}-{current_end}")
process_marital_status_range(current_start, current_end, 'Enhance_2_MaritalStatus_3')
3. Instead of writing all your data back to SQL tables, create temporary views. These, like their SQL server counterparts, are not physical tables and exist only in memory. These are great for temporary datasets that dont need to persist. There are 2 ways of creating them (see below). There are issues with temporary views:
- They only exist in the notebook or query you are running, so will need to be recreated in each notebook, if needed again.
-
You can nest temporary views inside other temporary views, HOWEVER, when you actually need to use the dataset, the 1st views dataset needs to be created in memory before the next is run, so you should limited the number of nested views as this can lead to writing to disk OR running out of memory.
- Temporary views are READ ONLY. Thus you should use them for intermediate steps which only need to be read from. If you require appending or inserting data, then consider using Dataframes and saving to a table instead.
create or replace temporary view Enhance_2_Age_3 as select Enhance_2_Age_discrim.*
,substring(DateOfBirth,1,4) as YearOfBirth
,Enhance_2_Age_2.r_age
,0 as Age_Work
,RUNDATE
,DateOfBirth
from Enhance_2_Age_discrim
inner join Enhance_2_Age_2 on Enhance_2_Age_2.ID = Enhance_2_Age_discrim.ID
data = [("Alice", 25), ("Bob", 30), ("Charlie", 35)]
columns = ["Name", "Age"]
df = spark.createDataFrame(data, schema=columns)
# Create a temporary view of the DataFrame
df.createOrReplaceTempView("people")
See Also: