Replace ThreadPool with threads.

This commit is contained in:
FiveMovesAhead 2025-06-06 14:58:33 +01:00
parent 96d275f56b
commit 4d8a865a45
4 changed files with 54 additions and 56 deletions

View File

@ -23,4 +23,6 @@ RESULTS_DIR=./results
# Seconds for results to live
TTL=300
# Name of the slave. Defaults to randomly generated name
SLAVE_NAME=
SLAVE_NAME=
# How many worker threads to spawn in the slave container
NUM_WORKERS=8

View File

@ -23,6 +23,7 @@ services:
- MASTER_PORT=${MASTER_PORT}
- MASTER_IP=${MASTER_IP}
- TTL=${TTL}
- NUM_WORKERS=${NUM_WORKERS}
satisfiability:
<<: *common

View File

@ -1,5 +1,4 @@
{
"max_workers": 100,
"max_cost": 8.0,
"algorithms": [
{

View File

@ -15,7 +15,7 @@ import sys
import tarfile
import time
import zlib
from concurrent.futures import ThreadPoolExecutor
from queue import Queue
from glob import glob
from threading import Thread
from common.structs import OutputData, MerkleProof
@ -24,7 +24,6 @@ from common.merkle_tree import MerkleTree, MerkleHash
logger = logging.getLogger(os.path.splitext(os.path.basename(__file__))[0])
PENDING_BATCH_IDS = set()
PROCESSING_BATCH_IDS = {}
PROCESSING_NONCES = {}
READY_BATCH_IDS = set()
FINISHED_BATCH_IDS = {}
TOTAL_COST = [0]
@ -113,14 +112,14 @@ def run_tig_runtime(nonce, batch, so_path, ptx_path, results_dir):
logger.debug(f"batch {batch['id']}, nonce {nonce} finished, took {now() - start}ms")
def compute_merkle_root(batch, results_dir):
start = now()
while True:
if batch["id"] not in PROCESSING_BATCH_IDS:
logger.info(f"batch {batch['id']} stopped")
return
def compute_merkle_roots(results_dir):
for batch_id in list(PROCESSING_BATCH_IDS):
job = PROCESSING_BATCH_IDS[batch_id]
batch = job["batch"]
start = job["start"]
q = job["q"]
num_processing = len(PROCESSING_BATCH_IDS[batch["id"]])
num_processing = q.qsize()
if num_processing > 0:
logger.debug(f"batch {batch['id']} still processing {num_processing} nonces")
time.sleep(1.5)
@ -161,6 +160,8 @@ def compute_merkle_root(batch, results_dir):
finally:
PROCESSING_BATCH_IDS.pop(batch["id"], None)
return
time.sleep(1)
def purge_folders(output_path, ttl):
@ -271,7 +272,7 @@ def send_results(headers, master_ip, master_port, results_dir):
time.sleep(2)
def process_batch(pool, algorithms_dir, config, results_dir):
def process_batch(algorithms_dir, config, results_dir):
try:
batch_id = PENDING_BATCH_IDS.pop()
except KeyError:
@ -309,61 +310,49 @@ def process_batch(pool, algorithms_dir, config, results_dir):
logger.error(f"Error processing batch {batch_id}: Algorithm {batch['settings']['algorithm_id']} does not match any regex in the config")
return
PROCESSING_BATCH_IDS[batch_id] = set(range(batch["start_nonce"], batch["start_nonce"] + batch["num_nonces"]))
q = Queue()
for n in range(batch["start_nonce"], batch["start_nonce"] + batch["num_nonces"]):
q.put(n)
so_path, ptx_path = download_library(algorithms_dir, batch)
logger.info(f"batch {batch['id']} started")
pool.submit(compute_merkle_root, batch, results_dir)
PROCESSING_NONCES[batch['id']] = {
PROCESSING_BATCH_IDS[batch_id] = {
"batch": batch,
"so_path": so_path,
"ptx_path": ptx_path,
"current_nonce": batch["start_nonce"],
"q": q,
"finished": set(),
"cost": c["cost"],
"start": now(),
}
def process_nonces(pool, config, results_dir):
if len(PROCESSING_NONCES) == 0:
logger.debug("No pending nonces")
time.sleep(1)
return
for batch_id in list(PROCESSING_NONCES):
job = PROCESSING_NONCES[batch_id]
def process_nonces(config, results_dir):
for batch_id in list(PROCESSING_BATCH_IDS):
job = PROCESSING_BATCH_IDS[batch_id]
q = job["q"]
batch = job["batch"]
so_path = job["so_path"]
ptx_path = job["ptx_path"]
cost = job["cost"]
if batch["id"] not in PROCESSING_BATCH_IDS:
logger.info(f"batch {batch_id} stopped")
PROCESSING_NONCES.pop(batch_id, None)
break
def process_nonce(nonce):
logger.debug(f"batch {batch_id}, nonce {nonce} started: (cost {cost})")
if TOTAL_COST[0] + cost <= config["max_cost"]:
try:
run_tig_runtime(nonce, batch, so_path, ptx_path, results_dir)
except Exception as e:
logger.error(f"batch {batch_id}, nonce {nonce}, runtime error: {e}")
finally:
TOTAL_COST[0] -= cost
if batch_id in PROCESSING_BATCH_IDS:
PROCESSING_BATCH_IDS[batch_id].remove(nonce)
while (
job["current_nonce"] < batch["start_nonce"] + batch["num_nonces"] and
TOTAL_COST[0] + cost <= config["max_cost"]
):
TOTAL_COST[0] += cost
pool.submit(process_nonce, job["current_nonce"])
job["current_nonce"] += 1
if job["current_nonce"] >= batch["start_nonce"] + batch["num_nonces"]:
PROCESSING_NONCES.pop(batch_id, None)
nonce = q.get_nowait()
break
except:
continue
else:
time.sleep(1)
return
time.sleep(0.1)
TOTAL_COST[0] += cost
logger.debug(f"batch {batch_id}, nonce {nonce} started: (cost {cost})")
try:
run_tig_runtime(nonce, batch, so_path, ptx_path, results_dir)
except Exception as e:
logger.error(f"batch {batch_id}, nonce {nonce}, runtime error: {e}")
finally:
TOTAL_COST[0] -= cost
job["finished"].add(nonce)
def poll_batches(headers, master_ip, master_port, results_dir):
@ -425,7 +414,8 @@ def main():
algorithms_dir = "algorithms"
results_dir = "results"
ttl = int(os.getenv("TTL") or 300)
ttl = int(os.getenv("TTL"))
num_workers = int(os.getenv("NUM_WORKERS"))
print(f"Starting slave with config:")
print(f" Slave Name: {slave_name}")
@ -434,6 +424,7 @@ def main():
print(f" Algorithms Dir: {algorithms_dir}")
print(f" Results Dir: {results_dir}")
print(f" TTL: {ttl}")
print(f" WORKERS: {num_workers}")
print(f" Config: {json.dumps(config, indent=2)}")
os.makedirs(algorithms_dir, exist_ok=True)
@ -442,15 +433,20 @@ def main():
"User-Agent": slave_name
}
pool = ThreadPoolExecutor(max_workers=config["max_workers"])
Thread(
target=wrap_thread,
args=(process_batch, pool, algorithms_dir, config, results_dir)
args=(process_batch, algorithms_dir, config, results_dir)
).start()
for _ in range(num_workers):
Thread(
target=wrap_thread,
args=(process_nonces, config, results_dir)
).start()
Thread(
target=wrap_thread,
args=(process_nonces, pool, config, results_dir)
args=(compute_merkle_roots, results_dir)
).start()
Thread(