目录
  1. 1. 一、collections —— 增强的数据结构
    1. 1.1. 1.1 namedtuple —— 轻量级数据类
    2. 1.2. 1.2 Counter —— 计数器
    3. 1.3. 1.3 deque —— 双端队列
    4. 1.4. 1.4 defaultdict —— 带默认值的字典
    5. 1.5. 1.5 OrderedDict —— 保序字典
    6. 1.6. 1.6 ChainMap —— 链式映射
  2. 2. 二、itertools —— 高效的迭代器工具
    1. 2.1. 2.1 无限迭代器
    2. 2.2. 2.2 链式与分组
    3. 2.3. 2.3 笛卡尔积与排列组合
    4. 2.4. 2.4 实用工具:islice, accumulate, takewhile, dropwhile
    5. 2.5. 2.5 zip_longest
  3. 3. 三、functools —— 高阶函数与函数工具
    1. 3.1. 3.1 lru_cache —— 函数结果缓存
    2. 3.2. 3.2 partial —— 部分应用
    3. 3.3. 3.3 reduce —— 归约
    4. 3.4. 3.4 wraps —— 保持被装饰函数的元信息
    5. 3.5. 3.5 cached_property (Python 3.8+)
    6. 3.6. 3.6 singledispatch —— 单分派泛型函数
  4. 4. 四、pathlib —— 面向对象的路径处理
    1. 4.1. 4.1 基本操作
    2. 4.2. 4.2 文件与目录操作
    3. 4.3. 4.3 os.path vs pathlib 对照
  5. 5. 五、concurrent.futures —— 并发执行
    1. 5.1. 5.1 ThreadPoolExecutor
    2. 5.2. 5.2 ProcessPoolExecutor
    3. 5.3. 5.3 GIL 的影响
    4. 5.4. 5.4 处理超时与取消
  6. 6. 六、subprocess —— 进程管理
    1. 6.1. 6.1 run —— 推荐的高级 API
    2. 6.2. 6.2 Popen —— 底层 API
    3. 6.3. 6.3 安全注意事项
  7. 7. 七、json / csv / sqlite3 —— 数据序列化与持久化
    1. 7.1. 7.1 json
    2. 7.2. 7.2 csv
    3. 7.3. 7.3 sqlite3
  8. 8. 八、logging —— 日志系统
    1. 8.1. 8.1 基本用法
    2. 8.2. 8.2 日志级别
    3. 8.3. 8.3 Logger / Handler / Formatter 层次结构
    4. 8.4. 8.4 使用字典配置(dictConfig)
  9. 9. 九、argparse —— 命令行参数解析
    1. 9.1. 9.1 基础用法
    2. 9.2. 9.2 子命令
  10. 10. 十、dataclasses —— 减少样板代码
    1. 10.1. 10.1 基本使用
    2. 10.2. 10.2 field 函数的配置
  11. 11. 十一、typing —— 类型提示
    1. 11.1. 11.1 基本类型注解
    2. 11.2. 11.2 泛型 —— TypeVar, Generic, Protocol
    3. 11.3. 11.3 Protocol —— 结构化子类型
  12. 12. 十二、常用工具模块速查
    1. 12.1. 12.1 os / os.path
    2. 12.2. 12.2 shutil —— 高级文件操作
    3. 12.3. 12.3 tempfile —— 临时文件
    4. 12.4. 12.4 hashlib —— 哈希摘要
    5. 12.5. 12.5 base64
    6. 12.6. 12.6 enum —— 枚举类型
  13. 13. 十三、总结
【Python系列】常用标准库

Python 的标准库是其「自带电池」哲学的最佳体现。从数据结构增强、函数式编程工具、文件路径处理,到并发编程、子进程管理、数据序列化、日志系统等,Python 标准库提供了生产级质量的模块,无需安装第三方依赖即可直接使用。本文系统梳理日常开发中最常用的十余个标准库模块,按照数据结构、函数工具、文件系统、并发、系统交互、数据格式、日志与配置、类型系统等维度组织,每个模块提供详细的 API 说明和实战示例。

一、collections —— 增强的数据结构

collections 模块提供了 Python 内置容器类型(list、dict、tuple、set)之外的专业数据结构,这些类型在特定场景下比内置类型更高效、语义更清晰。

1.1 namedtuple —— 轻量级数据类

namedtuple 创建的类兼具 tuple 的不可变性和对象式字段访问的便利性,内存开销远小于普通类。它适合表示坐标、RGB 颜色、数据库记录等轻量数据结构。

from collections import namedtuple

# 定义
Point = namedtuple('Point', ['x', 'y', 'z'])
# 或者用空格分隔的字符串
Point = namedtuple('Point', 'x y z')

# 创建实例
p = Point(1, 2, 3)
p = Point(x=1, y=2, z=3)

# 访问(字段名访问 + 索引访问均可)
print(p.x, p[0]) # 1 1
print(p.y, p[1]) # 2 2
print(p.z, p[2]) # 3 3

# 解包
x, y, z = p

# 内置方法
print(p._fields) # ('x', 'y', 'z')
print(p._asdict()) # {'x': 1, 'y': 2, 'z': 3}
p2 = p._replace(x=10) # 返回新实例,p 本身不变

# 使用默认值(Python 3.7+ 的 defaults 参数)
Point = namedtuple('Point', 'x y z', defaults=(0, 0))
p = Point(1) # Point(x=1, y=0, z=0)

# 从可迭代对象创建
p = Point._make([1, 2, 3])

生产环境中的典型用法:

# 表示 API 返回的记录
User = namedtuple('User', 'id name email created_at')
users = [User(1, 'Alice', 'a@example.com', '2024-01-01'),
User(2, 'Bob', 'b@example.com', '2024-01-02')]
print(users[0].name) # Alice

# 注意:如果字段数量固定但可能变化,考虑 typing.NamedTuple 以获得类型提示
from typing import NamedTuple
class User(NamedTuple):
id: int
name: str
email: str
created_at: str

1.2 Counter —— 计数器

Counter 是 dict 的子类,用于计数。它对可哈希对象的出现次数进行统计:

from collections import Counter

# 从可迭代对象创建
words = ['apple', 'banana', 'apple', 'orange', 'banana', 'apple']
c = Counter(words)
print(c) # Counter({'apple': 3, 'banana': 2, 'orange': 1})

# 从字符串创建(统计字符频率)
c = Counter('abracadabra')
print(c) # Counter({'a': 5, 'b': 2, 'r': 2, 'c': 1, 'd': 1})

# 从字典或关键字参数创建
c = Counter({'red': 4, 'blue': 2})
c = Counter(red=4, blue=2)

# ---- 常用方法 ----
# top k 高频元素
print(c.most_common(2)) # [('red', 4), ('blue', 2)]

# 访问不存在的 key 返回 0(而不是 KeyError)
print(c['green']) # 0

