Python高阶编程——闭包、装饰器、设计模式与并发编程

一、闭包(Closure):安全的状态管理机制

1.1 闭包的核心概念

全局变量的致命缺陷


def deposit(amount):
global balance
balance += amount

def withdraw(amount):
global balance
balance -= amount

当模块被导入时,balance直接被外部修改,导致数据篡改风险。实际案例中曾因此导致金融系统金额计算错误。

闭包的精确定义
在函数嵌套结构下,满足三个条件时形成闭包:

  1. 内部函数引用外部函数变量
  2. 外部函数返回内部函数
  3. 被引用的变量在闭包生命周期持续存在

关键字:

允许内部函数修改外部函数变量

语法:在内部函数中使用nonlocal 变量名 声明

1.2 闭包实现ATM案例

def account_create(initial_amount=0):
transaction_log = [] # 闭包扩展:交易记录

def atm(num, deposit=True):
nonlocal initial_amount
if deposit:
initial_amount += num
transaction_log.append(f"+{num}")
else:
if initial_amount >= num:
initial_amount -= num
transaction_log.append(f"-{num}")
else:
raise ValueError("余额不足")
print(f"操作成功,余额: {initial_amount}")
return initial_amount

def get_log(): # 闭包内嵌辅助函数
return transaction_log

return atm, get_log # 返回多个闭包函数

# 使用示例
atm_operation, get_history = account_create(1000)
atm_operation(500)
atm_operation(200, False)
print("交易记录:", get_history())

1.3 闭包的技术优势与注意事项

三层安全机制

  1. 变量保护层:initial_amount作为外层函数局部变量
  2. 访问控制层:仅通过闭包函数操作状态
  3. 操作审计层:内置交易日志追踪

缺点:

  1. 内部函数持续引用外部变量会导致内存占用
  2. 不当使用可能引起内存泄漏问题

内存管理警示



func = account_create()
print(sys.getsizeof(func)) # 典型闭包内存占用:200-500字节

# 避免循环引用导致的泄漏
class User:
def __init__(self):
self.account = account_create()

user = User()
del user # 必须显式解除引用

适用场景建议

  • ✅ 状态管理(如游戏角色属性)
  • ✅ 回调函数(如事件处理器)
  • ✅ 配置冻结(如机器学习超参数)
  • ❌ 大数据缓存(改用类实例)

二、装饰器(Decorator):无侵入的功能扩展

2.1 装饰器本质

设计哲学:开放封闭原则(OCP)

  • 对扩展开放:允许添加新功能
  • 对修改封闭:不改变原函数代码

原理:

通过闭包函数包裹目标函数,在调用目标函数前后执行额外操作

装饰器底层实现(一般写法)

虽然实现了功能,但调用方式不够直观,需要显式转换函数引用。

def decorator(func):
# 此处可执行初始化操作
cache = {}

def wrapper(*args, **kwargs):
# 前置处理
start = time.time()

# 执行原函数
result = func(*args, **kwargs)

# 后置处理
end = time.time()
print(f"执行耗时: {end - start:.4f}秒")
return result

return wrapper

装饰器糖写法:

保持目标函数调用方式不变(仍可直接调用sleep())

代码更简洁直观

符合”装饰”的语义理解

底层原理: 语法糖本质仍是闭包,@outer等价于sleep = outer(sleep)

def outer(func):
def inner():
print("我要睡觉了")
func()
print("起床了")
return inner

@outer
def sleep():
import random
import time
print("睡眠中~~~")
time.sleep(random.randint(1,5))

sleep()
def outer(func):
def inner():
print("我要睡觉了")
func()
print("起床了")
return inner

@outer
def sleep():

2.2 装饰器实战应用

基础计时装饰器

import time
import functools

def timer(func):
@functools.wraps(func) # 保留原函数元信息
def wrapper(*args, **kwargs):
start = time.perf_counter()
result = func(*args, **kwargs)
duration = time.perf_counter() - start
print(f"{func.__name__}耗时: {duration:.6f}s")
return result
return wrapper

