Introduction
Change Data Capture (CDC) is a common technique to track data changed in a database system. Then downstream systems, instead of fully resyncing, can operate on the incremental changes. Downstream systems can use the record changes to re-materialize the table to the latest state more efficiently or use the transaction history to derive useful metrics and insights.
There are a few common mechanisms to capture changes:
- Use a column that is maintained in the database table to know which records are updated (for example, if the schema design contains a `updated_at` field that is set to the latest time upon all record changes.
- Create database triggers upon changes to the target table and creates extra records in a “CDC” table.
- Use a database-provided replication log that syncs the change events. When more casually referring to CDC, most people are referencing this mechanism. This mechanism goes by many names: logical replication in PostgreSQL, change streams in MongoDB, binary logs in MySQL, etc.
Operating on the database provided replication log has several advantages over other methods to achieve the same results (for instance, taking periodical table snapshots):
- Less latency: Subscribers of CDC event streams can get near real-time updates of transactions going on in the database if needed.
- Less load on source systems: Pulling entire table snapshots is expensive for large tables where older records are only sporadically updated. If such pulls happen at a frequent interval, it may significantly impact databases’ normal functionalities. Reading from the database logs stored in the file system, on the other hand, does not rely on querying production data.
- Capturing deletes: Record deletion is often hard to track and may require building an external lookup table or modifying application code. Reading from the event stream, however, makes capturing deletes no different than insertion and updates—just another event type to deal with separately.
Capturing this change stream does have some tradeoffs:
- Additional Setup / Different Security Boundaries: Most databases make capturing the change stream a different setup process than issuing regular queries. Databases typically require additional setup since the change stream is often across the entire database as opposed to per table/schema. For strict use cases, access to the event stream may not be allowed or may require re-architecting to split out some data to a different database isolated from the data where the event stream is desired.
- Maintenance Overhead: Some databases will crash due to a lack of disk space if a replication slot is created to store change stream events and the consumer is either not consuming or not consuming fast enough. Other times there may be connectivity issues and the change stream has gaps in it due to the retention period of the change stream being shorter than the issue. These types of issues may increase maintenance overhead.
Acquiring CDC data
CDC with Debezium
Debezium (pronounced as “dee-BEE-zee-uhm”) is an open-source tool that abstracts away the complexities of working with low-level database events. A common Debezium architecture looks like this:
Currently, Debezium supports four databases: MySQL, Mongo, Postgres, and SQL Server. Additionally, support for Oracle servers and Cassandra is on the way.
If your use case is a low-latency one, that is, getting near real-time updates from database change events, Debezium would be a right fit. However, if you would prefer a “pull” rather than “push” model, i.e., polling database changes at a regular interval, and act upon them in batches, Debezium may require more work downstream of the Kafka connectors.
Overall, while Debezium offloads the labor of decoding raw log formats from various databases, it still requires much hands-on experience. For example, setting up and maintaining a cluster of Kafka brokers, as well as the Zookeepers to manage them. In addition, downstream subscribers must do extra work to handle duplicate events, in event of Debezium service going down.
CDC with Ascend
Ascend.io provides a unified platform for data engineering work. Grabbing CDC data is made simple in Ascend with just a few clicks:
Internally, Ascend reads and parses the appropriate change stream provided by the database (for example, using logical replication in Postgres). Users can configure these readers (Ascend Connectors) to fetch at a given frequency. Ascend will then store the last read checkpoint, pull events occurring after the checkpoint, and then update the last checkpoint. As CDC is strictly an incremental dataset, that is, historical data is never mutated, it works perfectly with Ascend’s notion of data partitions. Each Ascend partition contains a series of change events, and users can compute in parallel to construct powerful solutions.
Common CDC Usage Patterns
Regardless of whether you bring in CDC data via Debezium or Ascend, here are some common patterns with which you can make use of them.
Backup
Most databases only retain change logs for a limited period (typically a few days). This practice allows databases to serve basic replication use cases while minimizing storage costs on the database servers. ETLing this changelog enables cost-effective retention of the full change stream in cheaper storage tiers, like cloud blob store instead of on disk. This archived copy of the full change stream allows for more complex use cases of time-travel across the database and deeper analytics than can be performed on the re-materialized state. In Ascend the change stream replications are already stored in the blob store that backs your Ascend deployment. These change streams are then fully accessible to enable the more complex use cases below.
Reconstruction and time-traveling
It may sound counter-intuitive: why would we reconstruct a table using CDC event stream, when we could just pull the latest table as-is?
As mentioned earlier, “SELECT *” is potentially a heavy operation to be performed on a production database. Compared to full table sync, the “incremental column” approach puts much less load on the system, where each sync only fetches rows changed or modified since the last recorded checkpoint. These changed rows then get combined with the previous table snapshot, reconstructing the latest state of table data—a recursive model to continuously apply new changes on the existing dataset.
CDC further builds on the incrementality idea. Since it reads log files instead of querying the database, it also further reduces system load.
The following diagram illustrates the few steps to re-materialize a database table to its latest state:
- We first take a one-time table snapshot before the CDC stream is enabled. Add a column “operation_ts” to all rows with the value of the current timestamp.
- Read in a batch of CDC events from database logs. This is usually configured at an interval based on how frequently the downstream use cases need to have the dataset available.
- For each batch of CDC events, construct an “incremental table” that features all changed rows of records. This mini-table should have the same schema as the source table, in addition, also contains an “operation_ts” column recording timestamp of the last transaction applied on each changed row of records.
- Union the initial snapshot with the “incremental table”. For any row that appears in both “snapshot table” and “incremental table”, compare their “operation_ts” column, and accept the newest version; if it appears in just one table, just use that version.
- The end result of step 4 is our final, reconstructed table, representing the latest state as in the original database. This state now becomes the initial snapshot for the next iteration of step 4, thus considered as a recursive approach.
In Ascend, this recursive type of transformation (snapshot + delta) is native to PySpark and Scala transformations. The code (in Python) looks similar to:
from pyspark.sql import DataFrame, SparkSession, Window
import pyspark.sql.functions as F
from typing import List
def transform(spark_session: SparkSession, inputs: List[DataFrame], credentials=None) -> DataFrame:
df = inputs[0]
prior = inputs[1]
if len(prior.columns) == 0:
return df
return df \
.union(prior) \
.withColumn("versions_ago",
F.row_number().over(Window.partitionBy("primary_key_col_name").orderBy(F.col("server_ts").desc()))) \
.where("versions_ago == 1") \
.where("operation != 'delete'") \
.drop("versions_ago")
It is quite trivial to conduct “time-traveling” using CDC, that is, reconstruct a source table to a point back in time (let’s say, t1). To do that, we just need to start from the initial DB table state, build the rematerialized table with only transaction logs earlier than t1.
Advanced usage patterns
CDC is also valuable to help us unlock some questions on the dataset, that other methods can hardly achieve:
- Auditing: Databases reflect the most up-to-date state of data and lose the trace of how they got there. CDC data can serve as bread crumbs and help answer compliance-related questions, sometimes even performing true-ups when records in various systems do not match.
- Analytics: Questions like “How many times did users change their addresses for the past 6 months”, “How often does a ticket gets reopened last month”, cannot be answered with a simple “current state” table. To unlock this data / analytical value typically requires the application developers to store history. Alternatively, a CDC event stream brings in this data without having to pull in additional development work.
- Anomaly detection: Corrupt data, or unintended schema changes in source databases, often lead to unexpected chaos downstream. With the CDC event stream, one can build a notification system to alarm these undesired events. Even better, we can build logic to filter out or quarantine these changes, not allowing them to flow downstream at all when reconstructing tables for analytical purposes. This pattern is possible since the schema related to CDC is fixed by the schema of the change event, rather than of the target table’s own schema. From the change event, certain columns like the record values or the schema of record are encoded in JSON, allowing for a semi-structured format that isn’t brittle against schema changes.
These advanced use cases and more are all possible in Ascend by adding additional transformations on top of the change stream (since Ascend makes the full change stream accessible). Similar to the re-materialize example above, the same strategies of aggregating changes apply to creating slowly changing dimension tables, snapshot tables, or whatever other data set needs to be constructed.
CDC is a promising pattern as it fully harnesses the power of “incrementality”, as the size of data keeps growing in transactional databases. At Ascend, we are excited to build with customers and explore the full potential of automated CDC dataflows.