Skip to content

Personal Identifiable Information

Warning

Ask your Databricks administrators to set the environmental variable PII_TABLE before you get started.

Example: PII_TABLE=db_admin.gdpr.one_time_deletes

Consumer dataclass

Parameters:

Name Type Description Default
spark SparkSession

the active spark session

required
consumer str

the consuming catalog name

required

Example: # Get all the removal requests and mark one as completed ```python consumer = Consumer(spark, "beta_live")

consumer.get_removal_requests().display()

# After the handling the deletiong of a request
consumer.mark_as_completed("abc123")
```
Source code in delta_utils/pii.py
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
@dataclass
class Consumer:
    """
    Args:
        spark (SparkSession): the active spark session
        consumer (str): the consuming catalog name
    Example:
        # Get all the removal requests and mark one as completed
        ```python
        consumer = Consumer(spark, "beta_live")

        consumer.get_removal_requests().display()

        # After the handling the deletiong of a request
        consumer.mark_as_completed("abc123")
        ```
    """

    spark: SparkSession
    consumer: str

    def __post_init__(self):
        self.dt = get_pii_table(self.spark)

    def get_removal_requests(self) -> DataFrame:
        """
        Get all removal requests

        Returns:
            Dataframe: a dataframe that can be displayed with all the delete requests
        """
        return (
            self.dt.toDF()
            .where(F.col("affected_table").startswith(f"{self.consumer}."))
            .where(~F.col("deleted"))
            .drop("deleted")
        )

    def mark_as_completed(self, id: str):
        """
        Mark the removal request as completed

        Args:
            id (str): the UUID from the request
        """
        self.dt.update(
            F.col("id") == id,
            set={"deleted": "true"},
        )

get_removal_requests()

Get all removal requests

Returns:

Name Type Description
Dataframe DataFrame

a dataframe that can be displayed with all the delete requests

Source code in delta_utils/pii.py
189
190
191
192
193
194
195
196
197
198
199
200
201
def get_removal_requests(self) -> DataFrame:
    """
    Get all removal requests

    Returns:
        Dataframe: a dataframe that can be displayed with all the delete requests
    """
    return (
        self.dt.toDF()
        .where(F.col("affected_table").startswith(f"{self.consumer}."))
        .where(~F.col("deleted"))
        .drop("deleted")
    )

mark_as_completed(id)

Mark the removal request as completed

Parameters:

Name Type Description Default
id str

the UUID from the request

required
Source code in delta_utils/pii.py
203
204
205
206
207
208
209
210
211
212
213
def mark_as_completed(self, id: str):
    """
    Mark the removal request as completed

    Args:
        id (str): the UUID from the request
    """
    self.dt.update(
        F.col("id") == id,
        set={"deleted": "true"},
    )

Producer dataclass

Parameters:

Name Type Description Default
spark SparkSession

the active spark session

required
Example

Create a removal request

producer = Producer(spark)

producer.create_removal_request(
    affected_table="beta_live.world.adults_only",
    source_table="alpha_live.world.people",
    source_identifying_attributes=[("id", "1")],
)
Source code in delta_utils/pii.py
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
@dataclass
class Producer:
    """
    Args:
        spark (SparkSession): the active spark session

    Example:
        # Create a removal request
        ```python
        producer = Producer(spark)

        producer.create_removal_request(
            affected_table="beta_live.world.adults_only",
            source_table="alpha_live.world.people",
            source_identifying_attributes=[("id", "1")],
        )
        ```
    """

    spark: SparkSession

    def __post_init__(self):
        self.dt = get_pii_table(self.spark)

    def create_removal_request(
        self,
        *,
        affected_table: str,
        source_table: str,
        source_columns: Optional[List[str]] = None,
        source_identifying_attributes: List[Tuple[str, str]],
        when_to_delete: Optional[datetime] = None,
    ):
        """
        Creates a personal identifiable information (PII) removal request.

        This function generates a request to remove personal identifiable information (PII) from a specified affected table
        by utilizing the source table and associated columns with identifying attributes. The request can optionally include
        a specific date and time for when the deletion should occur.

        Args:
            affected_table (str): The name of the affected table from which PII needs to be removed.
            source_table (str): The name of the source table that contains the associated columns for identifying attributes.
            source_columns (Optional[List[str]], optional): A list of column names in the source table that hold PII.
                Defaults to None.
            source_identifying_attributes (List[Tuple[str, str]]): A list of tuples representing the identifying attributes
                to match the PII records in the affected table. Each tuple consists of a column name in the source table
                and the value of the PII records.
            when_to_delete (Optional[datetime], optional): An optional datetime object representing the specific date and
                time when the PII deletion should occur. Defaults to None.

        Raises:
            ValueError: If the affected_table or source_table is not provided or if source_identifying_attributes is empty.
        """

        if source_columns:
            invalid_source_columns = ", ".join(
                col for col in source_columns if "." in col
            )
            if invalid_source_columns:
                raise ValueError(
                    f"Can't use the columns: {invalid_source_columns}. Only root columns can be used."
                )
        df_update = self.spark.createDataFrame(
            [
                (
                    str(uuid.uuid4()),
                    datetime.now(),
                    affected_table,
                    source_table,
                    source_columns,
                    source_identifying_attributes,
                    when_to_delete or datetime.utcnow(),
                    False,
                )
            ],
            schema=ONE_TIME_DELETES_SCHEMA,
        )

        # Create merge upsert condition
        merge_attr = [
            "affected_table",
            "source_identifying_attributes",
            "source_table",
            "source_columns",
        ]
        merge_statement = reduce(
            and_,
            [
                F.col(f"source.{attr}") == F.col(f"updates.{attr}")
                for attr in merge_attr
            ],
        )

        # Do upsert
        (
            self.dt.alias("source")
            .merge(df_update.alias("updates"), merge_statement)
            .whenMatchedUpdate(
                set={
                    "created_at": "updates.created_at",
                    "when_to_delete": "updates.when_to_delete",
                    "deleted": "updates.deleted",
                }
            )
            .whenNotMatchedInsertAll()
            .execute()
        )

create_removal_request(*, affected_table, source_table, source_columns=None, source_identifying_attributes, when_to_delete=None)

Creates a personal identifiable information (PII) removal request.

This function generates a request to remove personal identifiable information (PII) from a specified affected table by utilizing the source table and associated columns with identifying attributes. The request can optionally include a specific date and time for when the deletion should occur.

Parameters:

Name Type Description Default
affected_table str

The name of the affected table from which PII needs to be removed.

required
source_table str

The name of the source table that contains the associated columns for identifying attributes.

required
source_columns Optional[List[str]]

A list of column names in the source table that hold PII. Defaults to None.

None
source_identifying_attributes List[Tuple[str, str]]

A list of tuples representing the identifying attributes to match the PII records in the affected table. Each tuple consists of a column name in the source table and the value of the PII records.

required
when_to_delete Optional[datetime]

An optional datetime object representing the specific date and time when the PII deletion should occur. Defaults to None.

None

Raises:

Type Description
ValueError

If the affected_table or source_table is not provided or if source_identifying_attributes is empty.

Source code in delta_utils/pii.py
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
def create_removal_request(
    self,
    *,
    affected_table: str,
    source_table: str,
    source_columns: Optional[List[str]] = None,
    source_identifying_attributes: List[Tuple[str, str]],
    when_to_delete: Optional[datetime] = None,
):
    """
    Creates a personal identifiable information (PII) removal request.

    This function generates a request to remove personal identifiable information (PII) from a specified affected table
    by utilizing the source table and associated columns with identifying attributes. The request can optionally include
    a specific date and time for when the deletion should occur.

    Args:
        affected_table (str): The name of the affected table from which PII needs to be removed.
        source_table (str): The name of the source table that contains the associated columns for identifying attributes.
        source_columns (Optional[List[str]], optional): A list of column names in the source table that hold PII.
            Defaults to None.
        source_identifying_attributes (List[Tuple[str, str]]): A list of tuples representing the identifying attributes
            to match the PII records in the affected table. Each tuple consists of a column name in the source table
            and the value of the PII records.
        when_to_delete (Optional[datetime], optional): An optional datetime object representing the specific date and
            time when the PII deletion should occur. Defaults to None.

    Raises:
        ValueError: If the affected_table or source_table is not provided or if source_identifying_attributes is empty.
    """

    if source_columns:
        invalid_source_columns = ", ".join(
            col for col in source_columns if "." in col
        )
        if invalid_source_columns:
            raise ValueError(
                f"Can't use the columns: {invalid_source_columns}. Only root columns can be used."
            )
    df_update = self.spark.createDataFrame(
        [
            (
                str(uuid.uuid4()),
                datetime.now(),
                affected_table,
                source_table,
                source_columns,
                source_identifying_attributes,
                when_to_delete or datetime.utcnow(),
                False,
            )
        ],
        schema=ONE_TIME_DELETES_SCHEMA,
    )

    # Create merge upsert condition
    merge_attr = [
        "affected_table",
        "source_identifying_attributes",
        "source_table",
        "source_columns",
    ]
    merge_statement = reduce(
        and_,
        [
            F.col(f"source.{attr}") == F.col(f"updates.{attr}")
            for attr in merge_attr
        ],
    )

    # Do upsert
    (
        self.dt.alias("source")
        .merge(df_update.alias("updates"), merge_statement)
        .whenMatchedUpdate(
            set={
                "created_at": "updates.created_at",
                "when_to_delete": "updates.when_to_delete",
                "deleted": "updates.deleted",
            }
        )
        .whenNotMatchedInsertAll()
        .execute()
    )

Using it together with lineage

# Instantiate a Lineage object
lineage = Lineage(
    databricks_workspace_url=dbutils.secrets.get("DATABRICKS", "URL"),
    databricks_token=dbutils.secrets.get("DATABRICKS", "TOKEN"),
)

# Instantiate the Producer object
producer = Producer(spark)

source_table = "alpha_live.world.people"

# Get downstream tables for the alpha_live.world.people table
downstream_tables = lineage.downstream_tables(source_table)

for affected_table in downstream_tables:
    producer.create_removal_request(
        affected_table=affected_table,
        source_table=source_table,
        source_identifying_attributes=[("id", "1")],
    )