我想在 main 和它建立的线程间共享数据,在线程中执行逻辑,更新数据,主线程中读取判断。
1
smdbh OP 补充下,我当前是将 dataclass 的结构当参数传入 threading.Thead 的参数中,这个操作是否有问题
|
2
djangovcps 83 天前
threading.lock ?
|
3
qianchengv 83 天前
```python
import threading import time from concurrent.futures import ThreadPoolExecutor import unittest from dataclasses import dataclass, field from threading import Lock import multiprocessing @dataclass class SharedData: value: int = 0 # Using a Lock to ensure thread-safety when accessing shared data lock: Lock = field(default_factory=Lock, init=False, repr=False) def increment(self): with self.lock: self.value += 1 def get_value(self): with self.lock: return self.value def worker(data: SharedData, num_iterations: int): local_sum = 0 for _ in range(num_iterations): local_sum += 1 # Use a lock to safely update the shared data with data.lock: data.value += local_sum class TestSharedDataThreadSafety(unittest.TestCase): def test_concurrent_increments(self): shared_data = SharedData() # Use 2x CPU count for threads to test both CPU-bound and I/O-bound scenarios num_threads = multiprocessing.cpu_count() * 2 num_iterations = 1000000 // num_threads with ThreadPoolExecutor(max_workers=num_threads) as executor: futures = [executor.submit(worker, shared_data, num_iterations) for _ in range(num_threads)] for future in futures: future.result() expected_value = num_threads * num_iterations self.assertEqual(shared_data.get_value(), expected_value, f"Expected {expected_value}, but got {shared_data.get_value()}") def test_race_condition(self): shared_data = SharedData() race_detected = threading.Event() def racer(): with shared_data.lock: initial_value = shared_data.value time.sleep(0.001) # Simulate some work # Check if the value has changed, which would indicate a race condition if initial_value == shared_data.value: shared_data.value += 1 else: race_detected.set() threads = [threading.Thread(target=racer) for _ in range(100)] for t in threads: t.start() for t in threads: t.join() self.assertFalse(race_detected.is_set(), "Race condition detected") def test_stress_test(self): shared_data = SharedData() stop_flag = threading.Event() def stress_worker(): local_sum = 0 while not stop_flag.is_set(): local_sum += 1 # Use a lock to safely update the shared data after intensive local computation with shared_data.lock: shared_data.value += local_sum # Use CPU count for threads to maximize resource utilization threads = [threading.Thread(target=stress_worker) for _ in range(multiprocessing.cpu_count())] for t in threads: t.start() time.sleep(5) # Run for 5 seconds to simulate prolonged stress stop_flag.set() for t in threads: t.join() print(f"Stress test final value: {shared_data.get_value()}") if __name__ == '__main__': unittest.main() ``` |
4
ClericPy 83 天前
show me your code?
|
5
milkpuff 82 天前
修改 data.a, data 的 id 不会变。
|
6
UN2758 47 天前
@qianchengv 佬,你这大段的代码还不如贴一个 gist 链接呢,比这个方便阅读多了
|
7
UN2758 47 天前
多线程,要么加锁,要么用 queue 之类的库,至于你的问题二,地址会移动可能是因为对象大小改变导致的重新分配地址
|