# 动态增加计数
c.update(['red', 'green'])
print(c['red']) # 5

# 不存在的元素可以用 del 或减法移除
c['green'] -= 1
print(c['green']) # 0(但不自动删除)
c += Counter() # 去掉非正值元素(技巧)

# Counter 支持算术运算
c1 = Counter(a=3, b=1)
c2 = Counter(a=1, b=2)
print(c1 + c2) # Counter({'a': 4, 'b': 3})
print(c1 - c2) # Counter({'a': 2})(b 被消除,因为 1-2 < 0)
print(c1 & c2) # Counter({'a': 1, 'b': 1}) # min(c1, c2)
print(c1 | c2) # Counter({'a': 3, 'b': 2}) # max

实际场景:日志分析中统计 IP 频率、文本词频统计等。

1.3 deque —— 双端队列

deque(double-ended queue)支持在两端以 O(1) 的时间复杂度插入和删除元素,特别适合实现队列、栈和滑动窗口:

from collections import deque

# 创建(可选 maxlen 限制最大长度)
dq = deque([1, 2, 3], maxlen=10)

# 两端操作 O(1)
dq.append(4) # 右端追加 → deque([1,2,3,4])
dq.appendleft(0) # 左端追加 → deque([0,1,2,3,4])
dq.pop() # 右端弹出 → 4
dq.popleft() # 左端弹出 → 0

# 当 append 超过 maxlen 时,另一端的元素自动被挤出
dq = deque(maxlen=3)
dq.extend([1, 2, 3, 4, 5])
print(dq) # deque([3, 4, 5]) —— 1 和 2 被自动挤出

# extend / extendleft
dq.extend([6, 7]) # deque([4, 5, 6, 7])
dq.extendleft([1, 2]) # deque([2, 1, 4, 5]) 注意:顺序反转

# rotate —— 循环移动
dq = deque([1, 2, 3, 4, 5])
dq.rotate(2) # deque([4, 5, 1, 2, 3]) 右移 2
dq.rotate(-1) # deque([5, 1, 2, 3, 4]) 左移 1

一个经典的滑动窗口实现(计算移动平均):

from collections import deque

def moving_average(iterable, window_size):
it = iter(iterable)
dq = deque(maxlen=window_size)
result = []
for x in it:
dq.append(x)
if len(dq) == window_size:
result.append(sum(dq) / window_size)
return result

import random
data = [random.randint(0, 100) for _ in range(20)]
print(moving_average(data, 5))

1.4 defaultdict —— 带默认值的字典

defaultdict 在访问不存在的 key 时,自动调用提供的工厂函数生成默认值,避免了反复的 if key in d 检查:

from collections import defaultdict

# 工厂函数在访问缺失键时调用
d = defaultdict(int) # int() 返回 0
d['a'] += 1
print(d['a']) # 1
print(d['b']) # 0(自动创建并返回 int() 的默认值)

d = defaultdict(list) # list() 返回 []
d['users'].append('Alice')
d['users'].append('Bob')
print(d) # {'users': ['Alice', 'Bob']}

d = defaultdict(set) # set() 返回 set()
d['tags'].add('python')
d['tags'].add('numpy')

d = defaultdict(lambda: 'unknown')
print(d['missing']) # 'unknown'

# 典型场景:按键分组
items = [('fruit', 'apple'), ('fruit', 'banana'), ('veg', 'carrot'), ('fruit', 'orange')]
groups = defaultdict(list)
for category, item in items:
groups[category].append(item)
print(dict(groups))
# {'fruit': ['apple', 'banana', 'orange'], 'veg': ['carrot']}

# 嵌套 defaultdict:创建自动生长的树结构
def nested_dict():
return defaultdict(nested_dict)

tree = nested_dict()
tree['company']['department']['team'] = 'AI'
print(tree['company']['department']['team']) # 'AI'

1.5 OrderedDict —— 保序字典

在 Python 3.7+ 中,普通 dict 已经保证插入顺序。但 OrderedDict 仍提供了一些 dict 没有的方法:

from collections import OrderedDict

od = OrderedDict()
od['a'] = 1
od['b'] = 2
od['c'] = 3

# 特有方法
od.move_to_end('a') # 把 'a' 移到最后
print(list(od.keys())) # ['b', 'c', 'a']
od.move_to_end('a', last=False) # 把 'a' 移到最前
print(list(od.keys())) # ['a', 'b', 'c']

# popitem 支持 LIFO(默认)和 FIFO
od.popitem() # LIFO: 弹出 ('c', 3)
od.popitem(last=False) # FIFO: 弹出 ('a', 1)

# OrderedDict 在比较时考虑顺序(普通 dict 在比较时忽略顺序)
od1 = OrderedDict([('a', 1), ('b', 2)])
od2 = OrderedDict([('b', 2), ('a', 1)])
print(od1 == od2) # False(顺序不同)
print(dict(od1) == dict(od2)) # True(dict 比较忽略顺序)

1.6 ChainMap —— 链式映射

ChainMap 将多个映射(字典)串联起来,形成一个逻辑视图。查找时按顺序依次搜索,但不复制数据:

from collections import ChainMap

# 典型场景:配置分层(命令行参数 > 环境变量 > 默认值)
defaults = {'host': 'localhost', 'port': 8080, 'debug': False}
env = {'host': 'prod.example.com', 'port': 443}
cli = {'debug': True}

config = ChainMap(cli, env, defaults)
print(config['host']) # 'prod.example.com'(优先使用 env)
print(config['port']) # 443(env 覆盖 defaults)
print(config['debug']) # True(cli 覆盖 defaults)

# maps 属性返回底层字典列表(可直接修改)
print(config.maps) # [{'debug': True}, {'host': '...'}, {...}]

# new_child 在链首添加新字典
config = config.new_child({'port': 9090})
print(config['port']) # 9090

# parents 返回去掉链首的 ChainMap
print(config.parents['port']) # 443

二、itertools —— 高效的迭代器工具

itertools 提供了一系列用于构建和组合迭代器的函数,分为三类:无限迭代器、终止于最短输入序列的迭代器、组合生成迭代器。这些函数全部用 C 实现,速度极快,且内存效率极高(惰性求值)。

2.1 无限迭代器

import itertools

# count(start, step) —— 无限等差数列
counter = itertools.count(10, 2)
print([next(counter) for _ in range(5)]) # [10, 12, 14, 16, 18]

# cycle(iterable) —— 无限循环
colors = itertools.cycle(['r', 'g', 'b'])
print([next(colors) for _ in range(7)]) # ['r', 'g', 'b', 'r', 'g', 'b', 'r']

# repeat(obj, times=None) —— 无限/有限重复
print(list(itertools.repeat('x', 5))) # ['x', 'x', 'x', 'x', 'x']

2.2 链式与分组

