From d6e0694666139aa4cbeb6a6284ebb37d81d596ca Mon Sep 17 00:00:00 2001 From: adtullock <78578014+adtullock@users.noreply.github.com> Date: Tue, 20 Dec 2022 19:33:42 -0800 Subject: [PATCH] New features + code restructure --- spark/class_DataSummarizer | 371 +++++++++++++++++++++++++++++-------- 1 file changed, 289 insertions(+), 82 deletions(-) diff --git a/spark/class_DataSummarizer b/spark/class_DataSummarizer index bcdb39c..7655a3e 100644 --- a/spark/class_DataSummarizer +++ b/spark/class_DataSummarizer @@ -1,13 +1,18 @@ # Databricks notebook source # DBTITLE 1,Package Imports -from pyspark.sql.types import BooleanType, FloatType, LongType, StringType +import pyspark.sql.functions as f +from pyspark.sql.types import * # COMMAND ---------- # DBTITLE 1,Class Definition class SparkDataSummary: - def __init__(self, df): - self.df = df + def __init__(self, df, dtypeChanges=None): + self.df = self._convert_ts_to_string(df) + self._summaryTypes = ('bool','numeric','string') + + if dtypeChanges is not None: + self.udpate_dtypes(dtypeChanges) # Get typed dataframes self._boolean_df = self._typed_df("bool") @@ -19,27 +24,213 @@ class SparkDataSummary: self._null_counts = self._get_null_counts() self._bool_counts = self._get_bool_counts() - def _typed_df(self, dtype): - """Subset a dataframe by column data types + self._summary = {"bool":None,"numeric":None,"string":None} + + #----- PUBLIC METHODS -----# + def distinct_value_summary(self): + """Display a distinct value counts summary for numeric and string columns + """ + distinct = lambda x: df.select(x).distinct().count() + + df_columns = self._string_df.columns.copy() + df_columns.extend(self._numeric_df.columns) + df = self.df.select(df_columns) + + summary_df = df.summary('count') + row_template = summary_df.collect()[0].asDict() + + new_rows = [] + new_row = row_template.copy() + new_row.update({'summary':'distinct'}) + __ = [new_row.update({c:str(distinct(c))}) for c in df_columns] + new_rows.append(new_row) + + summary_df = summary_df.unionByName(spark.createDataFrame(new_rows)) + + print("Distinct Values Summary:") + display(summary_df) + + return None + + def full_summary(self, showSummary=True): + """Create and display all available summary data""" + self.summarize('numeric', showSummary) + self.summarize('string', showSummary) + self.summarize('bool', showSummary) + + if showSummary: + self.distinct_value_summary() + + return None + + def getSummary(self, dtype): + assert dtype in self._summaryTypes, f"Invalid dtype: accepts {self._summaryTypes}" + return self._summary.get(dtype) + + def summarize(self, dtype='numeric', showSummary=True): + """Create and display a profile for all columns of the specified 'dtype' + :param dtype (str): Accepts these values: "bool", "numeric", "string" """ - if dtype == "bool": - fields = [f.name for f in self.df.schema if f.dataType == BooleanType()] - elif dtype == "numeric": - fields = [f.name for f in self.df.schema if f.dataType in (FloatType(), LongType())] - elif dtype == "string": - fields = [f.name for f in self.df.schema if f.dataType == StringType()] + assert dtype in self._summaryTypes, f"Invalid dtype: accepts {self._summaryTypes}" + + if self._summary.get(dtype) is None: + df = { + "bool":self._boolean_df, + "numeric":self._numeric_df, + "string":self._string_df + }[dtype.lower()] + + row_template = df.summary('count').collect()[0].asDict() + + if len(row_template.keys()) < 2: + # Manually make a row template + row_template = self._make_row_template(df.columns) + + # Get df columns + columns = [x for x in row_template.keys()][1:] + + new_rows = [] + + if dtype == 'bool': + # Add rows for true/false counts + new_rows.append(self._bool_counts.get("ALL")) + new_rows.append(self._bool_counts.get("T")) + new_rows.append(self._bool_counts.get("F")) + + if dtype == 'string': + # Add string column metadata + new_row = self._create_new_row('distinct', row_template, rowType='distinct') + new_rows.append(new_row) + + # Add row for null counts + new_row = self._create_new_row('nulls', row_template) + new_rows.append(new_row) + + # Add row for null % + new_row = self._create_new_row('null%', row_template, rowType='percent') + new_rows.append(new_row) + + if dtype == 'bool': + summary_df = spark.createDataFrame(new_rows).select(list(row_template.keys())) + elif dtype in ('numeric'): + summary_df = df.summary() + summary_df = summary_df.unionByName(spark.createDataFrame(new_rows)) + elif dtype == 'string': + summary_df = df.summary('count') + summary_df = summary_df.unionByName(spark.createDataFrame(new_rows)) + + if showSummary: + print(f"{dtype.title()} Data Summary:") - return self.df.select(fields) + self._summary.update({dtype:summary_df}) - def _get_null_counts(self): - """Get the columnwise null count + if showSummary: + display(self._summary.get(dtype)) + + return None + + def udpate_dtypes(self, colTypes:str, returnSchema=True): + """Cast columns as different types + :param colTypes (str): Accepts (strictly) comma-delimited string of column-type associations + e.g.: "col1 int,col2 bool" will attempt to cast col1 and col2 as integer and boolean types, respectively. + + :param returnSchema (boolean): Returns new dataframe schema, if True """ - null_counter = lambda c: self.df.where(f"{c} is null").count() - null_counts = {} - for name in [x.name for x in self.df.schema]: - null_counts.update({name:null_counter(name)}) + for ct in colTypes.split(","): + column, dtype = ct.split(' ') + sparkType = self._get_spark_data_type(dtype) + try: + self.df = self.df.withColumn(column, f.col(column).cast(sparkType)) + except Exception as e: + print(f"Unable to cast column {column} of type {dtype} to {sparkType} \n{e}") + + # Reassemble the typed dataframes + self._boolean_df = self._typed_df("bool") + self._numeric_df = self._typed_df("numeric") + self._string_df = self._typed_df("string") + self._bool_counts = self._get_bool_counts() + + # Reset data summary + self._summary = {"bool":None,"numeric":None,"string":None} + + if returnSchema: + retValue = self.df.schema + else: + retValue = None + + return retValue + + def write_all_summary_data(self, dbName:str, tableName:str, profileDB='data_profile'): + """Write all summary data to the database + :param dbName (str): Database name of table being profiled + :param tableName (str): Name of the table being profiled + :param profileDB (str): Database to write profile data to + """ + # Collect metadata rows + for d in self._summaryTypes: + self.write_summary_to_table(d, dbName, tableName, profileDB) + + return None + + def write_summary_to_table(self, dtype:str, dbName:str, tableName:str, profileDB='data_profile'): + """Write summary data to the database + :param dtype (str): Data type summary to save + :param dbName (str): Database name of table being profiled + :param tableName (str): Name of the table being profiled + :param profileDB (str): Database to write profile data to + """ + assert dtype.lower() in self._summaryTypes, f"Invalid dtype: accepts {self._summaryTypes}" + summary_data = self._summary.get(dtype) + + if len(summary_data.columns) < 2: + print(f"No {dtype} columns found in {tableName}. Skipping . . .") + else: + print(f"Attempting to write {dtype} summary for {tableName} . . .") - return null_counts + # Collect metadata rows + new_rows_list = [] + for c in summary_data.columns[1:]: + row_values = [dbName,tableName,c] + _ = [row_values.append(x.asDict().get(c)) for x in summary_data.select(c).collect()] + new_rows_list.append(row_values) + + columns = ['SourceDatabase','SourceTable','ColumnName'] + _ = [columns.append(x.asDict().get('summary').title()) for x in summary_data.select("summary").collect()] + + summary_df = spark.createDataFrame(new_rows_list, columns).withColumn("__date", f.current_date()) + + # Write summary to database + self._summary_writer(summary_df, dtype, profileDB) + return None + + #----- PRIVATE METHODS -----# + @classmethod + def _convert_ts_to_string(cls, df): + """Helper function to convert timestamp columns into string columns""" + # Cast timestamp columns as string values + for column in [c[0] for c in df.dtypes if c[1] == 'timestamp']: + df = df.withColumn(column, f.col(column).cast(StringType())) + + return df + + def _create_new_row(self, title:str, rowTemplate:dict, rowType:str='count'): + """Helper function to create a new row entry + :param title: Title for the metadata row + :param rowTemplate: Template containing the columns to compute calculations for + :param rowType: Accepts values in (count, percent, distinct) + """ + columns = [x for x in rowTemplate.keys()][1:] + new_row_dict = rowTemplate.copy() + new_row_dict.update({'summary':title}) + if rowType == 'count': + __ = [new_row_dict.update({c:str(self._null_counts.get(c))}) for c in columns] + elif rowType == 'percent': + __ = [new_row_dict.update({c:str(100*(self._null_counts.get(c)/self.row_count))}) for c in columns] + elif rowType == 'distinct': + distinct = lambda x: self._string_df.select(x).distinct().count() + __ = [new_row_dict.update({c:str(distinct(c))}) for c in columns] + + return new_row_dict def _get_bool_counts(self): """Get the columnwise count for boolean values @@ -59,72 +250,88 @@ class SparkDataSummary: value_counts.update({name:str(value_count)}) bool_counts = {'T':true_counts, 'F':false_counts, 'ALL':value_counts} - return bool_counts + return bool_counts - def summary(self, dtype='numeric'): - """Display a profile for all columns of the specified 'dtype' - :param dtype (str): Accepts these values: "bool", "numeric", "string" + def _get_null_counts(self): + """Get the columnwise null count """ - def create_new_row(self, title, rowTemplate, rowType='count'): - columns = [x for x in rowTemplate.keys()][1:] - new_row_dict = rowTemplate.copy() - new_row_dict.update({'summary':title}) - if rowType == 'count': - __ = [new_row_dict.update({c:str(self._null_counts.get(c))}) for c in columns] - elif rowType == 'percent': - __ = [new_row_dict.update({c:str(100*(self._null_counts.get(c)/self.row_count))}) for c in columns] - elif rowType == 'distinct': - distinct = lambda x: self._string_df.select(x).distinct().count() - __ = [new_row_dict.update({c:str(distinct(c))}) for c in columns] - - return new_row_dict - - df = { - "bool":self._boolean_df, - "numeric":self._numeric_df, - "string":self._string_df - }[dtype] - - row_template = df.summary('count').collect()[0].asDict() - - if len(row_template.keys()) < 2: - # Manually make a row template - row_template = {'summary':""} - __ = [row_template.update({c:""}) for c in df.columns] - - # Get df columns - columns = [x for x in row_template.keys()][1:] - - new_rows = [] - - if dtype == 'bool': - # Add rows for true/false counts - new_rows.append(self._bool_counts.get("ALL")) - new_rows.append(self._bool_counts.get("T")) - new_rows.append(self._bool_counts.get("F")) - - if dtype == 'string': - # Add string column metadata - new_row = create_new_row(self, 'distinct', row_template, rowType='distinct') - new_rows.append(new_row) - - # Add row for null counts - new_row = create_new_row(self, 'nulls', row_template) - new_rows.append(new_row) + null_counter = lambda c: self.df.where(f"{c} is null").count() + null_counts = {} + for name in [x.name for x in self.df.schema]: + null_counts.update({name:null_counter(name)}) + + return null_counts + + @classmethod + def _get_spark_data_type(cls, typeName:str): + """Identify a column's equivalent Spark data type""" + binaryType = lambda x: any([t in x.lower() for t in ['bit']]) + boolType = lambda x: any([t in x.lower() for t in ['bool']]) + decimalType = lambda x: any([t in x.lower() for t in ['decimal','numeric']]) + doubleType = lambda x: any([t in x.lower() for t in ['double','money']]) + floatType = lambda x: any([t in x.lower() for t in ['float']]) + intType = lambda x: any([t in x.lower() for t in ['int','long']]) + strType = lambda x: any([t in x.lower() for t in ['char','date','string','time']]) + + if binaryType(typeName): + dataType = BinaryType() + elif boolType(typeName): + dataType = BooleanType() + elif decimalType(typeName): + dataType = DecimalType() + elif doubleType(typeName): + dataType = DoubleType() + elif floatType(typeName): + dataType = FloatType() + elif intType(typeName): + dataType = LongType() + elif strType(typeName): + dataType = StringType() + else: + dataType = StringType() + + return dataType + + @classmethod + def _make_row_template(cls, columns): + """Helper function to manually create a row template + :param columns (list): Column names to include in the summary""" + template = {'summary':""} + __ = [template.update({c:""}) for c in columns] - # Add row for null % - new_row = create_new_row(self, 'null%', row_template, rowType='percent') - new_rows.append(new_row) + return template + + @classmethod + def _summary_writer(cls, df, dtype, profileDB): + """Helper function to handle writing summaries to the database + :param df (str): Dataframe carrying the profile data + :param dtype (str): Data type summary to save + :param profileDB (str): Database to write profile data to + """ + if not spark.catalog.databaseExists(profileDB): + print(f"Creating database: {profileDB}") + spark.sql(f"CREATE DATABASE {profileDB}") + + spark.catalog.setCurrentDatabase(profileDB) + table = dtype.lower() + "_data" - if dtype == 'bool': - summary_df = spark.createDataFrame(new_rows).select(list(row_template.keys())) - elif dtype in ('numeric'): - summary_df = df.summary() - summary_df = summary_df.unionByName(spark.createDataFrame(new_rows)) - elif dtype == 'string': - summary_df = df.summary('count') - summary_df = summary_df.unionByName(spark.createDataFrame(new_rows)) + df.write.saveAsTable(table, mode='append', partitionBy=['__date','SourceDatabase','SourceTable']) + print("Write successful!") + return None + + def _typed_df(self, dtype): + """Subset a dataframe by column data types + """ + assert dtype.lower() in self._summaryTypes, f"Invalid dtype: accepts {self._summaryTypes}" + getType = lambda t: str(t).split("Type")[0].lower() - display(summary_df) + dtypes = { + 'bool':('boolean'), + 'numeric':('binary', 'decimal', 'double', 'float', 'integer', 'long'), + 'string':('string', 'timestamp') + }[dtype.lower()] + + # Dataframe filtered to typed columns + fields = [f.name for f in self.df.schema if getType(f.dataType) in dtypes] - return None + return self.df.select(fields)