# -*- coding: utf-8 -*- # @Time : 2022/3/17 17:36 # @Author : Leng Yang # @FileName: categorical_process.py # @Software: PyCharm import pandas as pd import numpy as np from sklearn import metrics from scipy.stats import chi2_contingency, chi2 def test(): pass class CategorySelfDescribe(object): """ 描述性统计量 """ def __init__(self): pass @staticmethod def category_describe(data: pd.Series) -> pd.DataFrame: """ 描述该列数据包含的分类名称和分类种类数量 :param data: 输入数据,格式为pd.Series :return: pd.DataFrame, 返回dataframe形式包含分类名称列表和分类种类数量 Examples -------- >>> data1 = pd.DataFrame({'天气':['晴','晴','阴','雨'], '温度':['高','高','高','低']}) >>> CategorySelfDescribe().category_describe('天气') categories types 0 [晴, 阴, 雨] 3.0 """ results = pd.DataFrame() results = results.append({'categories': data.unique(), 'types': len(data.unique())}, ignore_index=True) return results @staticmethod def category_frequency(data: pd.Series) -> pd.DataFrame: """ 频数表 :param data: 输入数据,格式为pd.Series :return: pd.DataFrame, 返回频数表 Examples -------- >>> data1 = pd.DataFrame({'天气':['晴','晴','阴','雨','雨','雨','阴','晴','晴','雨','晴','阴','阴','雨'], '温度':['高','高','高','低','低','低','低','低','低','低','低','低','高','低']}) >>> CategorySelfDescribe().category_frequency('天气') unique_values count frequency 0 晴 5 0.357143 1 雨 5 0.357143 2 阴 4 0.285714 """ df_freq = data.value_counts(ascending=False).rename_axis('unique_values').reset_index(name='count') df_freq['frequency'] = df_freq['count'] / len(data) return df_freq class CategorySelfAnalyse(object): """ 对单列分类数据进行统计分析 """ def __init__(self): pass @staticmethod def entropy(data: pd.Series) -> float: """ 计算信息熵 :param data: 输入数据,格式为pd.Series :return: float, 信息熵 """ prob = pd.value_counts(data) / len(data) return sum(np.log2(prob) * prob * (-1)) class CategoryMutualDescribe(object): """ 对两列不同的分类数据进行描述性统计 """ def __init__(self): pass @staticmethod def crosstab(row_data: pd.Series, col_data: pd.Series) -> pd.DataFrame: """ 对两列不同的分类数据进行列联表分析 :param row_data: categorical数据1, 数据1分类作为列联表的行 :param col_data: categorical数据2, 数据2分类作为列联表的列 :return: pd.DataFrame, 列联表 Examples -------- >>> data1 = pd.DataFrame({'天气':['晴','晴','阴','雨'], '温度':['高','高','高','低']}) >>> CategoryMutualDescribe().crosstab('天气','温度') 温度 高 低 天气 晴 2 0 阴 1 0 雨 0 1 """ return pd.crosstab(row_data, col_data) class MutualCategoricalAnalyse(object): """ 对两列分类数据进行统计分析 """ def __init__(self): pass @staticmethod def info_gain(df: pd.DataFrame, attr_col: str, data_col: str) -> float: """ 计算信息增益: Gain(D,A) = Ent(D) - Ent(D|A) 使用某个特征A划分数据集D :param df_data: 输入数据,格式为dataframe :param attr_col: 特征数据列名 :param data_col: 数据集列名 :return: float, 信息增益 """ # e: 条件信息熵 e1 = df.groupby(attr_col).apply(lambda x: CategorySelfAnalyse.entropy(x[data_col])) p1 = pd.value_counts(df[attr_col]) / len(df[attr_col]) # p(x) e2 = sum(e1 * p1) # Ent(D|A) return CategorySelfAnalyse.entropy(df[data_col]) - e2 @staticmethod def normalized_mutual_information(data1: pd.Series, data2: pd.Series) -> float: """ Mutual Information between two clusterings. The Mutual Information is a measure of the similarity between two labels of the same data. Normalized Mutual Information (NMI) is a normalization of the Mutual Information (MI) score to scale the results between 0 (no mutual information) and 1 (perfect correlation). :param df_data: 输入数据,格式为dataframe :param data1: 分类数据1 :param data2: 分类数据2 :return: nmi : float, score between 0.0 and 1.0. 1.0 stands for perfectly complete labeling """ return metrics.normalized_mutual_info_score(data1, data2) @staticmethod def chi2_independence(data1: pd.Series, data2: pd.Series, alpha=0.05) -> pd.DataFrame: """ 卡方独立性检验 :param alpha: 置信度,用来确定临界值 :param data1: categroical数据1 :param data2: categorical数据2 :return: pd.DataFrame,内容如下: g: 卡方值,也就是统计量 p: P值(统计学名词),与置信度对比,也可进行假设检验,P值小于置信度,即可拒绝原假设 dof: 自由度 re: 判读变量,1表示拒绝原假设,0表示接受原假设 expctd: 原数据数组同维度的对应理论值 """ data = CategoryMutualDescribe.crosstab(data1, data2) result = pd.DataFrame(columns=['g', 'p', 'dof', 'expctd']) g, p, dof, expctd = chi2_contingency(data) result = result.append({'g': g, 'p': p, 'dof': dof, 'expctd': expctd}, ignore_index=True) if dof == 0: raise ValueError('自由度应该大于等于1') elif dof == 1: cv = chi2.isf(alpha * 0.5, dof) # critical value else: cv = chi2.isf(alpha * 0.5, dof - 1) if g > cv: result.loc[0, 're'] = 1 # 表示拒绝原假设 else: result.loc[0, 're'] = 0 # 表示接受原假设 return result