@timer
def complex_calculation(n):
return sum(i*i for i in range(n))

权限验证装饰器:python

import time
import functools

def auth_required(level):
def decorator(func):
def wrapper(user, *args, **kwargs):
if user.access_level < level:
raise PermissionError("权限不足")
return func(user, *args, **kwargs)
return wrapper
return decorator

@auth_required(level=3)
def delete_file(user, filename):
# 文件删除逻辑

2.3 装饰器高级技巧

多层装饰器执行顺序

@decorator1
@decorator2
def target_func():
pass

# 等效于:target_func = decorator1(decorator2(target_func))

类装饰器实现:python

class CacheDecorator:
def __init__(self, func):
self.func = func
self.cache = {}

def __call__(self, *args):
if args in self.cache:
return self.cache[args]
result = self.func(*args)
self.cache[args] = result
return result

@CacheDecorator
def fibonacci(n):
if n < 2:
return n
return fibonacci(n-1) + fibonacci(n-2)

三、设计模式

3.1 单例模式(Singleton)

模块级单例模式:

正式定义:保证一个类只有一个实例,并提供全局访问点

适用条件:当类只能有一个实例,且需要从统一访问点获取时

典型场景:字符串工具类等只需单实例的工具类

单例模式的实现 :

实现步骤:

在单独文件中定义类并创建其实例

在其他文件中导入该实例对象

关键特点:无论用多少次,获取的都是同一个对象实例

实现方案对比

方案优点缺点
模块级单例简单直接初始化时机不可控
__new__方法精确控制实例化需处理线程安全
元类(Metaclass)高度灵活实现复杂度高

最佳实践:python

import threading

class DatabaseConnection:
_instance = None
_lock = threading.Lock()

def __new__(cls):
if cls._instance is None:
with cls._lock:
if cls._instance is None:
cls._instance = super().__new__(cls)
cls._instance._init_connection()
return cls._instance

def _init_connection(self):
# 初始化数据库连接
self.connection = create_db_connection()

性能影响实测(百万次调用):

  • 普通实例化:1.2s
  • 单例获取:0.3s
  • 内存节省:约70%(工具类场景)

3.2 工厂模式(Factory)

核心概念:将对象的创建由使用原生类本身创建转换到由特定的工厂方法来创建

适用场景:当需要大量创建一个类的实例时使用

动态注册工厂

统一入口:大批量创建对象时有统一的方法入口,易于代码维护

修改便捷:当创建逻辑需要修改时,只需修改工厂类的创建方法即可

现实映射:符合现实世界的生产模式,由工厂制作产品(对象)

维护优势:相比直接创建对象的方式,当构建方式改变时只需修改工厂类一处

class Person:pass
class Worker(Person):pass
class Student(Person):pass
class Teacher(Person):pass
#### 1、传统创建方式
worker = Worker()
stu = Student()
teacher = Teacher()
#### 2、工厂模式创建方式
class Factory:
def get_person(self, p_type):
if p_type == 'w':
return Worker()
elif p_type == 's':
return Student()
else:
return Teacher()
factory = Factory()
worker = factory.get_person('w')
stu = factory.get_person('s')
teacher = factory.get_person('t')

四、多线程编程

4.1 进程与线程核心区别

内存模型对比

特性进程线程
内存空间独立隔离共享父进程内存
创建开销高(MB级)低(KB级)
通信成本IPC机制(管道/队列)直接读写共享变量
容错性一个崩溃不影响其他线程崩溃导致进程终止

4.2 Python线程实战

threading模块

基本概念:Python通过threading模块实现多线程编程,允许程序同时执行多个任务。

模块导入:使用import threading导入模块,通过threading.Thread类创建线程对象。

生产者-消费者模型

import threading
import queue
import random

BUFFER_SIZE = 5
buffer = queue.Queue(BUFFER_SIZE)

