Pandas 是 Python 数据科学生态中最核心的库之一。它以 DataFrame 和 Series 为基本数据结构,在 NumPy 基础上提供了标签化的数据操作能力,涵盖数据导入/导出、清洗、转换、聚合、窗口计算、时间序列、可视化等全流程。本文聚焦 Pandas 中高级特性:MultiIndex 多级索引、窗口函数、数据重塑、分类数据类型、方法链式编程、高性能 eval/query、数据库互操作、时间序列、绘图集成、Styler 格式化以及生产级数据处理流水线。
一、Pandas 基础速览 1.1 核心数据结构 Pandas 有两个核心结构:
Series :带标签的一维数组,可理解为「带索引的 ndarray」。
DataFrame :带标签的二维表格,每列可以是不同的 dtype。
import pandas as pdimport numpy as nps = pd.Series([10 , 20 , 30 , 40 ], index=['a' , 'b' , 'c' , 'd' ], name='value' ) print (s.index) print (s.values) print (s['b' ]) df = pd.DataFrame({ 'name' : ['Alice' , 'Bob' , 'Charlie' ], 'age' : [25 , 30 , 35 ], 'score' : [92.5 , 85.0 , 88.5 ], }) print (df.shape) print (df.columns) print (df.dtypes)
1.2 快速理解 Index Index 是 Pandas 中所有数据结构的标签轴。它不是普通的 NumPy 数组——它是不可变的、支持集合运算的、可哈希的:
idx1 = pd.Index(['a' , 'b' , 'c' , 'd' ]) idx2 = pd.Index(['c' , 'd' , 'e' , 'f' ]) print (idx1.intersection(idx2)) print (idx1.union(idx2)) print (idx1.difference(idx2)) print (idx1.symmetric_difference(idx2))
二、MultiIndex —— 多级索引 MultiIndex(分层索引)是 Pandas 处理高维数据的核心机制。它允许在一个轴上拥有多个索引级别,使得你可以在二维的 DataFrame 中表达多维数据。
2.1 创建 MultiIndex tuples = [('2023' , 'Q1' ), ('2023' , 'Q2' ), ('2023' , 'Q3' ), ('2023' , 'Q4' ), ('2024' , 'Q1' ), ('2024' , 'Q2' )] mi = pd.MultiIndex.from_tuples(tuples, names=['year' , 'quarter' ]) mi = pd.MultiIndex.from_product( [['2023' , '2024' ], ['Q1' , 'Q2' , 'Q3' , 'Q4' ]], names=['year' , 'quarter' ] ) mi = pd.MultiIndex.from_arrays( [['2023' ]*4 + ['2024' ]*4 , ['Q1' ,'Q2' ,'Q3' ,'Q4' ]*2 ], names=['year' , 'quarter' ] ) df_idx = pd.DataFrame({'year' : ['2023' ]*4 +['2024' ]*4 , 'quarter' : ['Q1' ,'Q2' ,'Q3' ,'Q4' ]*2 }) mi = pd.MultiIndex.from_frame(df_idx) data = np.random.randn(8 , 3 ) df = pd.DataFrame(data, index=mi, columns=['revenue' , 'cost' , 'profit' ]) print (df)
2.2 多级索引的切片与查找 print (df.loc['2023' ]) print (df.loc[('2023' , 'Q1' )]) print (df.loc['2023' :'2024' ]) print (df.loc[('2023' , ['Q1' , 'Q3' ]), :]) print (df.loc[(slice (None ), 'Q1' ), :]) idx = pd.IndexSlice print (df.loc[idx['2023' , :], :]) print (df.loc[idx[:, 'Q1' ], 'revenue' ]) print (df.loc[idx['2023' :'2024' , ['Q1' ,'Q4' ]], ['revenue' , 'profit' ]])
2.3 get_level_values 和 xs print (df.index.get_level_values(0 )) print (df.index.get_level_values('year' )) print (df.xs('Q2' , level='quarter' )) print (df.xs('2024' , level='year' )) result = df.xs('Q1' , level='quarter' , drop_level=True )
2.4 MultiIndex 的操作与重设 df_flat = df.reset_index() df_reindexed = df_flat.set_index(['year' , 'quarter' ]) df_swapped = df.swaplevel('year' , 'quarter' ) df_swapped = df.swaplevel(0 , 1 ) df_sorted = df.sort_index(level='quarter' , ascending=False ) df_reordered = df.reorder_levels(['quarter' , 'year' ])
2.5 聚合中的 MultiIndex 使用 df = pd.DataFrame({ 'region' : ['East' , 'East' , 'West' , 'West' , 'East' , 'West' ], 'product' : ['A' , 'B' , 'A' , 'B' , 'A' , 'B' ], 'sales' : [100 , 150 , 200 , 120 , 180 , 90 ], 'year' : [2023 , 2023 , 2023 , 2023 , 2024 , 2024 ], }) pivot = df.groupby(['region' , 'year' ])['sales' ].agg(['sum' , 'mean' , 'std' ]) print (pivot.columns) pivot.columns = ['_' .join(col).strip() for col in pivot.columns.values]
2.6 模拟 Panel Data(三维数据) MultiIndex 可以将三维或更高维的数据「折叠」进二维 DataFrame:
dates = pd.date_range('2024-01-01' , periods=5 , freq='D' ) assets = ['AAPL' , 'GOOGL' , 'MSFT' ] features = ['open' , 'high' , 'low' , 'close' , 'volume' ] mi_columns = pd.MultiIndex.from_product([assets, features], names=['asset' , 'feature' ]) df_panel = pd.DataFrame(np.random.randn(5 , 15 ), index=dates, columns=mi_columns) print (df_panel.loc[:, ('AAPL' , 'close' )])print (df_panel.xs('close' , level='feature' , axis=1 ))
三、窗口函数 —— rolling / expanding / ewm 窗口函数是时序分析的核心工具。Pandas 提供了三种窗口机制:固定大小窗口(rolling)、累积窗口(expanding)和指数加权窗口(ewm)。
3.1 rolling —— 滚动窗口 import pandas as pdimport numpy as npnp.random.seed(42 ) dates = pd.date_range('2024-01-01' , periods=252 , freq='B' ) returns = pd.Series(np.random.randn(252 ) * 0.01 , index=dates, name='daily_return' ) ma20 = returns.rolling(window=20 ).mean() vol20 = returns.rolling(window=20 ).std() rolling_max_drawdown = returns.rolling(window=60 ).apply( lambda x: (x.cummax() - x).max () ) df = pd.DataFrame({ 'a' : np.random.randn(252 ), 'b' : np.random.randn(252 ), }) rolling_corr = df['a' ].rolling(60 ).corr(df['b' ]) result = returns.rolling(60 ).agg(['mean' , 'std' , 'skew' , 'kurtosis' ])
3.2 rolling 的关键参数 ma = returns.rolling(window=20 , min_periods=10 ).mean() ma_center = returns.rolling(window=20 , center=True ).mean() ma_triang = returns.rolling(window=20 , win_type='triang' ).mean() ma_gauss = returns.rolling(window=20 , win_type='gaussian' ).mean(std=5 )
3.3 expanding —— 扩展窗口 expanding 窗口从第一个观测值开始,不断增大窗口直到包含所有数据。对于计算「到当前时刻为止」的累积统计非常有用:
cum_mean = returns.expanding().mean() cum_max = returns.expanding().max () cum_min = returns.expanding().min () cum_sum = returns.expanding().sum () cum_sharpe = returns.expanding().apply( lambda x: np.sqrt(252 ) * x.mean() / x.std() if x.std() > 0 else 0 )
3.4 ewm —— 指数加权移动窗口 ewm 对近期数据赋予更高的权重,权重按指数衰减。这是金融时间序列中计算波动率等指标的标准工具:
ewma = returns.ewm(span=20 ).mean() ewma_span = returns.ewm(span=20 ).mean() ewma_halflife = returns.ewm(halflife=14 ).mean() ewm_vol = returns.ewm(span=20 ).std() ewm_corr = df['a' ].ewm(span=60 ).corr(df['b' ])
四、数据重塑 —— pivot / melt / stack / unstack / crosstab 4.1 pivot —— 长表转宽表 pivot 通过指定 index、columns 和 values 来重塑数据:
df = pd.DataFrame({ 'date' : ['2024-01-01' , '2024-01-01' , '2024-01-02' , '2024-01-02' ], 'city' : ['Beijing' , 'Shanghai' , 'Beijing' , 'Shanghai' ], 'temperature' : [1.5 , 8.2 , 2.1 , 9.5 ], 'humidity' : [30 , 65 , 28 , 70 ], }) pivoted = df.pivot(index='date' , columns='city' , values='temperature' ) print (pivoted)pivoted_all = df.pivot(index='date' , columns='city' )
4.2 pivot_table —— 支持聚合的数据透视表 当 index 和 columns 的组合不唯一时,pivot_table 通过 aggregation 来处理重复:
df = pd.DataFrame({ 'date' : ['2024-01-01' , '2024-01-01' , '2024-01-01' , '2024-01-02' ], 'city' : ['Beijing' , 'Shanghai' , 'Beijing' , 'Beijing' ], 'temperature' : [1.5 , 8.2 , 2.1 , 3.5 ], 'humidity' : [30 , 65 , 28 , 25 ], }) pt = pd.pivot_table(df, index='date' , columns='city' , values='temperature' , aggfunc='mean' , fill_value=0 ) print (pt)pt_multi = pd.pivot_table(df, index='date' , columns='city' , values=['temperature' , 'humidity' ], aggfunc={'temperature' : ['mean' , 'min' , 'max' ], 'humidity' : 'mean' }, margins=True , margins_name='Total' )
4.3 melt —— 宽表转长表 melt 是 pivot 的逆操作,将多个列「熔」成两列:变量名和值:
df_wide = pd.DataFrame({ 'name' : ['Alice' , 'Bob' , 'Charlie' ], 'math' : [85 , 90 , 78 ], 'physics' : [92 , 85 , 88 ], 'chemistry' : [88 , 82 , 91 ], }) df_long = pd.melt(df_wide, id_vars=['name' ], value_vars=['math' , 'physics' , 'chemistry' ], var_name='subject' , value_name='score' ) print (df_long)
4.4 stack / unstack 这两个操作在内层/外层的轴层级上进行旋转:
mi = pd.MultiIndex.from_product([['Alice' ,'Bob' ], ['Math' ,'Physics' ]], names=['student' , 'subject' ]) df = pd.DataFrame({'score' : [85 , 92 , 90 , 85 ]}, index=mi) print (df)df_unstacked = df.unstack(level='subject' ) print (df_unstacked)df_unstacked = df.unstack(level='subject' , fill_value=0 ) df_stacked = df_unstacked.stack(level='subject' ) df_unstacked = df.unstack(level=1 ) df_stacked = df_unstacked.stack(level=1 )
4.5 crosstab —— 交叉表 crosstab 是专门用于计算分组频率/比例的便捷函数:
df = pd.DataFrame({ 'gender' : ['M' , 'F' , 'M' , 'F' , 'M' , 'F' , 'M' , 'F' , 'M' , 'M' ], 'age_group' : ['18-25' , '18-25' , '26-35' , '26-35' , '26-35' , '36+' , '36+' , '18-25' , '36+' , '26-35' ], 'purchased' : [1 , 1 , 0 , 1 , 1 , 0 , 0 , 1 , 1 , 0 ], }) ct = pd.crosstab(df['gender' ], df['age_group' ]) print (ct)ct_pct = pd.crosstab(df['gender' ], df['age_group' ], normalize='index' ) print (ct_pct)ct_agg = pd.crosstab(df['gender' ], df['age_group' ], values=df['purchased' ], aggfunc='mean' , margins=True ) print (ct_agg)
五、Categorical —— 分类数据类型 对于取值有限且重复率高的列(如性别、城市、等级等),使用 category dtype 可大幅降低内存占用并加速某些操作。
5.1 创建分类数据 df['gender' ] = df['gender' ].astype('category' ) s = pd.Series(pd.Categorical(['low' , 'medium' , 'high' , 'medium' , 'low' ], categories=['low' , 'medium' , 'high' ], ordered=True )) print (s.cat.categories) print (s.cat.ordered) print (s.cat.codes)
5.2 分类数据的内存优势 import numpy as npn = 10_000_000 labels = np.random.choice(['cat' , 'dog' , 'bird' , 'fish' , 'hamster' ], n) s_str = pd.Series(labels) print (f"object dtype: {s_str.memory_usage(deep=True ) / 1024 **2 :.1 f} MB" )s_cat = s_str.astype('category' ) print (f"category dtype: {s_cat.memory_usage(deep=True ) / 1024 **2 :.1 f} MB" )
5.3 分类数据的操作 s_cat = s_cat.cat.reorder_categories(['hamster' , 'bird' , 'cat' , 'dog' , 'fish' ]) s_cat = s_cat.cat.add_categories(['rabbit' ]) s_cat = s_cat.cat.remove_unused_categories() s_cat = s_cat.cat.rename_categories({'cat' : 'feline' , 'dog' : 'canine' }) s_cat = s_cat.cat.as_ordered() s_cat = s_cat.cat.as_unordered() df = pd.DataFrame({'grade' : pd.Categorical(['A' , 'B' , 'A' , 'A' ], categories=['A' ,'B' ,'C' ,'D' ]), 'score' : [95 , 80 , 92 , 88 ]}) print (df.groupby('grade' , observed=False )['score' ].mean())
六、方法链式编程 —— pipe / assign / query Pandas 的方法链(method chaining)风格避免中间变量污染,使数据处理流水线清晰可读。核心工具包括 pipe、assign、query 和 where/mask。
6.1 assign —— 动态添加列 import pandas as pdimport numpy as npdf = pd.DataFrame({'x' : np.random.randn(100 ), 'y' : np.random.randn(100 )}) result = (df .assign(z=lambda d: d['x' ] + d['y' ]) .assign(magnitude=lambda d: np.sqrt(d['x' ]**2 + d['y' ]**2 )) .assign(quadrant=lambda d: np.select( [(d['x' ]>=0 ) & (d['y' ]>=0 ), (d['x' ]<0 ) & (d['y' ]>=0 ), (d['x' ]<0 ) & (d['y' ]<0 ), (d['x' ]>=0 ) & (d['y' ]<0 )], ['Q1' , 'Q2' , 'Q3' , 'Q4' ], default='axis' )) )
6.2 pipe —— 链式调用自定义函数 当内置方法不够用,需要插入自定义函数时,pipe 允许将任意函数注入方法链:
def standardize (df, columns ): """将指定列标准化为 z-score""" df = df.copy() for col in columns: df[col] = (df[col] - df[col].mean()) / df[col].std() return df def remove_outliers (df, columns, n_std=3 ): """移除超过 n 倍标准差的异常值""" mask = pd.Series(True , index=df.index) for col in columns: mask &= (np.abs (df[col]) <= n_std) return df[mask] def add_interaction_terms (df, col1, col2 ): """添加交互项""" return df.assign(**{f'{col1} _{col2} _interact' : df[col1] * df[col2]}) processed = (pd.DataFrame({'a' : np.random.randn(1000 ), 'b' : np.random.randn(1000 ), 'c' : np.random.randn(1000 )}) .pipe(standardize, ['a' , 'b' , 'c' ]) .pipe(remove_outliers, ['a' , 'b' , 'c' ], n_std=3 ) .pipe(add_interaction_terms, 'a' , 'b' ) .assign(result=lambda d: d['a' ] + d['b' ] - d['a_b_interact' ]) )
pipe 的第一个参数 func 接收 DataFrame,其后的 *args 和 **kwargs 传递给 func。通过 (callable, *args, **kwargs) 元组语法还可以在 pipe 中串联多个函数:
result = (df .pipe((standardize, 'data' ), columns=['a' , 'b' ]) .pipe(remove_outliers, ['a' , 'b' ]) )
6.3 query —— 用字符串表达式筛选行 query 允许使用字符串表达式进行行过滤,语法近似 SQL 的 WHERE 子句,在某些场景下比布尔索引更易读:
df = pd.DataFrame({ 'name' : ['Alice' , 'Bob' , 'Charlie' , 'Diana' , 'Eve' ], 'age' : [25 , 17 , 35 , 28 , 16 ], 'score' : [92 , 78 , 88 , 95 , 82 ], 'grade' : ['A' , 'B' , 'B' , 'A' , 'C' ], }) adults = df.query('age >= 18' ) result = df.query('age >= 18 and score > 85' ) min_age = 21 min_score = 80 result = df.query('age >= @min_age and score >= @min_score' ) result = df.query('name.str.startswith("A")' ) result = df.query('grade in ["A", "B"]' ) df_indexed = df.set_index('name' ) result = df_indexed.query('age > 20' )
6.4 完整链式示例:数据管道 def load_and_clean (filepath ): """生产级数据加载与清洗管道""" return (pd.read_csv(filepath, parse_dates=['date' ]) .rename(columns=str .lower) .dropna(subset=['date' , 'amount' ]) .assign( amount=lambda d: pd.to_numeric(d['amount' ], errors='coerce' ), year=lambda d: d['date' ].dt.year, month=lambda d: d['date' ].dt.month, ) .query('amount > 0' ) .drop_duplicates(subset=['date' , 'account_id' ]) .astype({'account_id' : 'category' , 'type' : 'category' }) .sort_values('date' ) .reset_index(drop=True ) ) cleaned = load_and_clean('transactions.csv' )
七、eval / query 的高性能评估 7.1 DataFrame.eval DataFrame.eval 使用 numexpr 库在底层对表达式求值,可以避免中间数组分配,尤其是在计算涉及多个列时:
df = pd.DataFrame(np.random.randn(1_000_000 , 4 ), columns=['a' , 'b' , 'c' , 'd' ]) %timeit df['a' ] + df['b' ] * df['c' ] / (df['d' ] + 1 ) %timeit df.eval ('a + b * c / (d + 1)' ) df.eval ('ratio = a / (b + 0.001)' , inplace=True ) df.eval (''' x = a + b y = c - d z = x / (y + 1) ''' , inplace=True )
7.2 使用局部变量和 @ 前缀 threshold = 0.5 df.eval ('a > @threshold' ) df.eval ('flag = a > @threshold' , inplace=True )
7.3 运算符对照
运算符
等效写法
+ - * / ** %
算术,同 Python
== != < <= > >=
比较
& | ~
逻辑与、或、非
and or not
等效的单词形式
sin cos sqrt abs log exp
数学函数
str.contains
字符串方法
八、数据库互操作 —— read_sql / to_sql 8.1 SQLAlchemy 集成 Pandas 通过 SQLAlchemy 的 Engine 对象连接各种数据库:
from sqlalchemy import create_engineengine = create_engine('postgresql://user:password@localhost:5432/mydb' ) df = pd.read_sql_table('users' , engine) df = pd.read_sql_query(''' SELECT department, AVG(salary) as avg_salary FROM employees WHERE year = 2024 GROUP BY department HAVING AVG(salary) > 50000 ORDER BY avg_salary DESC ''' , engine)df = pd.read_sql('SELECT * FROM users WHERE active = 1' , engine)
8.2 写入数据库 df.to_sql('processed_data' , engine, if_exists='replace' , index=False , chunksize=1000 , method='multi' ) from sqlalchemy.types import Integer, String, Float, DateTimedf.to_sql('table_name' , engine, dtype={'id' : Integer, 'name' : String(100 ), 'amount' : Float(precision=2 )}, if_exists='append' , index=False )
8.3 分块读取大查询 chunks = [] for chunk in pd.read_sql_query( 'SELECT * FROM huge_table WHERE date >= "2024-01-01"' , engine, chunksize=10000 ): processed_chunk = chunk.groupby('category' ).agg({'amount' : 'sum' }) chunks.append(processed_chunk) final_result = pd.concat(chunks).groupby('category' ).sum ()
九、时间序列 —— resample 与时间处理 9.1 生成日期范围 dates = pd.date_range('2024-01-01' , '2024-12-31' , freq='D' ) biz_dates = pd.date_range('2024-01-01' , periods=252 , freq='B' ) hours = pd.date_range('2024-01-01' , periods=24 *7 , freq='H' ) custom_freq = pd.date_range('2024-01-01' , periods=10 , freq='2W-WED' )
9.2 resample —— 时间重采样 resample 是时间序列聚合的核心工具,将时间序列从一个频率转换为另一个频率:
idx = pd.date_range('2024-01-01' , periods=10000 , freq='T' ) ts = pd.Series(np.random.randn(10000 ).cumsum(), index=idx, name='value' ) hourly = ts.resample('H' ).agg(['mean' , 'min' , 'max' , 'std' , 'first' , 'last' ]) seconds = ts.resample('30S' ).interpolate(method='linear' ) ohlc = ts.resample('H' ).ohlc()
9.3 自定义重采样规则 ts_utc = ts.tz_localize('UTC' ) ts.resample('H' , label='right' ) ts.resample('H' , label='left' ) ts.resample('H' , closed='right' ) ts.resample('H' , closed='left' ) def weighted_mean (x ): return (x * np.linspace(1 , 2 , len (x))).mean() ts.resample('H' ).apply(weighted_mean)
9.4 时间属性与操作 df = pd.DataFrame({'date' : pd.date_range('2024-01-01' , periods=100 )}) df['year' ] = df['date' ].dt.year df['month' ] = df['date' ].dt.month df['day' ] = df['date' ].dt.day df['weekday' ] = df['date' ].dt.weekday df['day_name' ] = df['date' ].dt.day_name() df['quarter' ] = df['date' ].dt.quarter df['is_quarter_end' ] = df['date' ].dt.is_quarter_end df['days_in_month' ] = df['date' ].dt.days_in_month df['next_month' ] = df['date' ] + pd.DateOffset(months=1 ) df['prev_biz_day' ] = df['date' ] - pd.offsets.BDay(1 ) df['days_since' ] = (df['date' ] - pd.Timestamp('2024-01-01' )).dt.days
9.5 shift / diff / pct_change —— 滞后与变化 ts = pd.Series(np.random.randn(100 ).cumsum() + 100 ) ts_lag1 = ts.shift(1 ) ts_lag5 = ts.shift(5 ) ts_lead1 = ts.shift(-1 ) ts_diff1 = ts.diff(1 ) ts_diff5 = ts.diff(5 ) ts_pct = ts.pct_change() ts_pct5 = ts.pct_change(5 )
十、绘图集成 —— plot Pandas 内置了基于 Matplotlib 的绘图接口,通过 DataFrame.plot 和 Series.plot 方法可以快速生成各种图表:
import matplotlib.pyplot as pltdf = pd.DataFrame({ 'sales' : np.random.randint(100 , 300 , 100 ), 'cost' : np.random.randint(50 , 150 , 100 ), 'marketing_spend' : np.random.randint(10 , 50 , 100 ), }) df.plot(figsize=(12 , 6 ), title='Sales & Cost Over Time' ) plt.show() df.plot(subplots=True , layout=(3 , 1 ), figsize=(10 , 8 ), sharex=True ) df.iloc[:10 ].plot.bar(figsize=(12 , 5 ), title='First 10 Days' ) df.iloc[:10 ].plot.barh(figsize=(12 , 5 )) df.iloc[:10 ].plot.bar(stacked=True ) df.plot.hist(alpha=0.5 , bins=20 , figsize=(10 , 6 )) df.plot.box(figsize=(8 , 6 )) df.plot.density(figsize=(10 , 6 )) df.plot.scatter(x='sales' , y='cost' , c='marketing_spend' , colormap='viridis' , figsize=(8 , 6 ), title='Cost vs Sales' ) df.plot.area(figsize=(10 , 6 ), alpha=0.6 , stacked=False ) df_agg = df.sum () df_agg.plot.pie(autopct='%1.1f%%' , figsize=(8 , 8 ))
配合时间序列索引:
ts = pd.Series(np.random.randn(252 ).cumsum(), index=pd.date_range('2024-01-01' , periods=252 , freq='B' )) ts.plot(figsize=(14 , 5 ), title='Cumulative Return' )
十一、Styler —— DataFrame 格式化输出 Styler 类允许对 DataFrame 的 HTML 输出(Jupyter Notebook / Web)进行丰富的格式化,包括条件颜色、渐变、条形图和工具提示等。
11.1 基本格式化 df = pd.DataFrame({ 'product' : ['Widget A' , 'Widget B' , 'Widget C' , 'Widget D' ], 'revenue' : [1250000 , 890000 , 2150000 , 560000 ], 'growth_pct' : [0.125 , -0.032 , 0.215 , 0.058 ], 'margin' : [0.45 , 0.38 , 0.52 , 0.41 ], }) styled = (df.style .format ({ 'revenue' : '¥{:,.0f}' , 'growth_pct' : '{:+.2%}' , 'margin' : '{:.1%}' , }) .hide(axis='index' ) .set_caption('2024 Q2 产品销售报告' ) )
11.2 条件背景色和文字色 import seaborn as snsstyled = df.style.background_gradient(subset=['revenue' ], cmap='Blues' ) styled = df.style.background_gradient(subset=['growth_pct' ], cmap=sns.diverging_palette(10 , 240 , n=20 ), vmin=-0.2 , vmax=0.3 ) styled = df.style.text_gradient(subset=['margin' ], cmap='RdYlGn' ) def color_negative_red (val ): color = 'red' if val < 0 else 'green' return f'color: {color} ' styled = df.style.applymap(color_negative_red, subset=['growth_pct' ]) def highlight_max (s ): is_max = s == s.max () return ['background-color: yellow' if v else '' for v in is_max] styled = df.style.apply(highlight_max, subset=['revenue' , 'growth_pct' , 'margin' ])
11.3 内置条形图 styled = df.style.bar(subset=['revenue' ], color='#5fba7d' ) styled = df.style.bar(subset=['growth_pct' ], color=['#d65f5f' , '#5fba7d' ], align='mid' )
11.4 自定义多条件着色 def traffic_light (val ): """三色灯:红 / 黄 / 绿""" if val > 0.15 : color = '#4CAF50' elif val > 0 : color = '#FFC107' else : color = '#F44336' return f'background-color: {color} ; color: white' styled = df.style.applymap(traffic_light, subset=['growth_pct' ])
11.5 导出样式 html = styled.to_html() styled.to_excel('report.xlsx' , engine='openpyxl' )
十二、生产级数据流水线 实际项目中的数据处理往往涉及多个步骤:加载、清洗、转换、特征工程、聚合、输出。良好的管道设计应满足:可读性、可测试性、可复用性和性能。
12.1 设计模式:函数式管道 from typing import List , Callable import pandas as pdStep = Callable [[pd.DataFrame], pd.DataFrame] def pipeline (steps: List [Step] ) -> Step: """将多个处理步骤组合成一个管道函数""" def run (df: pd.DataFrame ) -> pd.DataFrame: for step in steps: df = step(df) return df return run def parse_dates (df: pd.DataFrame ) -> pd.DataFrame: df['date' ] = pd.to_datetime(df['date' ], errors='coerce' ) return df def validate_amount (df: pd.DataFrame ) -> pd.DataFrame: df['amount' ] = pd.to_numeric(df['amount' ], errors='coerce' ) return df[df['amount' ] > 0 ] def add_features (df: pd.DataFrame ) -> pd.DataFrame: return df.assign( year=df['date' ].dt.year, month=df['date' ].dt.month, day_of_week=df['date' ].dt.dayofweek, amount_log=np.log(df['amount' ] + 1 ), ) def clean_strings (df: pd.DataFrame ) -> pd.DataFrame: string_cols = df.select_dtypes(include='object' ).columns for col in string_cols: df[col] = df[col].str .strip().str .lower() return df processing_pipeline = pipeline([ parse_dates, validate_amount, add_features, clean_strings, ]) df_raw = pd.read_csv('raw_data.csv' ) df_clean = processing_pipeline(df_raw)
12.2 带日志的管道 import loggingfrom functools import wrapslogger = logging.getLogger(__name__) def log_step (func ): """装饰器:记录处理前后 DataFrame 的形状""" @wraps(func ) def wrapper (df: pd.DataFrame, *args, **kwargs ) -> pd.DataFrame: logger.info(f"[{func.__name__} ] 输入: {df.shape} " ) result = func(df, *args, **kwargs) logger.info(f"[{func.__name__} ] 输出: {result.shape} " ) return result return wrapper @log_step def remove_duplicates (df ): return df.drop_duplicates(subset=['date' , 'user_id' ]) @log_step def filter_recent (df, days=90 ): cutoff = pd.Timestamp.now() - pd.Timedelta(days=days) return df[df['date' ] >= cutoff]
12.3 大数据的分块处理 当数据量超过内存时,使用 chunksize 参数分块读取和处理:
def process_in_chunks (filepath: str , chunksize: int = 100_000 ): """分块处理超大 CSV 文件""" chunk_results = [] for chunk in pd.read_csv(filepath, chunksize=chunksize, parse_dates=['date' ]): processed = (chunk .pipe(validate_amount) .pipe(add_features) .pipe(clean_strings) ) summary = processed.groupby(['year' , 'month' , 'category' ]) \ .agg({'amount' : ['sum' , 'count' ]}) chunk_results.append(summary) final = pd.concat(chunk_results).groupby(level=[0 ,1 ,2 ]).sum () return final
12.4 使用 Dask 进行大规模 Pandas 操作 当数据量超过单机内存(例如 10GB+ 的 CSV)且分块处理不够方便时,可以使用 Dask DataFrame,它提供了与 Pandas 几乎相同的 API,但延迟计算,可分布式执行:
import dask.dataframe as ddddf = dd.read_csv('huge_dataset/*.csv' , parse_dates=['timestamp' ]) ddf['year' ] = ddf['timestamp' ].dt.year ddf_clean = ddf[ddf['amount' ] > 0 ] grouped = ddf_clean.groupby(['year' , 'category' ])['amount' ].agg(['sum' , 'mean' ]) result = grouped.compute()
12.5 内存优化技巧 def optimize_dtypes (df ): """自动优化 DataFrame 的 dtype 以减少内存占用""" for col in df.columns: col_type = df[col].dtype if col_type == 'object' : num_unique = df[col].nunique() num_total = len (df[col]) if num_unique / num_total < 0.5 : df[col] = df[col].astype('category' ) elif 'int' in str (col_type): c_min, c_max = df[col].min (), df[col].max () if c_min >= 0 : if c_max < 255 : df[col] = df[col].astype('uint8' ) elif c_max < 65535 : df[col] = df[col].astype('uint16' ) elif c_max < 4294967295 : df[col] = df[col].astype('uint32' ) else : if c_min > -128 and c_max < 127 : df[col] = df[col].astype('int8' ) elif c_min > -32768 and c_max < 32767 : df[col] = df[col].astype('int16' ) elif c_min > -2147483648 and c_max < 2147483647 : df[col] = df[col].astype('int32' ) elif 'float' in str (col_type): df[col] = pd.to_numeric(df[col], downcast='float' ) return df df = pd.DataFrame({'id' : range (1000000 ), 'value' : np.random.random(1000000 )}) print (f"优化前: {df.memory_usage(deep=True ).sum () / 1024 **2 :.1 f} MB" )df_optimized = optimize_dtypes(df) print (f"优化后: {df_optimized.memory_usage(deep=True ).sum () / 1024 **2 :.1 f} MB" )
十三、高级操作示例 13.1 merge / join 的各种模式 orders = pd.DataFrame({ 'order_id' : [1 , 2 , 3 , 4 ], 'user_id' : [1 , 2 , 1 , 5 ], 'amount' : [100 , 200 , 150 , 300 ], }) users = pd.DataFrame({ 'user_id' : [1 , 2 , 3 ], 'name' : ['Alice' , 'Bob' , 'Charlie' ], 'vip' : [True , False , True ], }) merged = pd.merge(orders, users, on='user_id' , how='inner' ) merged = pd.merge(orders, users, on='user_id' , how='left' ) pd.merge(left, right, on=['key1' , 'key2' ], how='outer' ) orders_indexed = orders.set_index('user_id' ) users_indexed = users.set_index('user_id' ) result = orders_indexed.join(users_indexed, how='inner' ) pd.merge(orders, users, on='user_id' , validate='m:1' )
transform 与 agg 不同——agg 返回缩小形状的聚合结果,transform 返回与输入相同形状的广播结果:
df = pd.DataFrame({ 'department' : ['HR' , 'HR' , 'Tech' , 'Tech' , 'Tech' , 'Sales' , 'Sales' ], 'employee' : ['A' ,'B' ,'C' ,'D' ,'E' ,'F' ,'G' ], 'salary' : [5000 , 6000 , 8000 , 9000 , 10000 , 5500 , 6500 ], }) df['dept_avg' ] = df.groupby('department' )['salary' ].transform('mean' ) df['salary_zscore' ] = df.groupby('department' )['salary' ].transform( lambda x: (x - x.mean()) / x.std() ) df['dept_rank' ] = df.groupby('department' )['salary' ].transform('rank' , ascending=False )
13.3 使用 cut / qcut 进行分箱 ages = np.random.randint(0 , 90 , 100 ) bins = [0 , 18 , 35 , 50 , 65 , 100 ] labels = ['未成年' , '青年' , '中年' , '中老年' , '老年' ] age_cat = pd.cut(ages, bins=bins, labels=labels) age_quartile = pd.qcut(ages, q=4 , labels=['Q1' , 'Q2' , 'Q3' , 'Q4' ]) print (pd.value_counts(age_cat))
13.4 使用 explode 展开列表列 df = pd.DataFrame({ 'user' : ['Alice' , 'Bob' ], 'tags' : [['python' , 'ml' ], ['java' , 'spring' , 'k8s' ]], }) exploded = df.explode('tags' ) print (exploded)
十四、总结 Pandas 远不止读 CSV + groupby。经过本文的梳理,你应该已经掌握了以下专家级工具:
MultiIndex :通过多级索引在二维 DataFrame 中表示高维数据,配合 xs 和 IndexSlice 灵活切片。
窗口函数 :rolling(固定窗口)、expanding(累积窗口)、ewm(指数加权窗口),是时序特征工程的核心。
数据重塑 :pivot/pivot_table 宽化、melt 长化、stack/unstack 翻层、crosstab 交叉统计。
分类数据类型 :category dtype 大幅降低字符型列的内存占用(可达 98%),且支持有序类别和图统计。
方法链式编程 :pipe + assign + query 的组合,使数据处理流水线声明式、无中间变量。
eval / query :利用 numexpr 加速表达式求值,避免中间数组分配。
数据库互操作 :通过 SQLAlchemy + read_sql / to_sql 无缝连接各种数据库。
时间序列 :resample 重采样、shift/diff/pct_change 滞后特征、丰富的日期属性提取。
绘图 :DataFrame.plot 一键生成折线、柱状、散点、直方图、密度图等。
Styler :HTML 表格的精细化格式化,条件着色、条形图、渐变背景。
生产管道 :函数式管道设计、分块处理、dtype 优化、Dask 扩展。
掌握这些工具后,你可以将大部分数据清洗和特征工程任务表示为声明式的 Pandas 管道,而非零散的探索性代码,从而显著提高代码的可维护性和执行效率。
参考资料 :