Skip to content

Lineage

Lineage dataclass

Represents a lineage object for tracking dependencies between tables in a Databricks workspace.

Parameters:

Name Type Description Default
databricks_workspace_url str

The URL of the Databricks workspace.

required
databricks_token str

The access token for authentication with the Databricks API.

required

Methods:

Name Description
downstream_tables

str) -> Set[str]: Returns a set of table names that are dependent upon the specified source table.

Example
# Instantiate a Lineage object
lineage = Lineage(
    databricks_workspace_url="https://example.databricks.com",
    databricks_token="abc123",
)

# Get downstream tables for a source table
downstream_tables = lineage.downstream_tables("source_table")
Source code in delta_utils/lineage.py
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
@dataclass
class Lineage:
    """
    Represents a lineage object for tracking dependencies between tables in a Databricks workspace.

    Args:
        databricks_workspace_url (str): The URL of the Databricks workspace.
        databricks_token (str): The access token for authentication with the Databricks API.

    Methods:
        downstream_tables(self, source_table: str) -> Set[str]:
            Returns a set of table names that are dependent upon the specified source table.

    Example:
        ```python
        # Instantiate a Lineage object
        lineage = Lineage(
            databricks_workspace_url="https://example.databricks.com",
            databricks_token="abc123",
        )

        # Get downstream tables for a source table
        downstream_tables = lineage.downstream_tables("source_table")
        ```
    """

    databricks_workspace_url: str
    databricks_token: str

    def downstream_tables(self, source_table: str) -> Set[str]:
        """
        Retrieves a set of table names that are dependent upon the specified source table.

        This method queries the Databricks workspace using the provided URL and access token to identify tables that
        have a dependency on the specified source table.

        Args:
            source_table (str): The name of the source table.

        Returns:
            Set[str]: A set of table names that are dependent upon the source table.

        Raises:
            requests.exceptions.HTTPError:
                If the source_table is not found or if there is an error retrieving the downstream tables.

        Example:
            ```python
            # Get downstream tables for a source table
            downstream_tables = lineage.downstream_tables("source_table")
            ```
        """
        resp = requests.get(
            f"{self.databricks_workspace_url}/api/2.0/lineage-tracking/table-lineage",
            json={
                "table_name": source_table,
                "inculude_entity_lineage": True,
            },
            headers={
                "Accept": "application/json",
                "Authorization": f"Bearer {self.databricks_token}",
            },
        )
        resp.raise_for_status()

        lineage_info = resp.json()
        return {
            "{catalog_name}.{schema_name}.{name}".format_map(row["tableInfo"])
            for row in lineage_info.get("downstreams", [])
            if "tableInfo" in row
        }

downstream_tables(source_table)

Retrieves a set of table names that are dependent upon the specified source table.

This method queries the Databricks workspace using the provided URL and access token to identify tables that have a dependency on the specified source table.

Parameters:

Name Type Description Default
source_table str

The name of the source table.

required

Returns:

Type Description
Set[str]

Set[str]: A set of table names that are dependent upon the source table.

Raises:

Type Description
HTTPError

If the source_table is not found or if there is an error retrieving the downstream tables.

Example
# Get downstream tables for a source table
downstream_tables = lineage.downstream_tables("source_table")
Source code in delta_utils/lineage.py
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
def downstream_tables(self, source_table: str) -> Set[str]:
    """
    Retrieves a set of table names that are dependent upon the specified source table.

    This method queries the Databricks workspace using the provided URL and access token to identify tables that
    have a dependency on the specified source table.

    Args:
        source_table (str): The name of the source table.

    Returns:
        Set[str]: A set of table names that are dependent upon the source table.

    Raises:
        requests.exceptions.HTTPError:
            If the source_table is not found or if there is an error retrieving the downstream tables.

    Example:
        ```python
        # Get downstream tables for a source table
        downstream_tables = lineage.downstream_tables("source_table")
        ```
    """
    resp = requests.get(
        f"{self.databricks_workspace_url}/api/2.0/lineage-tracking/table-lineage",
        json={
            "table_name": source_table,
            "inculude_entity_lineage": True,
        },
        headers={
            "Accept": "application/json",
            "Authorization": f"Bearer {self.databricks_token}",
        },
    )
    resp.raise_for_status()

    lineage_info = resp.json()
    return {
        "{catalog_name}.{schema_name}.{name}".format_map(row["tableInfo"])
        for row in lineage_info.get("downstreams", [])
        if "tableInfo" in row
    }