In case you haven’t been paying attention, Lakehouses are kind of a big deal, with more and more organizations pivoting to adopt the technology (or is it a lifestyle?) as a way to wrangle multiple data sources, formats, and usage patterns in one unified platform. Vendors may vary on how they implement them, but in my opinion (which to be clear, isn’t totally influenced by where I work as of the writing of this blog post) is that the open Lakehouse built with Databricks, which embraces the concept of decoupling compute and storage, is still the best bet.
For the uninitiated: the Lakehouse architecture hinges on that one crucial point: your storage is decoupled from your compute. This means you can lean in hard on cheap cloud storage options like Azure Data Lake Gen2, Amazon S3, and Google Cloud Storage. Technologies like Databricks, which helped develop the open storage format Delta, also means you control where your data lives and who can access it and where they can access it from, keeping tight controls on things like private network connectivity.
Since compute and storage are now separated, this means that any time you want to work with your data, you need some form of compute engine that is capable of connecting to and reading your data from your storage locations. Compute engines vary, but one of the best is Apache Spark, which gives you a great distributed compute layer suitable for all sorts of workloads, whether they be analytical and ad-hoc queries, dashboard or BI workloads, data engineering related, or even data science or AI/ML use cases. It really can do it all, and it does it very well.
But what about use operational use cases? For instance: let’s say your Lakehouse is hosting some data that is critical to customer-facing systems that demand low-latency response times, such as real-time users lookups, API interfaces, or event-driven systems, sometimes the overhead required to take a query, schedule it, and run it can be in the hundreds of milliseconds. For some workloads, that’s a lifetime.
For instance, let’s say I have a simple API microservice that looks up a recent taxi trip by some sort of key value; a key lookup, a staple of any information system. Here’s a quick and dirty Python function written for Uvicorn that handles it (but you can just as easily recreate this in your language and/or framework of choice). I can hook it up to Databricks, and have it query a Serverless SQL Warehouse endpoint to get my piece of data:
@app.get("/direct-sql-query-test") def queryDBSQL(trip_id: int = 0): from databricks import sql import json import os from datetime import date, datetime connection = sql.connect(server_hostname = os.getenv("DATABRICKS_SERVER_HOSTNAME"), http_path = os.getenv("DATABRICKS_HTTP_PATH"), access_token = os.getenv("DATABRICKS_TOKEN")) with connection.cursor() as cursor: cursor.execute("select * from drewfurgiuele.sample.trips WHERE trip_id = %(trip_id)s", {'trip_id': trip_id }) result = cursor.fetchone() response = { "trip_id" : result.trip_id, "tpep_pickup_datetime" : str(result.tpep_pickup_datetime), "tpep_dropoff_datetime" : str(result.tpep_dropoff_datetime), "trip_distance" : float(result.trip_distance), "fare_amount" : float(result.fare_amount), "pickup_zip" : result.pickup_zip, "dropoff_zip" : result.dropoff_zip } return JSONResponse(content=response)
The response times are relatively consistent, with the response coming back with lows in the mid ~250ms, to the higher end of ~500ms, and all sorts of times in between. This is also a cached response, meaning, I ran the query several times and observed these numbers. First-run numbers trend higher, by anywhere from 150 – 200ms on average. And this is by no means terrible but if our calling service is latency-averse, clearly we need to make this faster.
At this point, data architects and systems engineers will probably want pivot to either extracting or exporting their data to a relational database system that can return the data faster, like Postgres, or add a caching layer with something like Redis. And while this is a solid plan, this introduces new challenges, such as how you plan to keep your data in sync, where do you host the service, and how much is it going to cost you? And will the target service meet your latency requirements?
In my opinion, I tend to lean into the caching solutions, mostly because this is what they’re designed for. At this point, your more traditional DBAs will roll their eyes at you, or send you memes with things like “lol no sql” but pay them no mind, as they’re mostly unhappy people with a limited worldview. Instead, let’s focus on being solutions oriented.
So you’ve decided to roll with a caching service to get these responses to the calling applications and services faster, except you have a problem: how are you going to sync your data? And how will you continue to sync your data? Well, that’s where this story takes a unique twist: Spark and Databricks makes this especially trivial. No, really!
First, we need to establish our connectivity to our Redis instance. Redis has it’s own Spark driver that makes reading and writing data to Redis databases very simple. For our purposes, we need to do three things
First, create a Databricks cluster and set some Spark configuration options. Specifically, we need three settings:
spark.redis.host <your FQDN of your Redis instance> spark.redis.port <port your Redis instance is listening on> spark.redis.auth <your instance passsword>
These settings will be used by the Redis driver (below) to establish the connectivity between Spark and Redis. Note, this is a very basic configuration. Other settings can be provided, in case you’re using ACLs, SSL, or other connection options. The full list of options can be found on the driver’s GitHub page, here: spark-redis/doc/configuration.md at master · RedisLabs/spark-redis · GitHub. You should also, as a best practice, put these values into a secret store in Databricks as well.
Next, we need the spark-redis driver. You can either download the source code and build it yourself, or grab it via Maven in the Databricks Library installation UI. For this demonstration, I’m using the spark-redis_2.12
package.
Finally, once the cluster is running and the driver is installed, we can test our writing our data to the Redis database. Using Pyspark, this is pretty trivial indeed: we just read our records into a Spark dataframe, then turn around and write the data with a .write
() method:
#Read the entire contents of our trips table trips_df = spark.read.table("drewfurgiuele.sample.trips") #Get a count of the records, just to see how much data we're about to commit full_df.count() #Write to Redis full_df.write.format("org.apache.spark.sql.redis") \ .mode("append") \ .option("table", "trips_cache") \ .option("key.column", "trip_id") \ .save()
Let’s break this down: first, we grab our data into a Spark dataframe. Next, just for funsies, we do a .count()
on the dataframe to see how many records we’re going to write. The real magic then happens when we use the .write()
method on the datafarme and specify the format as org.apache.spark.sql.redis
. This will take into account our Spark configuration settings we set on our cluster. We then specify a couple options, such as what the name of the table we’re writing to is, and what our key column of our data set is (aka some unique value to the row).
The real secret weapon here is the mode()
option in the command. In Spark, there are several modes availble when you write data. The actual behavior depends on the underlying driver as to what happens, and in the case of the Redis driver, when we use the append mode, any existing keys are updated, and any new keys are added. This means I don’t need to expire changed records and overwrite my entire dataset each time. If I knew what records changed, I can just use the append mode each time to update my cache!
If only there was some way to do that <foreshadowing intensifies>.
Once the data is in our Redis database, we can test our responses. We’ll do a quick API test just like we did as when we hit the Databricks Serverless SQL endpoint directly:
This particular query took 7ms, with the highest observed value hitting 15ms. That’s a gulf of latency, especially if you consider how frequently a service may be requesting data, and any concurrent queries that may need to retrieve the data at once. And what’s more, you need some compute running with your Lakehouse to service these requests; this just requires whatever service/system is running your Redis instance.
Okay, so far, we’ve shown this is a faster way to get at your data, and we’ve shown that it can be done pretty easily with the power of Spark. But we’ve still got two big unanswered questions: one, how do we handle updates to our data, especially deletes, and two, how do we build a reliable pipeline to detect these changes and apply them? Well, I’ve got great news: Databricks and the open power of the Lakehouse once again solves this problem for us.
First, let’s talk about data updates. We first need to know what changed in our Lakehouse before we can do anything. And for that, we’ll take advantage of the Change Feed capability of Delta: Use Delta Lake change data feed on Databricks. We will need to enable this on the table we’re syncing with our Redis cache, and we can do it with a simple SQL statement:
ALTER TABLE drewfurgiuele.sample.trips SET TBLPROPERTIES (delta.enableChangeDataFeed = true)
With it enabled, some new capabilities open up to us, specifically, the ability to see every change that occurs to our change table. Consider the following series of changes to our data:
UPDATE drewfurgiuele.sample.trips SET fare_amount = 13.5 WHERE id = 7477; INSERT INTO drewfurgiuele.sample.trips (tpep_pickup_datetime, tpep_dropoff_datetime, trip_distance, fare_amount, pickup_zip, dropoff_zip) VALUES ('2024-03-29 12:11:00', '2023-11-9 12:43:00', 8.1, 25.0, 43017, 43017); DELETE FROM drewfurgiuele.sample.trips WHERE id = 3129;
One update, one insert, and one delete. These changes need to be fed to our Redis database, but without a full refresh of our data, how do we know what changes to push? We’ll use a different SQL statement, DESCRIBE HISTORY
to tell us what table “version” we’re on:
DESCRIBE HISTORY drewfurgiuele.sample.trips
Which yields the following:
The important bits here are the version and operation column. The version number represents state of the data in the table as of the given operation. We can use this version number to either revert changes, or filter for changes as of a certain version.
We also can use a SQL function, table_changes()
to return row-level data on what changed, and the _change_type column tells us the nature of the change. For updates, we need the “update_postimage” row. This function takes in the starting and ending version of changes to return, so using our results form the DESCRIBE command, we can get down to our latest changes:
SELECT * FROM table_changes('drewfurgiuele.sample.trips', 3, 7) order by _commit_timestamp
Now that we know how to get our changes, we can get the changes into a new dataframe, and then do the same append we did before to update the cache. Since we don’t really “delete” anything from Redis, we’ll get our deletes into a separate dataframe, and set their time to live (TTL) to 5, which will tell Redis to clean them up and remove them from the cache after 5 seconds of getting the update:
#First, get changes (inserts and deletes) change_feed_df = spark.read.option("readChangeData", True).option("startingVersion", 8).table("drewfurgiuele.sample.trips").filter("_change_type = 'update_postimage' or _change_type = 'insert'") #Then get deletes in a separate dataframe delete_feed_df = spark.read.option("readChangeData", True).option("startingVersion", 8).table("drewfurgiuele.sample.trips").filter("_change_type = 'delete'") #Append the changes to the Redis cache change_feed_df.write.format("org.apache.spark.sql.redis") \ .mode("append") \ .option("table", "taxi_cache") \ .option("key.column", "id") \ .save() #"Append" the deletes; we set the ttl on these records to be 5 seconds, which means once the record is updated, they will continue to exist in the cache for 5 more seconds, then be removed delete_feed_df.write.format("org.apache.spark.sql.redis") \ .mode("append") \ .option("table", "taxi_cache") \ .option("key.column", "id") \ .option("ttl", 5) \ .save()
The other interesting thing is that since I appended from my change feed data, my Redis record has some additional interested data added to it, which may be of use to you. When I look at it with a tool like RedisInsight we can see it:
So far so good. Now, it’s time to take this to the REAL next level: a real end-to-end pipeline built in Databricks to keep this to data for us.
Now it’s time to build something truly special. We know to write and change data in our Redis instance, and we know how to get our changes from our Lakehouse tables. It’s time to build a full pipeline that will not only push these changes, but automatically detect those changes for us. Once our changes have been processed into our table, we’ll turn around and capture those changes to a dataset that we can use to update our cache. And the tools in our Databricks toolbox we’ll use for these two tasks are Delta Live Tables (DLT) Databricks Workflows.
Delta Live Tables is a declarative ETL framework that allows to use a languages we all love, Python or SQL, and build a source and destination for our changes, and then run that pipeline either on demand or continuously to process and publish our incoming changes to our warehouse table via change data capture feeds. What makes this extra special is that DLT is built on top of Spark structured streaming without nearly the amount of code or complication, and allows us to specify our Delta Change Feed as a source. Which means it will be aware of any and all changes in our source table when the pipeline runs, and we can use to build a dataset to control changes and deletions.
To accomplish this, we first need to author a notebook that our DLT job will use. Here’s a sample notebook that does this work, and we’ll walk through it below.
import dlt from pyspark.sql.functions import col @dlt.table def trips_changes(): return spark.read.format("delta") \ .option("readChangeFeed", "true") \ .option("startingVersion", 2) \ .table("drewfurgiuele.sample.trips") @dlt.table def trips_upserts(): return dlt.read("trips_changes").filter("_change_type = 'update_postimage' or _change_type = 'insert'") @dlt.table def trips_deletes(): return dlt.read("trips_changes").filter("_change_type = 'delete'")
So what is this notebook, and why does this SQL look all strange? That’s DLT: we’re using a declarative framework to specify that our source of our changes is our change feed from our source table, then turning around and creating materialized views of the data that comes out of them. Each time this pipeline runs, it will refresh our materialized views of our changes based on the same filters we used above. When we run the pipeline, here’s what we see:
We see that the trips_changes table that we defined at the start of the code read 4 rows; those four rows represent the changes in the change feed since we last ran the pipeline. Our other tables are defined with filters on the first table we declared. You may be wondering: with one change in “trips_delete” and two in “trips_upserts”, why are there 4 rows in the trips_changes table? Remember: for each update statement, there’s two rows in the change feed; we only want “postimage” update types, so our pipeline is filtering those out in our materialized views. Once this pipeline runs, these tables can be queried like any other table or view in our Lakehouse. If we re-run the pipeline now, without any changes, the pipeline wouldn’t show anything except zeroes, since DLT remembers state (it already read and “remembered” that it captured those rows from the change feed).
Now that we have a place to query our changes, we’ll create one more notebook to process the changes into our Redis cache. Here are the four cells; two Python cells, and two SQL cells:
import pyspark.sql.functions as F import datetime, time dbutils.widgets.text("commitTimeMax","","Timestamp Window (in ms) max") default_timestamp = time.time()
%sql CREATE TABLE IF NOT EXISTS drewfurgiuele.redischanges.processed_timestamps ( runid BIGINT GENERATED ALWAYS AS IDENTITY, process_timestamp timestamp )
commitTimeMaxMS = dbutils.widgets.get("commitTimeMax") max_processed_timestamp = spark.read.table("drewfurgiuele.redischanges.processed_timestamps").orderBy("runID", ascending=False).limit(1) if max_processed_timestamp.count() == 0: timestamp_filter = datetime.datetime.fromtimestamp(default_timestamp) else: timestamp_filter = datetime.fromtimestamp(commitTimeMaxMS) upserts_df = spark.read.table("drewfurgiuele.redischanges.trips_upserts") upserts_df.write.format("org.apache.spark.sql.redis") \ .mode("append") \ .option("table", "trips_cache") \ .option("key.column", "trip_id") \ .save() deletes_df = spark.read.table("drewfurgiuele.redischanges.trips_upserts") deletes_df.write.format("org.apache.spark.sql.redis") \ .mode("append") \ .option("table", "trips_cache") \ .option("key.column", "trip_id") \ .option("ttl", 15) \ .save()
%sql INSERT INTO drewfurgiuele.redischanges.processed_timestamps (process_timestamp) VALUES ('{timestamp_filter}')
These four notebook cells do a few crucial things:
Well, for a couple of reasons: The compute that Delta Live Tables code runs on doesn’t support installing custom Java/Maven libraries, which is what we need for our Redis connection. I wish it did, and we could just have one pipeline. Since it can’t, we need to break it out to two steps. And since our second notebook job step is technically a batch update, we have to take it upon ourselves to track the “state” of our last update. It’s a extra step, sure, but we can still run this job in a continuous mode if you wanted, or, roll your own Spark structured streaming job and run it 24/7. This is just a quick demonstration of one way to accomplish this task.
The final step is stitch this together into a workflow:
For those of you that use Databricks already, here’s a representation of the job YAML:
resources: jobs: Process_Changes_To_Redis: name: Process Changes To Redis tasks: - task_key: Run_DLT_Pipeline pipeline_task: pipeline_id: c8cbb9e1-87e9-4881-b1d0-7272ae4b71f2 full_refresh: false - task_key: Push_Changes_To_Redis depends_on: - task_key: Run_DLT_Pipeline notebook_task: notebook_path: /Users/drew.furgiuele@databricks.com/Redis Demo/Push Changes To Redis base_parameters: commitTimeMax: "{{job.trigger.time.timestamp_ms}}" source: WORKSPACE job_cluster_key: Job_cluster libraries: - maven: coordinates: com.redislabs:spark-redis_2.12:3.1.0 job_clusters: - job_cluster_key: Job_cluster new_cluster: cluster_name: "" spark_version: 13.3.x-scala2.12 azure_attributes: first_on_demand: 1 availability: ON_DEMAND_AZURE spot_bid_max_price: -1 node_type_id: Standard_D4ds_v5 spark_env_vars: PYSPARK_PYTHON: /databricks/python3/bin/python3 enable_elastic_disk: true data_security_mode: USER_ISOLATION runtime_engine: PHOTON num_workers: 2
What isn’t captured in this definition is that for our second task, we need to make sure we configure our job cluster with the same required Spark configurations for our Redis connection, as well as specifying that we should use a shared, multi-node cluster for our compute type:
Now, when we run our workflow, we can automatically pick up our changes, persist them, and then run in the changes to our cache. We can also configure our job to run on a schedule, or continuous, to keep them in sync at the intervals we choose.
As you can see, in relatively few steps, with very little code, we can create these robust and powerful pipelines for identifying our changes, staging them, and pushing them to our cache for additions, changes, and even expirations. This means we can be extremely targeted in our pipelines and keep our updates small and manageable, and by extension, cheap. And for those cases where entire cache rebuilds need to happen, we can do full refreshes on the initial datasets from the Lakehouse, and full refreshes of every change from our DLT tables as well.
Whether you’re just embarking on your journey on building your organization’s next generation data platform, or you’re well on your way, it’s worth thinking about what needs to consume your data downstream and how you’ll handle low-latency customers, and how you’ll sync your changes to a low-cost solution with an equally cost-effective process. The solution we walked through in this blog is just one such example, but you can take it and apply it to pretty much any solution that fits your fancy. The key thing to keep in mind is that usually duplicating data is bad, or hard, or even both. Make sure you’re aware of your true requirements before you start, and choose the best option for you and your users. Once you do that, by both leveraging the power of Spark and the features of a platform like Databricks, it becomes an easily manageable process.