Data Modeling with Apache Spark + Delta Lake + Presto
Intro
We want to declare an integrated solution that you can use to build a columnar pre-aggregated dataset with one table and expanding it based on needs. This solution is made by using GROUPING SETS in Spark SQL to read and store dataset in Delta Lake which is columnar storage and finally querying with Presto in the presentation layer anywhere we would like. This approach accelerates query results by storing the calculated values per group of sets that have been defined.
Problem
Suppose we have some columns like Date, Brand, Category, Product, and Sale. In classic multidimensional data modeling we make some Dim tables such as Dim Date, Dim Category, etc around a Fact table which stored Dim Keys and for example Sale as Measure in a star model. Then classically we should create an OLAP process to fold our data warehouse in cubes with pre-aggregation for calculating complex aggregations. We want to simplify this process and simultaneously make it fast for the big data we have.
Solution
Now we propose a new way to do calculations fast as possible with PySpark and Delta Lake as columnar storage with more flexibility than classic modeling. We can unfold our data aggregation with GROUPING SET which is supported in SQL. In the following, we explain how it works and how we can use this idea with Spark SQL. You may consider that some commands like ROLL UP and CUBE exist. They do more aggregation based on aspects of columns and GROUPING SET does it just for the custom group which we have defined.
In our example suppose that we need sales based on different columns such as below:
{Date, Brand}
{Brand, Category}
{Category}
GROUPING SET helps us to create get the result as one union table which made by different group by the set as below.
SELECT Date, Brand, Category, SUM(Sale) AS Sum_Sale
FROM data
GROUP BY GROUPING SETS (
(Date, Brand),
(Brand, Category),
(Category)
)
Then we can filter the result and get a different report from one dataset. Because of the columnar structure of delta lake, storing the data like this is more efficient and gets a small size versus row-based databases.
Now we want to implement this solution via PySpark and get results with Presto from Delta Lake.
Apache Spark
Apache Spark is a unified analytics engine for big data processing, with built-in modules for streaming, SQL, machine learning and graph processing.
Delta Lake
Delta Lake is an open-source storage layer that brings ACID transactions to Apache Spark™ and big data workloads.
Presto
Presto is an open source distributed SQL query engine for running interactive analytic queries against data sources of all sizes ranging from gigabytes to petabytes.
Code
Configuring SparkSession for Delta Lake:
sparkSession = SparkSession.builder \
.master("local[*]") \
.appName("My App") \
.config("spark.driver.maxResultSize", "2000m") \
.config("spark.sql.shuffle.partitions", nb_partitions) \
.config("spark.hadoop.dfs.replication", 1) \
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
.config("spark.hadoop.hive.metastore.uris", hive_metastore_uri) \
.config("spark.sql.catalogImplementation", "hive") \
.getOrCreate()
Reading data from source and doing calculations for pre-aggregation:
df = sparkSession.read.format("delta") \
.load(read_path) \ --or anywhere else
.select(col('data.*'))df.persist()
df.createOrReplaceTempView("dataset")result = sparkSession.sql("""
SELECT Date, Brand, Category, SUM(Sale) AS Sum_Sale
FROM dataset
GROUP BY GROUPING SETS (
(Date, Brand),
(Brand, Category),
(Category)
)
""")
Writing result in Delta Lake with PySpark:
result.write.format('delta').mode("overwrite") \ .save(write_path)
Integrate Presto with Delta Lake for our table result:
sparkSession.sql("""GENERATE symlink_format_manifest FOR TABLE delta.`{}`""".format(
table_path))
ddl = """CREATE EXTERNAL TABLE if not exists {}({})
ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
STORED AS INPUTFORMAT 'org.apache.hadoop.hive.ql.io.SymlinkTextInputFormat'
OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
LOCATION '{}/_symlink_format_manifest'""" \
.format(table_name, col_name_type, table_path)
sparkSession.sql(ddl)
Finally, query on Delta Lake with Presto:
SELECT Category, Sum_Sale
FROM result
WHERE Category IS NOT NULL
AND Brand IS NULL
AND Date IS NULL
Conclusion
We propose an approach to combine the speed of Apache Spark for calculation, power of Delta Lake as columnar storage for big data, the flexibility of Presto as SQL query engine, and implementing a pre-aggregation technique like OLAP systems. This solution makes it happen that we achieve more speed to get reports and not occupying much space like row-based databases.