Taipy is a powerful workflow orchestration tool with an easy-to-use framework to apply to your existing data applications with little effort. Taipy is built on a solid foundation of concepts — Scenarios, Tasks and DataNodes — which are robust in allowing developers to easily model their pipelines, even when using 3rd party packages without explicit support.
If you’re already familiar with PySpark and Taipy, you can skip ahead to “2. The Taipy configuration (config.py)”. That section dives right into the nitty-gritty of defining a function for a Taipy task to run a PySpark application. Otherwise, read on!
This article will employ a simple example to demonstrate how we can integrate PySpark with Taipy to couple your big data processing needs with smart job execution.
This dataset only contains 344 records — hardly a dataset which requires Spark for processing. However, this dataset is accessible, and its size is not relevant for demonstrating the integration of Spark with Taipy. You may duplicate the data as many times as you need if you must test this with a larger dataset.
We’ll design a workflow which performs two main tasks:
1- Spark task (spark_process):
Load the data;
Group the data by “species”, “island” and “sex”;
Find the mean of the other columns (”bill_length_mm”, “bill_depth_mm”, “flipper_length_mm”, “body_mass_g”);
Save the data.
2- Python task (filter):
Load the output data saved previously by the Spark task;
Given a “species”, “island” and “sex”, return the aggregated values.
Our little project will comprise of 4 files:
You can find the contents of each file (other than penguins.csv which you can get from palmerpenguins repository) in code blocks within this article.
1. The Spark Application (penguin_spark_app.py)
Normally, we run PySpark tasks with the spark-submit command line utility. You can read more about the what and the why of submitting Spark jobs in this way in their own documentation here.
When using Taipy for our workflow orchestration, we can continue doing the same thing. The only difference is that instead of running a command in the command line, we have our workflow pipeline spawn a subprocess which runs the Spark application using spark-submit.
Before getting into that, let’s first take a look at our Spark application. Simply glance through the code, then continue reading on for a brief explanation on what this script does:
We can submit this Spark application for execution by entering a command into the terminal like:
Which would do the following:
Submits the penguin_spark_app.py application for local execution on 8 CPU cores;
Loads data from the app/penguins.csv CSV file;
Groups by “species”, “island” and “sex”, then aggregates the remaining columns by mean;
Saves the resultant DataFrame to app/output.csv.
Thereafter, the contents of app/output.csv should be exactly as follows:
Also, note that we have coded the Spark application to receive 2 command line parameters:
— input-csv-path : Path to the input penguin CSV file; and
— output-csv-path : Path to save the output CSV file after processing by the Spark app.
2. The Taipy configuration (config.py)
At this point, we have our penguin_spark_app.py PySpark application and need to create a Taipy task to run this PySpark application.
Again, take a quick glance through the app/config.py script and then continue reading on:
You can also build the Taipy configuration usingTaipy Studio, a Visual Studio Code extension which provides a graphical editor for building a Taipy .toml configuration file.
The PySpark task in Taipy
We are particularly interested in the code section which produces this part of the DAG:
Let’s extract and examine the relevant section of the config.pyscript which creates the “spark_process” Spark task (and its 3 associated data nodes) in Taipy as shown in the image above:
Since we designed the penguin_spark_app.py Spark application to receive 2 parameters (input_csv_path and output_csv_path), we chose to represent these 2 parameters as Taipy data nodes. Note that your use case may differ, and you can (and should!) modify the task, function and associated data nodes according to your needs. For example, you may:
Have a Spark task which performs some routine ETL and returns nothing;
Prefer to hard code the input and output paths instead of persisting them as data nodes; or
Save additional application parameters as data nodes and pass them to the Spark application.
Then, we run spark-submit as a Python subprocess like so:
Recall that the order of the list elements should retain the following format, as if they were executed on the command line:
Again, depending on our use case, we could specify a different spark-submit script path, Spark arguments (we supplied none in our example) or different application arguments based on our needs.
Reading and returning output_csv_path
Notice that the spark_process function ended like so:
In our case, we want our Taipy task to output the data after it is processed by Spark — so that it can be written to the processed_penguin_df_cfg Parquet data node. One way we can do this is by manually reading from the output target (in this case, output_csv_path) and then returning it as a Pandas DataFrame.
However, if you don’t need the return data of the Spark application, you can simply have your Taipy task (via the spark_process function) return None.
Caching the Spark Task
Since we configured spark_process_task_cfg with the skippableproperty set to True, when re-executing the scenario, Taipy will skip the re-execution of thespark_processtask and reuse the persisted task output: the processed_penguin_df_cfg Pandas DataFrame.
However, we also defined a validity_period of 1 day for the processed_penguin_df_cfg data node, so Taipy will still re-run the task if the DataFrame was last cached more than a day ago.
3. Building a GUI (main.py)
We’ll complete our application by building the GUI which we saw at the beginning of this article:
If you’re unfamiliar with Taipy’s GUI capabilities, you can find a quickstart here. In any case, you can just copy and paste the following code for app/main.py since it isn’t our focus:
Then, from the project folder, you can run the main script like so:
Conclusion
Now that you’ve seen an example of how to use PySpark with Taipy, go on and try using these two tools to enhance your own data applications!
If you’ve struggled with other workflow orchestration tools slowing down your work and getting in your way, don’t let it deter you from trying Taipy. Taipy is easy to use and strives to not limit itself in which 3rd party packages you can use it with — its robust and flexible framework makes it easy to adapt it to any data application.
You can find all the code and data in this repository.