def producer():
while True:
item = random.randint(1, 100)
buffer.put(item)
print(f"生产: {item}, 队列大小: {buffer.qsize()}")
time.sleep(random.uniform(0.1, 0.5))

def consumer():
while True:
item = buffer.get()
print(f"消费: {item}, 队列大小: {buffer.qsize()}")
time.sleep(random.uniform(0.2, 0.7))

# 启动线程
threading.Thread(target=producer, daemon=True).start()
threading.Thread(target=consumer, daemon=True).start()

线程池最佳实践:python

from concurrent.futures import ThreadPoolExecutor

def process_data(data):
# 数据处理逻辑
return transformed_data

with ThreadPoolExecutor(max_workers=4) as executor:
results = list(executor.map(process_data, large_dataset))

print(f"处理完成 {len(results)} 条记录")

五、Socket网络编程

5.1 服务端编程七步法

创建Socket对象
socket_server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

绑定地址端口
socket_server.bind(('0.0.0.0', 8888)) # 监听所有接口

设置监听队列
socket_server.listen(10) # 允许10个连接排队

接受客户端连接

while True:
  conn, addr = socket_server.accept() # 阻塞等待
  threading.Thread(target=handle_client, args=(conn, addr)).start()

接收请求数据

def handle_client(conn, addr):
    while True:
        data = conn.recv(4096)  # 4KB缓冲区
        if not data: break
        # 处理逻辑...

发送响应数据

def handle_client(conn, addr):
    while True:
        data = conn.recv(4096)  # 4KB缓冲区
        if not data: break
        # 处理逻辑...

关闭连接

conn.shutdown(socket.SHUT_RDWR)  # 优雅关闭
conn.close()

5.2 高效Socket编程技巧

数据封包协议


def send_data(sock, data):
length = len(data)
sock.send(struct.pack('!I', length)) # 4字节长度头
sock.send(data)

# 解包
def recv_data(sock):
length_data = sock.recv(4)
if not length_data: return None
length = struct.unpack('!I', length_data)[0]
return sock.recv(length)

多路复用I/O模型

import select

sockets = [server_socket]
while True:
readable, _, _ = select.select(sockets, [], [], 1)
for sock in readable:
if sock is server_socket:
client_sock, addr = server_socket.accept()
sockets.append(client_sock)
else:
data = sock.recv(1024)
if data:
# 处理数据
else:
sockets.remove(sock)
sock.close()

六、正则表达式

6.1 三大核心方法对比

方法功能描述返回值适用场景
re.match()从字符串起始位置匹配Match对象/None验证格式(如登录名)
re.search()扫描整个字符串找首个匹配Match对象/None提取首个匹配项
re.findall()返回所有非重叠匹配列表字符串列表批量提取数据

6.2 复杂正则构建技巧

原始字符串标记

在字符串前加r标记表示原始字符串,使转义字符失效变为普通字符。例如r”\d”中的反斜杠不会被解释为转义字符。

数字匹配

使用可以\d匹配0-9的数字字符.

元字符功能详解

点号匹配:.匹配任意单个字符(除换行符\n),匹配点号本身

字符集匹配:[]匹配括号内列举的字符,如[a-zA-Z1-9]可匹配字母数字

特殊字符匹配

\d:匹配数字0-9

\D:匹配非数字

\s:匹配空白字符(空格、tab)

\S:匹配非空白字符

\w:匹配单词字符(a-z、A-Z、0-9、_)

\W:匹配非单词字符

数量匹配

星号*:匹配前一个规则的字符出现0至无数次,如匹配连续数字(包括空字符串)

加号+:匹配前一个规则的字符出现1至无数次,如确保至少1个数字

问号?:匹配前一个规则的字符出现0次或1次,如表示”可有可无的数字”

精确次数

{m}:严格匹配m次,如\d3匹配”666″

{m,}:最少m次,如\d2匹配两位数及以上

