181 lines
6.3 KiB
Python
181 lines
6.3 KiB
Python
# -*- coding: utf-8 -*-
|
||
# @Time : 2022/3/17 17:36
|
||
# @Author : Leng Yang
|
||
# @FileName: categorical_process.py
|
||
# @Software: PyCharm
|
||
|
||
|
||
import pandas as pd
|
||
import numpy as np
|
||
from sklearn import metrics
|
||
from scipy.stats import chi2_contingency, chi2
|
||
|
||
|
||
def test():
|
||
pass
|
||
|
||
|
||
class CategorySelfDescribe(object):
|
||
"""
|
||
描述性统计量
|
||
"""
|
||
|
||
def __init__(self):
|
||
pass
|
||
|
||
@staticmethod
|
||
def category_describe(data: pd.Series) -> pd.DataFrame:
|
||
"""
|
||
描述该列数据包含的分类名称和分类种类数量
|
||
:param data: 输入数据,格式为pd.Series
|
||
:return: pd.DataFrame, 返回dataframe形式包含分类名称列表和分类种类数量
|
||
Examples
|
||
--------
|
||
>>> data1 = pd.DataFrame({'天气':['晴','晴','阴','雨'], '温度':['高','高','高','低']})
|
||
>>> CategorySelfDescribe().category_describe('天气')
|
||
categories types
|
||
0 [晴, 阴, 雨] 3.0
|
||
"""
|
||
results = pd.DataFrame()
|
||
results = results.append({'categories': data.unique(), 'types': len(data.unique())}, ignore_index=True)
|
||
return results
|
||
|
||
@staticmethod
|
||
def category_frequency(data: pd.Series) -> pd.DataFrame:
|
||
"""
|
||
频数表
|
||
:param data: 输入数据,格式为pd.Series
|
||
:return: pd.DataFrame, 返回频数表
|
||
Examples
|
||
--------
|
||
>>> data1 = pd.DataFrame({'天气':['晴','晴','阴','雨','雨','雨','阴','晴','晴','雨','晴','阴','阴','雨'],
|
||
'温度':['高','高','高','低','低','低','低','低','低','低','低','低','高','低']})
|
||
>>> CategorySelfDescribe().category_frequency('天气')
|
||
unique_values count frequency
|
||
0 晴 5 0.357143
|
||
1 雨 5 0.357143
|
||
2 阴 4 0.285714
|
||
|
||
"""
|
||
df_freq = data.value_counts(ascending=False).rename_axis('unique_values').reset_index(name='count')
|
||
df_freq['frequency'] = df_freq['count'] / len(data)
|
||
return df_freq
|
||
|
||
|
||
class CategorySelfAnalyse(object):
|
||
"""
|
||
对单列分类数据进行统计分析
|
||
"""
|
||
|
||
def __init__(self):
|
||
pass
|
||
|
||
@staticmethod
|
||
def entropy(data: pd.Series) -> float:
|
||
"""
|
||
计算信息熵
|
||
:param data: 输入数据,格式为pd.Series
|
||
:return: float, 信息熵
|
||
"""
|
||
prob = pd.value_counts(data) / len(data)
|
||
return sum(np.log2(prob) * prob * (-1))
|
||
|
||
|
||
class CategoryMutualDescribe(object):
|
||
"""
|
||
对两列不同的分类数据进行描述性统计
|
||
"""
|
||
|
||
def __init__(self):
|
||
pass
|
||
|
||
@staticmethod
|
||
def crosstab(row_data: pd.Series, col_data: pd.Series) -> pd.DataFrame:
|
||
"""
|
||
对两列不同的分类数据进行列联表分析
|
||
:param row_data: categorical数据1, 数据1分类作为列联表的行
|
||
:param col_data: categorical数据2, 数据2分类作为列联表的列
|
||
:return: pd.DataFrame, 列联表
|
||
Examples
|
||
--------
|
||
>>> data1 = pd.DataFrame({'天气':['晴','晴','阴','雨'], '温度':['高','高','高','低']})
|
||
>>> CategoryMutualDescribe().crosstab('天气','温度')
|
||
温度 高 低
|
||
天气
|
||
晴 2 0
|
||
阴 1 0
|
||
雨 0 1
|
||
"""
|
||
return pd.crosstab(row_data, col_data)
|
||
|
||
|
||
class MutualCategoricalAnalyse(object):
|
||
"""
|
||
对两列分类数据进行统计分析
|
||
"""
|
||
|
||
def __init__(self):
|
||
pass
|
||
|
||
@staticmethod
|
||
def info_gain(df: pd.DataFrame, attr_col: str, data_col: str) -> float:
|
||
"""
|
||
计算信息增益: Gain(D,A) = Ent(D) - Ent(D|A)
|
||
使用某个特征A划分数据集D
|
||
:param df_data: 输入数据,格式为dataframe
|
||
:param attr_col: 特征数据列名
|
||
:param data_col: 数据集列名
|
||
:return: float, 信息增益
|
||
"""
|
||
# e: 条件信息熵
|
||
e1 = df.groupby(attr_col).apply(lambda x: CategorySelfAnalyse.entropy(x[data_col]))
|
||
p1 = pd.value_counts(df[attr_col]) / len(df[attr_col]) # p(x)
|
||
e2 = sum(e1 * p1) # Ent(D|A)
|
||
return CategorySelfAnalyse.entropy(df[data_col]) - e2
|
||
|
||
@staticmethod
|
||
def normalized_mutual_information(data1: pd.Series, data2: pd.Series) -> float:
|
||
"""
|
||
Mutual Information between two clusterings. The Mutual Information is a measure of the similarity
|
||
between two labels of the same data.
|
||
Normalized Mutual Information (NMI) is a normalization of the Mutual
|
||
Information (MI) score to scale the results between 0 (no mutual
|
||
information) and 1 (perfect correlation).
|
||
:param df_data: 输入数据,格式为dataframe
|
||
:param data1: 分类数据1
|
||
:param data2: 分类数据2
|
||
:return: nmi : float, score between 0.0 and 1.0. 1.0 stands for perfectly complete labeling
|
||
"""
|
||
return metrics.normalized_mutual_info_score(data1, data2)
|
||
|
||
@staticmethod
|
||
def chi2_independence(data1: pd.Series, data2: pd.Series, alpha=0.05) -> pd.DataFrame:
|
||
"""
|
||
卡方独立性检验
|
||
:param alpha: 置信度,用来确定临界值
|
||
:param data1: categroical数据1
|
||
:param data2: categorical数据2
|
||
:return: pd.DataFrame,内容如下:
|
||
g: 卡方值,也就是统计量
|
||
p: P值(统计学名词),与置信度对比,也可进行假设检验,P值小于置信度,即可拒绝原假设
|
||
dof: 自由度
|
||
re: 判读变量,1表示拒绝原假设,0表示接受原假设
|
||
expctd: 原数据数组同维度的对应理论值
|
||
"""
|
||
data = CategoryMutualDescribe.crosstab(data1, data2)
|
||
result = pd.DataFrame(columns=['g', 'p', 'dof', 'expctd'])
|
||
g, p, dof, expctd = chi2_contingency(data)
|
||
result = result.append({'g': g, 'p': p, 'dof': dof, 'expctd': expctd}, ignore_index=True)
|
||
if dof == 0:
|
||
raise ValueError('自由度应该大于等于1')
|
||
elif dof == 1:
|
||
cv = chi2.isf(alpha * 0.5, dof) # critical value
|
||
else:
|
||
cv = chi2.isf(alpha * 0.5, dof - 1)
|
||
|
||
if g > cv:
|
||
result.loc[0, 're'] = 1 # 表示拒绝原假设
|
||
else:
|
||
result.loc[0, 're'] = 0 # 表示接受原假设
|
||
return result
|