Skip to content

Delta

from delta_utils import (
    read_change_feed,
    last_written_timestamp_for_delta_path,
    NoNewDataException,
)


path = "/path/to/delta/table"

last_timestamp = last_written_timestamp_for_delta_path(spark, path)
if last_timestamp:
    # Read the changes only
    try:
        df = read_change_feed(spark, path, startingTimestamp=last_timestamp)
    except NoNewDataException:
        # Exit the databricks notebook
        dbutils.notebook.exit("No new data")
else:
    # Read the whole dataset
    df = spark.read.load(path, format="delta")

DeltaChanges

Append only

Let's read from /path/events and filter out only the events for Anna. If we've written to /path/anna_events before we will only read the changes since the last written time.

from delta_utils import DeltaChanges, NoNewDataException

from pyspark.sql import functions as F


from_path = "/path/events"
to_path = "/path/anna_events"
person_name = "Anna"

dc = DeltaChanges(spark, to_path)

try:
    df = dc.read_changes(from_path)
except NoNewDataException:
    dbutils.notebook.exit("No new changes")

df = (
    df
    .where(F.col("name") == person_name)
    .drop("_change_type", "_commit_version", "_commit_timestamp")
)
# The delta table will be written with mode="append"
# We're assuming that only inserts have been made to /path/events
dc.save(df)

NonDeltaLastWrittenTimestamp

Let's do the same as above but write to a jdbc connector instead. Jdbc connectors don't have read change feed support like delta, therefor the NonDeltaLastWrittenTimestamp class exists.

from delta_utils import NonDeltaLastWrittenTimestamp, NoNewDataException

from pyspark.sql import functions as F


from_path = "/path/events"
to_path = "jdbc:..."
person_name = "Anna"

# The path can be global and used for multiple tables.
written_timestamps = NonDeltaLastWrittenTimestamp(spark, "/path/global/")

try:
    df = written_timestamps.read_changes("annas-jdbc-connector", from_path)
except NoNewDataException:
    dbutils.notebook.exit("No new changes")

(
    df
    .where(F.col("name") == person_name)
    .drop("_change_type", "_commit_version", "_commit_timestamp")
    .write.save(to_path, format="jdbc")
)
written_timestamps.set_all_last_written_timestamps()  # Update the timestamps

New and updated

new_and_updated will return a dataframe with the rows that can be upserted into the delta table. It requires an id_field to look for the unique column in the dataframe.

from delta_utils import new_and_updated

from delta_utils import DeltaChanges, NoNewDataException
from pyspark.sql import functions as F


from_path = "/path/events"
to_path = "/path/anna_events"
person_name = "Anna"

dc = DeltaChanges(spark, to_path)

try:
    df = dc.read_changes(from_path)
except NoNewDataException:
    dbutils.notebook.exit("No new changes")

df = (
    new_and_updated(df.where(F.col("name") == person_name), "unique_id")
    .drop("_change_type", "_commit_version", "_commit_timestamp")
)
dc.upsert(df, join_fields=("unique_id", "timestamp",))