with構文でバックグランドのmultiprocessing
· 約10分
with構文でbackgroundでmultiprocessingを起動したい
コード
import multiprocessing
import time
import random
import os
import signal
import logging
from typing import List, Dict, Any, Callable, Optional, TypeVar, Generic, Union, Tuple
import uuid
from threading import Lock, Event
from string import ascii_uppercase
from datetime import datetime
from contextlib import contextmanager
from dataclasses import dataclass
# ロガー設定
logger = logging.getLogger(__name__)
@dataclass
class TaskResult:
"""タスク実行結果を格納するデータクラス"""
task_id: str
success: bool
result: Any = None
error: Exception = None
processing_time: float = 0.0
worker_id: str = ""
class TaskTimeoutError(Exception):
"""タスク実行がタイムアウトした場合の例外"""
pass
@contextmanager
def timeout_handler(seconds: int, task_id: str):
"""指定秒数後にTimeoutErrorを発生させるコンテキストマネージャー"""
def handle_timeout(signum, frame):
raise TaskTimeoutError(f"Task {task_id} timed out after {seconds} seconds")
original_handler = signal.getsignal(signal.SIGALRM)
try:
signal.signal(signal.SIGALRM, handle_timeout)
signal.alarm(seconds)
yield
finally:
signal.alarm(0)
signal.signal(signal.SIGALRM, original_handler)
def process_in_lock(worker_id: str) -> float:
"""
ロック内で実行する処理
Args:
worker_id: ワーカーID
Returns:
処理時間
"""
logger.debug(f"[{worker_id}] Start Processing...")
start_time = time.time()
t = random.uniform(0.1, 0.5)
time.sleep(t)
elapsed = time.time() - start_time
logger.debug(f"[{worker_id}] done! processing time: {elapsed:.2f} sec")
return elapsed
def process_outside_lock(
timeout: Optional[int] = None,
) -> Tuple[bool, Union[float, Exception]]:
"""
ロック外で実行する処理(オプションでタイムアウト指定可能)
Args:
timeout: 処理タイムアウト秒数(Noneの場合はタイムアウトなし)
Returns:
(成功フラグ, 処理時間または例外)
"""
task_id = uuid.uuid4().hex[:8]
start_time = time.time()
try:
if timeout:
with timeout_handler(timeout, task_id):
t = random.uniform(1, 5)
time.sleep(t)
else:
t = random.uniform(1, 5)
time.sleep(t)
elapsed = time.time() - start_time
return True, elapsed
except Exception as e:
return False, e
def worker_function(
task_queue: multiprocessing.Queue,
result_queue: multiprocessing.Queue,
lock: Lock,
stop_event: Event,
wait_event: Event,
worker_id: str,
max_retries: int = 3,
task_timeout: Optional[int] = None,
):
"""
ワーカー関数:タスクキューからタスクを取得して処理し、結果を結果キューに格納
Args:
task_queue: タスクキュー
result_queue: 結果キュー
lock: タスク取得用共有ロック
stop_event: 停止シグナル
wait_event: 待機 シグナル
worker_id: ワーカーID
max_retries: 最大リトライ回数
task_timeout: タスク実行タイムアウト秒数
"""
logger.info(f"Worker {worker_id} started")
try:
while not stop_event.is_set():
task = None
# タスク取得
with lock:
if not task_queue.empty():
try:
task = task_queue.get(block=False)
lock_time = process_in_lock(worker_id)
except Exception as e:
logger.error(f"[{worker_id}] Error getting task: {e}")
continue
elif wait_event.is_set():
logger.info(
f"[{worker_id}] No tasks available and wait flag set, stopping worker."
)
break
# タスク処理
if task is not None:
task_id = str(task)
logger.info(f"[{worker_id}] Processing task: {task_id}")
retries = 0
while retries <= max_retries:
try:
success, result = process_outside_lock(task_timeout)
if success:
processing_time = result
logger.info(
f"[{worker_id}] Task {task_id} completed in {processing_time:.2f}s"
)
result_queue.put(
TaskResult(
task_id=task_id,
success=True,
result=task,
processing_time=processing_time,
worker_id=worker_id,
)
)
break
else:
error = result
logger.warning(
f"[{worker_id}] Task {task_id} failed: {error}"
)
retries += 1
if retries > max_retries:
logger.error(
f"[{worker_id}] Task {task_id} failed after {max_retries} retries"
)
result_queue.put(
TaskResult(
task_id=task_id,
success=False,
error=error,
worker_id=worker_id,
)
)
else:
logger.info(
f"[{worker_id}] Retrying task {task_id} (attempt {retries}/{max_retries})"
)
time.sleep(0.5 * retries) # バックオフ待機
except Exception as e:
logger.exception(
f"[{worker_id}] Unexpected error processing task {task_id}: {e}"
)
result_queue.put(
TaskResult(
task_id=task_id,
success=False,
error=e,
worker_id=worker_id,
)
)
break
else:
logger.debug(f"[{worker_id}] No task to process, waiting...")
time.sleep(2) # ポーリング間隔を短くして応答性を向上
except Exception as e:
logger.exception(f"[{worker_id}] Worker crashed with error: {e}")
finally:
logger.info(f"[{worker_id}] Worker shutting down")
T = TypeVar("T")
R = TypeVar("R")
class BackgroundWorker(Generic[T, R]):
"""
バックグラウンドでタスクを処理するマルチプロセスワーカーシステム
"""
def __init__(
self,
func: Callable,
tasks: Optional[multiprocessing.Queue] = None,
results: Optional[multiprocessing.Queue] = None,
manager: Optional[multiprocessing.Manager] = None, # type: ignore
lock: Optional[Lock] = None,
num_workers: int = 2,
wait_time_for_join: float = 1.2,
max_retries: int = 3,
task_timeout: Optional[int] = None,
log_level: int = logging.INFO,
):
"""
初期化
Args:
func: ワーカー関数
tasks: タスクキュー(Noneの場合は新規作成)
results: 結果キュー(Noneの場合は新規作成)
manager: マルチプロセスマネージャ(Noneの場合は新規作成)
lock: 共有ロック(Noneの場合は新規作成)
num_workers: ワーカープロセス数
wait_time_for_join: join時の待機時間
max_retries: タスク実行の最大リトライ回数
task_timeout: タスク実行タイムアウト秒数
log_level: ロギングレベル
"""
# ロギング設定
self._setup_logging(log_level)
# 基本設定
self.func = func
self.num_workers = num_workers
self.wait_time_for_join = wait_time_for_join
self.max_retries = max_retries
self.task_timeout = task_timeout
self.processes: List[multiprocessing.Process] = []
self._start_time = datetime.now()
# マルチプロセス関連オブジェクト初期化
self.manager = manager if manager else multiprocessing.Manager()
self.tasks = tasks if tasks is not None else multiprocessing.Queue()
self.results = results if results is not None else multiprocessing.Queue()
self.lock = lock if lock is not None else self.manager.Lock()
# メトリクス
self._metrics = {
"tasks_submitted": 0,
"tasks_completed": 0,
"tasks_failed": 0,
"avg_processing_time": 0.0,
"total_processing_time": 0.0,
}
logger.info(f"BackgroundWorker initialized with {num_workers} workers")
def _setup_logging(self, log_level: int):
"""ロギング設定"""
handler = logging.StreamHandler()
formatter = logging.Formatter(
"%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
handler.setFormatter(formatter)
logger.addHandler(handler)
logger.setLevel(log_level)
def __enter__(self):
"""コンテキストマネージャー開始処理"""
self.stop_event = self.manager.Event()
self.wait_event = self.manager.Event()
self._start_time = datetime.now()
# ワーカープロセス起動
for i in range(self.num_workers):
if i < len(ascii_uppercase):
worker_id = ascii_uppercase[i]
else:
worker_id = f"W{i}"
worker = multiprocessing.Process(
target=self.func,
args=(
self.tasks,
self.results,
self.lock,
self.stop_event,
self.wait_event,
worker_id,
self.max_retries,
self.task_timeout,
),
name=f"Worker-{worker_id}",
daemon=True, # デーモンプロセスに設定して、メインプロセス終了時に強制終了
)
worker.start()
self.processes.append(worker)
logger.info(f"Started worker process {worker_id} (PID: {worker.pid})")
return self
def __exit__(self, exc_type, exc_value, traceback):
"""コンテキストマネージャー終了処理"""
logger.info("BackgroundWorker context exiting")
self.wait() # キューのタスクが全て処理されるまで待機
self.join() # ワーカープロセスの終了を待機
self.collect_results() # 残りの結果を収集
self.report_metrics() # メトリクスレポート出力
# ファイナライズ
self.manager.shutdown()
logger.info(
f"BackgroundWorker shutdown complete after {datetime.now() - self._start_time}"
)
def join(self, timeout: Optional[float] = None):
"""
全ワーカープロセスの終了を待機
Args:
timeout: 各プロセスのjoin時のタイムアウト(秒)
"""
logger.info("Joining worker processes...")
remaining_processes = len(self.processes)
timeout = timeout or self.wait_time_for_join
while self.processes:
new_processes = []
for p in self.processes:
if p.is_alive():
p.join(timeout=timeout)
if p.is_alive():
new_processes.append(p)
else:
logger.info(f"Worker {p.name} (PID: {p.pid}) terminated")
else:
p.join()
logger.info(f"Worker {p.name} (PID: {p.pid}) already terminated")
# プロセス数が減っていなければタイムアウトを増やす
if len(new_processes) == remaining_processes:
timeout *= 1.5
remaining_processes = len(new_processes)
self.processes = new_processes
if new_processes:
logger.info(
f"Still waiting for {len(new_processes)} worker processes to terminate"
)
time.sleep(0.5)
def add_task(self, task: T) -> None:
"""
タスクキューにタスクを追加
Args:
task: 追加するタスク
"""
self.tasks.put(task)
self._metrics["tasks_submitted"] += 1
logger.debug(f"Added task to queue: {task}")
def pop_task(self) -> Optional[T]:
"""
タスクキューからタス クを取得
Returns:
取得したタスク、キューが空の場合はNone
"""
if not self.tasks.empty():
return self.tasks.get()
return None
def stop(self):
"""ワーカープロセスに停止シグナルを送信"""
logger.info("Sending stop signal to all workers")
self.stop_event.set()
def wait(self):
"""全てのタスクが処理されるまで待機するシグナルを送信"""
logger.info("Sending wait signal to workers")
self.wait_event.set()
def collect_results(self) -> List[TaskResult]:
"""
結果キューから全ての結果を収集
Returns:
TaskResultのリスト
"""
results = []
while not self.results.empty():
try:
result = self.results.get(block=False)
results.append(result)
# メトリクス更新
if result.success:
self._metrics["tasks_completed"] += 1
self._metrics["total_processing_time"] += result.processing_time
else:
self._metrics["tasks_failed"] += 1
except Exception as e:
logger.error(f"Error collecting results: {e}")
# 平均処理時間計算
completed_tasks = self._metrics["tasks_completed"]
if completed_tasks > 0:
self._metrics["avg_processing_time"] = (
self._metrics["total_processing_time"] / completed_tasks
)
return results
def report_metrics(self):
"""現在のメトリクス情報をログに出力"""
uptime = datetime.now() - self._start_time
logger.info("=== Worker Performance Metrics ===")
logger.info(f"Total uptime: {uptime}")
logger.info(f"Tasks submitted: {self._metrics['tasks_submitted']}")
logger.info(f"Tasks completed: {self._metrics['tasks_completed']}")
logger.info(f"Tasks failed: {self._metrics['tasks_failed']}")
completion_rate = 0
if self._metrics["tasks_submitted"] > 0:
completion_rate = (
self._metrics["tasks_completed"]
/ self._metrics["tasks_submitted"]
* 100
)
logger.info(f"Completion rate: {completion_rate:.1f}%")
logger.info(
f"Average processing time: {self._metrics['avg_processing_time']:.2f}s"
)
@property
def metrics(self) -> Dict[str, Any]:
"""現在のメトリクス情報を取得"""
# 最新の結果を収集してメトリクス更新
self.collect_results()
return self._metrics.copy()
if __name__ == "__main__":
# ロギングの設定
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
)
num_workers = 4
num_tasks = 20
wait_time_between_tasks = 5
logger.info(f"Starting BackgroundWorker demo with {num_workers} workers")
try:
with BackgroundWorker(
func=worker_function,
num_workers=num_workers,
task_timeout=10, # 10秒でタイムアウト
max_retries=2, # 最大2回リトライ
log_level=logging.INFO,
) as worker:
# タスク追加
logger.info(f"Adding {num_tasks} tasks to the queue")
for i in range(num_tasks):
worker.add_task(i)
time.sleep(wait_time_between_tasks) # 一部のタスク間に遅延を入れる
# タスク完了まで待機
logger.info("Waiting for tasks to complete")
worker.wait()
# 結果収集
results = worker.collect_results()
logger.info(f"Collected {len(results)} results")
# メトリクス表示
metrics = worker.metrics
logger.info(
f"Final completion rate: {metrics['tasks_completed'] / num_tasks * 100:.1f}%"
)
except KeyboardInterrupt:
logger.warning("Process interrupted by user")
except Exception as e:
logger.exception(f"An error occurred: {e}")
finally:
logger.info("Demo complete")
出力
2025-03-31 23:18:07,400 - __main__ - INFO - Starting BackgroundWorker demo with 4 workers
2025-03-31 23:18:07,417 - __main__ - INFO - BackgroundWorker initialized with 4 workers
2025-03-31 23:18:07,417 - __main__ - INFO - BackgroundWorker initialized with 4 workers
2025-03-31 23:18:07,420 - __main__ - INFO - Started worker process A (PID: 2670819)
2025-03-31 23:18:07,420 - __main__ - INFO - Started worker process A (PID: 2670819)
2025-03-31 23:18:07,421 - __main__ - INFO - Started worker process B (PID: 2670821)
2025-03-31 23:18:07,421 - __main__ - INFO - Started worker process B (PID: 2670821)
2025-03-31 23:18:07,421 - __main__ - INFO - Worker A started
2025-03-31 23:18:07,421 - __main__ - INFO - Worker A started
2025-03-31 23:18:07,422 - __main__ - INFO - Started worker process C (PID: 2670825)
2025-03-31 23:18:07,422 - __main__ - INFO - Started worker process C (PID: 2670825)
2025-03-31 23:18:07,422 - __main__ - INFO - Worker B started
2025-03-31 23:18:07,422 - __main__ - INFO - Worker B started
2025-03-31 23:18:07,422 - __main__ - INFO - Started worker process D (PID: 2670830)
2025-03-31 23:18:07,422 - __main__ - INFO - Started worker process D (PID: 2670830)
2025-03-31 23:18:07,423 - __main__ - INFO - Adding 20 tasks to the queue
2025-03-31 23:18:07,423 - __main__ - INFO - Adding 20 tasks to the queue
2025-03-31 23:18:07,423 - __main__ - INFO - Worker C started
2025-03-31 23:18:07,423 - __main__ - INFO - Worker C started
2025-03-31 23:18:07,424 - __main__ - INFO - Worker D started
2025-03-31 23:18:07,424 - __main__ - INFO - Worker D started
2025-03-31 23:18:07,620 - __main__ - INFO - [C] Processing task: 0
2025-03-31 23:18:07,620 - __main__ - INFO - [C] Processing task: 0
2025-03-31 23:18:11,229 - __main__ - INFO - [C] Task 0 completed in 3.61s
2025-03-31 23:18:11,229 - __main__ - INFO - [C] Task 0 completed in 3.61s
2025-03-31 23:18:13,366 - __main__ - INFO - [C] Processing task: 1
2025-03-31 23:18:13,366 - __main__ - INFO - [C] Processing task: 1
2025-03-31 23:18:15,170 - __main__ - INFO - [C] Task 1 completed in 1.80s
2025-03-31 23:18:15,170 - __main__ - INFO - [C] Task 1 completed in 1.80s
2025-03-31 23:18:17,891 - __main__ - INFO - [A] Processing task: 2
2025-03-31 23:18:17,891 - __main__ - INFO - [A] Processing task: 2
...