# chain —— 串联多个迭代器
a = [1, 2, 3]
b = [4, 5]
c = [6, 7, 8]
print(list(itertools.chain(a, b, c))) # [1,2,3,4,5,6,7,8]

# chain.from_iterable —— 展开嵌套结构
nested = [[1, 2], [3, 4], [5]]
print(list(itertools.chain.from_iterable(nested))) # [1,2,3,4,5]

# groupby —— 按 key 函数分组(需要预先排序!)
data = [('A', 1), ('A', 2), ('B', 3), ('B', 4), ('C', 5)]
for key, group in itertools.groupby(data, key=lambda x: x[0]):
print(key, list(group))
# A [('A', 1), ('A', 2)]
# B [('B', 3), ('B', 4)]
# C [('C', 5)]

# 注意:groupby 只合并相邻的相同 key,通常需要先排序
unsorted = [('A', 2), ('B', 3), ('A', 1)]
for key, group in itertools.groupby(unsorted, lambda x: x[0]):
print(key, list(group))
# A [('A', 2)] ← 只分到一个!
# B [('B', 3)]
# A [('A', 1)] ← 另一个 A 组
# 正确用法:先排序
unsorted.sort(key=lambda x: x[0])
for key, group in itertools.groupby(unsorted, lambda x: x[0]):
print(key, list(group))
# A [('A', 1), ('A', 2)]
# B [('B', 3)]

2.3 笛卡尔积与排列组合

import itertools

# product(*iterables, repeat=1) —— 笛卡尔积
print(list(itertools.product([1, 2], ['A', 'B'])))
# [(1, 'A'), (1, 'B'), (2, 'A'), (2, 'B')]

# repeat 参数:与自身的笛卡尔积
print(list(itertools.product([1, 2], repeat=3)))
# [(1,1,1),(1,1,2),(1,2,1),(1,2,2),(2,1,1),(2,1,2),(2,2,1),(2,2,2)]

# permutations(iterable, r) —— 排列(有序,不放回)
print(list(itertools.permutations([1, 2, 3], 2)))
# [(1,2), (1,3), (2,1), (2,3), (3,1), (3,2)]

# combinations(iterable, r) —— 组合(无序,不放回)
print(list(itertools.combinations([1, 2, 3], 2)))
# [(1,2), (1,3), (2,3)]

# combinations_with_replacement —— 组合(无序,可放回)
print(list(itertools.combinations_with_replacement([1, 2, 3], 2)))
# [(1,1), (1,2), (1,3), (2,2), (2,3), (3,3)]

2.4 实用工具:islice, accumulate, takewhile, dropwhile

# islice —— 惰性切片(适用于任何迭代器,不限于序列)
from itertools import islice
gen = (x**2 for x in range(100))
print(list(islice(gen, 10, 20))) # 取第 10-19 个元素

# 使用 islice 可以安全地取无限迭代器的某一段
counter = itertools.count()
print(list(islice(counter, 5, 10))) # [5, 6, 7, 8, 9]

# accumulate —— 累积
import itertools, operator
a = [1, 2, 3, 4, 5]
print(list(itertools.accumulate(a))) # 求和累积 [1,3,6,10,15]
print(list(itertools.accumulate(a, operator.mul))) # 乘积累积 [1,2,6,24,120]
# 复杂例子:保留前缀最大值
print(list(itertools.accumulate([3,1,4,1,5,9], max))) # [3,3,4,4,5,9]

# takewhile / dropwhile —— 条件终止/条件起始
data = [1, 3, 5, 7, 2, 4, 6]
print(list(itertools.takewhile(lambda x: x < 6, data))) # [1, 3, 5] —— 遇到 7 即停止
print(list(itertools.dropwhile(lambda x: x < 6, data))) # [7, 2, 4, 6] —— 跳过前面满足条件的

# filterfalse —— 与 filter 相反
print(list(itertools.filterfalse(lambda x: x % 2, range(10)))) # [0,2,4,6,8]

# pairwise(Python 3.10+)—— 相邻对
print(list(itertools.pairwise([1, 2, 3, 4]))) # [(1,2), (2,3), (3,4)]

# starmap —— 对参数元组解包后应用函数
print(list(itertools.starmap(pow, [(2,5), (3,2), (10,3)]))) # [32, 9, 1000]

# tee —— 复制迭代器(注意:会缓存数据,可能大量消耗内存)
it = iter([1, 2, 3, 4])
it1, it2 = itertools.tee(it, 2)
print(list(it1)) # [1, 2, 3, 4]
print(list(it2)) # [1, 2, 3, 4]

2.5 zip_longest

from itertools import zip_longest

# zip 在最短序列结束后停止;zip_longest 在最长序列结束后才停止
a = [1, 2, 3]
b = ['a', 'b']
print(list(zip(a, b))) # [(1,'a'), (2,'b')]
print(list(zip_longest(a, b, fillvalue='MISSING'))) # [(1,'a'), (2,'b'), (3,'MISSING')]

三、functools —— 高阶函数与函数工具

functools 是函数式编程的瑞士军刀,提供了对函数进行装饰、包装、部分应用和缓存的能力。

3.1 lru_cache —— 函数结果缓存

lru_cache 是性能优化的利器。它记录函数的输入和输出,当相同参数再次调用时直接返回缓存结果,跳过计算:

from functools import lru_cache
import time

@lru_cache(maxsize=128) # 最多缓存 128 个结果
def fibonacci(n):
"""不使用缓存,fib(35) 需要数秒;使用后瞬间完成"""
if n <= 1:
return n
return fibonacci(n - 1) + fibonacci(n - 2)

start = time.perf_counter()
print(fibonacci(200)) # 几乎瞬时
print(time.perf_counter() - start)

# 检查缓存信息
print(fibonacci.cache_info()) # CacheInfo(hits=198, misses=201, maxsize=128, currsize=128)
fibonacci.cache_clear() # 清空缓存

# 无限制缓存:maxsize=None(注意内存泄漏风险)
@lru_cache(maxsize=None)
def compute_expensive(x):
pass

缓存机制在算法竞赛和回溯/DP 场景中极其实用,等同于自动化的 memoization。

3.2 partial —— 部分应用

partial 预先填入一个函数的部分参数,返回一个接受了其余参数的新函数。这在回调和多参数固定中非常有用:

from functools import partial

# 预填部分参数
def power(base, exponent):
return base ** exponent

square = partial(power, exponent=2)
cube = partial(power, exponent=3)
print(square(5)) # 25
print(cube(5)) # 125

# 典型场景:多线程/多进程中的目标函数
import concurrent.futures

def process(url, timeout, retries):
# ...实际的网络请求逻辑...
return len(url)

with concurrent.futures.ThreadPoolExecutor() as executor:
process_with_args = partial(process, timeout=10, retries=3)
results = executor.map(process_with_args, ['http://a.com', 'http://b.com'])

