从入门到实战详解Python数据统计的完全指南
目录
- 1. 数据统计基础与环境配置
- 1.1 python数据科学生态系统
- 1.2 环境配置与安装
- 2. 数据获取与加载
- 2.1 从不同数据源加载数据
- 2.2 数据基本信息查看
- 3. 数据清洗与预处理
- 3.1 缺失值处理
- 3.2 数据转换与编码
- 4. 描述性统计分析
- 4.1 基本统计量计算
- 4.2 高级统计分析
1. 数据统计基础与环境配置
1.1 Python数据科学生态系统
Python在数据统计领域的强大主要得益于其丰富的库生态系统:
# 核心数据分析库 import pandas as pd import numpy as np # 数据可视化库 import matplotlib.pyplot as plt import seaborn as sns import plotly.express as px # 统计分析库 import scipy.stats as stats from scipy import stats import statsmodels.api as sm from statsmodels.formula.api import ols # 机器学习库 from sklearn.preprocessing import StandardScaler from sklearn.linear_model import LinearRegression # 其他实用库 import warnings warnings.filterwarnings('ignore')
1.2 环境配置与安装
# 推荐使用conda或pip安装必要包 """ pip install pandas numpy matplotlib seaborn plotly pip install scipy statsmodels scikit-learn pip install jupyter notebook # 交互式环境 """ # 设置中文字体显示 plt.rcParams['font.sans-serif'] = ['SimHei'] # 用来正常显示中文标签 plt.rcParams['axes.unicode_minus'] = False # 用来正常显示负号 # 设置绘图样式 plt.style.use('seaborn-v0_8') sns.set_palette("husl")
2. 数据获取与加载
2.1 从不同数据源加载数据
import pandas as pd import numpy as np import sq编程客栈lite3 import requests import json class DataLoader: def __init__(self): self.data_sources = {} def load_csv(self, file_path, **kwargs): """加载CSV文件""" try: df = pd.read_csv(file_path, **kwargs) self.data_sources['csv'] = df print(f"成功加载CSV文件,数据形状: {df.shape}") return df except Exception as e: print(f"加载CSV文件失败: {e}") return None def load_excel(self, file_path, sheet_name=0): """加载Excel文件""" try: df = pd.read_excel(file_path, sheet_name=sheet_name) self.data_sources['excel'] = df print(f"成功加载Excel文件,数据形状: {df.shape}") return df except Exception as e: print(f"加载Excel文件失败: {e}") return None def load_sql(self, query, db_path): """从SQL数据库加载数据""" try: conn = SQLite3.connect(db_path) df = pd.read_sql_query(query, conn) conn.close() self.data_sources['sql'] = df print(f"成功从SQL加载数据,数据形状: {df.shape}") return df except Exception as e: print(f"从SQL加载数据失败: {e}") return None def load_api(self, url, params=None): """从API接口加载数据""" try: response = requests.get(url, params=params) if response.status_code == 200: data = response.json() df = pd.DataFrame(data) self.data_sources['api'] = df print(f"成功从API加载数据,数据形状: {df.shape}") return df else: print(f"API请求失败,状态码: {response.status_code}") return None except Exception as e: print(f"从API加载数据失败: {e}") return None # 使用示例 loader = DataLoader() # 加载示例数据集 from sklearn.datasets import load_iris, load_boston iris = load_iris() iris_df = pd.DataFrame(iris.data, columns=iris.feature_names) iris_df['target'] = iris.target
2.2 数据基本信息查看
def explore_data(df, sample_size=5): """ 全面探索数据集基本信息 """ print("=" * 50) print("数据集基本信息探索") print("=" * 50) # 基本形状信息 print(f"数据形状: {df.shape}") print(f"行数: {df.shape[0]}") print(f"列数: {df.shape[1]}") # 数据类型信息 print("\n数据类型信息:") print(df.dtypes) # 数据预览 print(f"\n前{sample_size}行数据:") print(df.head(sample_size)) print(f"\n后{sample_size}行数据:") print(df.tail(sample_size)) # 统计摘要 print("\n数值列统计摘要:") print(df.describe()) # 缺失值信息 print("\n缺失值统计:") missing_info = pd.DataFrame({ '缺失数量': df.isnull().sum(), '缺失比例': df.isnull().sum() / len(df) * 100 }) print(missing_info) # 唯一值信息 print("\n分类变量唯一值统计:") categorical_cols = df.select_dtypes(include=['object']).columns for col in categorical_cols: print(f"{col}: {df[col].nunique()} 个唯一值") return { 'shape': df.shape, 'dtypes': df.dtypes, 'missing_info': missing_info } # 在iris数据集上应用 info = explore_data(iris_df)
3. 数据清洗与预处理
3.1 缺失值处理
class DataCleaner: def __init__(self, df): self.df = df.copy() self.cleaning_log = [] def detect_missing_values(self): """检测缺失值""" missing_stats = pd.DataFrame({ 'missing_count': self.df.isnull().sum(), 'missing_percentage': (self.df.isnull().sum() / len(self.df)) * 100, 'data_type': self.df.dtypes }) # 高缺失率列 high_missing_cols = missing_stats[missing_stats['missing_percentage'] > 50].index.tolist() self.cleaning_log.append({ 'step': '缺失值检测', 'details': f"发现 {len(high_missing_cols)} 个高缺失率列(>50%)" }) return missing_stats, high_missing_cols def handle_missing_values(self, strategy='auto', custom_strategy=None): """处理缺失值""" df_clean = self.df.copy() missing_stats, high_missing_cols = self.detect_missing_values() # 删除高缺失率列 if high_missing_cols: df_clean = df_clean.drop(columns=high_missing_cols) self.cleaning_log.append({ 'step': '删除高缺失率列', 'details': f"删除列: {high_missing_cols}" }) # 处理剩余缺失值 for col in df_clean.columns: if df_clean[col].isnull().sum() > 0: if strategy == 'auto': # 自动选择策略 if df_clean[col].dtype in ['float64', 'int64']: # 数值列用中位数填充 fill_value = df_clean[col].median() df_clean[col].fillna(fill_value, inplace=True) method = f"中位数填充 ({fill_value})" else: # 分类列用众数填充 fill_value = df_clean[col].mode()[0] if not df_clean[col].mode().empty else 'Unknown' df_clean[col].fillna(fill_value, inplace=True) method = f"众数填充 ({fill_value})" elif strjsategy == 'custom' and custom_strategy: # 自定义策略 if col in custom_strategy: fill_value = custom_strategy[col] df_clean[col].fillna(fill_value, inplace=True) method = f"自定义填充 ({fill_value})" self.cleaning_log.append({ 'step': '缺失值填充', 'column': col, 'method': method, 'filled_count': self.df[col].isnull().sum() }) self.df = df_clean return df_clean def remove_duplicates(self): """删除重复行""" initial_count = len(self.df) self.df = self.df.drop_duplicates() removed_count = initial_count - len(self.df) self.cleaning_log.append({ 'step': '删除重复行', 'removed_count': removed_count, 'remaining_count': len(self.df) }) return self.df def handle_outliers(self, method='iqr', threshold=3): """处理异常值""" df_clean = self.df.copy() numeric_cols = df_clean.select_dtypes(include=[np.number]).columns outliers_info = {} for col in numeric_cols: if method == 'iqr': # IQR方法 Q1 = df_clean[col].quantile(0.25) Q3 = df_clean[col].quantile(0.75) IQR = Q3 - Q1 lower_bound = Q1 - 1.5 * IQR upper_bound = Q3 + 1.5 * IQR outliers = df_clean[(df_clean[col] < lower_bound) | (df_clean[col] > upper_bound)] outlier_count = len(outliers)javascript # 缩尾处理 df_clean[col] = np.where(df_clean[col] < lower_bound, lower_bound, df_clean[col]) df_clean[col] = np.where(df_clean[col] > upper_bound, upper_bound, df_clean[col]) elif method == 'zscore': # Z-score方法 z_scores = np.abs(stats.zscore(df_clean[col])) outlier_count = len(df_clean[z_scores > threshold]) # 使用中位数和标准差进行稳健的异常值处理 median = df_clean[col].median() mad = stats.median_abs_deviation(df_clean[col]) df_clean[col] = np.where(z_scores > threshold, median, df_clean[col]) outliers_info[col] = outlier_count self.cleaning_log.append({ 'step': '异常值处理', 'method': method, 'outliers_info': outliers_info }) self.df = df_clean return df_clean def get_cleaning_report(self): """生成清洗报告""" print("数据清洗报告") print("=" * 30) for log in self.cleaning_log: print(f"{log['step']}:") for key, value in log.items(): if key != 'step': print(f" {key}: {value}") print() # 使用示例 # 创建有缺失值和异常值的测试数据 np.random.seed(42) test_data = pd.DataFrame({ 'A': np.random.normal(0, 1, 100), 'B': np.random.normal(10, 2, 100), 'C': np.random.choice(['X', 'Y', 'Z'], 100), 'D': np.random.exponential(2, 100) }) # 人为添加缺失值和异常值 test_data.loc[10:15, 'A'] = np.nan test_data.loc[20:25, 'B'] = np.nan test_data.loc[5, 'A'] = 100 # 异常值 test_data.loc[6, 'B'] = 100 # 异常值 cleaner = DataCleaner(test_data) cleaned_data = cleaner.handle_missing_values() cleaned_data = cleaner.remove_duplicates() cleaned_data = cleaner.handle_outliers() cleaner.get_cleaning_report()
3.2 数据转换与编码
class DataTransformer: def __init__(self, df): self.df = df.copy() self.transformation_log = [] def encode_categorical(self, columns=None, method='onehot'): """分类变量编码""" df_encoded = self.df.copy() if columns is None: categorical_cols = df_encoded.select_dtypes(include=['object']).columns else: categorical_cols = columns for col in categorical_cols: if method == 'onehot': # One-Hot编码 dummies = pd.get_dummies(df_encoded[col], prefix=col) df_encoded = pd.concat([df_encoded, dummies], axis=1) df_encoded.drop(col, axis=1, inplace=True) encoding_type = "One-Hot编码" elif method == 'label': # 标签编码 from sklearn.preprocessing import LabelEncoder le = LabelEncoder() df_encoded[col] = le.fit_transform(df_encoded[col]) encoding_type = "标签编码" elif method == 'target': # 目标编码(需要目标变量) if 'target' in df_encoded.columns: target_mean = df_encoded.groupby(col)['tar编程get'].mean() df_encoded[col] = df_encoded[col].map(target_mean) encoding_type = "目标编码" self.transformation_log.append({ 'step': '分类变量编码', 'column': col, 'method': encoding_type }) self.df = df_encoded return df_encoded def scale_numerical(self, columns=None, method='standard'): """数值变量标准化""" from sklearn.preprocessing import StandardScaler, MinMaxScaler, RobustScaler df_scaled = self.df.copy() if columns is None: numerical_cols = df_scaled.select_dtypes(include=[np.number]).columns else: numerical_cols = columns scaler = None if method == 'standard': scaler = StandardScaler() scaling_type = "标准化(Z-score)" elif method == 'minmax': scaler = MinMaxScaler() scaling_type = "最小最大缩放" elif method == 'robust': scaler = RobustScaler() scaling_type = "稳健缩放" if scaler: df_scaled[numerical_cols] = scaler.fit_transform(df_scaled[numerical_cols]) self.transformation_log.append({ 'step': '数值变量缩放', 'columns': list(numerical_cols), 'method': scaling_type }) self.df = df_scaled return df_scaled, scaler def create_features(self): """特征工程""" df_featured = self.df.copy() numerical_cols = df_featured.select_dtypes(include=[np.number]).columns # 创建多项式特征 from sklearn.preprocessing import PolynomialFeatures if len(numerical_cols) >= 2: poly = PolynomialFeatures(degree=2, include_bias=False, interaction_only=True) poly_features = poly.fit_transform(df_featured[numerical_cols[:2]]) # 取前两个数值列 poly_feature_names = poly.get_feature_names_out(numerical_cols[:2]) poly_df = pd.DataFrame(poly_features, columns=poly_feature_names) df_featured = pd.concat([df_featured, poly_df], axis=1) self.transformation_log.append({ 'step': '特征工程', 'type': '多项式特征', 'features_created': list(poly_feature_names) }) # 创建统计特征 for col in numerical_cols: df_featured[f'{col}_zscore'] = stats.zscore(df_featured[col]) df_featured[f'{col}_rank'] = df_featured[col].rank() self.transformation_log.append({ 'step': '特征工程', 'type': '统计特征', 'features_created': [f'{col}_zscore' for col in numerical_cols] + [f'{col}_rank' for col in numerical_cols] }) self.df = df_featured return df_featured # 使用示例 transformer = DataTransformer(iris_df) transformed_data, scaler = transformer.scale_numerical(method='standard') transformer.create_features()
4. 描述性统计分析
4.1 基本统计量计算
class DescriptiveStatistics: def __init__(self, df): self.df = df self.numerical_cols = df.select_dtypes(include=[np.number]).columns self.categorical_cols = df.select_dtypes(include=['object']).columns def basic_stats(self): """计算基本统计量""" stats_summary = {} for col in self.numerical_cols: data = self.df[col].dropna() stats_summary[col] = { 'count': len(data), 'mean': np.mean(data), 'median': np.median(data), 'std': np.std(data), 'variance': np.var(data), 'min': np.min(data), 'max': np.max(data), 'range': np.max(data) - np.min(data), 'q1': np.percentile(data, 25), 'q3': np.percentile(data, 75), 'iqr': np.percentile(data, 75) - np.percentile(data, 25), 'skewness': stats.skew(data), 'kurtosis': stats.kurtosis(data), 'cv': (np.std(data) / np.mean(data)) * 100 if np.mean(data) != 0 else np.inf } return pd.DataFrame(stats_summary).T def categorical_stats(self): """分类变量统计""" cat_stats = {} for col in self.categorical_cols: data = self.df[col].dropna() value_counts = data.value_counts() cat_stats[col] = { 'count': len(data), 'unique_count': len(value_counts), 'mode': value_counts.index[0] if len(value_counts) > 0 else None, 'mode_frequency': value_counts.iloc[0] if len(value_counts) > 0 else 0, 'mode_percentage': (value_counts.iloc[0] / len(data)) * 100 if len(value_counts) > 0 else 0, 'entropy': stats.entropy(value_counts) # 信息熵 } return pd.DataFrame(cat_stats).T def distribution_test(self): """分布检验""" distribution_results = {} for col in self.numerical_cols: data = self.df[col].dropna() # 正态性检验 shapiro_stat, shapiro_p = stats.shapiro(data) if len(data) < 5000 else (np.nan, np.nan) normaltest_stat, normaltest_p = stats.normaltest(data) distribution_results[col] = { 'shapiro_stat': shapiro_stat, 'shapiro_p': shapiro_p, 'normaltest_stat': normaltest_stat, 'normaltest_p': normaltest_p, 'is_normal_shapiro': shapiro_p > 0.05 if not np.isnan(shapiro_p) else None, 'is_normal_normaltest': normaltest_p > 0.05 } return pd.DataFrame(distribution_results).T def correlation_analysis(self): """相关性分析""" corr_matrix = self.df[self.numerical_cols].corr() # 三种相关系数 pearson_corr = self.df[self.numerical_cols].corr(method='pearson') spearman_corr = self.df[self.numerical_cols].corr(method='spearman') kendall_corr = self.df[self.numerical_cols].corr(method='kendall') return { 'pearson': pearson_corr, 'spearman': spearman_corr, 'kendall': kendall_corr } def generate_report(self): """生成完整的描述性统计报告""" print("描述性统计分析报告") print("=" * 50) # 基本统计量 print("\n1. 数值变量基本统计量:") basic_stats_df = self.basic_stats() print(basic_stats_df.round(4)) # 分类变量统计 if len(self.categorical_cols) > 0: print("\n2. 分类变量统计:") cat_stats_df = self.categorical_stats() print(cat_stats_df.round(4)) # 分布检验 print("\n3. 分布检验结果:") dist_test_df = self.distribution_test() print(dist_test_df.round(4)) # 相关性分析 print("\n4. Pearson相关系数矩阵:") corr_results = self.correlation_analysis() print(corr_results['pearson'].round(4)) return { 'basic_stats': basic_stats_df, 'categorical_stats': cat_stats_df if len(self.categorical_cols) > 0 else None, 'distribution_test': dist_test_df, 'correlation': corr_results } # 使用示例 desc_stats = DescriptiveStatistics(iris_df) report = desc_stats.generate_report()
4.2 高级统计分析
class AdvancedStatistics: def __init__(self, df): self.df = df self.numerical_cols = df.select_dtypes(include=[np.number]).columns def outlier_detection(self, method='multiple'): """异常值检测""" outlier_results = {} for col in self.numerical_cols: data = self.df[col].dropna() outliers = {} # IQR方法 Q1 = np.percentile(data, 25) Q3 = np.percentile(data, 75) IQR = Q3 - Q1 lower_bound = Q1 - 1.5 * IQR upper_bound = Q3 + 1.5 * IQR iqr_outliers = data[(data < lower_bound) | (data > upper_bound)] outliers['iqr'] = { 'count': len(iqr_outliers), 'percentage': (len(iqr_outliers) / len(data)) * 100, 'values': iqr_outliers.tolist() } # Z-score方法 z_scores = np.abs(stats.zscore(data)) zscore_outliers = data[z_scores > 3] outliers['zscore'] = { 'count': len(zscore_outliers), 'percentage': (len(zscore_outliers) / len(data)) * 100, 'values': zscore_outliers.tolist() } # 修正Z-score方法(对异常值更稳健) median = np.median(data) mad = stats.median_abs_deviation(data) modified_z_scores = 0.6745 * (data - median) / mad mod_z_outliers = data[np.abs(modified_z_scores) > 3.5] outliers['modified_zscore'] = { 'count': len(mod_z_outliers), 'percentage': (len(mod_z_outliers) / len(data)) * 100, 'values': mod_z_outliers.tolist() } outlier_results[col] = outliers return outlier_results def normality_tests(self): """正态性检验综合""" normality_results = {} for col in self.numerical_cols: data = self.df[col].dropna() tests = {} # Shapiro-Wilk检验(适合小样本) if len(data) < 5000: shapiro_stat, shapiro_p = stats.shapiro(data) tests['shapiro_wilk'] = { 'statistic': shapiro_stat, 'p_value': shapiro_p, 'is_normal': shapiro_p > 0.05 } # D'Agostino's K^2检验 k2_stat, k2_p = stats.normaltest(data) tests['dagostino'] = { 'statistic': k2_stat, 'p_value': k2_p, 'is_normal': k2_p > 0.05 } # Anderson-Darling检验 anderson_result = stats.anderson(data, dist='norm') tests['anderson_darling'] = { 'statistic': anderson_result.statistic, 'critical_values': anderson_result.critical_values, 'significance_level': anderson_result.significance_level, 'is_normal': anderson_result.statistic < anderson_result.critical_values[2] # 5%显著性水平 } # Kolmogorov-Smirnov检验 ks_stat, ks_p = stats.kstest(data, 'norm', args=(np.mean(data), np.std(data))) tests['kolmogorov_smirnov'] = { 'statistic': ks_stat, 'p_value': ks_p, 'is_normal': ks_p > 0.05 } normality_results[col] = tests return normality_results def confidence_intervals(self, confidence=0.95): """置信区间计算""" ci_results = {} for col in self.numerical_cols: data = self.df[col].dropna() n = len(data) mean = np.mean(data) std_err = stats.sem(data) # t分布的置信区间 ci = stats.t.interval(confidence, n-1, loc=mean, scale=std_err) # 使用bootstrap计算置信区间 bootstrap_ci = self._bootstrap_ci(data, confidence=confidence) ci_results[col] = { 'sample_size': n, 'mean': mean, 'std_error': std_err, f'ci_{confidence}': ci, 'bootstrap_ci': bootstrap_ci, 'ci_width': ci[1] - ci[0] } return ci_results def _bootstrap_ci(self, data, n_bootstrap=1000, confidence=0.95): """Bootstrap置信区间""" bootstrap_means = [] for _ in range(n_bootstrap): bootstrap_sample = np.random.choice(data, size=len(data), replace=True) bootstrap_means.append(np.mean(bootstrap_sample)) alpha = (1 - confidence) / 2 lower = np.percentile(bootstrap_means, alpha * 100) upper = np.percentile(bootstrap_means, (1 - alpha) * 100) return (lower, upper) def generate_advanced_report(self): """生成高级统计报告""" print("高级统计分析报告") print("=" * 50) # 异常值检测 print("\n1. 异常值检测结果:") outlier_results = self.outlier_detection() for col, methods in outlier_results.items(): javascript print(f"\n{col}:") for method, result in methods.items(): print(f" {method}: {result['count']} 个异常值 ({result['percentage']:.2f}%)") # 正态性检验 print("\n2. 正态性检验综合结果:") normality_results = self.normality_tests() for col, tests in normality_results.items(): print(f"\n{col}:") for test_name, result in tests.items(): is_normal = result.get('is_normal', False) status = "正态" if is_normal else "非正态" print(f" {test_name}: p={result.get('p_value', 0):.4f} ({status})") # 置信区间 print("\n3. 置信区间分析:") ci_results = self.confidence_intervals() for col, result in ci_results.items(): print(f"\n{col}:") print(f" 均值: {result['mean']:.4f}") print(f" 95%置信区间: [{result['ci_0.95'][0]:.4f}, {result['ci_0.95'][1]:.4f}]") print(f" Bootstrap CI: [{result['bootstrap_ci'][0]:.4f}, {result['bootstrap_ci'][1]:.4f}]") return { 'outliers': outlier_results, 'normality': normality_results, 'confidence_intervals': ci_results } # 使用示例 advanced_stats = AdvancedStatistics(iris_df) advanced_report = advanced_stats.generate_advanced_report()
以上就是从入门到实战详解Python数据统计的完全指南的详细内容,更多关于Python数据统计的资料请关注编程客栈(www.devze.com)其它相关文章!
精彩评论