From 4d8a865a456ddb082e7ddbfa6aa332c7779da188 Mon Sep 17 00:00:00 2001 From: FiveMovesAhead Date: Fri, 6 Jun 2025 14:58:33 +0100 Subject: [PATCH] Replace ThreadPool with threads. --- tig-benchmarker/.env | 4 +- tig-benchmarker/docker-compose-slave.yml | 1 + tig-benchmarker/slave/config.json | 1 - tig-benchmarker/slave/main.py | 104 +++++++++++------------ 4 files changed, 54 insertions(+), 56 deletions(-) diff --git a/tig-benchmarker/.env b/tig-benchmarker/.env index 19a17ee0..ed054789 100644 --- a/tig-benchmarker/.env +++ b/tig-benchmarker/.env @@ -23,4 +23,6 @@ RESULTS_DIR=./results # Seconds for results to live TTL=300 # Name of the slave. Defaults to randomly generated name -SLAVE_NAME= \ No newline at end of file +SLAVE_NAME= +# How many worker threads to spawn in the slave container +NUM_WORKERS=8 \ No newline at end of file diff --git a/tig-benchmarker/docker-compose-slave.yml b/tig-benchmarker/docker-compose-slave.yml index 838e6009..58143729 100644 --- a/tig-benchmarker/docker-compose-slave.yml +++ b/tig-benchmarker/docker-compose-slave.yml @@ -23,6 +23,7 @@ services: - MASTER_PORT=${MASTER_PORT} - MASTER_IP=${MASTER_IP} - TTL=${TTL} + - NUM_WORKERS=${NUM_WORKERS} satisfiability: <<: *common diff --git a/tig-benchmarker/slave/config.json b/tig-benchmarker/slave/config.json index 106c654b..b3e1f1da 100644 --- a/tig-benchmarker/slave/config.json +++ b/tig-benchmarker/slave/config.json @@ -1,5 +1,4 @@ { - "max_workers": 100, "max_cost": 8.0, "algorithms": [ { diff --git a/tig-benchmarker/slave/main.py b/tig-benchmarker/slave/main.py index e8e224ee..c74ed73e 100644 --- a/tig-benchmarker/slave/main.py +++ b/tig-benchmarker/slave/main.py @@ -15,7 +15,7 @@ import sys import tarfile import time import zlib -from concurrent.futures import ThreadPoolExecutor +from queue import Queue from glob import glob from threading import Thread from common.structs import OutputData, MerkleProof @@ -24,7 +24,6 @@ from common.merkle_tree import MerkleTree, MerkleHash logger = logging.getLogger(os.path.splitext(os.path.basename(__file__))[0]) PENDING_BATCH_IDS = set() PROCESSING_BATCH_IDS = {} -PROCESSING_NONCES = {} READY_BATCH_IDS = set() FINISHED_BATCH_IDS = {} TOTAL_COST = [0] @@ -113,14 +112,14 @@ def run_tig_runtime(nonce, batch, so_path, ptx_path, results_dir): logger.debug(f"batch {batch['id']}, nonce {nonce} finished, took {now() - start}ms") -def compute_merkle_root(batch, results_dir): - start = now() - while True: - if batch["id"] not in PROCESSING_BATCH_IDS: - logger.info(f"batch {batch['id']} stopped") - return +def compute_merkle_roots(results_dir): + for batch_id in list(PROCESSING_BATCH_IDS): + job = PROCESSING_BATCH_IDS[batch_id] + batch = job["batch"] + start = job["start"] + q = job["q"] - num_processing = len(PROCESSING_BATCH_IDS[batch["id"]]) + num_processing = q.qsize() if num_processing > 0: logger.debug(f"batch {batch['id']} still processing {num_processing} nonces") time.sleep(1.5) @@ -161,6 +160,8 @@ def compute_merkle_root(batch, results_dir): finally: PROCESSING_BATCH_IDS.pop(batch["id"], None) return + + time.sleep(1) def purge_folders(output_path, ttl): @@ -271,7 +272,7 @@ def send_results(headers, master_ip, master_port, results_dir): time.sleep(2) -def process_batch(pool, algorithms_dir, config, results_dir): +def process_batch(algorithms_dir, config, results_dir): try: batch_id = PENDING_BATCH_IDS.pop() except KeyError: @@ -309,61 +310,49 @@ def process_batch(pool, algorithms_dir, config, results_dir): logger.error(f"Error processing batch {batch_id}: Algorithm {batch['settings']['algorithm_id']} does not match any regex in the config") return - PROCESSING_BATCH_IDS[batch_id] = set(range(batch["start_nonce"], batch["start_nonce"] + batch["num_nonces"])) + q = Queue() + for n in range(batch["start_nonce"], batch["start_nonce"] + batch["num_nonces"]): + q.put(n) so_path, ptx_path = download_library(algorithms_dir, batch) logger.info(f"batch {batch['id']} started") - pool.submit(compute_merkle_root, batch, results_dir) - - PROCESSING_NONCES[batch['id']] = { + PROCESSING_BATCH_IDS[batch_id] = { "batch": batch, "so_path": so_path, "ptx_path": ptx_path, - "current_nonce": batch["start_nonce"], + "q": q, + "finished": set(), "cost": c["cost"], + "start": now(), } -def process_nonces(pool, config, results_dir): - if len(PROCESSING_NONCES) == 0: - logger.debug("No pending nonces") - time.sleep(1) - return - - for batch_id in list(PROCESSING_NONCES): - job = PROCESSING_NONCES[batch_id] +def process_nonces(config, results_dir): + for batch_id in list(PROCESSING_BATCH_IDS): + job = PROCESSING_BATCH_IDS[batch_id] + q = job["q"] batch = job["batch"] so_path = job["so_path"] ptx_path = job["ptx_path"] cost = job["cost"] - - if batch["id"] not in PROCESSING_BATCH_IDS: - logger.info(f"batch {batch_id} stopped") - PROCESSING_NONCES.pop(batch_id, None) - break - - def process_nonce(nonce): - logger.debug(f"batch {batch_id}, nonce {nonce} started: (cost {cost})") + if TOTAL_COST[0] + cost <= config["max_cost"]: try: - run_tig_runtime(nonce, batch, so_path, ptx_path, results_dir) - except Exception as e: - logger.error(f"batch {batch_id}, nonce {nonce}, runtime error: {e}") - finally: - TOTAL_COST[0] -= cost - if batch_id in PROCESSING_BATCH_IDS: - PROCESSING_BATCH_IDS[batch_id].remove(nonce) - - while ( - job["current_nonce"] < batch["start_nonce"] + batch["num_nonces"] and - TOTAL_COST[0] + cost <= config["max_cost"] - ): - TOTAL_COST[0] += cost - pool.submit(process_nonce, job["current_nonce"]) - job["current_nonce"] += 1 - - if job["current_nonce"] >= batch["start_nonce"] + batch["num_nonces"]: - PROCESSING_NONCES.pop(batch_id, None) + nonce = q.get_nowait() + break + except: + continue + else: + time.sleep(1) + return - time.sleep(0.1) + TOTAL_COST[0] += cost + logger.debug(f"batch {batch_id}, nonce {nonce} started: (cost {cost})") + try: + run_tig_runtime(nonce, batch, so_path, ptx_path, results_dir) + except Exception as e: + logger.error(f"batch {batch_id}, nonce {nonce}, runtime error: {e}") + finally: + TOTAL_COST[0] -= cost + job["finished"].add(nonce) def poll_batches(headers, master_ip, master_port, results_dir): @@ -425,7 +414,8 @@ def main(): algorithms_dir = "algorithms" results_dir = "results" - ttl = int(os.getenv("TTL") or 300) + ttl = int(os.getenv("TTL")) + num_workers = int(os.getenv("NUM_WORKERS")) print(f"Starting slave with config:") print(f" Slave Name: {slave_name}") @@ -434,6 +424,7 @@ def main(): print(f" Algorithms Dir: {algorithms_dir}") print(f" Results Dir: {results_dir}") print(f" TTL: {ttl}") + print(f" WORKERS: {num_workers}") print(f" Config: {json.dumps(config, indent=2)}") os.makedirs(algorithms_dir, exist_ok=True) @@ -442,15 +433,20 @@ def main(): "User-Agent": slave_name } - pool = ThreadPoolExecutor(max_workers=config["max_workers"]) Thread( target=wrap_thread, - args=(process_batch, pool, algorithms_dir, config, results_dir) + args=(process_batch, algorithms_dir, config, results_dir) ).start() + for _ in range(num_workers): + Thread( + target=wrap_thread, + args=(process_nonces, config, results_dir) + ).start() + Thread( target=wrap_thread, - args=(process_nonces, pool, config, results_dir) + args=(compute_merkle_roots, results_dir) ).start() Thread(