LOGO OA教程 ERP教程 模切知识交流 PMS教程 CRM教程 开发文档 其他文档  
 
网站管理员

30天学会Python编程:15.Python并发编程

admin
2025年7月17日 21:51 本文热度 9

并发编程是现代软件开发中不可或缺的技能,Python提供了多种并发编程模型。本指南将系统性地介绍Python中的各种并发技术,包括多线程、多进程、协程以及相关工具库的使用。

1. 并发编程基础概念

1.1 并发与并行

并发(Concurrency):指系统能够处理多个任务的能力,这些任务在时间上重叠,但不一定同时执行。在单核CPU上,通过时间片轮转实现并发。

并行(Parallelism):指系统能够同时执行多个任务,需要多核CPU支持。

1.2 Python中的并发模型

Python提供了三种主要的并发编程模型:

  1. 多线程:适合I/O密集型任务
  2. 多进程:适合CPU密集型任务
  3. 协程(异步IO):适合高并发的I/O操作


1.3 GIL全局解释器锁

全局解释器锁(GIL) 是Python解释器中的一个机制,它确保任何时候只有一个线程执行Python字节码。这对并发编程有重要影响:

  • 优点:简化了CPython实现,使对象模型(包括关键的内建类型如字典)隐式地避免了并发访问的竞争条件
  • 缺点:限制了多线程在CPU密集型任务中的性能

GIL的影响

  • 对纯Python代码的多线程性能有显著限制
  • I/O操作(文件、网络)会释放GIL,因此I/O密集型任务仍可受益于多线程
  • 可以通过多进程或C扩展绕过GIL限制

2. 多线程编程

2.1 threading模块基础

Python的threading模块提供了高级的线程接口。与低级的_thread模块相比,它提供了更多功能和更好的抽象。

创建线程的基本方法

import threading
import time

defworker(num):
    print(f"Worker {num} 开始执行")
    time.sleep(1)
    print(f"Worker {num} 执行完成")

# 创建线程
threads = []
for i inrange(3):
    t = threading.Thread(target=worker, args=(i,))
    threads.append(t)
    t.start()

# 等待所有线程完成
for t in threads:
    t.join()

关键点

  • Thread
    类表示一个执行线程
  • target
    参数指定线程要执行的函数
  • args
    参数传递给目标函数的参数元组
  • start()
    方法启动线程
  • join()
    方法等待线程结束

2.2 线程同步

当多个线程需要访问共享资源时,必须使用同步机制来避免竞态条件。

2.2.1 锁(Lock)

最基本的同步原语,一次只允许一个线程访问共享资源。

import threading

classCounter:
    def__init__(self):
        self.value = 0
        self.lock = threading.Lock()
    
    defincrement(self):
        withself.lock:  # 自动获取和释放锁
            self.value += 1

counter = Counter()

defincrement_worker():
    for _ inrange(100000):
        counter.increment()

threads = [threading.Thread(target=increment_worker) for _ inrange(2)]
for t in threads:
    t.start()
for t in threads:
    t.join()

print(f"最终计数: {counter.value}")  # 应为200000

锁的使用

  • 总是使用with语句管理锁,确保锁会被释放
  • 避免嵌套锁,容易导致死锁
  • 锁的范围应尽可能小,只保护必要的代码段

2.2.2 其他同步原语

  • RLock(可重入锁):允许同一个线程多次获取锁
  • Semaphore:限制同时访问资源的线程数量
  • Event:线程间通信的简单方式
  • Condition:复杂的线程通信机制

2.3 线程池

concurrent.futures.ThreadPoolExecutor提供了方便的线程池接口。

from concurrent.futures import ThreadPoolExecutor
import urllib.request

URLS = ['http://example.com''http://example.org']

defload_url(url, timeout=60):
    with urllib.request.urlopen(url, timeout=timeout) as conn:
        return conn.read()