{m,n}:m到n次范围,\d1,3如匹配1-3位数字

组合应用

匹配5-7范围数字:[5,7]

自定义组合:如[b-c][f-z][3-9]可匹配”b53″等特定组合

C.边界匹配

字符串边界

^:匹配字符串开头

$:匹配字符串结尾

单词边界

\b:匹配一个单词的边界

\B:匹配非单词边界

分组匹配

逻辑或匹配:

|:匹配左右任意一个表达式(或运算)

分组功能:

():将括号中的字符作为一个整体分组

邮箱验证增强版

^[a-zA-Z0-9.!#$%&'*+/=?^_`{|}~-]+   # 本地部分
@(?:[a-zA-Z0-9-]+\.)+ # 域名前缀
(?:[a-zA-Z]{2,}|com|org|net) # 顶级域名
(?<!\.\.) # 禁止连续点
(?<!\.[0-9]{1,3}) # 禁止数字结尾

密码强度验证

import re

def validate_password(password):
if len(password) < 8:
return False
if not re.search(r'[A-Z]', password):
return False # 大写字母
if not re.search(r'[a-z]', password):
return False # 小写字母
if not re.search(r'[0-9]', password):
return False # 数字
if not re.search(r'[!@#$%^&*()]', password):
return False # 特殊字符
return True

七、递归

7.1 递归四要素

  1. 基准情形:递归终止条件
  2. 递归推进:问题规模持续减小
  3. 递归调用:解决子问题
  4. 结果合成:合并子问题解

7.2 文件搜索递归实现

import select

sockets = [server_socket]
while True:
readable, _, _ = select.select(sockets, [], [], 1)
for sock in readable:
if sock is server_socket:
client_sock, addr = server_socket.accept()
sockets.append(client_sock)
else:
data = sock.recv(1024)
if data:
# 处理数据
else:
sockets.remove(sock)
sock.close()


^[a-zA-Z0-9.!#$%&'*+/=?^_`{|}~-]+ # 本地部分
@(?:[a-zA-Z0-9-]+\.)+ # 域名前缀
(?:[a-zA-Z]{2,}|com|org|net) # 顶级域名
(?<!\.\.) # 禁止连续点
(?<!\.[0-9]{1,3}) # 禁止数字结尾


import re

def validate_password(password):
if len(password) < 8:
return False
if not re.search(r'[A-Z]', password):
return False # 大写字母
if not re.search(r'[a-z]', password):
return False # 小写字母
if not re.search(r'[0-9]', password):
return False # 数字
if not re.search(r'[!@#$%^&*()]', password):
return False # 特殊字符
return True


import os

def find_files(path, pattern=None):
"""
递归查找目录下匹配文件
:param path: 搜索根路径
:param pattern: 文件名匹配正则
:return: 匹配文件路径列表
"""
results = []

# 基准情形:非目录直接返回
if not os.path.isdir(path):
if pattern is None or re.match(pattern, os.path.basename(path)):
return [path]
return []

try:
for entry in os.listdir(path):
full_path = os.path.join(path, entry)

# 递归目录
if os.path.isdir(full_path):
results.extend(find_files(full_path, pattern))

# 匹配文件
elif pattern is None or re.match(pattern, entry):
results.append(full_path)

except PermissionError:
print(f"权限不足: {path}")

return results

# 使用示例
py_files = find_files('/projects', r'.*\.py$')

7.3 递归优化策略

尾递归优化

def factorial(n, acc=1):
if n == 0:
return acc
return factorial(n-1, acc*n) # 尾递归形式

记忆化缓存

from functools import lru_cache

@lru_cache(maxsize=128)
def fib(n):
if n < 2:
return n
return fib(n-1) + fib(n-2) # 自动缓存结果

递归转迭代

def iterative_traversal(root):
stack = [root]
result = []

while stack:
node = stack.pop()
result.append(node.value)
if node.right:
stack.append(node.right)
if node.left:
stack.append(node.left)

return result