Python进阶篇(下) 学习笔记
创建模块( myCustomUtils.py )# 自定义模块 myCustomUtils.py# 定义变量# 定义函数"""两数相加"""# 定义类导入模块并且使用#导入整个模块(同目录下)# 使用print(myCustomUtils.VERSION) # 调用属性,输出:1.0print(myCustomUtils.add(1,2)) # 调用函数,输出:3cal = myCustomUti
面向对象编程(OOP)
类与对象
定义类
# 定义类
class Animal:
#类属性
name = "动物"
#类方法
def eat(self, food):
print(f'eat {food}')
创建对象(实例化)
aml = Animal()
构造函数__init__(初始化对象属性)
手动给对象加属性太繁琐,__init__是 Python 的构造函数(初始化方法),创建对象时自动调用,用于给实例属性赋值
核心规则:
__init__是特殊方法(双下划线包裹,又称 “魔法方法”);
创建对象时,传入的参数会传给__init__;
self是必选参数,代表当前实例,无需手动传值。
定义类
# 定义类
class Animal:
#类属性
name = "动物"
# 构造函数(self必须作为第一个参数)
def __init__(self, age):
self.age = age
创建对象(实例化)
dog = Animal(18)
封装
隐藏对象的私有属性 / 方法,只暴露必要的接口(比如通过方法访问 / 修改私有属性),避免外部随意修改数据。
核心规则:
在属性 / 方法名前加两个下划线__,使其成为私有(外部无法直接访问)。
#定义学生类
class Student:
#构造函数
def __init__(self, name, age):
#私有属性
self.__name = name
#公有属性
self.age = age
# 公有方法(获取私有)
def getName(self):
self.__printName()
return self.__name
#私有方法()
def __printName(self):
print("Name:", self.__name)
继承与多态
继承是 “子类继承父类的属性和方法”,子类可复用父类逻辑,也可重写 / 新增方法。
多态是 “同一方法,不同子类有不同实现”,调用时无需区分子类类型,统一调用即可。
核心概念
父类 / 超类:被继承的类(比如 Person);
子类 / 派生类:继承的类(比如 Student、Teacher);
重写:子类定义和父类同名的方法,覆盖父类逻辑;
super():子类中调用父类的方法(保留父类逻辑)。
# 父类:Person
class Person:
def __init__(self, name, age):
self.name = name
self.age = age
def eat(self):
print(f"{self.name}在吃饭")
def sleep(self):
print(f"{self.name}在睡觉")
# 子类:Student(继承Person,可以用逗号隔开多继承)
class Student(Person):
# 子类构造函数:调用父类构造函数,新增属性
def __init__(self, name, age, school):
# 方式1:调用父类__init__
super().__init__(name, age) # 自动匹配父类
# 方式2:Person.__init__(self, name, age) # 手动传self
self.school = school # 子类新增属性
# 重写父类方法(覆盖)
def eat(self):
print(f"{self.name}学生在学校食堂吃饭")
# 子类新增方法
def study(self):
print(f"{self.name}在{self.school}学习")
静态方法与类方法
| 方法类型 | 定义方式 | 第一个参数 | 访问属性 | 调用方式 |
|---|---|---|---|---|
| 实例方法 | def 方法名 (self, …): | self(实例) | 实例属性 + 类属性 | 对象。方法名 () |
| 类方法 | @classmethod | cls(类) | 仅类属性 | 类名。方法名 () / 对象。方法名 () |
| 静态方法 | @staticmethod | 无 | 无(需手动传) | 类名。方法名 () / 对象。方法名 () |
class Student:
# 类属性
total = 0
# 构造方法
def __init__(self, name):
self.name = name # 实例属性
Student.total += 1 # 每创建一个实例,总数+1
# 实例方法(操作实例属性)
def get_name(self):
return self.name
# 类方法(操作类属性,@classmethod + cls)
@classmethod
def get_total(cls):
return f"当前学生总数:{cls.total}"
# 静态方法(工具方法,@staticmethod,无参数)
@staticmethod
def is_adult(age):
return age >= 18
# 实例方法:通过对象调用
stu1 = Student("小明")
print(stu1.get_name()) # 小明
# 类方法:类/对象都能调用
print(Student.get_total()) # 当前学生总数:1
stu2 = Student("小红")
print(stu2.get_total()) # 当前学生总数:2
# 静态方法:类/对象都能调用
print(Student.is_adult(18)) # True
print(stu1.is_adult(17)) # False
魔术方法
Python 中的魔术方法(Magic Methods)是以双下划线 __ 开头和结尾的特殊方法,由 Python 解释器自动触发执行(无需手动调用),是实现面向对象核心特性(如封装、多态、运算符重载)的底层支撑。
基础初始化 / 销毁(对象生命周期)
| 魔术方法 | 触发时机 | 核心作用 | 示例 |
|---|---|---|---|
| __init__ | 创建对象时(类名()) | 初始化实例属性(构造函数) | def __init__(self, name): self.name = name |
| __new__ | 创建对象实例前(比__init__早) | 控制实例创建过程(如单例模式) | def __new__(cls): return super().__new__(cls) |
| __del__ | 对象被垃圾回收时 | 释放资源(极少用,Python 自动管理内存) | def __del__(self): print(f"{self.name}被销毁") |
字符串 / 表示形式
| 魔术方法 | 触发时机 | 核心作用 | 示例 |
|---|---|---|---|
| __str__ | print(obj) / str(obj) | 面向用户的友好字符串表示 | def __str__(self): return f"Person({self.name})" |
| __repr__ | repr(obj) / 终端直接输对象 | 面向开发者的精准字符串(可还原对象) | def __repr__(self): return f"Person(name=‘{self.name}’)" |
| __format__ | format(obj, 格式符) | 自定义格式化输出(如format(obj, “%Y”)) | def __format__(self, fmt): return self.name.format(fmt) |
注:若只重写__str__,__repr__会默认复用__str__;若只重写__repr__,print(obj)也会用__repr__。
运算符重载
实现对象的加减乘除、比较等运算。
- 算术运算符
| 魔术方法 | 触发时机 | 作用 | 示例 |
|---|---|---|---|
| __add__ | obj1 + obj2 | 加法 | def __add__(self, other): return self.num + other.num |
| __sub__ | obj1 - obj2 | 减法 | def __sub__(self, other): return self.num - other.num |
| __mul__ | obj1 * obj2 | 乘法 | def __mul__(self, other): return self.num * other.num |
| __truediv__ | obj1 / obj2 | 除法 | def __truediv__(self, other): return self.num / other.num |
| __mod__ | obj1 % obj2 | 取余 | def __mod__(self, other): return self.num % other.num |
- 比较运算符
| 魔术方法 | 触发时机 | 作用 | 示例 |
|---|---|---|---|
| __eq__ | obj1 == obj2 | 等于 | def __eq__(self, other): return self.name == other.name |
| __ne__ | obj1 != obj2 | 不等于 | def __ne__(self, other): return not self.__eq__(other) |
| __gt__ | obj1 > obj2 | 大于 | def __gt__(self, other): return self.age > other.age |
| __ge__ | obj1 >= obj2 | 大于等于 | def __ge__(self, other): return self.age >= other.age |
| __lt__ | obj1 < obj2 | 小于 | def __lt__(self, other): return self.age < other.age |
| __le__ | obj1 <= obj2 | 小于等于 | def __le__(self, other): return self.age <= other.age |
容器 / 序列操作(让对象像列表 / 字典)
| 魔术方法 | 触发时机 | 核心作用 | 示例 |
|---|---|---|---|
| __len__ | len(obj) | 自定义对象 “长度” | def __len__(self): return len(self.items) |
| __getitem__ | obj[key] / 切片obj[1:3] | 获取索引 / 键对应的值 | def __getitem__(self, key): return self.items[key] |
| __setitem__ | obj[key] = value | 设置索引 / 键对应的值 | def __setitem__(self, key, val): self.items[key] = val |
| __delitem__ | del obj[key] | 删除索引 / 键对应的值 | def __delitem__(self, key): del self.items[key] |
| __iter__ | for x in obj / iter(obj) | 让对象可迭代(返回迭代器) | def __iter__(self): return iter(self.items) |
| __next__ | next(iterator) | 迭代器的下一个值(配合__iter__) | def __next__(self): return next(self.iter) |
| __contains__ | x in obj | 自定义in成员判断 | def __contains__(self, x): return x in self.items |
调用行为(让对象像函数)
| 魔术方法 | 触发时机 | 核心作用 | 示例 |
|---|---|---|---|
| __call__ | obj()(对象当作函数调用) | 让对象支持调用行为 | def __call__(self, x): return x * 2 → obj(5) 返回10 |
属性访问控制(封装 / 权限控制)
精细控制对象属性的 “读 / 写 / 删”,替代简单的私有属性封装,更灵活。
| 魔术方法 | 触发时机 | 核心作用 | 示例 |
|---|---|---|---|
| __getattr__ | 访问不存在的属性(obj.xxx) | 动态处理属性访问(如兜底返回) | def __getattr__(self, name): return f"属性{name}不存在" |
| __setattr__ | 设置属性(obj.xxx = val) | 拦截属性赋值(如校验) | def __setattr__(self, name, val): if name == ‘age’ and val <0: raise ValueError; super().__setattr__(name, val) |
| __delattr__ | 删除属性(del obj.xxx) | 拦截属性删除 | def __delattr__(self, name): if name == ‘name’: raise PermissionError; super().__delattr__(name) |
| __getattribute__ | 访问任意属性(优先于__getattr__) | 所有属性访问的入口(慎用,易死循环) | def __getattribute__(self, name): return super().__getattribute__(name) |
上下文管理器(with 语句)
实现对象的with语句支持,自动管理资源(如文件、连接)。
| 魔术方法 | 触发时机 | 核心作用 | 示例 |
|---|---|---|---|
| __enter__ | with obj as f: 进入时 | 初始化资源(返回上下文对象) | def __enter__(self): self.file = open(‘test.txt’); return self.file |
| __exit__ | with 代码块结束时 | 释放资源(处理异常) | def __exit__(self, exc_type, exc_val, exc_tb): self.file.close() |
其他常用魔术方法
| 魔术方法 | 触发时机 | 核心作用 |
|---|---|---|
| __hash__ | hash(obj) | 自定义对象的哈希值(用于字典键 / 集合) |
| __bool__ | bool(obj) / 条件判断(if obj) | 自定义对象的 “真假”(默认非空为 True) |
| __copy__ | copy.copy(obj) | 浅拷贝对象 |
| __deepcopy__ | copy.deepcopy(obj) | 深拷贝对象 |
魔法方法例子
class Person:
def __init__(self, name):
self.name = name
def __add__(self,other):
return f"{self.name} and {other.name}"
p1 = Person("John")
p2 = Person("Jack")
print(p1 + p2) # 输出:John and Jack
模块与包
模块是包含 Python 定义和语句的.py 文件,核心作用是将代码按功能拆分,实现复用(比如把 “数据处理” 逻辑写在data_utils.py,其他文件可导入使用)。
模块(Module):代码复用的基础
导入方式
Python 提供多种导入方式,适配不同使用场景,优先级:精准导入 > 模块导入 > 通配符导入(通配符不推荐)。
| 导入方式 | 语法示例 | 作用说明 | 访问方式 |
|---|---|---|---|
| 导入整个模块 | import 模块名 | 加载整个模块,生成模块命名空间 | 模块名.函数/属性(如math.pi) |
| 导入模块并指定别名 | import 模块名 as 别名 | 简化模块名(避免冲突 / 简写) | 别名.函数/属性(如import numpy as np) |
| 导入模块中的指定成员 | from 模块名 import 函数/类/变量 | 仅加载指定成员,直接访问(无需模块名) | 函数/属性(如from math import pi) |
| 导入指定成员并指定别名 | from 模块名 import 成员 as 别名 | 避免成员名冲突 | 别名(如from math import pi as PI) |
| 导入模块所有成员(不推荐) | from 模块名 import * | 加载模块所有成员,易引发命名冲突 | 直接访问所有成员(如pi) |
导入并使用的例子:
# 方式1:导入整个模块
import math
print(math.pi) # 3.141592653589793
print(math.sqrt(16)) # 4.0
# 方式2:导入模块并指定别名
import math as m
print(m.pi) # 3.141592653589793
# 方式3:导入指定成员
from math import pi, sqrt
print(pi) # 3.141592653589793
print(sqrt(16)) # 4.0
# 方式4:导入指定成员并别名
from math import pi as PI
print(PI) # 3.141592653589793
# 方式5:通配符导入(不推荐,易冲突)
from math import *
print(cos(0)) # 1.0(cos是math的函数)
自定义模块
创建模块( myCustomUtils.py )
# 自定义模块 myCustomUtils.py
# 定义变量
VERSION = "1.0"
# 定义函数
def add(a, b):
"""两数相加"""
return a + b
# 定义类
class Calculator:
def multiply(self, a, b):
return a * b
导入模块并且使用
#导入整个模块(同目录下)
import myCustomUtils
# 使用
print(myCustomUtils.VERSION) # 调用属性,输出:1.0
print(myCustomUtils.add(1,2)) # 调用函数,输出:3
cal = myCustomUtils.Calculator() #实例化对象
print(cal.multiply(2,2)) # 调用方法,输出:4
包(Package):模块的集合
包是包含__init__.py文件的文件夹,核心作用是组织多个相关模块(比如把 “数据处理” 相关的read.py、write.py放在data/包下),避免模块名冲突。
包的基本结构
my_project/ # 项目根目录
├── main.py # 主程序文件
└── my_package/ # 自定义包(文件夹)
├── __init__.py # 包的初始化文件(必须有,空文件也可)
├── module1.py # 包内模块1
└── module2.py # 包内模块2
__init__.py文件
标识文件夹为 Python 包(无此文件则无法被导入);
初始化包的命名空间(可定义包级变量、导入子模块);
控制from 包 import *的导入范围(通过__all__)。
# 定义包级变量
PACKAGE_VERSION = "1.0"
# 导入子模块(让外部可直接导入包,访问子模块)
from . import module1, module2
# 定义__all__:控制from my_package import * 时导入的成员
__all__ = ["module1", "module2", "PACKAGE_VERSION"]
子模块
module1.py
def print_info1():
print("打印模块1")
module2.py
def print_info2():
print("打印模块2")
导入使用
# 导入包内模块
import my_package.module1
my_package.module1.print_info1() # 调用module1的func1函数
# 导入模块并别名
import my_package.module1 as m1
m1.print_info1()
# 从包中导入指定模块
from my_package import module2
module2.print_info2()
第三方包的安装(pip 工具)
pip 是 Python 的包管理工具
以安装 requests(网络请求) 包为例子:
安装包
# 安装指定包(最新版本)
pip install requests
# 安装指定版本
pip install requests==2.31.0
升级包
pip install --upgrade requests
卸载包
pip uninstall requests
查看已安装的包
pip list
高级特性
列表推导式与生成器表达式
列表推导式:生成完整列表,一次性加载到内存(数据量大时占内存)
生成器表达式:生成生成器对象,惰性生成数据(按需取值,不占内存)
- 列表推导式
# 基础版:生成新列表
[表达式 for 变量 in 可迭代对象]
# 进阶版:带条件过滤
[表达式 for 变量 in 可迭代对象 if 条件]
# 嵌套版:多层循环(慎用,可读性优先)
[表达式 for 变量1 in 可迭代对象1 for 变量2 in 可迭代对象2]
例子:
arr = [x+x for x in range(4)]
print(arr) # 输出: [0, 2, 4, 6]
- 生成器表达式
# 基础版
(表达式 for 变量 in 可迭代对象)
# 带条件版
(表达式 for 变量 in 可迭代对象 if 条件)
例子:
square_gen = (x*x for x in range(1, 11))
print(square_gen) # <generator object <genexpr> at 0x0000014BE1A2AA80>
# 按需取值(迭代时才计算)
for num in square_gen:
print(num, end=" ") # 输出: 1 4 9 16 25 36 49 64 81 100(逐个生成,不占内存)
迭代器与生成器
- 迭代器(Iterator)
可迭代对象(Iterable):能被for循环遍历的对象(列表、字符串、字典等),实现了__iter__方法;
迭代器(Iterator):可被next()函数调用并返回下一个值的对象,实现了__iter__和__next__方法;
转换:通过iter(可迭代对象)将可迭代对象转为迭代器。
例子:
# 1. 可迭代对象转迭代器
lst = [1,2,3]
it = iter(lst) # 转为迭代器
# 2. 用next()取值(逐个生成)
print(next(it)) # 1
print(next(it)) # 2
print(next(it)) # 3
# print(next(it)) # 遍历完后报错:StopIteration
# 3. for循环自动处理迭代器(捕获StopIteration)
it2 = iter(lst)
for x in it2:
print(x) # 1 2 3(无需手动调用next)
- 生成器(Generator)
生成器是特殊的迭代器,无需手动实现__iter__和__next__,通过以下两种方式创建:
生成器表达式(前文讲的()形式);
生成器函数:用yield关键字替代return的函数,调用时返回生成器对象。
生成器函数核心规则
函数内用yield返回值,每次调用next()时执行到yield暂停,下次从暂停处继续;
调用生成器函数时,函数不立即执行,仅返回生成器对象;
遍历完成后抛出StopIteration。
例子:
# 定义生成器函数(生成1-n的整数)
def my_generator(n):
i = 1
while i <= n:
yield i # 暂停并返回i,下次从i+=1继续
i += 1
# 调用函数→返回生成器对象(不执行函数体)
gen = my_generator(3)
# 遍历生成器
print(next(gen)) # 1(执行到yield 1暂停)
print(next(gen)) # 2(从i+=1继续,执行到yield 2暂停)
print(next(gen)) # 3(继续执行到yield 3暂停)
# print(next(gen)) # 遍历完报错:StopIteration
闭包(Closure):函数嵌套的高级特性
闭包是嵌套函数的特殊形式,满足以下 3 个条件:
外层函数定义了内层函数;
内层函数引用了外层函数的变量(非全局变量);
外层函数返回内层函数(而非执行)。
核心作用:
保存外层函数的变量状态(即使外层函数执行完毕,变量仍被内层函数引用);
实现 “数据封装”,避免使用全局变量
例子:
# 外层函数:定义计数器初始值
def counter(init=0):
# 内层函数:修改/访问外层变量
def add():
nonlocal init # 声明引用外层函数的变量(不可变类型需nonlocal)
init += 1
return init
# 返回内层函数(不执行)
return add
# 创建闭包实例(保存init=0的状态)
count1 = counter()
print(count1()) # 1(init从0→1)
print(count1()) # 2(init从1→2)
# 创建新的闭包实例(独立状态)
count2 = counter(10)
print(count2()) # 11(init从10→11)
print(count1()) # 3(count1的状态不受count2影响)
装饰器(Decorator):函数的 “增强插件”
装饰器是基于闭包实现的语法糖,核心作用是:在不修改原函数代码的前提下,为函数添加额外功能(如日志、计时、权限校验)。
基础装饰器语法规则:
1.定义装饰器(闭包形式)
def 装饰器函数(被装饰函数):
def 内层函数(*args, **kwargs):
# 执行原函数前的额外逻辑(如日志)
result = 被装饰函数(*args, **kwargs) # 调用原函数
# 执行原函数后的额外逻辑
return result
return 内层函数
2.使用装饰器(@语法糖)
@装饰器函数 # 等价于:原函数 = 装饰器函数(原函数)
def 原函数(参数):
函数体
使用例子:
def log_decorator(func):
def wrapper(*args, **kwargs):
# 记录调用信息
print(f"调用函数:{func.__name__}")
print(f"参数:args={args}, kwargs={kwargs}")
result = func(*args, **kwargs)
print(f"函数返回值:{result}")
return result
#返回内层函数,不要加括号
return wrapper
@log_decorator
def add(a, b):
return a + b
add(2, 3)
# 输出:
# 调用函数:add
# 参数:args=(2, 3), kwargs={}
# 函数返回值:5
collections 模块(实用数据结构扩展)
collections是 Python 内置的标准库,提供了比基础数据类型(列表 / 字典)更高效的专用数据结构
有九种分别为:
[‘ChainMap’, ‘Counter’, ‘OrderedDict’, ‘UserDict’, ‘UserList’, ‘UserString’, ‘defaultdict’, ‘deque’, ‘namedtuple’]
常用类型介绍: Counter、defaultdict、deque、namedtuple
Counter:高效计数工具
常用方法:
most_common(n):返回出现次数最多的 n 个元素(列表形式);
update(iter):更新计数(新增元素或累加次数);
elements():返回迭代器,包含所有元素(按次数重复)。
from collections import Counter
# 统计字符串
c = Counter("abracadabra")
print(c) # Counter({'a': 5, 'b': 2, 'r': 2, 'c': 1, 'd': 1})
print(c.most_common(2)) # [('a', 5), ('b', 2)]
c.update("aaaa") # 累加a的次数
print(c['a']) # 9
defaultdict:带默认值的字典
作用:
解决普通字典 d[key] 访问不存在的键时报 KeyError 的问题,可指定键不存在时的默认值类型。
默认值类型:
list(分组)、int(计数)、set(去重分组)、dict(嵌套字典)等。
#场景1:分组(默认值为列表)
dd_list = defaultdict(list)
for k, v in [("a", 1), ("b", 2), ("a", 3)]:
dd_list[k].append(v)
print(dd_list) # defaultdict(list, {'a': [1, 3], 'b': [2]})
# 场景2:计数(默认值为整数)
dd_int = defaultdict(int)
for num in [1,2,2,3,3,3]:
dd_int[num] += 2
print(dd_int) # defaultdict(<class 'int'>, {1: 2, 2: 4, 3: 6})
deque:双端队列
作用:
高效实现「头部 / 尾部」的增删操作,替代列表(列表头部增删 O (n),数据量大时效率低)。
常用方法:
append(x):尾部添加元素;
appendleft(x):头部添加元素;
pop():尾部删除元素;
popleft():头部删除元素;
maxlen:队列最大长度(初始化时指定)。
例子:
#导入双端队列
from collections import deque
dq = deque([1,2,3], maxlen=3)
print(dq) # deque([1, 2, 3], maxlen=3)
dq.append(4) #最大长度为3,尾部添加,头部1被丢弃
print(dq) # deque([2, 3, 4], maxlen=3)
namedtuple:命名元组
作用:
创建「可通过属性访问元素」的元组子类,替代轻量级类(无需定义 __init__),比普通元组更易读。
# 定义命名元组(坐标)
Point = namedtuple("Point", ["x", "y"]) # 等价于 Point = namedtuple("Point", "x y")
p = Point(1, 2)
print(p.x) # 1(通过属性访问,比p[0]更易读)
print(p.y) # 2
print(p) # Point(x=1, y=2)(友好的字符串表示)
# 场景:替代轻量级类(如学生信息)
Student = namedtuple("Student", "name age score")
stu = Student("小明", 18, 90)
print(stu.name, stu.score) # 小明 90
高级 IO
pathlib
pathlib 是 Python 3.4+ 引入的路径处理模块,替代传统的 os.path,以面向对象的方式操作路径,语法更简洁、可读性更高。
核心类
PurePath:纯路径(仅处理字符串,不涉及实际文件系统),分为 PureWindowsPath(Windows 路径)和 PurePosixPath(Unix/Linux/Mac 路径)。
Path:继承 PurePath,增加了与文件系统交互的方法(如判断文件是否存在、读写文件等),自动适配系统类型。
导入
from pathlib import Path
基础路径创建
# 获取当前工作目录
current_dir = Path.cwd()
# 获取用户主目录
home_dir = Path.home()
# 手动构造路径(推荐用 / 拼接,而非 +)
target_path = current_dir / "data" / "test.txt"
路径属性
print("绝对路径:", target_path.absolute()) # 绝对路径
print("文件名:", target_path.name) # 完整文件名(test.txt)
print("文件名(无后缀):", target_path.stem) # 无后缀(test)
print("文件后缀:", target_path.suffix) # 后缀(.txt)
print("父目录:", target_path.parent) # 父目录(data)
print("所有父目录:", list(target_path.parents)) # 所有上级目录
文件系统判断
print("是否存在:", target_path.exists()) # 是否存在
print("是否是文件:", target_path.is_file()) # 是否是文件
print("是否是目录:", target_path.is_dir()) # 是否是目录
目录/文件操作
# 创建目录(parents=True:自动创建上级目录,exist_ok=True:已存在不报错)
target_dir = current_dir / "data"
target_dir.mkdir(parents=True, exist_ok=True)
# 写入文件(替代 open + write)
target_path.write_text("Hello Pathlib!", encoding="utf-8")
# 读取文件(替代 open + read)
content = target_path.read_text(encoding="utf-8")
print("文件内容:", content)
# 遍历目录下的所有文件
for file in target_dir.glob("*.txt"): # 匹配所有 .txt 文件
print("遍历文件:", file)
重命名/删除
new_path = target_dir / "new_test.txt"
target_path.rename(new_path) # 重命名
new_path.unlink() # 删除文件
JSON 解析(内置 json 模块)
核心函数
| 函数 | 作用 |
|---|---|
| json.dumps() | 将 Python 对象序列化为 JSON 字符串 |
| json.dump() | 将 Python 对象写入文件(JSON 格式) |
| json.loads() | 将 JSON 字符串反序列化为 Python 对象 |
| json.load() | 从文件读取 JSON 内容并反序列化为对象 |
例子:
# 将 Python 对象序列化为 JSON 字符串
jsonData = json.dumps(data)
print(jsonData)
# 将 JSON 字符串反序列化为 Python 对象
classData = json.loads(jsonData)
print(classData)
输出结果:
{"name": "\u5f20\u4e09", "age": 25, "hobbies": ["\u7f16\u7a0b", "\u9605\u8bfb"], "is_student": false, "score": null}
{'name': '张三', 'age': 25, 'hobbies': ['编程', '阅读'], 'is_student': False, 'score': None}
日志模块(logging)
logging 是 Python 内置日志模块,替代 print 调试,支持多级别、多输出端(控制台 / 文件)、日志轮转等。
核心概念
日志级别(从低到高):DEBUG < INFO < WARNING < ERROR < CRITICAL,默认级别为 WARNING(仅输出 WARNING 及以上)。
Logger:日志记录器,负责产生日志。
Handler:日志处理器,负责输出日志(控制台 / 文件 / 邮件等)。
Formatter:日志格式器,定义日志输出格式。
例子:
import logging
# 1. 基础配置(快速使用)
logging.basicConfig(
level=logging.DEBUG, # 设置日志级别(DEBUG 会输出所有级别)
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", # 格式
datefmt="%Y-%m-%d %H:%M:%S", # 时间格式
filename="app.log", # 日志写入文件(不指定则输出到控制台)
filemode="a", # 写入模式(a:追加,w:覆盖)
encoding="utf-8" # 编码
)
# 2. 输出不同级别日志
logging.debug("调试信息:程序运行细节")
logging.info("普通信息:程序正常运行")
logging.warning("警告信息:可能存在问题")
logging.error("错误信息:程序出错但不终止")
logging.critical("严重错误:程序无法运行")
结果:
将日志写入到app.log文件中
网络编程
Socket 模块
socket是 Python 内置的底层网络模块,直接操作 TCP/UDP 协议,是所有网络编程的基础。
socket的api:
import socket
# 1. 创建Socket对象(AF_INET=IPv4,SOCK_STREAM=TCP,SOCK_DGRAM=UDP)
s = socket.socket(family=socket.AF_INET, type=socket.SOCK_STREAM)
# 2. 核心方法(服务端)
s.bind(('0.0.0.0', 8888)) # 绑定IP+端口
s.listen(5) # 监听(backlog=最大等待连接数)
conn, addr = s.accept() # 阻塞等待客户端连接(返回连接对象+客户端地址)
# 3. 核心方法(客户端)
s.connect(('127.0.0.1', 8888)) # 连接服务端
# 4. 数据收发
data = conn.recv(1024) # 接收数据(bufsize=单次最大接收字节数)
conn.send(b'hello') # 发送字节数据(需编码,Python3需bytes类型)
conn.sendall(b'hello') # 确保数据全部发送(循环调用send)
# 5. 关闭连接
conn.close() # 关闭客户端连接(服务端)
s.close() # 关闭监听Socket
TCP
服务端:
# tcp_server.py
import socket
def tcp_server():
# 1. 创建TCP Socket
server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# 2. 绑定地址(解决端口占用:SO_REUSEADDR)
server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
server_socket.bind(('0.0.0.0', 8888))
# 3. 监听
server_socket.listen(5)
print("TCP服务端启动,监听端口8888...")
# 4. 等待客户端连接
conn, client_addr = server_socket.accept()
print(f"客户端{client_addr}已连接")
try:
# 5. 收发数据
while True:
# 接收客户端数据(阻塞)
data = conn.recv(1024)
if not data: # 客户端关闭连接时,recv返回空
print(f"客户端{client_addr}断开连接")
break
print(f"收到客户端消息:{data.decode('utf-8')}")
# 回复客户端
conn.send(f"已收到:{data.decode('utf-8')}".encode('utf-8'))
finally:
# 6. 关闭连接
conn.close()
server_socket.close()
if __name__ == '__main__':
tcp_server()
客户端:
# tcp_client.py
import socket
def tcp_client():
# 1. 创建TCP Socket
client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# 2. 连接服务端
client_socket.connect(('127.0.0.1', 8888))
print("已连接到TCP服务端")
try:
# 3. 发送数据
while True:
msg = input("请输入发送内容(输入q退出):")
if msg == 'q':
break
client_socket.send(msg.encode('utf-8'))
# 4. 接收服务端回复
data = client_socket.recv(1024)
print(f"服务端回复:{data.decode('utf-8')}")
finally:
# 5. 关闭连接
client_socket.close()
if __name__ == '__main__':
tcp_client()
UDP
服务端:
import socket
def udp_server():
# 1. 创建UDP Socket
server_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
server_socket.bind(('0.0.0.0', 8889))
print("UDP服务端启动,监听端口8889...")
while True:
# 2. 接收数据(阻塞,返回数据+客户端地址)
data, client_addr = server_socket.recvfrom(1024)
print(f"收到客户端{client_addr}消息:{data.decode('utf-8')}")
# 3. 回复客户端
server_socket.sendto(f"UDP已收到:{data.decode('utf-8')}".encode('utf-8'), client_addr)
if __name__ == '__main__':
udp_server()
客户端:
import socket
def udp_client():
# 1. 创建UDP Socket
client_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
while True:
msg = input("请输入发送内容(输入q退出):")
if msg == 'q':
break
# 2. 发送数据(直接指定服务端地址)
client_socket.sendto(msg.encode('utf-8'), ('127.0.0.1', 8889))
# 3. 接收回复
data, server_addr = client_socket.recvfrom(1024)
print(f"服务端回复:{data.decode('utf-8')}")
client_socket.close()
if __name__ == '__main__':
udp_client()
多线程
Python 中的并发编程主要解决「提升程序执行效率」的问题,核心分为三类:多线程(threading)、多进程(multiprocessing)、协程(asyncio/aiohttp)。
线程:
线程(Thread)是进程内的最小执行单元,一个进程可以包含多个线程;
线程共享进程的内存空间(如全局变量、堆内存),因此线程间通信成本极低;
线程的创建 / 销毁开销远小于进程,是轻量级的并发方式。
GIL(全局解释器锁):
定义:
CPython 解释器的全局锁,同一时刻只有一个线程能执行 Python 字节码;
影响:
多线程无法利用多核 CPU 加速 CPU 密集型任务(如数学计算、数据处理),甚至因线程切换开销导致比单线程更慢;
多线程对 IO 密集型任务(如网络请求、文件读写、数据库操作)依然有效(因为 IO 等待时线程会释放 GIL,其他线程可执行);
结论:
Python 多线程的核心价值是「处理 IO 密集型任务」,CPU 密集型任务优先用多进程。
线程的生命周期
新建(创建 Thread 对象)→ 就绪(start() 后)→ 运行(CPU 调度执行 run())→ 阻塞(如 sleep/IO 等待)→ 终止(任务完成/异常)
常用属性与方法
线程创建:初始化参数
| 参数 | 作用 | 示例 |
|---|---|---|
| target | 指定线程要执行的函数(核心) | Thread(target=add_count) |
| args | 传给 target 函数的位置参数(元组形式) | Thread(target=add_count, args=(100,)) |
| kwargs | 传给 target 函数的关键字参数(字典形式) | Thread(target=add_count, kwargs={“num”:100}) |
| name | 线程名称(便于调试,默认格式 Thread-N) | Thread(name=“CountThread”, target=add_count) |
| daemon | 是否为守护线程(布尔值,默认 False) | Thread(target=add_count, daemon=True) |
线程操作
| 方法 | 作用 | 注意事项 |
|---|---|---|
| start() | 启动线程(将线程加入就绪队列,等待 CPU 调度,执行 run() 方法) | 不可重复调用(多次调用会抛 RuntimeError) 不是直接执行 target 函数 |
| run() | 线程的核心执行逻辑(默认执行 target 函数,可重写) | 手动调用 run() 是「同步执行」(不会创建新线程),仅用于自定义线程类时重写 |
| join(timeout=None) | 主线程阻塞,等待该线程执行完成;timeout 为超时时间(秒,浮点数) | Windows 下需确保在 if __name__ == “__main__” 内调用;超时后主线程不再等待 |
| is_alive() | 判断线程是否处于「活跃状态」(就绪 / 运行 / 阻塞,未终止) | 线程启动前 / 终止后返回 False,启动后返回 True |
线程标识
| 属性 / 方法 | 作用 | 示例 |
|---|---|---|
| name / setName() | 获取 / 设置线程名称(可读可写) | t.name = “NewName” → print(t.name) |
| ident | 获取线程的「Python 级 ID」(整数,线程终止后变为 None) | print(t.ident) |
| native_id | 获取「系统级线程 ID」(Python 3.8+,更贴近操作系统的线程标识) | print(t.native_id) |
| threading.get_ident() | 全局方法,获取当前线程的 ident(已过时,推荐用 current_thread().ident) | print(threading.get_ident()) |
守护线程
| 属性 / 方法 | 作用 | 注意事项 |
|---|---|---|
| daemon / setDaemon() | 获取 / 设置是否为守护线程(布尔值) | 必须在 start() 前设置,否则抛 RuntimeError;Windows 下主线程退出会立刻终止守护线程 |
| isDaemon() | 判断是否为守护线程(等同于 t.daemon) | 语义更清晰,老版本 Python 常用 |
threading 模块的全局工具方法
这类方法用于「全局管理线程」,比如获取当前线程、统计活跃线程数等:
| 方法 | 作用 | 示例 |
|---|---|---|
| threading.current_thread() | 获取当前正在执行的线程对象(最常用) | current_t = threading.current_thread() |
| threading.active_count() | 获取当前「活跃线程数」(包括主线程) | print(f"活跃线程数:{threading.active_count()}") |
| threading.enumerate() | 获取所有活跃线程的列表(可遍历调试) | for t in threading.enumerate(): print(t.name) |
| threading.main_thread() | 获取主线程对象(程序启动时默认创建的线程) | main_t = threading.main_thread() |
| threading.settrace(func) | 为所有线程设置「跟踪函数」(调试用,比如监控函数调用) | 进阶用法,一般用于调试工具 |
| threading.setprofile(func) | 为所有线程设置「剖析函数」(性能分析用) | 进阶用法,一般用于性能优化 |
线程的创建与启动
方式 1:直接实例化 Thread 类
import threading
import time
# 定义线程执行的函数(IO 密集型示例:模拟网络请求)
def task(name, delay):
print(f"线程 {name} 开始执行,延迟 {delay} 秒")
time.sleep(delay) # 模拟 IO 等待(如网络请求、文件读写)
print(f"线程 {name} 执行完成")
# 创建线程(target 指定执行函数,args 传递参数)
t1 = threading.Thread(target=task, args=("t1", 2))
t2 = threading.Thread(target=task, args=("t2", 3))
# 启动线程
start_time = time.time()
t1.start()
t2.start()
# 等待所有线程执行完成(主线程阻塞)
t1.join()
t2.join()
print(f"总耗时:{time.time() - start_time:.2f} 秒") # 约 3 秒(而非 2+3=5 秒)
方式 2:继承 Thread 类重写 run() 方法
import threading
import time
# 自定义线程类
class MyThread(threading.Thread):
def __init__(self, thread_name, delay):
super().__init__(name=thread_name) # 调用父类构造函数
self.delay = delay
# 重写 run() 方法:线程的核心执行逻辑
def run(self):
print(f"线程 {self.name} 启动,等待 {self.delay} 秒")
time.sleep(self.delay)
print(f"线程 {self.name} 完成")
start_time = time.time()
# 创建自定义线程对象
t1 = MyThread("Custom-Task-1", 2)
t2 = MyThread("Custom-Task-2", 3)
# 启动线程
t1.start()
t2.start()
# 等待完成
t1.join()
t2.join()
print(f"总耗时:{time.time() - start_time:.2f} 秒")
方式 3:线程池(ThreadPoolExecutor)
from concurrent.futures import ThreadPoolExecutor
import time
def io_task(task_name, delay):
print(f"任务 {task_name} 启动,等待 {delay} 秒")
time.sleep(delay)
print(f"任务 {task_name} 完成")
return f"{task_name} 结果" # 任务返回值
start_time = time.time()
# 创建线程池(指定最大线程数,建议:IO 密集型任务可设为 10-100,CPU 密集型=CPU核心数)
with ThreadPoolExecutor(max_workers=2, thread_name_prefix="Pool-Thread") as executor:
# 提交任务(两种方式)
# 方式1:submit + result(获取单个任务结果)
future1 = executor.submit(io_task, "task1", 2)
future2 = executor.submit(io_task, "task2", 3)
# 获取任务结果(阻塞直到任务完成)
print("任务1结果:", future1.result())
print("任务2结果:", future2.result())
# 方式2:map(批量提交任务,按顺序获取结果)
# tasks = [("task3", 1), ("task4", 2)]
# results = executor.map(lambda x: io_task(x[0], x[1]), tasks)
# for res in results:
# print("批量任务结果:", res)
print(f"总耗时:{time.time() - start_time:.2f} 秒")
守护线程
守护线程是「后台线程」,主线程退出时,无论守护线程是否完成,都会被强制终止;
import threading
import time
def daemon_task():
while True:
print("守护线程运行中...")
time.sleep(1)
def normal_task():
print("非守护线程启动")
time.sleep(3)
print("非守护线程完成")
# 创建守护线程
t_daemon = threading.Thread(target=daemon_task, daemon=True)
# 创建非守护线程
t_normal = threading.Thread(target=normal_task)
t_daemon.start()
t_normal.start()
# 主线程等待非守护线程完成(3秒后),然后退出,守护线程被强制终止
print("主线程等待非守护线程...")
t_normal.join()
print("主线程退出")
输出:
守护线程运行中...
非守护线程启动
主线程等待非守护线程...
守护线程运行中...
守护线程运行中...
非守护线程完成
主线程退出
线程局部存储(TLS)
线程共享进程内存,若想让每个线程拥有「独立的变量副本」,需用 threading.local():
import threading
import time
# 创建线程局部存储对象
local_data = threading.local()
def task(thread_name):
# 每个线程的 local_data 是独立的
local_data.value = thread_name
print(f"线程 {thread_name} 初始化 value:{local_data.value}")
time.sleep(1)
# 验证:每个线程只能获取自己的 value
print(f"线程 {thread_name} 读取 value:{local_data.value}")
t1 = threading.Thread(target=task, args=("t1",))
t2 = threading.Thread(target=task, args=("t2",))
t1.start()
t2.start()
t1.join()
t2.join()
输出:
线程 t1 初始化 value:t1
线程 t2 初始化 value:t2
线程 t1 读取 value:t1
线程 t2 读取 value:t2
线程同步
线程共享内存,若多个线程同时修改同一变量,会导致「数据错乱」(竞态条件)。必须通过「同步机制」保护共享资源。
基础锁(Lock)
Lock 是互斥锁,同一时刻只有一个线程能获取锁,其他线程需等待(阻塞)
with lock:自动调用 acquire()(获取锁)和 release()(释放锁),即使代码抛出异常也会释放锁,推荐使用;
手动加锁:lock.acquire()(获取锁,阻塞)→ 执行代码 → lock.release()(释放锁),需注意异常处理。
可重入锁(RLock)
Lock 是「不可重入锁」:同一线程多次调用 acquire() 会导致死锁;RLock(可重入锁)允许同一线程多次获取锁(需对应次数释放)。
import threading
rlock = threading.RLock()
def nested_task():
# 第一次获取锁
with rlock:
print("内层代码执行")
def task():
# 第二次获取锁(同一线程,RLock 允许)
with rlock:
print("外层代码执行")
nested_task()
t = threading.Thread(target=task)
t.start()
t.join()
信号量(Semaphore)
Semaphore 允许指定数量的线程同时获取锁,用于控制「并发访问数」(如限制同时访问数据库的线程数)。
import threading
import time
# 信号量:最多 2 个线程同时执行
sem = threading.Semaphore(2)
def limited_task(thread_name):
with sem:
print(f"线程 {thread_name} 开始执行")
time.sleep(2) # 模拟任务耗时
print(f"线程 {thread_name} 执行完成")
# 创建 5 个线程,但同时只有 2 个执行
for i in range(5):
t = threading.Thread(target=limited_task, args=(f"t{i}",))
t.start()
输出:
线程 t0 开始执行
线程 t1 开始执行
线程 t1 执行完成
线程 t0 执行完成
线程 t2 开始执行
线程 t3 开始执行
线程 t3 执行完成
线程 t4 开始执行
线程 t2 执行完成
线程 t4 执行完成
事件(Event)
Event 是简单的线程通信机制,通过「信号标志」实现线程间的通知(如:线程 A 等待线程 B 完成某个操作后再执行)。
核心方法:
event.set():将标志设为 True,唤醒所有等待的线程;
event.clear():将标志设为 False;
event.wait(timeout=None):阻塞直到标志为 True;
event.is_set():判断标志是否为 True。
import threading
import time
# 创建事件对象
event = threading.Event()
def wait_task():
print("等待线程:等待信号...")
event.wait() # 阻塞直到 event.set()
print("等待线程:收到信号,开始执行")
def signal_task():
print("信号线程:执行初始化操作...")
time.sleep(3)
print("信号线程:发送信号")
event.set() # 发送信号
t1 = threading.Thread(target=wait_task)
t2 = threading.Thread(target=signal_task)
t1.start()
t2.start()
t1.join()
t2.join()
输出:
等待线程:等待信号...
信号线程:执行初始化操作...
信号线程:发送信号
等待线程:收到信号,开始执行
条件(Condition)
Condition 基于 RLock,支持「等待 - 通知」机制,适合更复杂的同步场景(如:生产者 - 消费者模型)。
import threading
import time
import random
# 共享队列
queue = []
# 创建条件对象
cond = threading.Condition()
# 生产者线程
def producer():
while True:
with cond:
# 队列满则等待
if len(queue) >= 5:
print("队列满,生产者等待...")
cond.wait() # 释放锁,等待通知
# 生产数据
item = random.randint(1, 100)
queue.append(item)
print(f"生产者生产:{item},队列长度:{len(queue)}")
# 通知消费者
cond.notify()
time.sleep(1)
# 消费者线程
def consumer():
while True:
with cond:
# 队列为空则等待
if len(queue) == 0:
print("队列空,消费者等待...")
cond.wait()
# 消费数据
item = queue.pop(0)
print(f"消费者消费:{item},队列长度:{len(queue)}")
# 通知生产者
cond.notify()
time.sleep(1.5)
if __name__ == "__main__":
t1 = threading.Thread(target=producer, daemon=True)
t2 = threading.Thread(target=consumer, daemon=True)
t1.start()
t2.start()
# 主线程等待 10 秒后退出
time.sleep(10)
print("主线程退出")
输出:
生产者生产:62,队列长度:1
消费者消费:62,队列长度:0
生产者生产:76,队列长度:1
消费者消费:76,队列长度:0
生产者生产:75,队列长度:1
消费者消费:75,队列长度:0
生产者生产:18,队列长度:1
生产者生产:4,队列长度:2
消费者消费:18,队列长度:1
生产者生产:95,队列长度:2
消费者消费:4,队列长度:1
生产者生产:76,队列长度:2
生产者生产:9,队列长度:3
消费者消费:95,队列长度:2
生产者生产:29,队列长度:3
消费者消费:76,队列长度:2
生产者生产:76,队列长度:3
主线程退出
异步编程
Python 异步编程是基于 asyncio 库和 async/await 语法的高并发编程范式,核心解决 I/O 密集型任务的高并发问题(如网络请求、数据库操作、文件读写)。
| 概念 | 通俗解释 | 核心作用 |
|---|---|---|
| 协程(Coroutine) | 可暂停 / 恢复的 “轻量级函数” | 异步任务的载体,通过 await 暂停执行,让出控制权 |
| 事件循环(Event Loop) | 单线程内的 “调度器” | 管理协程的执行顺序,监听 I/O 事件,切换可执行的协程 |
| Task(任务) | 封装协程的 “可调度对象” | 将协程加入事件循环,支持并发执行、取消、获取结果 |
| Future | 表示 “未来完成的结果” | 底层抽象,Task 是 Future 的子类,用于获取异步任务的结果 |
| 可等待对象(Awaitable) | 能被 await 的对象 | 包括协程、Task、Future,是异步编程的核心交互对象 |
核心方法:
入口:用 asyncio.run() 启动主协程;
并发:用 create_task() 创建任务,gather()/wait() 调度并发;
控制:用 wait_for() 控超时,Semaphore 控并发数;
同步:用 Lock 解决资源竞争,Event 实现协程通信;
调试:开启 debug=True,用 Task 实例方法排查任务状态。
快速使用
import asyncio # 标准库,原生
# 原生async语法定义协程
async def async_task(name, delay):
print(f"任务{name}开始")
await asyncio.sleep(delay) # asyncio.sleep是原生异步休眠(替代同步的time.sleep)
print(f"任务{name}结束")
# 原生asyncio.run启动事件循环(3.7+支持)
async def main():
# 原生方式创建/调度协程任务
task1 = asyncio.create_task(async_task("A", 2))
task2 = asyncio.create_task(async_task("B", 1))
await asyncio.gather(task1, task2)
if __name__ == "__main__":
asyncio.run(main()) # 原生入口
更多推荐

所有评论(0)