# partialmethod —— 用于类方法的部分应用
from functools import partialmethod

class Server:
def request(self, method, path, **kwargs):
print(f"{method} {path} {kwargs}")

# 预定义 GET / POST 方法
get = partialmethod(request, 'GET')
post = partialmethod(request, 'POST')

s = Server()
s.get('/users') # GET /users {}
s.post('/login', body='{...}') # POST /login {'body': '{...}'}

3.3 reduce —— 归约

from functools import reduce
import operator

# 从 functools 导入(Python 3 中 reduce 已从内置函数中移除)
data = [1, 2, 3, 4, 5]
print(reduce(operator.add, data)) # 15
print(reduce(operator.mul, data)) # 120
print(reduce(lambda a, b: a * b, data, 1)) # 120(带初始值)

# reduce 逐步展开:
# 第1步: 1 * 2 = 2
# 第2步: 2 * 3 = 6
# 第3步: 6 * 4 = 24
# 第4步: 24 * 5 = 120

# 自定义 reduce:将嵌套列表 flatten
def flatten_one_level(acc, item):
if isinstance(item, list):
acc.extend(item)
else:
acc.append(item)
return acc

nested = [[1, 2], 3, [4, 5], 6]
print(reduce(flatten_one_level, nested, [])) # [1, 2, 3, 4, 5, 6]

3.4 wraps —— 保持被装饰函数的元信息

编写装饰器时,如果不使用 wraps,被装饰函数的 __name____doc____module__ 等元信息会被替换为装饰器内部函数的:

from functools import wraps

def timing(func):
@wraps(func) # 保持 func 的 __name__ 和 __doc__
def wrapper(*args, **kwargs):
import time
start = time.perf_counter()
result = func(*args, **kwargs)
elapsed = time.perf_counter() - start
print(f"{func.__name__} took {elapsed:.4f}s")
return result
return wrapper

@timing
def slow_sum(n):
"""Sum numbers from 1 to n."""
return sum(range(1, n + 1))

print(slow_sum.__name__) # 'slow_sum'(而不是 'wrapper')
print(slow_sum.__doc__) # 'Sum numbers from 1 to n.'(而不是 None)

3.5 cached_property (Python 3.8+)

将方法结果缓存为实例属性,在初次计算后后续访问直接返回缓存值:

from functools import cached_property

class Dataset:
def __init__(self, filepath):
self.filepath = filepath

@cached_property
def data(self):
"""只读取和解析一次,结果缓存在 self.__dict__ 中"""
print("Loading and parsing data...")
import json
with open(self.filepath) as f:
return json.load(f)

ds = Dataset('data.json')
print(ds.data) # 触发读取,打印 "Loading..."
print(ds.data) # 直接返回缓存,不读取文件

cached_propertyproperty + lru_cache 的区别在于它直接修改 __dict__,在实例作用域缓存,可被实例属性覆盖。

3.6 singledispatch —— 单分派泛型函数

from functools import singledispatch

@singledispatch
def serialize(obj):
"""默认行为:不支持的类型"""
raise TypeError(f"Unsupported type: {type(obj)}")

@serialize.register(int)
def _(obj):
return str(obj)

@serialize.register(float)
def _(obj):
return f"{obj:.6f}"

@serialize.register(list)
def _(obj):
return '[' + ', '.join(serialize(x) for x in obj) + ']'

@serialize.register(dict)
def _(obj):
return '{' + ', '.join(f'{k}: {serialize(v)}' for k,v in obj.items()) + '}'

print(serialize(42)) # "42"
print(serialize([1, 2.5, 3])) # "[1, 2.500000, 3]"

四、pathlib —— 面向对象的路径处理

pathlib 从 Python 3.4 起引入,提供了 Path 类来替代 os.path 的字符串操作。Path 对象重载了 / 运算符用于路径拼接,天然跨平台(Windows 使用 \,Unix 使用 /)。

4.1 基本操作

from pathlib import Path

# 创建路径对象
p = Path('/home/user/Documents/report.pdf')
print(p.name) # 'report.pdf'
print(p.stem) # 'report'
print(p.suffix) # '.pdf'
print(p.suffixes) # ['.pdf']
print(p.parent) # Path('/home/user/Documents')
print(p.parts) # ('/', 'home', 'user', 'Documents', 'report.pdf')
print(p.anchor) # '/'
print(p.as_uri()) # 'file:///home/user/Documents/report.pdf'

# 路径拼接(/ 运算符重载)
data_dir = Path('/data')
train_dir = data_dir / 'train' / 'images'
print(train_dir) # Path('/data/train/images')

# with_suffix 和 with_stem(Python 3.9+)
print(p.with_suffix('.txt')) # /home/user/Documents/report.txt
print(p.with_stem('summary')) # /home/user/Documents/summary.pdf

# 获取当前文件/目录
cwd = Path.cwd()
home = Path.home()

4.2 文件与目录操作

# ---- 检查 ----
p = Path('some_file.txt')
print(p.exists()) # 是否存在
print(p.is_file()) # 是否是普通文件
print(p.is_dir()) # 是否是目录
print(p.is_symlink()) # 是否是符号链接

# 文件元信息
if p.exists():
stat = p.stat()
print(stat.st_size) # 文件大小(字节)
print(stat.st_mtime) # 最后修改时间(Unix 时间戳)
from datetime import datetime
print(datetime.fromtimestamp(stat.st_mtime))

# ---- 创建目录 ----
dir_path = Path('a/b/c')
dir_path.mkdir(parents=True, exist_ok=True) # 等价 mkdir -p

# ---- 读写文件 ----
content = p.read_text(encoding='utf-8') # 读文本
lines = p.read_text().splitlines()

p.write_text('Hello, World!', encoding='utf-8') # 写文本

data = p.read_bytes() # 读二进制

# ---- 遍历目录 ----
for item in Path('.').iterdir(): # 不递归
print(item)

# glob 和 rglob(递归)
for py_file in Path('.').rglob('*.py'): # 递归找到所有 .py 文件
print(py_file)

for py_file in Path('.').glob('**/*.py'): # 等价写法
print(py_file)

# 按模式匹配(更复杂的 glob)
for f in Path('.').rglob('data/**/*.csv'):
print(f)

# ---- 删除 ----
p.unlink() # 删除文件
p.unlink(missing_ok=True) # Python 3.8+:文件不存在也不报错
dir_path.rmdir() # 删除空目录
# 删非空目录需用 shutil
import shutil
shutil.rmtree(dir_path)

4.3 os.path vs pathlib 对照

操作 os.path pathlib
拼接 os.path.join('a', 'b') Path('a') / 'b'
当前目录 os.getcwd() Path.cwd()
是否存在 os.path.exists(p) Path(p).exists()
是否文件 os.path.isfile(p) Path(p).is_file()
是否目录 os.path.isdir(p) Path(p).is_dir()
文件名 os.path.basename(p) Path(p).name
父目录 os.path.dirname(p) Path(p).parent
扩展名 os.path.splitext(p)[1] Path(p).suffix
解绝对路径 os.path.abspath(p) Path(p).resolve()

