Skip to content

Clean

drop_all_parameters_null_columns(df)

This function drops all columns which contain only null values in parameters column. :param df: A PySpark DataFrame

Source code in delta_utils/clean.py
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
def drop_all_parameters_null_columns(df: DataFrame) -> DataFrame:
    """
    This function drops all columns which contain only null values in parameters column.
    :param df: A PySpark DataFrame
    """
    df_param = df
    null_counts = (
        df_param.select(
            [F.count(F.when(F.col(c).isNull(), c)).alias(c) for c in df_param.columns]
        )
        .collect()[0]
        .asDict()
    )
    num_of_rows = df.count()
    to_drop = [k for k, v in null_counts.items() if v == num_of_rows]
    df_param_keep = df_param.drop(*to_drop)

    return df_param_keep

fix_invalid_column_names(df)

Will replace all invalid spark characters in columns names with ascii numbers

Parameters:

Name Type Description Default
df DataFrame

The dataframe with invalid column names

required

Returns:

Name Type Description
DataFrame DataFrame

Returns a dataframe that has no invalid column names

Source code in delta_utils/clean.py
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
def fix_invalid_column_names(df: DataFrame) -> DataFrame:
    """
    Will replace all invalid spark characters in columns names with ascii numbers

    Args:
        df (DataFrame): The dataframe with invalid column names

    Returns:
        DataFrame: Returns a dataframe that has no invalid column names

    """
    new_fields = [
        (column, replace_invalid_column_char(column)) for column in df.columns
    ]
    check_duplicates(
        [new for old, new in new_fields],
        "Found duplicates columns when renaming invalid columns",
    )

    return df.selectExpr([f"`{k}` as `{v}`" for k, v in new_fields])

flatten(df, nested_names=True)

Will take a nested dataframe and flatten it out.

Parameters:

Name Type Description Default
df DataFrame

The dataframe you want to flatten

required
nested_names bool

If you want nested names or not

True

Returns:

Name Type Description
DataFrame DataFrame

Returns a flatter dataframe

Source code in delta_utils/clean.py
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
def flatten(df: DataFrame, nested_names=True) -> DataFrame:
    """
    Will take a nested dataframe and flatten it out.

    Args:
        df (DataFrame): The dataframe you want to flatten
        nested_names (bool): If you want nested names or not

    Returns:
        DataFrame: Returns a flatter dataframe

    """
    fields = rename_flatten_schema(flatten_schema(df.schema))

    if nested_names:
        fields = [f"{k} as `{v}`" for k, v in fields]
    else:
        fields = [f"{k}" for k, v in fields]

    df = df.selectExpr(*fields)
    check_duplicates(df.columns, "Found duplicates columns when flattening")

    return df