From 7ff38256f62591dd9ad0dfd062577c1570284b49 Mon Sep 17 00:00:00 2001 From: FiveMovesAhead Date: Wed, 4 Jun 2025 11:42:37 +0100 Subject: [PATCH] Update slave to use separate challenge docker architecture. --- tig-benchmarker/.env | 16 +- ...-compose.yml => docker-compose-master.yml} | 0 tig-benchmarker/docker-compose-slave.yml | 60 +++++++ tig-benchmarker/slave/Dockerfile | 14 ++ tig-benchmarker/{ => slave}/config.json | 4 +- tig-benchmarker/{slave.py => slave/main.py} | 157 +++++++----------- tig-benchmarker/slave/requirements.txt | 4 + 7 files changed, 154 insertions(+), 101 deletions(-) rename tig-benchmarker/{docker-compose.yml => docker-compose-master.yml} (100%) create mode 100644 tig-benchmarker/docker-compose-slave.yml create mode 100644 tig-benchmarker/slave/Dockerfile rename tig-benchmarker/{ => slave}/config.json (68%) rename tig-benchmarker/{slave.py => slave/main.py} (73%) create mode 100644 tig-benchmarker/slave/requirements.txt diff --git a/tig-benchmarker/.env b/tig-benchmarker/.env index 31520d19..c56faad8 100644 --- a/tig-benchmarker/.env +++ b/tig-benchmarker/.env @@ -1,7 +1,21 @@ +# Set to 1 to enable verbose logging +VERBOSE=1 + POSTGRES_USER=postgres POSTGRES_PASSWORD=mysecretpassword POSTGRES_DB=postgres UI_PORT=80 DB_PORT=5432 + MASTER_PORT=5115 -VERBOSE= \ No newline at end of file +# Defaults to 0.0.0.0 +MASTER_IP= + +# Directory for slave to download algorithms. Defaults to ./lib +ALGORITHMS_DIR= +# Directory for slave to store results. Defaults to ./results +RESULTS_DIR= +# Seconds for results to live. Defaults to 300s +TTL= +# Name of the slave. Defaults to randomly generated name +SLAVE_NAME= \ No newline at end of file diff --git a/tig-benchmarker/docker-compose.yml b/tig-benchmarker/docker-compose-master.yml similarity index 100% rename from tig-benchmarker/docker-compose.yml rename to tig-benchmarker/docker-compose-master.yml diff --git a/tig-benchmarker/docker-compose-slave.yml b/tig-benchmarker/docker-compose-slave.yml new file mode 100644 index 00000000..b737dd6c --- /dev/null +++ b/tig-benchmarker/docker-compose-slave.yml @@ -0,0 +1,60 @@ +version: "3.8" + +x-common: &common + volumes: + - ./:/app + command: ["sleep", "infinity"] + +x-common-gpu: &common-gpu + deploy: + resources: + reservations: + devices: + - driver: nvidia + count: all + capabilities: [gpu] + +services: + slave: + build: + context: ./ + dockerfile: ./slave/Dockerfile + volumes: + - /var/run/docker.sock:/var/run/docker.sock + - ./:/app + environment: + - VERBOSE=${VERBOSE} + - SLAVE_NAME=${SLAVE_NAME} + - MASTER_PORT=${MASTER_PORT} + - MASTER_IP=${MASTER_IP} + - ALGORITHMS_DIR=${ALGORITHMS_DIR} + - RESULTS_DIR=${RESULTS_DIR} + - TTL=${TTL} + command: ["python", "slave/main.py", "slave/config.json"] + + satisfiability: + <<: *common + image: ghcr.io/tig-foundation/tig-monorepo/satisfiability/runtime:0.0.1 + container_name: satisfiability + + vehicle_routing: + <<: *common + image: ghcr.io/tig-foundation/tig-monorepo/vehicle_routing/runtime:0.0.1 + container_name: vehicle_routing + + knapsack: + <<: *common + image: ghcr.io/tig-foundation/tig-monorepo/knapsack/runtime:0.0.1 + container_name: knapsack + + vector_search: + <<: *common + <<: *common-gpu + image: ghcr.io/tig-foundation/tig-monorepo/vector_search/runtime:0.0.1 + container_name: vector_search + + hypergraph: + <<: *common + <<: *common-gpu + image: ghcr.io/tig-foundation/tig-monorepo/hypergraph/runtime:0.0.1 + container_name: hypergraph \ No newline at end of file diff --git a/tig-benchmarker/slave/Dockerfile b/tig-benchmarker/slave/Dockerfile new file mode 100644 index 00000000..06665ce3 --- /dev/null +++ b/tig-benchmarker/slave/Dockerfile @@ -0,0 +1,14 @@ +FROM ubuntu:24.04 + +WORKDIR /app + +RUN apt update && apt install -y curl python3 python3-pip +COPY slave/requirements.txt requirements.txt +RUN pip3 install -r requirements.txt --break-system-packages --no-cache-dir + +RUN apt-get update && apt-get install -y --no-install-recommends \ + ca-certificates curl gnupg lsb-release \ + && curl -fsSL https://download.docker.com/linux/debian/gpg | gpg --dearmor -o /usr/share/keyrings/docker-archive-keyring.gpg \ + && echo "deb [arch=amd64 signed-by=/usr/share/keyrings/docker-archive-keyring.gpg] https://download.docker.com/linux/debian $(lsb_release -cs) stable" > /etc/apt/sources.list.d/docker.list \ + && apt-get update && apt-get install -y docker-ce \ + && apt-get clean && rm -rf /var/lib/apt/lists/* \ No newline at end of file diff --git a/tig-benchmarker/config.json b/tig-benchmarker/slave/config.json similarity index 68% rename from tig-benchmarker/config.json rename to tig-benchmarker/slave/config.json index 2563afff..106c654b 100644 --- a/tig-benchmarker/config.json +++ b/tig-benchmarker/slave/config.json @@ -1,10 +1,10 @@ { "max_workers": 100, + "max_cost": 8.0, "algorithms": [ { "id_regex": ".*", - "cpu": 1.0, - "gpu": 0.0 + "cost": 1.0 } ] } \ No newline at end of file diff --git a/tig-benchmarker/slave.py b/tig-benchmarker/slave/main.py similarity index 73% rename from tig-benchmarker/slave.py rename to tig-benchmarker/slave/main.py index ecbcfc51..c8ac0961 100644 --- a/tig-benchmarker/slave.py +++ b/tig-benchmarker/slave/main.py @@ -27,7 +27,7 @@ PROCESSING_BATCH_IDS = set() PROCESSING_NONCES = {} READY_BATCH_IDS = set() FINISHED_BATCH_IDS = {} -TOTAL_USAGE = {"cpu": 0, "gpu": 0} +TOTAL_COST = [0] if (CPU_ARCH := platform.machine().lower()) in ["x86_64", "amd64"]: CPU_ARCH = "amd64" elif CPU_ARCH in ["arm64", "aarch64"]: @@ -36,29 +36,11 @@ else: print(f"Unsupported CPU architecture: {CPU_ARCH}") sys.exit(1) -HAS_GPU = subprocess.run(["which", "nvidia-smi"], capture_output=True).returncode == 0 -if (VISIBLE_CPUS := os.environ.get("CPU_VISIBLE_CORES", None)) is None: - VISIBLE_CPUS = list(os.sched_getaffinity(0)) -else: - VISIBLE_CPUS = list(map(int, VISIBLE_CPUS.split(","))) - os.sched_setaffinity(0, VISIBLE_CPUS) - -if not HAS_GPU: - VISIBLE_GPUS = [] -elif (VISIBLE_GPUS := os.environ.get("CUDA_VISIBLE_DEVICES", None)) is None: - VISIBLE_GPUS = [ - int(match.group(1)) - for line in subprocess.check_output(["nvidia-smi", "-L"]).decode("utf-8").splitlines() - if (match := re.match(r'^GPU (\d+):', line)) is not None - ] -else: - VISIBLE_GPUS = list(map(int, VISIBLE_GPUS.split(","))) - def now(): return int(time.time() * 1000) -def download_library(downloads_folder, batch): - challenge_folder = f"{downloads_folder}/{batch['challenge']}" +def download_library(algorithms_dir, batch): + challenge_folder = f"{algorithms_dir}/{batch['challenge']}" so_path = f"{challenge_folder}/{CPU_ARCH}/{batch['algorithm']}.so" ptx_path = f"{challenge_folder}/ptx/{batch['algorithm']}.ptx" if not os.path.exists(so_path): @@ -73,21 +55,19 @@ def download_library(downloads_folder, batch): if not os.path.exists(ptx_path): return so_path, None - elif not HAS_GPU: - raise Exception(f"Algorithm {batch['algorithm']} requires GPU support, but GPU not found") else: return so_path, ptx_path -def run_tig_runtime(nonce, tig_runtime_path, batch, so_path, ptx_path, output_path): - output_file = f"{output_path}/{batch['id']}/{nonce}.json" +def run_tig_runtime(nonce, batch, so_path, ptx_path, results_dir): + output_file = f"{results_dir}/{batch['id']}/{nonce}.json" if os.path.exists(output_file): logger.info(f"batch {batch['id']}, nonce {nonce}: already computed") return start = now() cmd = [ - tig_runtime_path, + "docker", "exec", batch["challenge"], "tig-runtime", json.dumps(batch["settings"]), batch["rand_hash"], str(nonce), @@ -98,7 +78,6 @@ def run_tig_runtime(nonce, tig_runtime_path, batch, so_path, ptx_path, output_pa if ptx_path is not None: cmd += [ "--ptx", ptx_path, - "--gpu", str(nonce % len(VISIBLE_GPUS)), ] logger.debug(f"computing batch: {' '.join(cmd)}") process = subprocess.Popen( @@ -140,7 +119,7 @@ def run_tig_runtime(nonce, tig_runtime_path, batch, so_path, ptx_path, output_pa logger.debug(f"batch {batch['id']}, nonce {nonce} finished, took {now() - start}ms") -def compute_merkle_root(batch, output_path): +def compute_merkle_root(batch, results_dir): start = now() while True: if batch["id"] not in PROCESSING_BATCH_IDS: @@ -149,7 +128,7 @@ def compute_merkle_root(batch, output_path): processing_nonces = set( n for n in range(batch["start_nonce"], batch["start_nonce"] + batch["num_nonces"]) - if not os.path.exists(f"{output_path}/{batch['id']}/{n}.json") + if not os.path.exists(f"{results_dir}/{batch['id']}/{n}.json") ) if len(processing_nonces) > 0: logger.debug(f"batch {batch['id']} still processing nonces: {processing_nonces}") @@ -159,17 +138,17 @@ def compute_merkle_root(batch, output_path): hashes = [] solution_nonces = [] for n in range(batch["start_nonce"], batch["start_nonce"] + batch["num_nonces"]): - with open(f"{output_path}/{batch['id']}/{n}.json", "r") as f: + with open(f"{results_dir}/{batch['id']}/{n}.json", "r") as f: d = OutputData.from_dict(json.load(f)) if len(d.solution) > 0: solution_nonces.append(n) hashes.append(d.to_merkle_hash()) merkle_tree = MerkleTree(hashes, batch["batch_size"]) - with open(f"{output_path}/{batch['id']}/hashes.zlib", "wb") as f: + with open(f"{results_dir}/{batch['id']}/hashes.zlib", "wb") as f: hashes = [h.to_str() for h in hashes] f.write(zlib.compress(json.dumps(hashes).encode())) - with open(f"{output_path}/{batch['id']}/result.json", "w") as f: + with open(f"{results_dir}/{batch['id']}/result.json", "w") as f: result = { "solution_nonces": list(solution_nonces), "merkle_root": merkle_tree.calc_merkle_root().to_str(), @@ -201,7 +180,7 @@ def purge_folders(output_path, ttl): FINISHED_BATCH_IDS.pop(batch_id) -def send_results(headers, master_ip, master_port, output_path): +def send_results(headers, master_ip, master_port, results_dir): try: batch_id = READY_BATCH_IDS.pop() except KeyError: @@ -213,7 +192,7 @@ def send_results(headers, master_ip, master_port, output_path): logger.debug(f"batch {batch_id} submitted recently") return - output_folder = f"{output_path}/{batch_id}" + output_folder = f"{results_dir}/{batch_id}" with open(f"{output_folder}/batch.json") as f: batch = json.load(f) @@ -291,7 +270,7 @@ def send_results(headers, master_ip, master_port, output_path): time.sleep(2) -def process_batch(pool, tig_runtime_path, downloads_folder, config, output_path): +def process_batch(pool, algorithms_dir, config, results_dir): try: batch_id = PENDING_BATCH_IDS.pop() except KeyError: @@ -305,20 +284,19 @@ def process_batch(pool, tig_runtime_path, downloads_folder, config, output_path) ): return - if os.path.exists(f"{output_path}/{batch_id}/result.json"): + if os.path.exists(f"{results_dir}/{batch_id}/result.json"): logger.info(f"batch {batch_id} already processed") READY_BATCH_IDS.add(batch_id) return - PROCESSING_BATCH_IDS.add(batch_id) - - with open(f"{output_path}/{batch_id}/batch.json") as f: + + with open(f"{results_dir}/{batch_id}/batch.json") as f: batch = json.load(f) - so_path, ptx_path = download_library(downloads_folder, batch) - if ptx_path is not None and not HAS_GPU: - logger.error(f"Algorithm {batch['algorithm']} requires GPU support, but no GPUs are visible") + containers = set(subprocess.check_output(["docker", "ps", "--format", "{{.Names}}"], text=True).splitlines()) + if batch["challenge"] not in containers: + logger.error(f"Error processing batch {batch_id}: Challenge container {batch['challenge']} not found. Did you start it with 'docker-compose up {batch['challenge']}'?") return - + c = next( ( x for x in config["algorithms"] @@ -327,23 +305,24 @@ def process_batch(pool, tig_runtime_path, downloads_folder, config, output_path) None ) if c is None: - logger.error(f"Algorithm {batch['settings']['algorithm_id']} does not match any regex in the config") + logger.error(f"Error processing batch {batch_id}: Algorithm {batch['settings']['algorithm_id']} does not match any regex in the config") return - + + PROCESSING_BATCH_IDS.add(batch_id) + so_path, ptx_path = download_library(algorithms_dir, batch) logger.info(f"batch {batch['id']} started") - pool.submit(compute_merkle_root, batch, output_path) + pool.submit(compute_merkle_root, batch, results_dir) PROCESSING_NONCES[batch['id']] = { "batch": batch, "so_path": so_path, "ptx_path": ptx_path, "current_nonce": batch["start_nonce"], - "cpu": c["cpu"], - "gpu": c["gpu"], + "cost": c["cost"], } -def process_nonces(pool, tig_runtime_path, output_path): +def process_nonces(pool, config, results_dir): if len(PROCESSING_NONCES) == 0: logger.debug("No pending nonces") time.sleep(1) @@ -354,8 +333,7 @@ def process_nonces(pool, tig_runtime_path, output_path): batch = job["batch"] so_path = job["so_path"] ptx_path = job["ptx_path"] - cpu = job["cpu"] - gpu = job["gpu"] + cost = job["cost"] if batch["id"] not in PROCESSING_BATCH_IDS: logger.info(f"batch {batch_id} stopped") @@ -363,22 +341,19 @@ def process_nonces(pool, tig_runtime_path, output_path): break def process_nonce(nonce): - logger.debug(f"batch {batch_id}, nonce {nonce} started: (cpu {cpu}, gpu {gpu})") + logger.debug(f"batch {batch_id}, nonce {nonce} started: (cost {cost})") try: - run_tig_runtime(nonce, tig_runtime_path, batch, so_path, ptx_path, output_path) + run_tig_runtime(nonce, batch, so_path, ptx_path, results_dir) except Exception as e: logger.error(f"batch {batch_id}, nonce {nonce}, runtime error: {e}") finally: - TOTAL_USAGE["cpu"] -= cpu - TOTAL_USAGE["gpu"] -= gpu + TOTAL_COST[0] -= cost while ( job["current_nonce"] < batch["start_nonce"] + batch["num_nonces"] and - TOTAL_USAGE["cpu"] + cpu <= len(VISIBLE_CPUS) and - TOTAL_USAGE["gpu"] + gpu <= len(VISIBLE_GPUS) + TOTAL_COST[0] + cost <= config["max_cost"] ): - TOTAL_USAGE["cpu"] += cpu - TOTAL_USAGE["gpu"] += gpu + TOTAL_COST[0] += cost pool.submit(process_nonce, job["current_nonce"]) job["current_nonce"] += 1 @@ -388,7 +363,7 @@ def process_nonces(pool, tig_runtime_path, output_path): time.sleep(0.1) -def poll_batches(headers, master_ip, master_port, output_path): +def poll_batches(headers, master_ip, master_port, results_dir): get_batches_url = f"http://{master_ip}:{master_port}/get-batches" logger.info(f"fetching batches from {get_batches_url}") resp = requests.get(get_batches_url, headers=headers) @@ -400,7 +375,7 @@ def poll_batches(headers, master_ip, master_port, output_path): logger.info(f"root batches: {root_batch_ids}") logger.info(f"proofs batches: {proofs_batch_ids}") for batch in batches: - output_folder = f"{output_path}/{batch['id']}" + output_folder = f"{results_dir}/{batch['id']}" os.makedirs(output_folder, exist_ok=True) with open(f"{output_folder}/batch.json", "w") as f: json.dump(batch, f) @@ -426,19 +401,7 @@ def wrap_thread(func, *args): time.sleep(5) -def main( - master_ip: str, - tig_runtime_path: str, - downloads_folder: str, - config_path: str, - slave_name: str, - master_port: int, - output_path: str, - ttl: int, -): - if not os.path.exists(tig_runtime_path): - logger.error(f"tig-runtime not found at path: {tig_runtime_path}") - sys.exit(1) +def main(config_path: str): if not os.path.exists(config_path): logger.error(f"Config file not found at path: {config_path}") sys.exit(1) @@ -448,22 +411,28 @@ def main( except Exception as e: logger.error(f"Error loading config file: {e}") sys.exit(1) + + slave_name = os.getenv("SLAVE_NAME") or randomname.get_name() + master_ip = os.getenv("MASTER_IP") or "0.0.0.0" + if (master_port := os.getenv("MASTER_PORT")) is None: + logger.error("MASTER_PORT environment variable not set") + sys.exit(1) + master_port = int(master_port) + + algorithms_dir = os.getenv("ALGORITHMS_DIR") or "lib" + results_dir = os.getenv("RESULTS_DIR") or "results" + ttl = int(os.getenv("TTL") or 300) print(f"Starting slave with config:") print(f" Slave Name: {slave_name}") - print(f" Visible CPUs: {VISIBLE_CPUS}") - print(f" Visible GPUs: {VISIBLE_GPUS}") print(f" Master: {master_ip}:{master_port}") print(f" CPU Architecture: {CPU_ARCH}") - print(f" GPU Available: {HAS_GPU}") - print(f" Runtime Path: {tig_runtime_path}") - print(f" Downloads Folder: {downloads_folder}") - print(f" Config: {json.dumps(config, indent=2)}") - print(f" Output Path: {output_path}") + print(f" Algorithms Dir: {algorithms_dir}") + print(f" Results Dir: {results_dir}") print(f" TTL: {ttl}") - print(f" Verbose: {args.verbose}") + print(f" Config: {json.dumps(config, indent=2)}") - os.makedirs(downloads_folder, exist_ok=True) + os.makedirs(algorithms_dir, exist_ok=True) headers = { "User-Agent": slave_name @@ -472,44 +441,36 @@ def main( pool = ThreadPoolExecutor(max_workers=config["max_workers"]) Thread( target=wrap_thread, - args=(process_batch, pool, tig_runtime_path, downloads_folder, config, output_path) + args=(process_batch, pool, algorithms_dir, config, results_dir) ).start() Thread( target=wrap_thread, - args=(process_nonces, pool, tig_runtime_path, output_path) + args=(process_nonces, pool, config, results_dir) ).start() Thread( target=wrap_thread, - args=(send_results, headers, master_ip, master_port, output_path) + args=(send_results, headers, master_ip, master_port, results_dir) ).start() Thread( target=wrap_thread, - args=(purge_folders, output_path, ttl) + args=(purge_folders, results_dir, ttl) ).start() - wrap_thread(poll_batches, headers, master_ip, master_port, output_path) + wrap_thread(poll_batches, headers, master_ip, master_port, results_dir) if __name__ == "__main__": parser = argparse.ArgumentParser(description="TIG Slave Benchmarker") parser.add_argument("config", type=str, help="Path to config file") - parser.add_argument("--tig_runtime_path", type=str, default=shutil.which("tig-runtime"), help="Path to tig-runtime executable") - parser.add_argument("--master", type=str, default="0.0.0.0", help="IP address of the master (default: 0.0.0.0)") - parser.add_argument("--download", type=str, default="libs", help="Folder to download algorithm libraries to (default: libs)") - parser.add_argument("--name", type=str, default=randomname.get_name(), help="Name for the slave (default: randomly generated)") - parser.add_argument("--port", type=int, default=5115, help="Port for master (default: 5115)") - parser.add_argument("--verbose", action='store_true', help="Print debug logs") - parser.add_argument("--output", type=str, default="results", help="Folder to output results to (default: results)") - parser.add_argument("--ttl", type=int, default=300, help="(Time To Live) Seconds to retain results (default: 300)") args = parser.parse_args() logging.basicConfig( format='%(levelname)s - [%(name)s] - %(message)s', - level=logging.DEBUG if args.verbose else logging.INFO + level=logging.DEBUG if os.getenv("VERBOSE") else logging.INFO ) - main(args.master, args.tig_runtime_path, args.download, args.config, args.name, args.port, args.output, args.ttl) + main(args.config) diff --git a/tig-benchmarker/slave/requirements.txt b/tig-benchmarker/slave/requirements.txt new file mode 100644 index 00000000..379fe9e9 --- /dev/null +++ b/tig-benchmarker/slave/requirements.txt @@ -0,0 +1,4 @@ +blake3 +dataclasses +randomname +requests \ No newline at end of file