路径遍历的实际例子:

# 统计每个扩展名的文件数量
from pathlib import Path
from collections import Counter

c = Counter(p.suffix for p in Path('/usr/lib').rglob('*') if p.is_file())
print(c.most_common(5))

五、concurrent.futures —— 并发执行

concurrent.futures 提供了高层次的异步执行接口,通过 ThreadPoolExecutor(线程池)和 ProcessPoolExecutor(进程池)统一了多线程和多进程的编程模型。

5.1 ThreadPoolExecutor

适用于 I/O 密集型任务(网络请求、文件读写、数据库操作等),但由于 GIL 的存在,CPU 密集型任务不能通过多线程加速:

from concurrent.futures import ThreadPoolExecutor, as_completed
import time

def download(url):
"""模拟 I/O 密集型任务"""
time.sleep(0.5) # 模拟网络延迟
return f"Downloaded {url}"

urls = ['http://example.com/a', 'http://example.com/b',
'http://example.com/c', 'http://example.com/d']

# 方式 1:map —— 保持顺序
with ThreadPoolExecutor(max_workers=4) as executor:
results = list(executor.map(download, urls))
for url, result in zip(urls, results):
print(f"{url}{result}")

# 方式 2:submit + as_completed —— 按完成顺序处理
with ThreadPoolExecutor(max_workers=4) as executor:
future_to_url = {executor.submit(download, url): url for url in urls}
for future in as_completed(future_to_url):
url = future_to_url[future]
try:
result = future.result()
print(f"{url}{result}")
except Exception as exc:
print(f"{url} 生成异常: {exc}")

# 方式 3:submit + Future.result(按提交顺序)
with ThreadPoolExecutor(max_workers=4) as executor:
futures = [executor.submit(download, url) for url in urls]
for future, url in zip(futures, urls):
print(f"{url}{future.result()}")

5.2 ProcessPoolExecutor

适用于 CPU 密集型任务(数值计算、图像处理等)。每个进程有独立的 Python 解释器和 GIL,真正实现了并行:

from concurrent.futures import ProcessPoolExecutor
import math

def cpu_intensive(n):
"""计算素数的个数 — CPU 密集型"""
count = 0
for num in range(2, n):
is_prime = True
for i in range(2, int(math.sqrt(num)) + 1):
if num % i == 0:
is_prime = False
break
if is_prime:
count += 1
return count

# 用进程池并行处理
numbers = [10000, 15000, 20000, 25000, 30000, 35000]

with ProcessPoolExecutor(max_workers=4) as executor:
results = executor.map(cpu_intensive, numbers)
for n, count in zip(numbers, results):
print(f"n={n}: {count} primes")

5.3 GIL 的影响

Python 的全局解释器锁(GIL)确保同一时刻只有一个线程执行 Python 字节码。这意味着:

  • I/O 操作会释放 GIL,因此多线程在 I/O 密集场景下可以显著加速。
  • CPU 密集的纯 Python 代码持有 GIL,多线程无法并行,甚至可能因上下文切换变慢。应该使用 ProcessPoolExecutor
  • NumPy / TensorFlow 等 C 扩展在执行 C 代码时会释放 GIL,因此 NumPy 多线程操作确实可以受益。
# 验证 GIL 影响
import time
import threading

def countdown(n):
while n > 0:
n -= 1

# 单线程串行
start = time.perf_counter()
countdown(5_000_000)
countdown(5_000_000)
print(f"串行: {time.perf_counter() - start:.3f}s")

# 双线程 —— 实际可能更慢!
start = time.perf_counter()
t1 = threading.Thread(target=countdown, args=(5_000_000,))
t2 = threading.Thread(target=countdown, args=(5_000_000,))
t1.start(); t2.start()
t1.join(); t2.join()
print(f"并行: {time.perf_counter() - start:.3f}s")
# 串行: ~0.2s
# 并行: ~0.2s(并未更快,可能因 GIL 竞争而略慢)

5.4 处理超时与取消

from concurrent.futures import ThreadPoolExecutor, TimeoutError

def potentially_slow(n):
import time
time.sleep(n)
return f"Done after {n}s"

with ThreadPoolExecutor() as executor:
future = executor.submit(potentially_slow, 10)
try:
result = future.result(timeout=2) # 最多等 2 秒
except TimeoutError:
print("超时了,取消任务")
future.cancel() # 尝试取消(已经运行的任务无法取消)

六、subprocess —— 进程管理

subprocess 模块替代了旧式的 os.systemos.spawn* 系列函数,提供了完整的子进程创建、I/O 管理和状态控制能力。

6.1 run —— 推荐的高级 API

自 Python 3.5 起,subprocess.run 是运行子进程的推荐方式:

import subprocess

# 简单命令执行
result = subprocess.run(['ls', '-la', '/tmp'], capture_output=True, text=True)
print(result.returncode) # 0 表示成功
print(result.stdout) # 命令的标准输出
print(result.stderr) # 标准错误(如果有)

# capture_output=True 等价于 stdout=PIPE, stderr=PIPE
# text=True 等价于 universal_newlines=True(输出解码为字符串)

# 检查返回码
result = subprocess.run(['git', 'status'], capture_output=True, text=True)
if result.returncode == 0:
print(result.stdout)
else:
print(f"失败: {result.stderr}")

# check=True:非零退出码时抛出 CalledProcessError
try:
subprocess.run(['false'], check=True)
except subprocess.CalledProcessError as e:
print(f"命令失败,返回码: {e.returncode}")

# 超时控制
try:
subprocess.run(['sleep', '10'], timeout=2)
except subprocess.TimeoutExpired:
print("命令超时")

# 传递输入
result = subprocess.run(['grep', 'error'],
input='line1: ok\nline2: error: file not found\nline3: ok',
capture_output=True, text=True)
print(result.stdout) # line2: error: file not found

# 设置环境变量
result = subprocess.run(['echo', '$HOME'],
capture_output=True, text=True,
env={'HOME': '/custom/home', **dict(os.environ)}) # 需要 os

6.2 Popen —— 底层 API

当需要与子进程交互(如流式读取输出)时,使用 Popen

import subprocess

# 逐行读取长时间运行的命令输出
proc = subprocess.Popen(['ping', '-c', '5', 'google.com'],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True)

# 实时读取
for line in proc.stdout:
print(f"[实时] {line.strip()}")

proc.wait()
print(f"退出码: {proc.returncode}")

