Skip to content

Core

is_read_change_feed_enabled(spark, path)

Check if delta.enableChangeDataFeed is enabled

Parameters:

Name Type Description Default
spark SparkSession

The spark session

required
path str

path to the data location

required

Returns:

Type Description
bool

if read change feed is enabled

Source code in delta_utils/core.py
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
def is_read_change_feed_enabled(spark: SparkSession, path: str) -> bool:
    """Check if delta.enableChangeDataFeed is enabled

    Args:
        spark (SparkSession): The spark session
        path (str): path to the data location

    Returns:
        if read change feed is enabled
    """
    table = table_from_path(path)
    return (
        spark.sql(f"SHOW TBLPROPERTIES {table}")
        .where(
            (F.col("key") == "delta.enableChangeDataFeed") & (F.col("value") == "true")
        )
        .count()
        > 0
    )

last_written_timestamp_for_delta_path(spark, path)

Returns the last written timestamp for a delta table

Source code in delta_utils/core.py
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
def last_written_timestamp_for_delta_path(
    spark: SparkSession, path: str
) -> Optional[datetime]:
    """Returns the last written timestamp for a delta table"""
    table = table_from_path(path)
    try:
        response = (
            spark.sql(f"DESCRIBE HISTORY {table}")
            .where(
                F.col("operation").isin(["WRITE", "MERGE", "CREATE TABLE AS SELECT"])
            )
            .orderBy(F.col("timestamp").desc())
            .select("timestamp")
            .first()
        )
    except AnalysisException as e:
        print(f"AnalysisException: {e}")
        return None
    if not response:
        return None
    return response["timestamp"]

location_for_hive_table(spark, table)

Parameters:

Name Type Description Default
spark SparkSession

The spark session

required
table str

The table name. Preferably according to the UC three level namespace {catalog}.{schema}.{table}

required

Returns:

Type Description
str

AWS S3 full path

Source code in delta_utils/core.py
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
def location_for_hive_table(spark: SparkSession, table: str) -> str:
    """
    Args:
        spark (SparkSession): The spark session
        table (str): The table name. Preferably according to the UC three level namespace {catalog}.{schema}.{table}

    Returns:
        AWS S3 full path
    """
    return (
        spark.sql(f"DESCRIBE EXTENDED {table}")  # type: ignore
        .where(F.col("col_name") == "Location")
        .select("data_type")
        .first()["data_type"]
    )

read_change_feed(spark, path, **kwargs)

Read changes from delta table or raise NoNewDataExcpetion if the timestamp is after the last written timestamp

If the delta table doesn't have delta.enableChangeDataFeed set to true, raises ReadChangeFeedDisabled exception

Source code in delta_utils/core.py
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
def read_change_feed(spark: SparkSession, path: str, **kwargs) -> DataFrame:
    """Read changes from delta table or raise NoNewDataExcpetion if the timestamp is after the last written timestamp

    If the delta table doesn't have delta.enableChangeDataFeed set to true, raises ReadChangeFeedDisabled exception
    """
    if not is_read_change_feed_enabled(spark, path):
        raise ReadChangeFeedDisabled(path)
    table = table_from_path(path)
    try:
        spark_option = spark.read.option("readChangeFeed", True)
        for key, value in kwargs.items():
            spark_option = spark_option.option(key, value)
        dataframe = spark_option.table(table)
        if dataframe.first() is None:
            raise NoNewDataException()
        return dataframe
    except AnalysisException as e:
        error_msg = str(e)
        print(error_msg)
        if (
            error_msg.startswith("The provided timestamp")
            and "is after the latest version available to this" in error_msg
        ):
            raise NoNewDataException(error_msg)
        else:
            raise e

spark_current_timestamp(spark)

Check if delta.enableChangeDataFeed is enabled

Parameters:

Name Type Description Default
spark SparkSession

The spark session

required

Returns:

Type Description
datetime

the current timestamp from spark

Source code in delta_utils/core.py
121
122
123
124
125
126
127
128
129
130
def spark_current_timestamp(spark: SparkSession) -> datetime:
    """Check if delta.enableChangeDataFeed is enabled

    Args:
        spark (SparkSession): The spark session

    Returns:
        the current timestamp from spark
    """
    return spark.sql("SELECT current_timestamp()").first()[0]  # type: ignore

table_from_path(path)

Returns a table name from a path

Source code in delta_utils/core.py
75
76
77
78
79
80
def table_from_path(path: str) -> str:
    """Returns a table name from a path"""
    if is_path(path):
        return f"delta.`{path}`"
    # The path is most likely already a table
    return path