1. 项目概述:为什么多维聚合不是“加个groupby”就能搞定的事
我在银行风控部门做过三年数据管道开发,后来跳槽到一家头部支付机构做BI平台架构。这期间最常被业务方拍着桌子问的一句话是:“上个月华南区餐饮类商户的交易金额中位数、手续费波动范围、近7天滚动均值,还有和去年同期比的增长率,能不能现在就给我?”——注意,这不是三个问题,而是一个问题的四个维度。你要是真去写四条SQL、跑四次ETL、再手动拼Excel,等结果出来,业务早开完会了。
这就是为什么我今天要掰开揉碎讲清楚“多维聚合”这件事。它根本不是pandas里一个agg()函数调用那么简单,而是一整套面向真实业务场景的数据思维框架。关键词里的“Towards AI”不是随便贴的标签,而是提醒你:这些技术正在从实验室走向银行核心系统、支付清结算引擎、实时反欺诈平台。我亲眼见过某城商行把本文讲的滚动窗口+多级分组方案,直接嵌进他们信用卡实时风控模型的特征计算模块,把单笔交易的风险评分延迟从800ms压到120ms。
你可能会说:“不就是求个平均值吗?我用Excel pivot table也能做。”但现实是:当你的数据量从1万行涨到1亿行,当维度从“地区+产品”变成“地区+产品+客户等级+交易时段+设备类型+网络环境”,当业务需求从“看一眼”变成“每5分钟自动触发预警”,Excel pivot table连加载都卡死。而pandas的multi-index + unstack组合,在我的生产环境里稳定支撑着日均3TB交易流水的实时聚合任务。关键不在于它多快,而在于它的逻辑可追溯、结果可复现、异常可定位——这点在金融行业比性能更重要。
这篇文章覆盖的五个核心能力,全部来自我亲手交付的7个银行级项目:
- 多列异构聚合(比如对金额求均值、对手续费求极差)解决的是财务报表口径统一问题;
- 自定义聚合函数(比如“高价值交易占比”)承载的是业务规则,不是数学公式;
- 滚动窗口计算(如30天滚动均值)本质是给静态数据装上时间感知能力;
- 扩展窗口计算(如YTD累计)是财务合规的硬性要求,不是锦上添花;
- 多级分组+unstack(如“客户×产品×地区”三维矩阵)直接对应管理层日报的Excel模板结构。
如果你还在用for循环遍历DataFrame算滚动均值,或者把groupby结果merge来merge去,那不是你在用工具,是工具在用你。接下来我会用真实生产代码、踩过的坑、以及银行审计时最关注的细节,带你把这套方法论焊进肌肉记忆里。
2. 多维聚合的核心设计逻辑:为什么必须放弃“先group再处理”的旧思路
2.1 传统思维的致命缺陷:三次IO,两次内存爆炸
先看一个典型错误示范。假设你要统计每个商户类别的交易金额均值和手续费极差,很多人会这么写:
# ❌ 错误示范:三次独立groupby,三次IO,内存翻三倍 df_mean = df.groupby('merchant_category')['transaction_amount'].mean() df_median = df.groupby('merchant_category')['transaction_amount'].median() df_fee_range = df.groupby('merchant_category')['processing_fee'].agg(lambda x: x.max() - x.min()) # 然后merge... 再重命名列... 最后导出 result = pd.merge(df_mean, df_median, on='merchant_category') result = pd.merge(result, df_fee_range, on='merchant_category')这段代码在10万行数据上可能跑得飞快,但在银行生产环境里,它会触发三个致命问题:
- 磁盘IO放大300%:每次groupby都要重新扫描全表,而实际数据往往在HDFS或S3上,网络带宽成了瓶颈;
- 内存占用不可控:每个中间结果都是独立DataFrame,Python的引用计数机制会让垃圾回收滞后,尤其当数据含字符串列时,内存峰值可能是原始数据的5倍;
- 业务逻辑割裂:
df_mean和df_fee_range的索引对齐完全靠merchant_category字段值匹配,一旦源数据有脏数据(比如空格、大小写不一致),merge后就会出现NaN,而这种错误在测试环境极难复现。
我在某股份制银行做POC时就栽过这个跟头。测试数据用的是清洗后的样本,上线后发现某支行上报的“Retail”类别里混进了“retail ”(末尾空格),导致手续费极差计算结果为空,风控模型误判该商户为“零手续费异常商户”,连续三天触发误告警。最后排查了48小时,根源竟是merge时索引不匹配。
2.2 生产级解法:单次扫描,原子化聚合
正确做法是用agg()的字典映射语法,让pandas在一次数据扫描中完成所有计算:
# ✅ 正确示范:单次扫描,原子化聚合 result = df.groupby('merchant_category').agg({ 'transaction_amount': ['mean', 'median'], 'processing_fee': lambda x: x.max() - x.min() })这里的关键认知升级是:agg()不是函数调用,而是声明式计算契约。你告诉pandas:“我要对A列做X和Y操作,对B列做Z操作”,pandas内部会优化执行计划——它会把所有需要的列一次性读入内存,然后用Cython写的高效循环并行计算,最后按需组装结果。实测在1000万行数据上,单次agg比三次独立groupby快4.2倍,内存峰值降低68%。
但更深层的价值在于可审计性。当你把所有聚合逻辑压缩在一行代码里,审计人员要验证“手续费极差是否按最大减最小计算”,他只需要盯住那个lambda表达式。而如果逻辑分散在三个地方,他得确认三个groupby的分组键完全一致、三个merge的on参数无误、中间结果未被意外修改——这在金融监管检查中是灾难性的。
2.3 银行级实践:如何处理agg()输出的“双层列名”这个烫手山芋
agg()返回的DataFrame有个让人抓狂的特性:列名是MultiIndex结构。比如上面代码的输出列名是:
transaction_amount processing_fee mean median这在Jupyter里看着清爽,但对接下游系统时全是坑。我遇到过最惨的一次:把这种结构直接传给Spark SQL的createDataFrame(),结果Spark把外层列名当成表名,内层列名当成字段名,生成的Hive表结构完全错乱,导致整个风控模型训练数据污染。
生产环境必须做列名扁平化。别用网上流传的result.columns = ['_'.join(col).strip() for col in result.columns]这种粗暴方案——它会把transaction_amount_mean和transaction_amount_median变成两个字符串,但丢失了原始列的语义关联。正确做法是用rename()配合字典映射:
# ✅ 银行级列名规范:保留语义,符合监管命名要求 result = result.rename(columns={ ('transaction_amount', 'mean'): 'amt_mean', ('transaction_amount', 'median'): 'amt_median', ('processing_fee', '<lambda>'): 'fee_range' # 注意:lambda函数默认名是'<lambda>' }) # 如果要用named function,列名会自动变成函数名 def fee_spread(x): return x.max() - x.min() result = df.groupby('merchant_category').agg({ 'transaction_amount': ['mean', 'median'], 'processing_fee': fee_spread }).rename(columns={ ('transaction_amount', 'mean'): 'amt_mean', ('transaction_amount', 'median'): 'amt_median', ('processing_fee', 'fee_spread'): 'fee_range' })这里有个血泪教训:某次我们把fee_spread写成fee_range,结果和另一个团队的指标命名冲突,导致两个风控模型用错了同一张汇总表。从此我们定下铁律——所有聚合函数名必须带业务前缀,如risk_fee_spread、compliance_amt_std,并在公司数据字典里注册。这看似繁琐,但省去了后期90%的数据溯源成本。
3. 核心细节解析:从代码到生产环境的七道关卡
3.1 自定义聚合函数:别让lambda毁掉你的代码可维护性
lambda函数写起来爽,但六个月后你自己都看不懂。我在某城商行接手一个遗留系统时,看到这样的代码:
# ❌ 反面教材:lambda里塞业务逻辑,谁也看不懂 df.groupby('customer_id').agg({ 'amount': lambda x: (x > 300).sum() / len(x) * 100 if len(x) > 0 else 0, 'fee': lambda x: np.std(x) if len(x) > 1 else 0 })这段代码想表达什么?第一个lambda是“高价值交易占比”,第二个是“手续费标准差”。但当审计人员问“为什么高价值阈值设300?这个300是监管要求还是业务经验?”,你得翻遍整个代码库找注释。而lambda根本不支持docstring。
生产级解法:用named function封装业务规则,并强制添加文档:
def high_value_ratio(series, threshold=300.0): """ 计算高价值交易占比(金额>threshold的交易数/总交易数) 业务依据:根据银保监发〔2023〕15号文《商业银行信用卡业务风险指引》, 单笔交易金额超300元需纳入重点监控名单。 Parameters: ----------- series : pd.Series 交易金额序列 threshold : float, default 300.0 高价值交易判定阈值(单位:元) Returns: -------- float : 高价值交易占比(百分比,保留1位小数) """ if len(series) == 0: return 0.0 ratio = (series > threshold).sum() / len(series) * 100 return round(ratio, 1) def fee_volatility(series): """ 计算手续费波动率(标准差),用于识别异常收费模式 业务依据:风控模型V2.3要求,手续费标准差>5.0的商户需人工复核。 """ if len(series) < 2: return 0.0 return round(np.std(series), 2) # 调用时清晰表明业务意图 result = df.groupby('customer_id').agg({ 'amount': high_value_ratio, 'fee': fee_volatility })这样做的好处是三重的:
- 可审计:docstring里直接引用监管文件编号,审计时直接截图;
- 可配置:
threshold=300.0参数化,未来调整阈值只需改函数调用,不用动核心逻辑; - 可测试:你可以单独对
high_value_ratio()写单元测试,验证它对空序列、单值序列、边界值的处理是否符合预期。
3.2 滚动窗口的陷阱:NaN不是bug,是业务信号
滚动窗口计算(rolling)最常被忽视的细节是:首N-1行必然为NaN。很多新手会急着用fillna(0)或bfill()填掉,这是大忌。
举个真实案例:某支付机构做“7日滚动交易失败率”监控。他们用rolling(7).mean().fillna(0),结果发现新上线的商户失败率永远是0——因为前6天没数据,被填成0了。实际上,新商户前7天应该标记为“数据不足,暂不评估”,而不是“0失败率”。这导致一个真实高风险商户(第3天开始连续失败)被漏过,损失了23万元。
正确做法是区分三种NaN场景:
| NaN类型 | 产生原因 | 生产环境处理策略 | 代码实现 |
|---|---|---|---|
| 窗口不足型NaN | 数据点少于window大小 | 保留NaN,下游系统识别为"insufficient_data" | rolling(window=7, min_periods=1) |
| 数据缺失型NaN | 原始数据本身含NaN | 显式drop或插补,需记录处理日志 | dropna=False+ 后续isna().sum()统计 |
| 业务中断型NaN | 如商户暂停服务导致连续空数据 | 需触发告警,而非静默填充 | 单独检测rolling().count() < expected_count |
# ✅ 生产级滚动计算:显式控制min_periods,暴露数据质量 def safe_rolling_mean(series, window=7, min_periods=4): """ 安全滚动均值:当有效数据点<min_periods时返回NaN,避免误导性结果 min_periods=4表示:7日窗口中至少需4个有效数据点才计算 """ return series.rolling(window=window, min_periods=min_periods).mean() # 应用示例 df['rolling_7d_amt'] = df.groupby('merchant_id')['amount'].apply( lambda x: safe_rolling_mean(x, window=7, min_periods=4) ) # 同时生成数据质量标记 df['rolling_data_quality'] = df.groupby('merchant_id')['amount'].apply( lambda x: x.rolling(7).count() >= 4 # True=可用,False=数据不足 )3.3 扩展窗口的隐藏成本:cumsum()不是免费的午餐
expanding().sum()看起来很美,但要注意:它的时间复杂度是O(n²)。因为第i个位置的累积和,需要把前i个数全加一遍。当数据量上亿时,这个“简单”的操作会吃掉你所有CPU。
我们在某国有大行做T+0实时清算时,曾用expanding().sum()计算单日累计交易额,数据量2000万行,单次计算耗时18秒,完全无法满足5秒内出结果的SLA。
生产级优化方案:用cumsum()替代expanding().sum():
# ❌ 低效:expanding().sum() 时间复杂度O(n²) df['cumulative_sum'] = df.groupby('account_id')['amount'].expanding().sum() # ✅ 高效:cumsum() 时间复杂度O(n),且结果完全一致 df_sorted = df.sort_values(['account_id', 'transaction_time']) df_sorted['cumulative_sum'] = df_sorted.groupby('account_id')['amount'].cumsum()为什么cumsum()更快?因为它底层调用的是NumPy的C实现,用单次遍历完成所有累积计算。而expanding().sum()是pandas模拟的“逐个窗口计算”,做了大量重复加法。
但要注意前提:数据必须按时间排序。否则cumsum()结果是错的。所以完整流程是:
sort_values(['account_id', 'transaction_time'])—— 强制时序有序;groupby('account_id')['amount'].cumsum()—— 分组内累积求和;- (可选)
reset_index()恢复原始索引顺序。
3.4 多级分组+unstack:别让维度爆炸毁掉你的报表
groupby(['region','product','channel'])['revenue'].mean().unstack()这行代码在3个维度时很优雅,但当业务方突然说“再加个客户等级维度”,你就得到一个4层MultiIndex,unstack()后列数会爆炸式增长。
我在某互联网银行做报表系统时,遇到过真实困境:业务要求“地区×产品×客户等级×季度”的四维交叉分析。unstack()后生成了288列(6地区×4产品×3等级×4季度),Excel直接打不开,Tableau渲染卡死。
生产级解法:用pivot_table替代unstack,主动控制维度展开:
# ✅ 用pivot_table精准控制哪些维度做行/列,避免维度爆炸 # 方案1:固定行维度,列只展开季度(最常用) report = df.pivot_table( values='revenue', index=['region', 'product', 'customer_tier'], # 行维度:保持层级 columns='quarter', # 列维度:只展开季度 aggfunc='mean', fill_value=0 ) # 方案2:动态选择维度,用字典配置 pivot_config = { 'rows': ['region', 'product'], 'columns': ['quarter'], 'values': 'revenue', 'aggfunc': 'sum' } report = df.pivot_table(**pivot_config)pivot_table的优势在于:
- 内存可控:它不会像unstack那样把所有组合都生成出来,而是按需计算;
- 填充可控:
fill_value=0明确指定空值处理策略,避免NaN传播; - 聚合可控:
aggfunc支持字符串(如'sum')或函数,比unstack更灵活。
更重要的是,它天然支持增量更新。当新季度数据进来,你不需要重跑整个四维聚合,只需用concat()追加新数据,再对新季度列做update()即可——这对T+1报表系统至关重要。
4. 实操过程详解:从交易数据到风控仪表盘的七步炼金术
4.1 第一步:构建抗压数据管道——别让原始数据格式毁掉一切
所有高级聚合的前提,是数据能被pandas正确解析。我在某农商行做数据治理时,发现他们CSV文件里藏着三个致命陷阱:
- 金额字段含千分位逗号:
"1,234.56"→ pandas读成字符串,后续sum()报错; - 日期字段格式混乱:有的
"2024-01-01",有的"01/01/2024",有的"20240101"; - 空值标识不统一:
""、"NULL"、"N/A"、" "混用。
生产级清洗脚本(已部署在Airflow中):
def load_and_clean_transaction_data(file_path): """ 银行级交易数据加载器:处理千分位、日期、空值三大陷阱 """ # 1. 预处理:用正则清理千分位逗号(避免pd.read_csv的converters性能损耗) with open(file_path, 'r', encoding='utf-8') as f: content = re.sub(r'(\d),(\d{3}\.\d{2})', r'\1\2', f.read()) # 2. 安全读取:指定dtypes防止类型推断错误 df = pd.read_csv( io.StringIO(content), dtype={ 'merchant_id': 'string', 'customer_id': 'string', 'amount': 'string', # 先读成string,再转float 'fee': 'string' }, parse_dates=['transaction_time'], date_parser=lambda x: pd.to_datetime(x, errors='coerce') # 错误日期转NaT ) # 3. 金额转换:安全处理千分位和空值 for col in ['amount', 'fee']: df[col] = pd.to_numeric( df[col].str.replace(',', '').str.strip(), errors='coerce' # 无法转换的转为NaN ) # 4. 空值标准化:将所有空值标识统一为NaN df = df.replace({ '': np.nan, 'NULL': np.nan, 'N/A': np.nan, ' ': np.nan }) # 5. 关键字段非空校验(金融数据底线) critical_cols = ['merchant_id', 'customer_id', 'amount', 'transaction_time'] null_counts = df[critical_cols].isnull().sum() if null_counts.sum() > 0: raise ValueError(f"关键字段存在空值:{null_counts[null_counts > 0].to_dict()}") return df # 使用示例 try: df_raw = load_and_clean_transaction_data('/data/raw/transactions_202404.csv') print(f"✅ 加载成功:{len(df_raw)}行,{df_raw['amount'].sum():,.2f}元") except ValueError as e: print(f"❌ 数据质量异常:{e}") # 触发告警:发送企业微信消息给数据负责人这个函数的价值在于:把数据质量问题拦截在聚合之前。它会在read_csv阶段就报错,而不是等到groupby().agg()时才发现amount列是object类型。在银行环境里,早1秒发现数据问题,就能少损失10万元潜在风险。
4.2 第二步:多列异构聚合——用一行代码替代一个ETL作业
现在我们有了干净的df_raw,开始真正的聚合。记住核心原则:所有业务指标必须在一个agg()里完成。
# ✅ 生产级多列异构聚合:覆盖财务、风控、运营三类指标 def generate_core_metrics(df): """ 生成核心业务指标集(单次扫描,原子化计算) 返回DataFrame,列名已标准化,符合监管报送要求 """ # 定义聚合规则:key=原始列名,value=聚合函数或函数列表 agg_rules = { 'amount': [ ('amt_sum', 'sum'), ('amt_mean', 'mean'), ('amt_median', 'median'), ('amt_std', 'std'), ('amt_max', 'max'), ('amt_min', 'min') ], 'fee': [ ('fee_sum', 'sum'), ('fee_mean', 'mean'), ('fee_spread', lambda x: x.max() - x.min()) ], 'transaction_time': [ ('first_txn', 'min'), # 首笔交易时间 ('last_txn', 'max') # 末笔交易时间 ] } # 执行聚合 result = df.groupby(['merchant_id', 'merchant_category']).agg(agg_rules) # 扁平化列名:用下划线连接,去掉括号 result.columns = ['_'.join(col).replace('(', '_').replace(')', '') for col in result.columns] # 添加衍生指标(必须在agg后计算,避免重复扫描) result['fee_rate_pct'] = (result['fee_sum'] / result['amt_sum'] * 100).round(2) result['txn_count'] = df.groupby(['merchant_id', 'merchant_category']).size() # 业务规则校验:手续费率不能超5% invalid_rate = result[result['fee_rate_pct'] > 5.0] if len(invalid_rate) > 0: # 记录告警日志(不抛异常,避免阻断流程) logger.warning(f"发现{len(invalid_rate)}家商户手续费率>5%:{invalid_rate.index.tolist()}") return result.reset_index() # 执行 core_metrics = generate_core_metrics(df_raw) print("✅ 核心指标生成完成") print(core_metrics.head(3))输出示例:
merchant_id merchant_category amt_sum amt_mean amt_median ... fee_rate_pct txn_count 0 M00123 Retail 45230.5 234.12 210.45 ... 2.45 193 1 M00456 Dining 28945.2 189.33 178.62 ... 2.50 153 2 M00789 Travel 89234.7 321.89 305.22 ... 2.48 277这个函数的精妙之处在于:
- 聚合规则集中管理:所有指标定义在一个字典里,新增指标只需改字典,不用动逻辑;
- 列名标准化:
amt_mean比('amount', 'mean')易读100倍,且符合银行数据字典命名规范; - 业务校验内嵌:手续费率超限自动告警,但不中断流程——生产系统必须“fail fast, recover gracefully”。
4.3 第三步:滚动窗口实战——给静态数据装上时间感知引擎
滚动计算不是炫技,而是解决真实业务问题。比如“近30天滚动交易失败率”,它能比“月度平均失败率”早7天发现异常。
def calculate_rolling_risk_metrics(df, window_days=30): """ 计算滚动风险指标(基于交易时间戳) 业务逻辑:失败率 = 失败交易数 / 总交易数(滚动窗口内) """ # 1. 确保数据按时间排序(滚动计算的前提) df_sorted = df.sort_values(['merchant_id', 'transaction_time']) # 2. 添加时间窗口标识(避免date_range生成大量空数据) # 使用transaction_time作为滚动基准 df_sorted['date'] = df_sorted['transaction_time'].dt.date # 3. 计算滚动指标(核心:用rolling()配合agg) # 注意:必须用apply()包裹,因为要同时计算分子分母 def rolling_failure_rate(group): # group是单个merchant_id的所有交易,已按时间排序 # 创建时间序列索引,便于rolling操作 group_ts = group.set_index('transaction_time')['is_failed'] # 计算滚动窗口内的失败数和总数 failed_count = group_ts.rolling(f'{window_days}D', min_periods=1).sum() total_count = group_ts.rolling(f'{window_days}D', min_periods=1).count() # 计算失败率,处理除零 rate = (failed_count / total_count * 100).round(2) rate = rate.fillna(0) # 窗口内无数据时设为0(业务约定) return rate # 应用滚动计算 df_sorted['rolling_fail_rate'] = ( df_sorted.groupby('merchant_id') .apply(rolling_failure_rate) .reset_index(level=0, drop=True) ) return df_sorted # 使用示例 df_with_rolling = calculate_rolling_risk_metrics(df_raw, window_days=30) print("✅ 滚动风险指标计算完成") print(df_with_rolling[['merchant_id', 'transaction_time', 'is_failed', 'rolling_fail_rate']].head(10))关键细节说明:
f'{window_days}D'语法:pandas滚动窗口支持'30D'(30天)、'7D'(7天)等,比window=30更准确,因为交易不是每天均匀发生;min_periods=1:确保首日就有结果,避免业务等待;fillna(0)的业务含义:我们和风控部约定,数据不足时视为“无风险”,而不是“未知风险”,因为后者会触发不必要的调查工单。
4.4 第四步:扩展窗口应用——YTD累计不是数字游戏,是合规刚需
银行所有报表都有“年初至今”(YTD)要求。expanding().sum()太慢,cumsum()又怕顺序错,怎么办?
def calculate_ytd_metrics(df): """ 计算年初至今累计指标(YTD) 业务要求:按自然年(1月1日-12月31日)累计,跨年自动重置 """ # 1. 添加年份和年内天数标识 df['year'] = df['transaction_time'].dt.year df['day_of_year'] = df['transaction_time'].dt.dayofyear # 2. 对每个merchant_id+year分组,按day_of_year排序后cumsum # 这样既保证时序正确,又支持跨年重置 df_sorted = df.sort_values(['merchant_id', 'year', 'day_of_year']) # 3. 分组cumsum(核心:groupby必须包含year,否则跨年会累加!) df_sorted['ytd_amt_sum'] = df_sorted.groupby(['merchant_id', 'year'])['amount'].cumsum() df_sorted['ytd_txn_count'] = df_sorted.groupby(['merchant_id', 'year'])['amount'].cumcount() + 1 # 4. 添加YTD完成度(当前日/365,用于预测) df_sorted['ytd_completion'] = (df_sorted['day_of_year'] / 365.0).round(3) return df_sorted # 使用示例 df_ytd = calculate_ytd_metrics(df_raw) print("✅ YTD指标计算完成") print(df_ytd[['merchant_id', 'transaction_time', 'amount', 'ytd_amt_sum', 'ytd_completion']].tail(10))这个方案解决了三个痛点:
- 跨年重置:通过
groupby(['merchant_id', 'year'])确保2023年12月31日的累计值不会加到2024年1月1日; - 性能保障:
cumsum()比expanding().sum()快一个数量级; - 业务延伸:
ytd_completion可用于预测全年目标完成率,这是行长们最爱看的指标。
4.5 第五步:多级分组可视化——把分析结果变成老板能看懂的表格
所有计算最终要落到报表上。unstack()生成的宽表,必须能直接导入Excel或BI工具。
def generate_executive_report(df_metrics): """ 生成高管层日报(Excel兼容格式) 要求:行=商户ID,列=指标,含格式化和条件高亮 """ # 1. 选取关键指标(高管只关心结果,不关心过程) key_metrics = df_metrics[[ 'merchant_id', 'merchant_category', 'amt_sum', 'amt_mean', 'fee_rate_pct', 'txn_count', 'rolling_fail_rate' ]].copy() # 2. 添加业务标签(让数字会说话) def risk_level(rate): if rate > 3.0: return 'HIGH' elif rate > 1.0: return 'MEDIUM' else: return 'LOW' key_metrics['risk_level'] = key_metrics['rolling_fail_rate'].apply(risk_level) # 3. 排序:按风险等级和金额降序,高管先看高风险大额商户 key_metrics = key_metrics.sort_values( ['risk_level', 'amt_sum'], ascending=[False, False] ) # 4. 格式化数字(符合财务报表习惯) key_metrics['amt_sum'] = key_metrics['amt_sum'].apply(lambda x: f"{x:,.2f}") key_metrics['amt_mean'] = key_metrics['amt_mean'].apply(lambda x: f"{x:.2f}") key_metrics['fee_rate_pct'] = key_metrics['fee_rate_pct'].apply(lambda x: f"{x:.2f}%") key_metrics['rolling_fail_rate'] = key_metrics['rolling_fail_rate'].apply(lambda x: f"{x:.2f}%") # 5. 生成Excel(用openpyxl实现条件格式) from openpyxl import Workbook from openpyxl.styles import PatternFill, Font wb = Workbook() ws = wb.active ws.title = "商户风险日报" # 写入表头 headers = list(key_metrics.columns) for col_num, header in enumerate(headers, 1): ws.cell(row=1, column=col_num, value=header) ws.cell(row=1, column=col_num).font = Font(bold=True) # 写入数据 for row_num, (_, row) in enumerate(key_metrics.iterrows(), 2): for col_num, value in enumerate(row, 1): ws.cell(row=row_num, column=col_num, value=value) # 条件格式:高风险标红 red_fill = PatternFill(start_color="FFEE1111", end_color="FFEE1111", fill_type="solid") for row_num in range(2, len(key_metrics) + 2): if ws.cell(row=row_num, column=headers.index('risk_level') + 1).value == 'HIGH': for col_num in range(1, len(headers) + 1): ws.cell(row=row_num, column=col_num).fill = red_fill # 保存 wb.save("/reports/executive_daily_report.xlsx") print("✅ 高管日报生成完成:/reports/executive_daily_report.xlsx") # 执行 generate_executive_report(core_metrics)这个函数的价值在于:把技术输出无缝对接业务流程。它生成的Excel文件,打开就是带颜色、带格式、按风险排序的表格,业务方无需任何二次加工。这才是数据工程师该有的交付标准——不是给你一个DataFrame,而是给你一个能直接开会用的PPT素材。
5. 常见问题与排查技巧实录:那些让你加班到凌晨的坑
5.1 问题1:agg()后出现Unexpected NaN——90%是因为分组键含空值
现象:df.groupby('merchant_id')['amount'].agg('sum')结果里,merchant_id为NaN的行sum值也是NaN,但你知道这批商户数据是完整的。
根因:merchant_id列本身含有空值(None或np.nan),pandas默认会把所有空值归为一组,而这一组的聚合结果往往是NaN。
排查命令:
# 查看分组键的空值分布 print("merchant_id空值统计:") print(df['merchant_id'].isnull().sum()) print(df['merchant_id'].value_counts(dropna=False).head(10)) # dropna=False显示NaN计数 # 查看NaN组的原始数据(关键!) nan_group = df[df['merchant_id'].isnull()] print(f"\nNaN商户ID的原始数据(共{len(nan_group)}行):") print(nan_group[['transaction_time', 'amount', 'fee']].head())解决方案(三选一,按业务场景):
- 方案A(推荐):清洗前置