# 管道连接多个进程
# 等价于:ls -la | grep '.py'
p1 = subprocess.Popen(['ls', '-la'], stdout=subprocess.PIPE, text=True)
p2 = subprocess.Popen(['grep', '.py'], stdin=p1.stdout,
stdout=subprocess.PIPE, text=True)
p1.stdout.close() # 允许 p1 在 p2 退出时收到 SIGPIPE
output, _ = p2.communicate()
print(output)

# 后台进程
proc = subprocess.Popen(['python', 'long_running_script.py'],
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL)
print(f"子进程 PID: {proc.pid}")
# proc.kill() # 强制终止
# proc.terminate() # 温和终止(SIGTERM)

6.3 安全注意事项

永远不要使用 shell=True 拼接用户输入,这会导致命令注入漏洞:

# 危险:用户输入可以注入恶意命令
user_file = "; rm -rf /"
subprocess.run(f"ls {user_file}", shell=True) # 灾难!

# 安全:用列表传递参数
subprocess.run(['ls', user_file]) # 即使 user_file 包含特殊字符,也只作为参数

七、json / csv / sqlite3 —— 数据序列化与持久化

7.1 json

import json

# ---- 基本序列化与反序列化 ----
data = {'name': 'Alice', 'age': 30, 'skills': ['Python', 'C++'], 'active': True}

# Python → JSON 字符串
json_str = json.dumps(data, indent=2, ensure_ascii=False)
print(json_str)
# {
# "name": "Alice",
# "age": 30,
# "skills": ["Python", "C++"],
# "active": true
# }

# JSON 字符串 → Python
parsed = json.loads(json_str)
print(parsed['name']) # Alice

# ---- 文件操作 ----
with open('data.json', 'w', encoding='utf-8') as f:
json.dump(data, f, indent=2, ensure_ascii=False)

with open('data.json', 'r', encoding='utf-8') as f:
loaded = json.load(f)

# ---- 自定义序列化 ----
from datetime import datetime, date
import json

class CustomEncoder(json.JSONEncoder):
def default(self, obj):
if isinstance(obj, (datetime, date)):
return obj.isoformat()
if hasattr(obj, '__dict__'):
return obj.__dict__
return super().default(obj)

data = {'created_at': datetime.now(), 'event': 'start'}
json_str = json.dumps(data, cls=CustomEncoder)
print(json_str)

# 反序列化的自定义处理
def custom_decoder(dct):
"""将 ISO 日期字符串转换回 datetime"""
for key, value in dct.items():
if isinstance(value, str) and 'T' in value:
try:
dct[key] = datetime.fromisoformat(value)
except (ValueError, TypeError):
pass
return dct

parsed = json.loads(json_str, object_hook=custom_decoder)
print(type(parsed['created_at'])) # <class 'datetime.datetime'>

7.2 csv

import csv

# ---- 读取 CSV ----
with open('data.csv', 'r', encoding='utf-8') as f:
reader = csv.reader(f)
header = next(reader) # 跳过表头
for row in reader:
print(row)

# DictReader —— 每行作为字典
with open('data.csv', 'r', encoding='utf-8') as f:
reader = csv.DictReader(f)
for row in reader:
print(row['name'], row['age'])

# ---- 写入 CSV ----
rows = [
['name', 'age', 'city'],
['Alice', '28', 'Beijing'],
['Bob', '32', 'Shanghai'],
]
with open('output.csv', 'w', newline='', encoding='utf-8') as f:
writer = csv.writer(f)
writer.writerows(rows)

# DictWriter
with open('output.csv', 'w', newline='', encoding='utf-8') as f:
fieldnames = ['name', 'age', 'city']
writer = csv.DictWriter(f, fieldnames=fieldnames)
writer.writeheader()
writer.writerow({'name': 'Alice', 'age': '28', 'city': 'Beijing'})

# ---- 处理特殊格式 ----
# 制表符分隔(TSV)
reader = csv.reader(f, delimiter='\t')

# 引号处理
reader = csv.reader(f, quotechar='"', quoting=csv.QUOTE_MINIMAL)

7.3 sqlite3

SQLite 是零配置、自包含的嵌入式关系数据库,Python 标准库自带 sqlite3 模块。它适合单用户场景、原型开发、桌面应用和移动端:

import sqlite3

# 连接(如果文件不存在会自动创建)
conn = sqlite3.connect('example.db')
cursor = conn.cursor()

# 创建表
cursor.execute('''
CREATE TABLE IF NOT EXISTS users (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL,
email TEXT UNIQUE,
created_at TEXT DEFAULT (datetime('now'))
)
''')

# 插入(推荐使用参数化查询,防止 SQL 注入)
cursor.execute('INSERT INTO users (name, email) VALUES (?, ?)', ('Alice', 'alice@example.com'))
cursor.execute('INSERT INTO users (name, email) VALUES (?, ?)', ('Bob', 'bob@example.com'))

# 批量插入
users = [('Charlie', 'charlie@example.com'), ('Diana', 'diana@example.com')]
cursor.executemany('INSERT INTO users (name, email) VALUES (?, ?)', users)

conn.commit()

# 查询
cursor.execute('SELECT id, name, email FROM users WHERE name LIKE ?', ('A%',))
for row in cursor.fetchall():
print(row) # (1, 'Alice', 'alice@example.com')

# 获取单行
cursor.execute('SELECT * FROM users WHERE id = ?', (1,))
user = cursor.fetchone()
print(user)

# 使用行工厂:获得字典式访问
conn.row_factory = sqlite3.Row
cursor = conn.cursor()
cursor.execute('SELECT * FROM users')
for row in cursor:
print(dict(row)) # {'id': 1, 'name': 'Alice', ...}

# 事务管理
conn.execute('BEGIN')
try:
cursor.execute('UPDATE users SET email = ? WHERE id = ?', ('new@example.com', 1))
conn.commit()
except Exception:
conn.rollback()
raise

# 使用上下文管理器自动提交/回滚
with conn:
conn.execute('DELETE FROM users WHERE id = ?', (3,))

# 最后关闭
cursor.close()
conn.close()

八、logging —— 日志系统

Python 的 logging 模块提供了完整的日志框架,包含 Logger(记录器)、Handler(处理器)、Formatter(格式化器)和 Filter(过滤器)四大组件。

8.1 基本用法

import logging

# 快速配置
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s [%(levelname)s] %(name)s: %(message)s',
datefmt='%Y-%m-%d %H:%M:%S',
handlers=[
logging.FileHandler('app.log'),
logging.StreamHandler(), # 同时输出到控制台
]
)

logger = logging.getLogger(__name__)
logger.debug('调试信息') # 不输出(因为级别是 INFO)
logger.info('程序启动') # 输出
logger.warning('警告信息') # 输出
logger.error('错误信息') # 输出
logger.critical('严重错误') # 输出

8.2 日志级别

日志级别从低到高:

级别 数值 用途
DEBUG 10 详细诊断信息
INFO 20 程序正常运行信息
WARNING 30 潜在问题提醒
ERROR 40 因严重问题未能完成某功能
CRITICAL 50 整个程序无法继续运行

