装饰器(@decorator)
基本概念
装饰器是一个函数,它接受另一个函数作为输入,并返回一个新的函数。在不修改原函数代码的情况下,给函数添加额外功能。
# 一个简单的装饰器
def my_decorator(func):
def wrapper():
print("函数执行前")
func()
print("函数执行后")
return wrapper
# 使用 @ 符号应用装饰器
@my_decorator
def say_hello():
print("Hello!")
say_hello()
# 输出:
# 函数执行前
# Hello!
# 函数执行后
@ 符号的本质
# 下面两种写法等价:
# 写法 1:使用 @ 符号(推荐)
@my_decorator
def say_hello():
print("Hello!")
# 写法 2:不使用 @ 符号
def say_hello():
print("Hello!")
say_hello = my_decorator(say_hello) # 等价于 @my_decorator
理解:@decorator 等价于 函数 = decorator(函数)
带参数的装饰器
def repeat(times):
"""返回一个装饰器,让函数执行指定次数"""
def decorator(func):
def wrapper(*args, **kwargs):
for _ in range(times):
func(*args, **kwargs)
return wrapper
return decorator
@repeat(3)
def greet(name):
print(f"Hello, {name}!")
greet("Alice")
# 输出:
# Hello, Alice!
# Hello, Alice!
# Hello, Alice!
functools.wraps - 保留函数信息
from functools import wraps
def my_decorator(func):
@wraps(func) # 保留原函数的 __name__ 和 __doc__
def wrapper(*args, **kwargs):
return func(*args, **kwargs)
return wrapper
@my_decorator
def say_hello():
"""这是 say_hello 的文档"""
print("Hello!")
print(say_hello.__name__) # say_hello(而不是 wrapper)
解包语法(Unpacking)
列表/元组解包
# 基础解包
coordinates = [1, 2, 3]
x, y, z = coordinates
print(x, y, z) # 1 2 3
# 等价于:
# x = coordinates[0]
# y = coordinates[1]
# z = coordinates[2]
* 操作符 - 收集剩余元素
# 收集剩余元素到列表
numbers = [1, 2, 3, 4, 5]
first, *middle, last = numbers
print(first) # 1
print(middle) # [2, 3, 4]
print(last) # 5
# 其他用法
a, *rest = [1, 2, 3, 4, 5]
print(a) # 1
print(rest) # [2, 3, 4, 5]
*a, last = [1, 2, 3, 4, 5]
print(a) # [1, 2, 3, 4]
print(last) # 5
字典解包
# ** 用于解包字典
person = {"name": "Alice", "age": 25}
def greet(name, age):
print(f"{name} is {age} years old")
greet(**person) # 等价于 greet(name="Alice", age=25)
# 合并字典
defaults = {"host": "localhost", "port": 8080}
config = {"port": 9000, "debug": True}
merged = {**defaults, **config}
print(merged) # {'host': 'localhost', 'port': 9000, 'debug': True}
实际用法:函数返回字典直接解包传参
** 不仅可以在调用处解包已有字典,还可以直接解包函数的返回值:
def build_payload(record):
return {
"thread_id": record.thread_id,
"status": record.status,
"metadata": record.metadata or {},
}
# 解包返回值作为关键字参数
store.put(record.run_id, **build_payload(record))
# 等同于 store.put(record.run_id, thread_id=..., status=..., metadata=...)
好处是新增字段只需改 build_payload() 一处。
函数参数中的解包
# *args:收集所有位置参数到元组
def print_all(*args):
for arg in args:
print(arg)
print_all(1, 2, 3, 4) # 打印 1 2 3 4
# **kwargs:收集所有关键字参数到字典
def print_kwargs(**kwargs):
for key, value in kwargs.items():
print(f"{key} = {value}")
print_kwargs(name="Alice", age=25)
# 输出:
# name = Alice
# age = 25
with 语法
基本概念
with 语句用于简化资源管理,确保资源在使用后被正确释放。最常见的使用场景是文件操作:
# 传统方式
f = open('file.txt', 'r')
try:
content = f.read()
finally:
f.close()
# with 语句方式
with open('file.txt', 'r') as f:
content = f.read()
# 离开 with 块后,文件自动关闭
工作原理
with 语句的核心是上下文管理器(Context Manager)。任何实现了 __enter__() 和 __exit__() 方法的对象都可以用作上下文管理器:
class MyContextManager:
def __enter__(self):
print("进入上下文")
return self
def __exit__(self, exc_type, exc_val, exc_tb):
print("退出上下文")
# 返回 True 表示吞掉异常,False/None 表示传播异常
return False
with MyContextManager() as cm:
print("在上下文内部")
with 语句的执行流程:
- 调用
__enter__()方法 - 将返回值赋给
as后面的变量 - 执行
with块中的代码 - 离开
with块时,调用__exit__()方法 - 如果发生异常,
__exit__()接收异常信息(exc_type,exc_val,exc_tb)
contextlib 工具
Python 提供了 contextlib 模块来简化上下文管理器的创建:
使用 contextmanager 装饰器:
from contextlib import contextmanager
@contextmanager
def file_manager(filename, mode):
f = open(filename, mode)
try:
yield f
finally:
f.close()
with file_manager('file.txt', 'r') as f:
content = f.read()
yield 的工作原理:
yield之前的代码 =__enter__方法(准备资源)yield的值 = 传递给as变量yield之后的代码 =__exit__方法(清理资源)
使用 ExitStack 管理多个上下文:
from contextlib import ExitStack
with ExitStack() as stack:
files = [stack.enter_context(open(fname)) for fname in filenames]
# 所有文件会在 ExitStack 退出时自动关闭
suppress() - 忽略特定异常:
from contextlib import suppress
# 等价于 try-except-pass
with suppress(FileNotFoundError):
os.remove('nonexistent.txt')
常见使用场景
- 文件操作(自动关闭)
- 数据库连接管理
- 线程锁/进程锁管理
- 临时修改配置/环境变量
- 性能计时/性能分析
yield 语法
生成器基础
yield 是 Python 的生成器(Generator)的核心。生成器是一种特殊的迭代器,可以懒生成数据,节省内存:
# 列表 - 一次性创建所有元素
def create_list(n):
result = []
for i in range(n):
result.append(i * i)
return result
# 生成器 - 按需生成元素
def create_generator(n):
for i in range(n):
yield i * i
# 使用
gen = create_generator(5)
for item in gen:
print(item) # 0, 1, 4, 9, 16
yield vs return
| 特性 | return | yield |
|---|---|---|
| 函数类型 | 普通函数 | 生成器函数 |
| 返回值 | 单个值 | 可产生多个值 |
| 状态保持 | 函数结束后状态丢失 | 暂停并保存状态 |
| 内存使用 | 一次性加载所有数据 | 按需生成,节省内存 |
生成器的执行流程
def simple_generator():
print("开始执行")
yield 1
print("继续执行")
yield 2
print("最后执行")
yield 3
gen = simple_generator()
print("创建生成器,尚未执行")
print(next(gen)) # 开始执行 -> 1
print(next(gen)) # 继续执行 -> 2
print(next(gen)) # 最后执行 -> 3
print(next(gen)) # StopIteration 异常
send() 方法 - 双向通信
生成器可以通过 send() 方法接收外部值:
def accumulator():
total = 0
while True:
value = yield total
if value is not None:
total += value
gen = accumulator()
print(next(gen)) # 启动生成器,输出 0
print(gen.send(10)) # 发送 10,输出 10
print(gen.send(5)) # 发送 5,输出 15
yield from - 委托给子生成器
yield from 用于简化嵌套生成器的迭代:
def chain(*iterables):
for it in iterables:
for i in it:
yield i
# 使用 yield from 简化
def chain_v2(*iterables):
for it in iterables:
yield from it
# 使用
list(chain([1, 2], [3, 4])) # [1, 2, 3, 4]
实际应用场景
- 处理大型文件(逐行读取)
- 流式数据处理
- 无限序列生成
- 协程(Coroutine)实现
- 管道式数据处理
异步编程(async/await)
核心概念
Python 的异步编程基于 asyncio 模块,使用 async/await 语法:
import asyncio
async def fetch_data(url):
print(f"开始获取 {url}")
await asyncio.sleep(1) # 模拟 IO 操作
print(f"获取完成 {url}")
return f"data from {url}"
async def main():
# 顺序执行
result1 = await fetch_data("url1")
result2 = await fetch_data("url2")
# 并发执行
results = await asyncio.gather(
fetch_data("url1"),
fetch_data("url2"),
fetch_data("url3")
)
asyncio.run(main())
关键术语
| 术语 | 说明 |
|---|---|
| Coroutine(协程) | 使用 async def 定义的函数,调用后返回协程对象 |
| Event Loop(事件循环) | 异步编程的核心,负责调度和执行协程 |
| Task(任务) | 包装协程对象,用于并发执行 |
| await | 暂停协程执行,等待可等待对象完成 |
| asyncio.gather() | 并发运行多个协程,等待全部完成 |
| asyncio.create_task() | 创建任务,后台运行 |
任务调度模式
import asyncio
async def worker(name, delay):
for i in range(3):
await asyncio.sleep(delay)
print(f"{name} 工作 {i+1}")
async def main():
# 方式 1: 使用 gather
await asyncio.gather(
worker("A", 1),
worker("B", 2),
worker("C", 0.5)
)
# 方式 2: 创建任务
task1 = asyncio.create_task(worker("X", 1))
task2 = asyncio.create_task(worker("Y", 2))
await task1
await task2
asyncio.run(main())
超时控制
import asyncio
async def long_running_task():
await asyncio.sleep(10)
return "完成"
async def main():
try:
result = await asyncio.wait_for(
long_running_task(),
timeout=5.0
)
except asyncio.TimeoutError:
print("任务超时")
asyncio.run(main())
异步上下文管理器
结合 with 和 async/await:
class AsyncConnection:
async def __aenter__(self):
print("异步获取连接")
await asyncio.sleep(1)
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
print("异步释放连接")
await asyncio.sleep(1)
async def main():
async with AsyncConnection() as conn:
print("使用连接")
asyncio.run(main())
异步迭代器
结合 yield 和 async/await:
class AsyncRange:
def __init__(self, limit):
self.limit = limit
def __aiter__(self):
self.current = 0
return self
async def __anext__(self):
if self.current >= self.limit:
raise StopAsyncIteration
await asyncio.sleep(0.1)
self.current += 1
return self.current - 1
async def main():
async for i in AsyncRange(5):
print(i)
asyncio.run(main())
实际应用场景
- 并发网络请求(API 调用、爬虫)
- WebSocket 通信
- 数据库异步驱动(如
asyncpg,aiomysql) - 高并发服务器(如
aiohttp,FastAPI) - 文件 IO(
aiofiles)
最佳实践
- 避免阻塞事件循环:不要在 async 函数中使用
time.sleep(),用asyncio.sleep() - CPU 密集型任务:使用
asyncio.to_thread()或进程池 - 错误处理:在
gather中使用return_exceptions=True - 优雅关闭:使用
asyncio.create_task()和适当的取消逻辑 - 超时控制:始终为网络请求设置超时
@dataclass 数据类
自动生成 __init__、__repr__、__eq__ 等样板方法:
from dataclasses import dataclass, field
@dataclass
class RunRecord:
run_id: str
status: str
metadata: dict = field(default_factory=dict)
task: object | None = field(default=None, repr=False)
frozen=True:不可变数据类
@dataclass(frozen=True)
class Config:
host: str
port: int = 8080
- 创建后不能修改字段值(赋值会抛
FrozenInstanceError) - 自动变为可哈希(可以当 dict 的 key 或放入 set)
- 适合做”参数捆绑包”——防止在传递过程中被意外篡改
field(default_factory=…)
可变类型(list、dict)不能用 = {} 做默认值(所有实例会共享同一个对象),必须用 field(default_factory=dict) 让每个实例创建独立的字典。
异步上下文管理器进阶
AsyncExitStack:动态管理多个异步上下文
当需要同时管理多个异步资源时,嵌套 async with 会变得很深:
# 嵌套写法(难维护)
async with make_stream_bridge(config) as bridge:
async with make_checkpointer(config) as cp:
async with make_store(config) as store:
...
用 AsyncExitStack 可以扁平化:
from contextlib import AsyncExitStack
async with AsyncExitStack() as stack:
bridge = await stack.enter_async_context(make_stream_bridge(config))
cp = await stack.enter_async_context(make_checkpointer(config))
store = await stack.enter_async_context(make_store(config))
# 退出时按后进先出顺序关闭所有资源
@abc.abstractmethod 抽象基类
定义接口规范,子类必须实现所有抽象方法:
import abc
class DataStore(abc.ABC):
@abc.abstractmethod
async def get(self, key: str):
"""Retrieve a value."""
@abc.abstractmethod
async def put(self, key: str, value: str):
"""Store a value."""
class RedisStore(DataStore):
async def get(self, key: str): ...
async def put(self, key: str, value: str): ...
如果子类没有实现某个抽象方法,实例化时会抛 TypeError。
TypeVar 和泛型
from typing import TypeVar, Callable
T = TypeVar("T")
def factory(attr: str) -> Callable[[], T]:
def getter() -> T:
return getattr(some_object, attr, None)
return getter
# T 会根据使用场景被推断为具体类型
get_db: Callable[[], Database] = factory("db")
get_cache: Callable[[], Cache] = factory("cache")
一个工厂函数生成多种类型的依赖注入函数——避免重复代码。
ContextVar 上下文变量
类似线程局部存储,但是面向 asyncio 的——每个协程有独立的值:
from contextvars import ContextVar
_current_user: ContextVar[str | None] = ContextVar("current_user", default=None)
# 在请求中间件中设置
token = _current_user.set("user-123")
# 在任意异步代码中读取
user = _current_user.get()
# 请求结束时重置
_current_user.reset(token)
这在异步 Web 框架中非常有用——无需通过函数参数层层传递,就能在任意深度的协程调用链中访问当前请求的上下文。
lru_cache 缓存
from functools import lru_cache
@lru_cache(maxsize=128)
def compute_expensive(x: int) -> int:
return x * x
- 对相同输入自动返回缓存结果
maxsize=128最多缓存 128 个不同输入的结果- 要求参数是可哈希的(否则会 TypeError)
总结
| 特性 | 核心用途 | 关键方法/语法 |
|---|---|---|
| 装饰器 | 不修改函数添加功能 | @decorator, @wraps, *args/**kwargs |
| 解包 | 拆解可迭代对象 | a, b, c = [...], *rest, **dict |
| with | 资源管理 | __enter__, __exit__, contextmanager |
| yield | 懒生成/迭代器 | yield, send(), yield from |
| async/await | 并发 IO | async def, await, asyncio.gather, create_task |
| @dataclass | 自动生成样板方法 | @dataclass, field(default_factory=...) |
| @abc.abstractmethod | 定义接口规范 | @abc.abstractmethod, abc.ABC |
| TypeVar | 泛型/类型安全 | TypeVar, Callable[[], T] |
| ContextVar | 异步上下文变量 | ContextVar, .set(), .get(), .reset() |
| lru_cache | 函数结果缓存 | @lru_cache(maxsize=128) |
这些特性涵盖了 Python 中”控制流”和”代码组织”的重要工具:
- 装饰器和解包是语法糖,让代码更简洁
- with确保资源正确清理
- yield实现懒生成和协程
- async/await实现异步并发
- @dataclass和**@abc.abstractmethod**帮你更好的组织代码结构
- TypeVar、ContextVar、lru_cache解决特定场景的问题
理解它们的工作原理可以帮助你编写更高效、更优雅的 Python 代码。