跳转至

并发任务时应该怎么做

首先,我们应该知道什么是线程安全和线程不安全?

在多线程编程中,线程安全(Thread Safety)是一个非常重要的概念。它指的是多个线程同时访问同一个资源(如全局变量、静态变量等)时,不会因为线程执行的顺序不同而导致数据出错,程序表现出非预期行为或者程序崩溃。

线程安全/ 线程不安全

当一个函数或者一个数据结构在多线程环境下被调用时,如果能够保证运行结果正确并且没有出现运行错误,则称该函数或数据结构是线程安全的。实现线程安全的方法通常包括:

  • 互斥锁(Mutex):通过锁机制确保某一时刻只有一个线程可以访问共享资源。
  • 原子操作:利用处理器提供的原子操作来进行无锁的线程安全编程。
  • 不可变对象:创建后其状态不能改变的对象自然是线程安全的。
  • 线程局部存储:每个线程有自己的数据副本,避免了共享。

线程不安全则意味着程序中的某些部分在多线程环境下执行时,其行为是不可预测的,可能导致数据损坏、程序崩溃或者出现错误的计算结果。线程不安全通常发生在以下几种情况:

  • 共享数据:多个线程同时修改同一个数据。
  • 有状态的实体:对象或者函数使用了实例字段或静态变量,而这些字段或变量被多个线程共享。
  • 适当同步的缺失:在访问共享资源时没有适当的同步机制(如锁)。

代码示例

线程不安全的情况通常出现在多个线程共享同一个资源(如变量、数据结构、文件等)时,如果对这个资源的访问没有进行适当的同步控制,多个线程可能会同时读写该资源,导致数据不一致或程序崩溃。以下是一个示例,演示在没有线程安全机制保护下的线程不安全情况:

假设我们有一个简单的计数器类 Counter,它有一个方法 increment,每次调用时会将计数器加一。

Python
import threading

class Counter:
    def __init__(self):
        self.count = 0

    def increment(self):
        self.count += 1

counter = Counter()

def worker():
    for _ in range(100000):
        counter.increment()

threads = []
for i in range(10):
    thread = threading.Thread(target=worker)
    threads.append(thread)
    thread.start()

for thread in threads:
    thread.join()

print(f"Final count: {counter.count}")

在这个例子中,我们启动了10个线程,每个线程都对同一个 Counter 对象的 count 属性进行递增操作。如果线程是安全的,最终计数应该是 10 * 100000 = 1000000

然而,由于 increment 方法在多个线程中并行执行,而没有任何同步控制,因此可能会出现以下线程不安全的问题:

竞态条件:多个线程同时读取和修改 self.count,可能导致丢失更新。例如,一个线程读取 self.count 的值为 42,另一个线程也读取 self.count 的值为 42,然后两个线程都将 self.count 增加到 43,导致丢失一次更新。 数据不一致:最终的计数值可能小于预期的 1000000,因为多个线程可能在没有完成前一个更新操作时就开始了下一个更新操作。 运行上述代码,结果可能会有所不同,但通常会小于 1000000

如何修复线程不安全

  • threading.Lock(),我们可以通过使用锁来修复上述线程不安全的问题。以下是修复后的版本:
Python
import threading

class Counter:
    def __init__(self):
        self.count = 0
        self.lock = threading.Lock()

    def increment(self):
        with self.lock:
            self.count += 1

counter = Counter()

def worker():
    for _ in range(100000):
        counter.increment()

threads = []
for i in range(10):
    thread = threading.Thread(target=worker)
    threads.append(thread)
    thread.start()

for thread in threads:
    thread.join()

print(f"Final count: {counter.count}")

在这个修复版本中,increment 方法使用 with self.lock: 来确保 self.count += 1 操作是原子的,即不会被其他线程中断。这样可以保证每个线程都能正确地递增计数器,而不会导致数据不一致。

运行修复后的代码,最终计数值应该是 1000000

这个例子展示了如何使用锁来确保多个线程对共享资源的访问是线程安全的,从而避免竞态条件和数据不一致的问题。

  • threading.local 创建了一个用于存储线程本地数据的对象

threading.local() 能够保证线程安全是因为 threading.local 创建了一个用于存储线程本地数据的对象。这意味着在不同线程中访问 self.local 时,每个线程都会有自己独立的副本,互不干扰。具体来说:

线程本地存储:

threading.local() 创建的对象在每个线程中是独立的。即使多个线程同时访问同一个 self.local 对象,它们看到的也是各自线程中的版本,而不是共享的全局版本。 独立的数据副本:

每个线程对 self.local 进行读写操作时,都是在其私有的副本上进行操作。因此,一个线程中的操作不会影响到其他线程中的数据。这就确保了数据的独立性和线程安全。

以下是一些代码示例来解释这种机制:

Python
import threading

class ThreadLocalExample:
    def __init__(self):
        self.local = threading.local()

    def set_data(self, data):
        self.local.data = data

    def get_data(self):
        return getattr(self.local, 'data', None)

# 创建实例
example = ThreadLocalExample()

def thread_function(name, data):
    example.set_data(data)
    print(f"Thread {name}: {example.get_data()}")

# 创建多个线程
thread1 = threading.Thread(target=thread_function, args=("Thread-1", "Data-1"))
thread2 = threading.Thread(target=thread_function, args=("Thread-2", "Data-2"))

# 启动线程
thread1.start()
thread2.start()

# 等待线程完成
thread1.join()
thread2.join()

在上述示例中,thread1thread2 分别设置和获取 self.local.data。 由于 self.local 是线程本地存储对象,两个线程设置的 data 不会互相干扰,分别打印 Data-1Data-2