8.3 Logger / Handler / Formatter 层次结构

import logging

# 创建 logger(使用模块名,利用命名空间层次)
logger = logging.getLogger('myapp.database')
logger.setLevel(logging.DEBUG)

# 创建 handler(决定日志去向)
file_handler = logging.FileHandler('database.log')
file_handler.setLevel(logging.WARNING) # 文件只记录 WARNING 及以上

console_handler = logging.StreamHandler()
console_handler.setLevel(logging.DEBUG) # 控制台记录 DEBUG 及以上

# 创建 formatter(决定日志格式)
detailed_fmt = logging.Formatter(
'%(asctime)s [%(levelname)-8s] %(name)s:%(lineno)d - %(message)s',
datefmt='%Y-%m-%d %H:%M:%S'
)
simple_fmt = logging.Formatter('[%(levelname)-8s] %(message)s')

file_handler.setFormatter(detailed_fmt)
console_handler.setFormatter(simple_fmt)

# 添加 handler 到 logger
logger.addHandler(file_handler)
logger.addHandler(console_handler)

# 避免重复日志(logger 会传播到 root logger)
logger.propagate = False

# 使用
logger.debug('连接池大小: 10')
logger.error('数据库连接失败: timeout')

8.4 使用字典配置(dictConfig)

对于生产项目,推荐使用字典式配置,可以将配置存储为 JSON 或 YAML 文件:

import logging.config

LOGGING_CONFIG = {
'version': 1,
'disable_existing_loggers': False,
'formatters': {
'verbose': {
'format': '%(asctime)s [%(levelname)s] %(name)s:%(lineno)d: %(message)s',
'datefmt': '%Y-%m-%d %H:%M:%S',
},
'simple': {
'format': '[%(levelname)s] %(message)s',
},
},
'handlers': {
'console': {
'class': 'logging.StreamHandler',
'level': 'DEBUG',
'formatter': 'simple',
},
'file': {
'class': 'logging.handlers.RotatingFileHandler',
'level': 'INFO',
'formatter': 'verbose',
'filename': 'app.log',
'maxBytes': 10 * 1024 * 1024, # 10 MB
'backupCount': 5,
},
},
'loggers': {
'myapp': {
'handlers': ['console', 'file'],
'level': 'DEBUG',
'propagate': False,
},
'myapp.database': {
'level': 'WARNING',
},
},
'root': {
'handlers': ['console'],
'level': 'WARNING',
},
}

logging.config.dictConfig(LOGGING_CONFIG)
logger = logging.getLogger('myapp')
logger.info('应用启动') # 会同时输出到控制台和文件

九、argparse —— 命令行参数解析

argparse 让 Python 脚本支持专业的命令行接口,包括位置参数、可选参数、子命令、类型验证和帮助信息。

9.1 基础用法

import argparse

parser = argparse.ArgumentParser(
description='处理图像文件并生成缩略图',
epilog='示例: python thumbnail.py -i input.jpg -s 200 --quality 85'
)

# 位置参数(必填)
parser.add_argument('input_file', help='输入图像文件路径')

# 可选参数
parser.add_argument('-o', '--output',
default='output.jpg',
help='输出文件路径(默认: output.jpg)')
parser.add_argument('-s', '--size',
type=int, default=128,
help='缩略图尺寸(像素)')
parser.add_argument('-q', '--quality',
type=int, default=85,
choices=range(1, 101),
metavar='1-100',
help='JPEG 质量 (1-100)')
parser.add_argument('--grayscale',
action='store_true',
help='转换为灰度图')
parser.add_argument('-v', '--verbose',
action='count', default=0,
help='详细输出(-v 详细, -vv 更详细)')

args = parser.parse_args()

# 使用解析后的参数
print(f"输入: {args.input_file}")
print(f"输出: {args.output}, 尺寸: {args.size}px, 质量: {args.quality}")
print(f"灰度模式: {args.grayscale}")
print(f"详细程度: {args.verbose}")

9.2 子命令

类似 git commitgit push 的嵌套命令结构:

import argparse

parser = argparse.ArgumentParser(description='数据管理工具')
subparsers = parser.add_subparsers(dest='command', required=True)

# 子命令:import
import_parser = subparsers.add_parser('import', help='导入数据')
import_parser.add_argument('source', help='数据源路径')
import_parser.add_argument('--format', choices=['csv', 'json', 'xml'], default='csv')

# 子命令:export
export_parser = subparsers.add_parser('export', help='导出数据')
export_parser.add_argument('target', help='导出目标路径')
export_parser.add_argument('--compress', action='store_true', help='压缩输出')

# 子命令:stats
stats_parser = subparsers.add_parser('stats', help='显示统计信息')
stats_parser.add_argument('--columns', nargs='+', help='只统计指定列')

args = parser.parse_args()

if args.command == 'import':
print(f"导入 {args.source},格式 {args.format}")
elif args.command == 'export':
print(f"导出到 {args.target},压缩={'是' if args.compress else '否'}")
elif args.command == 'stats':
cols = args.columns or ['all']
print(f"统计列: {cols}")

十、dataclasses —— 减少样板代码

dataclass 装饰器(Python 3.7+)自动生成 __init____repr____eq__ 等方法,省去大量重复代码。它与 namedtuple 的主要区别:可变,可设置默认值,支持继承和方法定义。

10.1 基本使用

from dataclasses import dataclass
from typing import List

@dataclass
class User:
name: str
age: int
email: str = ''
tags: List[str] = None # 可变默认值需要特别处理

def __post_init__(self):
"""在 __init__ 之后调用,用于复杂验证和默认值处理"""
if self.tags is None:
self.tags = []

# 自动生成的 __init__ / __repr__ / __eq__
u1 = User('Alice', 28, 'alice@example.com')
u2 = User('Alice', 28, 'alice@example.com')
print(u1) # User(name='Alice', age=28, email='alice@example.com', tags=[])
print(u1 == u2) # True(自动按字段比较)

# 不可变数据类
@dataclass(frozen=True)
class Point:
x: float
y: float

p = Point(1.0, 2.0)
# p.x = 3.0 # FrozenInstanceError!

10.2 field 函数的配置

from dataclasses import dataclass, field

@dataclass
class Task:
title: str
priority: int = field(default=1, metadata={'min': 0, 'max': 10})
tags: list = field(default_factory=list) # 可变默认值的安全写法
created_at: str = field(init=False) # 不在 __init__ 中

def __post_init__(self):
import datetime
self.created_at = datetime.datetime.now().isoformat()

十一、typing —— 类型提示

11.1 基本类型注解

from typing import List, Dict, Set, Tuple, Optional, Union, Any, Callable

# 基本类型
def greet(name: str) -> str:
return f"Hello, {name}"