with ThreadPoolExecutor(max_workers=5as executor:
    future_to_url = {executor.submit(load_url, url, 60): url for url in URLS}
    for future in concurrent.futures.as_completed(future_to_url):
        url = future_to_url[future]
        try:
            data = future.result()
            print(f"{url} 页面长度为 {len(data)}")
        except Exception as e:
            print(f"{url} 获取失败: {e}")

线程池的优点

  • 避免频繁创建销毁线程的开销
  • 可以限制并发线程数量
  • 提供了Future对象,便于获取结果和处理异常

3. 多进程编程

3.1 multiprocessing模块

multiprocessing模块提供了类似于threading模块的API,但使用进程而非线程。

from multiprocessing import Process
import os

deftask(name):
    print(f"子进程 {name} (PID: {os.getpid()}) 执行中...")
    result = sum(i*i for i inrange(1000000))
    print(f"子进程 {name} 完成")

if __name__ == '__main__':
    processes = []
    for i inrange(4):  # 4核CPU常用
        p = Process(target=task, args=(i,))
        processes.append(p)
        p.start()
    
    for p in processes:
        p.join()

多进程特点

  • 每个进程有独立的内存空间
  • 不受GIL限制,适合CPU密集型任务
  • 进程创建和切换开销比线程大
  • 进程间通信比线程间通信复杂

3.2 进程间通信(IPC)

3.2.1 Queue

multiprocessing.Queue是进程安全的队列实现。

from multiprocessing import Process, Queue

defproducer(q):
    for item in ['A''B''C']:
        q.put(item)
        print(f"生产: {item}")

defconsumer(q):
    whileTrue:
        item = q.get()
        if item isNone:  # 终止信号
            break
        print(f"消费: {item}")

if __name__ == '__main__':
    q = Queue()
    p1 = Process(target=producer, args=(q,))
    p2 = Process(target=consumer, args=(q,))
    p1.start()
    p2.start()
    p1.join()
    q.put(None)  # 发送终止信号
    p2.join()

3.2.2 Pipe

multiprocessing.Pipe提供了一对连接对象,用于双向通信。

from multiprocessing import Process, Pipe

def worker(conn):
    conn.send("Hello from worker")
    print("Worker received:", conn.recv())
    conn.close()

if __name__ == '__main__':
    parent_conn, child_conn = Pipe()
    p = Process(target=worker, args=(child_conn,))
    p.start()
    print("Parent received:", parent_conn.recv())
    parent_conn.send("Hello from parent")
    p.join()

3.3 进程池

multiprocessing.Pool提供了进程池功能。

from multiprocessing import Pool

def cpu_intensive(n):
    return sum(i * i for i in range(n))

if __name__ == '__main__':
    with Pool(4as pool:  # 4个工作进程
        results = pool.map(cpu_intensive, range(1000010010))
        print(results)

进程池方法

  • map(func, iterable):并行处理可迭代对象
  • apply(func, args):同步执行函数
  • apply_async(func, args):异步执行函数
  • imap(func, iterable):惰性版本的map

4. 异步编程(asyncio)

4.1 协程基础

协程是轻量级的用户态线程,由事件循环调度。

import asyncio

asyncdeffetch_data(url):
    print(f"开始获取 {url}")
    await asyncio.sleep(2)  # 模拟IO操作
    print(f"完成获取 {url}")
    returnf"{url} 的数据"

asyncdefmain():
    task1 = asyncio.create_task(fetch_data("url1"))
    task2 = asyncio.create_task(fetch_data("url2"))
    
    results = await asyncio.gather(task1, task2)
    print(results)

asyncio.run(main())

关键概念

  • async def:定义协程函数
  • await:挂起协程,等待结果
  • asyncio.run():运行协程
  • create_task():调度协程执行
  • gather():并发运行多个协程

4.2 异步IO操作

使用aiohttp进行异步HTTP请求:

import aiohttp
import asyncio

asyncdeffetch_page(url):
    asyncwith aiohttp.ClientSession() as session:
        asyncwith session.get(url) as response:
            returnawait response.text()

asyncdefmain():
    urls = [
        "http://example.com",
        "http://example.org",
        "http://example.net"
    ]
    tasks = [fetch_page(url) for url in urls]
    pages = await asyncio.gather(*tasks)
    print(f"获取了 {len(pages)} 个页面")

asyncio.run(main())

异步编程要点

  • 避免在协程中使用阻塞调用
  • 使用专为asyncio设计的库(aiohttp, asyncpg等)
  • 合理控制并发数量
  • 注意异常处理

5. 并发编程举例

5.1 并发Web爬虫

import aiohttp
import asyncio
from urllib.parse import urljoin
from bs4 import BeautifulSoup

asyncdefcrawl(start_url, max_depth=2):
    visited = set()
    queue = [(start_url, 0)]
    
    asyncwith aiohttp.ClientSession() as session:
        while queue:
            url, depth = queue.pop(0)
            if url in visited or depth > max_depth:
                continue
                
            try:
                print(f"抓取: {url}")
                asyncwith session.get(url) as response:
                    html = await response.text()
                    visited.add(url)
                    
                    if depth < max_depth:
                        soup = BeautifulSoup(html, 'html.parser')
                        for link in soup.find_all('a', href=True):
                            next_url = urljoin(url, link['href'])
                            if next_url notin visited:
                                queue.append((next_url, depth + 1))
            except Exception as e:
                print(f"抓取失败 {url}{e}")

asyncio.run(crawl("http://example.com"))

5.2 实时数据处理管道

import threading
import queue
import random
import time

classDataPipeline:
    def__init__(self):
        self.raw_data_queue = queue.Queue()
        self.processed_data = []
        self.lock = threading.Lock()
    
    defdata_source(self):
        whileTrue:
            data = random.randint(1100)
            self.raw_data_queue.put(data)
            time.sleep(0.1)
    
    defdata_processor(self):
        whileTrue:
            data = self.raw_data_queue.get()
            time.sleep(0.2)
            result = data * 2
            
            withself.lock:
                self.processed_data.append(result)
                print(f"处理数据: {data} -> {result} (队列大小: {self.raw_data_queue.qsize()})")
    
    defstart(self):
        threads = [
            threading.Thread(target=self.data_source, daemon=True),
            threading.Thread(target=self.data_processor, daemon=True),
            threading.Thread(target=self.data_processor, daemon=True)
        ]
        
        for t in threads:
            t.start()
        
        try:
            whileTrue:
                time.sleep(1)
                withself.lock:
                    print(f"当前处理结果数: {len(self.processed_data)}")
        except KeyboardInterrupt:
            print("停止管道")

if __name__ == '__main__':
    pipeline = DataPipeline()
    pipeline.start()

6. 并发编程知识图谱



7. 并发编程实践

  1. 任务类型分析

    • CPU密集型:使用多进程
    • I/O密集型:使用多线程或协程
    • 高并发I/O:优先考虑协程
  2. 资源管理

    • 合理控制并发数量
    • 使用连接池管理资源
    • 及时释放资源(文件、网络连接等)
  3. 错误处理

    • 为每个任务添加超时机制
    • 记录详细的错误日志
    • 实现重试机制
  4. 性能优化

    • 避免不必要的锁
    • 减少进程/线程间通信
    • 使用批量处理代替频繁的小操作
  5. 调试技巧

    • 使用threading.current_thread().name标识线程
    • 使用multiprocessing.current_process().name标识进程
    • 使用日志记录执行流程

8. 常见问题

  1. 死锁问题

    • 现象:程序挂起,无响应
    • 解决:按固定顺序获取锁,使用带超时的锁
  2. 资源竞争

    • 现象:数据不一致或程序崩溃
    • 解决:使用适当的同步原语保护共享资源
  3. 性能瓶颈

    • 现象:增加并发数但性能不提升
    • 解决:分析系统资源使用情况,可能是I/O或外部服务限制
  4. 内存泄漏

    • 现象:内存使用持续增长
    • 解决:确保资源正确释放,使用内存分析工具
  5. 协程阻塞

    • 现象:异步程序响应慢
    • 解决:检查是否有同步阻塞调用,使用loop.run_in_executor处理阻塞操作

9. 进阶学习方向

  1. 分布式任务队列

    • Celery
    • RQ(Redis Queue)
  2. Actor模型

    • Pykka
    • Thespian
  3. 高性能并发框架

    • Tornado
    • Sanic
    • FastAPI(支持异步)
  4. 并行计算

    • Dask
    • Ray
    • PySpark
  5. 微服务与并发

    • 服务网格中的并发控制
    • 分布式锁
    • 限流与熔断

通过系统学习和实践这些并发编程技术,将能够构建高性能、高并发的Python应用程序。


阅读原文:原文链接


该文章在 2025/7/18 10:50:58 编辑过
关键字查询
相关文章
正在查询...
点晴ERP是一款针对中小制造业的专业生产管理软件系统,系统成熟度和易用性得到了国内大量中小企业的青睐。
点晴PMS码头管理系统主要针对港口码头集装箱与散货日常运作、调度、堆场、车队、财务费用、相关报表等业务管理,结合码头的业务特点,围绕调度、堆场作业而开发的。集技术的先进性、管理的有效性于一体,是物流码头及其他港口类企业的高效ERP管理信息系统。
点晴WMS仓储管理系统提供了货物产品管理,销售管理,采购管理,仓储管理,仓库管理,保质期管理,货位管理,库位管理,生产管理,WMS管理系统,标签打印,条形码,二维码管理,批号管理软件。
点晴免费OA是一款软件和通用服务都免费,不限功能、不限时间、不限用户的免费OA协同办公管理系统。
Copyright 2010-2025 ClickSun All Rights Reserved