# 容器类型
def process(items: List[int]) -> Dict[str, int]:
return {"count": len(items), "sum": sum(items)}

# Optional 即 Union[X, None]
def find_user(user_id: int) -> Optional[Dict[str, str]]:
if user_id == 1:
return {"name": "Alice"}
return None

# Union —— 几种可能类型之一
def parse(value: str) -> Union[int, float, str]:
try:
return int(value)
except ValueError:
try:
return float(value)
except ValueError:
return value

# Callable
def apply(func: Callable[[int, int], int], a: int, b: int) -> int:
return func(a, b)

print(apply(lambda x, y: x + y, 3, 4)) # 7

11.2 泛型 —— TypeVar, Generic, Protocol

from typing import TypeVar, Generic, Protocol

# TypeVar:类型变量,用于泛型函数和类
T = TypeVar('T')

def first(items: List[T]) -> T:
return items[0]

print(first([1, 2, 3])) # 类型推断为 int
print(first(['a', 'b'])) # 类型推断为 str

# 约束 TypeVar
Number = TypeVar('Number', int, float)

def add(a: Number, b: Number) -> Number:
return a + b # 仅接受 int 或 float

# Generic 类
K = TypeVar('K')
V = TypeVar('V')

class Pair(Generic[K, V]):
def __init__(self, key: K, value: V):
self.key = key
self.value = value

pair: Pair[str, int] = Pair("age", 28)

11.3 Protocol —— 结构化子类型

Protocol 用于定义接口,而无需显式继承(类似 Go 的 interface):

from typing import Protocol

class SupportsClose(Protocol):
def close(self) -> None: ...

class FileReader:
def close(self) -> None:
print("Closing file")

class SocketConnection:
def close(self) -> None:
print("Closing socket")

def cleanup(resource: SupportsClose) -> None:
resource.close()

# FileReader 和 SocketConnection 都没有显式实现 SupportsClose
# 但只要它们有 close 方法,就可传入 cleanup
cleanup(FileReader())
cleanup(SocketConnection())

十二、常用工具模块速查

12.1 os / os.path

import os

os.getcwd() # 当前工作目录
os.chdir('/path') # 切换目录
os.listdir('.') # 列出目录内容
os.makedirs('a/b/c', exist_ok=True) # 递归创建目录
os.rename('old', 'new') # 重命名
os.remove('file.txt') # 删除文件
os.rmdir('empty_dir') # 删除空目录
os.environ # 环境变量字典
os.environ.get('HOME') # 获取特定环境变量
os.path.join('a', 'b', 'c') # 路径拼接
os.path.exists('path') # 是否存在
os.path.isfile('p') # 是否文件
os.path.isdir('p') # 是否目录
os.path.abspath('p') # 绝对路径
os.path.basename('/a/b/c.txt') # 'c.txt'
os.path.dirname('/a/b/c.txt') # '/a/b'
os.path.split('/a/b/c.txt') # ('/a/b', 'c.txt')
os.path.splitext('f.txt') # ('f', '.txt')

12.2 shutil —— 高级文件操作

import shutil

shutil.copy('src', 'dst') # 复制文件(保留权限)
shutil.copy2('src', 'dst') # 复制文件(保留元数据)
shutil.copytree('src_dir', 'dst_dir') # 递归复制目录
shutil.rmtree('dir') # 递归删除目录
shutil.move('src', 'dst') # 移动文件/目录
shutil.disk_usage('/') # 磁盘使用情况
shutil.make_archive('backup', 'zip', 'src_dir') # 创建压缩包

12.3 tempfile —— 临时文件

import tempfile

# 创建临时文件(自动删除)
with tempfile.NamedTemporaryFile(mode='w', suffix='.txt', delete=True) as f:
f.write('Some data')
print(f.name) # 临时文件路径

# 创建临时目录
with tempfile.TemporaryDirectory() as tmpdir:
print(tmpdir) # /tmp/tmpxxx
# ... 使用 tmpdir ...
# 退出上下文后目录自动删除

12.4 hashlib —— 哈希摘要

import hashlib

# MD5
md5 = hashlib.md5(b'hello').hexdigest()

# SHA-256
sha = hashlib.sha256(b'hello').hexdigest()

# 流式更新(大文件)
hasher = hashlib.sha256()
with open('large_file.bin', 'rb') as f:
while chunk := f.read(8192):
hasher.update(chunk)
digest = hasher.hexdigest()

# 可用算法列表
print(hashlib.algorithms_available)

12.5 base64

import base64

# 编码
encoded = base64.b64encode(b'hello world') # b'aGVsbG8gd29ybGQ='
print(encoded.decode())

# 解码
decoded = base64.b64decode(encoded)
print(decoded) # b'hello world'

# URL 安全编码(替换 +/ 为 -_)
encoded_url = base64.urlsafe_b64encode(b'\xfb\xff')
print(encoded_url) # b'-_8='

12.6 enum —— 枚举类型

from enum import Enum, IntEnum, auto

class Color(Enum):
RED = 1
GREEN = 2
BLUE = 3

print(Color.RED) # Color.RED
print(Color.RED.name) # 'RED'
print(Color.RED.value) # 1

# 从值构造
print(Color(2)) # Color.GREEN

# auto —— 自动编号
class Status(IntEnum):
PENDING = auto() # 1
RUNNING = auto() # 2
SUCCESS = auto() # 3

# IntEnum 可以当整数使用
if Status.SUCCESS == 3:
print("成功")

十三、总结

Python 标准库覆盖了绝大多数日常开发需求,熟练掌握这些模块可以:

  1. 减少第三方依赖:大多数场景下 collections + itertools + functools 的组合已足够强大。
  2. 写出地道的 Pythonpathlib 替代字符串路径拼接,dataclass 替代手动 __init__lru_cache 替代手写缓存。
  3. 正确处理并发:理解 ThreadPoolExecutor vs ProcessPoolExecutor 的选择依据(I/O 密集 vs CPU 密集)。
  4. 安全的子进程调用:使用 subprocess.run 的列表参数形式,避免 shell=True 注入风险。
  5. 结构化日志:利用 Logger/Handler/Formatter 层次结构,配合 dictConfig 实现灵活的生产环境日志管理。
  6. 类型驱动开发:通过 typing 模块配合 mypy / pyright 实现静态类型检查,减少运行时错误。

标准库是 Python 生态的基石,也是理解 Python 哲学(简单优于复杂、显式优于隐式)的最佳入口。

参考资料

文章作者: Leo·Cheung
文章链接: http://tufusi.com/2021/09/15/%E3%80%90Python%E7%B3%BB%E5%88%97%E3%80%91%E5%B8%B8%E7%94%A8%E6%A0%87%E5%87%86%E5%BA%93/
版权声明: 本博客所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议。转载请注明来自 ONE·PIECE
打赏
  • 微信
  • 支付宝

评论