Update slave to use separate challenge docker architecture.

This commit is contained in:
FiveMovesAhead 2025-06-04 11:42:37 +01:00
parent 89d86f9deb
commit 7ff38256f6
7 changed files with 154 additions and 101 deletions

View File

@ -1,7 +1,21 @@
# Set to 1 to enable verbose logging
VERBOSE=1
POSTGRES_USER=postgres
POSTGRES_PASSWORD=mysecretpassword
POSTGRES_DB=postgres
UI_PORT=80
DB_PORT=5432
MASTER_PORT=5115
VERBOSE=
# Defaults to 0.0.0.0
MASTER_IP=
# Directory for slave to download algorithms. Defaults to ./lib
ALGORITHMS_DIR=
# Directory for slave to store results. Defaults to ./results
RESULTS_DIR=
# Seconds for results to live. Defaults to 300s
TTL=
# Name of the slave. Defaults to randomly generated name
SLAVE_NAME=

View File

@ -0,0 +1,60 @@
version: "3.8"
x-common: &common
volumes:
- ./:/app
command: ["sleep", "infinity"]
x-common-gpu: &common-gpu
deploy:
resources:
reservations:
devices:
- driver: nvidia
count: all
capabilities: [gpu]
services:
slave:
build:
context: ./
dockerfile: ./slave/Dockerfile
volumes:
- /var/run/docker.sock:/var/run/docker.sock
- ./:/app
environment:
- VERBOSE=${VERBOSE}
- SLAVE_NAME=${SLAVE_NAME}
- MASTER_PORT=${MASTER_PORT}
- MASTER_IP=${MASTER_IP}
- ALGORITHMS_DIR=${ALGORITHMS_DIR}
- RESULTS_DIR=${RESULTS_DIR}
- TTL=${TTL}
command: ["python", "slave/main.py", "slave/config.json"]
satisfiability:
<<: *common
image: ghcr.io/tig-foundation/tig-monorepo/satisfiability/runtime:0.0.1
container_name: satisfiability
vehicle_routing:
<<: *common
image: ghcr.io/tig-foundation/tig-monorepo/vehicle_routing/runtime:0.0.1
container_name: vehicle_routing
knapsack:
<<: *common
image: ghcr.io/tig-foundation/tig-monorepo/knapsack/runtime:0.0.1
container_name: knapsack
vector_search:
<<: *common
<<: *common-gpu
image: ghcr.io/tig-foundation/tig-monorepo/vector_search/runtime:0.0.1
container_name: vector_search
hypergraph:
<<: *common
<<: *common-gpu
image: ghcr.io/tig-foundation/tig-monorepo/hypergraph/runtime:0.0.1
container_name: hypergraph

View File

@ -0,0 +1,14 @@
FROM ubuntu:24.04
WORKDIR /app
RUN apt update && apt install -y curl python3 python3-pip
COPY slave/requirements.txt requirements.txt
RUN pip3 install -r requirements.txt --break-system-packages --no-cache-dir
RUN apt-get update && apt-get install -y --no-install-recommends \
ca-certificates curl gnupg lsb-release \
&& curl -fsSL https://download.docker.com/linux/debian/gpg | gpg --dearmor -o /usr/share/keyrings/docker-archive-keyring.gpg \
&& echo "deb [arch=amd64 signed-by=/usr/share/keyrings/docker-archive-keyring.gpg] https://download.docker.com/linux/debian $(lsb_release -cs) stable" > /etc/apt/sources.list.d/docker.list \
&& apt-get update && apt-get install -y docker-ce \
&& apt-get clean && rm -rf /var/lib/apt/lists/*

View File

@ -1,10 +1,10 @@
{
"max_workers": 100,
"max_cost": 8.0,
"algorithms": [
{
"id_regex": ".*",
"cpu": 1.0,
"gpu": 0.0
"cost": 1.0
}
]
}

View File

@ -27,7 +27,7 @@ PROCESSING_BATCH_IDS = set()
PROCESSING_NONCES = {}
READY_BATCH_IDS = set()
FINISHED_BATCH_IDS = {}
TOTAL_USAGE = {"cpu": 0, "gpu": 0}
TOTAL_COST = [0]
if (CPU_ARCH := platform.machine().lower()) in ["x86_64", "amd64"]:
CPU_ARCH = "amd64"
elif CPU_ARCH in ["arm64", "aarch64"]:
@ -36,29 +36,11 @@ else:
print(f"Unsupported CPU architecture: {CPU_ARCH}")
sys.exit(1)
HAS_GPU = subprocess.run(["which", "nvidia-smi"], capture_output=True).returncode == 0
if (VISIBLE_CPUS := os.environ.get("CPU_VISIBLE_CORES", None)) is None:
VISIBLE_CPUS = list(os.sched_getaffinity(0))
else:
VISIBLE_CPUS = list(map(int, VISIBLE_CPUS.split(",")))
os.sched_setaffinity(0, VISIBLE_CPUS)
if not HAS_GPU:
VISIBLE_GPUS = []
elif (VISIBLE_GPUS := os.environ.get("CUDA_VISIBLE_DEVICES", None)) is None:
VISIBLE_GPUS = [
int(match.group(1))
for line in subprocess.check_output(["nvidia-smi", "-L"]).decode("utf-8").splitlines()
if (match := re.match(r'^GPU (\d+):', line)) is not None
]
else:
VISIBLE_GPUS = list(map(int, VISIBLE_GPUS.split(",")))
def now():
return int(time.time() * 1000)
def download_library(downloads_folder, batch):
challenge_folder = f"{downloads_folder}/{batch['challenge']}"
def download_library(algorithms_dir, batch):
challenge_folder = f"{algorithms_dir}/{batch['challenge']}"
so_path = f"{challenge_folder}/{CPU_ARCH}/{batch['algorithm']}.so"
ptx_path = f"{challenge_folder}/ptx/{batch['algorithm']}.ptx"
if not os.path.exists(so_path):
@ -73,21 +55,19 @@ def download_library(downloads_folder, batch):
if not os.path.exists(ptx_path):
return so_path, None
elif not HAS_GPU:
raise Exception(f"Algorithm {batch['algorithm']} requires GPU support, but GPU not found")
else:
return so_path, ptx_path
def run_tig_runtime(nonce, tig_runtime_path, batch, so_path, ptx_path, output_path):
output_file = f"{output_path}/{batch['id']}/{nonce}.json"
def run_tig_runtime(nonce, batch, so_path, ptx_path, results_dir):
output_file = f"{results_dir}/{batch['id']}/{nonce}.json"
if os.path.exists(output_file):
logger.info(f"batch {batch['id']}, nonce {nonce}: already computed")
return
start = now()
cmd = [
tig_runtime_path,
"docker", "exec", batch["challenge"], "tig-runtime",
json.dumps(batch["settings"]),
batch["rand_hash"],
str(nonce),
@ -98,7 +78,6 @@ def run_tig_runtime(nonce, tig_runtime_path, batch, so_path, ptx_path, output_pa
if ptx_path is not None:
cmd += [
"--ptx", ptx_path,
"--gpu", str(nonce % len(VISIBLE_GPUS)),
]
logger.debug(f"computing batch: {' '.join(cmd)}")
process = subprocess.Popen(
@ -140,7 +119,7 @@ def run_tig_runtime(nonce, tig_runtime_path, batch, so_path, ptx_path, output_pa
logger.debug(f"batch {batch['id']}, nonce {nonce} finished, took {now() - start}ms")
def compute_merkle_root(batch, output_path):
def compute_merkle_root(batch, results_dir):
start = now()
while True:
if batch["id"] not in PROCESSING_BATCH_IDS:
@ -149,7 +128,7 @@ def compute_merkle_root(batch, output_path):
processing_nonces = set(
n for n in range(batch["start_nonce"], batch["start_nonce"] + batch["num_nonces"])
if not os.path.exists(f"{output_path}/{batch['id']}/{n}.json")
if not os.path.exists(f"{results_dir}/{batch['id']}/{n}.json")
)
if len(processing_nonces) > 0:
logger.debug(f"batch {batch['id']} still processing nonces: {processing_nonces}")
@ -159,17 +138,17 @@ def compute_merkle_root(batch, output_path):
hashes = []
solution_nonces = []
for n in range(batch["start_nonce"], batch["start_nonce"] + batch["num_nonces"]):
with open(f"{output_path}/{batch['id']}/{n}.json", "r") as f:
with open(f"{results_dir}/{batch['id']}/{n}.json", "r") as f:
d = OutputData.from_dict(json.load(f))
if len(d.solution) > 0:
solution_nonces.append(n)
hashes.append(d.to_merkle_hash())
merkle_tree = MerkleTree(hashes, batch["batch_size"])
with open(f"{output_path}/{batch['id']}/hashes.zlib", "wb") as f:
with open(f"{results_dir}/{batch['id']}/hashes.zlib", "wb") as f:
hashes = [h.to_str() for h in hashes]
f.write(zlib.compress(json.dumps(hashes).encode()))
with open(f"{output_path}/{batch['id']}/result.json", "w") as f:
with open(f"{results_dir}/{batch['id']}/result.json", "w") as f:
result = {
"solution_nonces": list(solution_nonces),
"merkle_root": merkle_tree.calc_merkle_root().to_str(),
@ -201,7 +180,7 @@ def purge_folders(output_path, ttl):
FINISHED_BATCH_IDS.pop(batch_id)
def send_results(headers, master_ip, master_port, output_path):
def send_results(headers, master_ip, master_port, results_dir):
try:
batch_id = READY_BATCH_IDS.pop()
except KeyError:
@ -213,7 +192,7 @@ def send_results(headers, master_ip, master_port, output_path):
logger.debug(f"batch {batch_id} submitted recently")
return
output_folder = f"{output_path}/{batch_id}"
output_folder = f"{results_dir}/{batch_id}"
with open(f"{output_folder}/batch.json") as f:
batch = json.load(f)
@ -291,7 +270,7 @@ def send_results(headers, master_ip, master_port, output_path):
time.sleep(2)
def process_batch(pool, tig_runtime_path, downloads_folder, config, output_path):
def process_batch(pool, algorithms_dir, config, results_dir):
try:
batch_id = PENDING_BATCH_IDS.pop()
except KeyError:
@ -305,20 +284,19 @@ def process_batch(pool, tig_runtime_path, downloads_folder, config, output_path)
):
return
if os.path.exists(f"{output_path}/{batch_id}/result.json"):
if os.path.exists(f"{results_dir}/{batch_id}/result.json"):
logger.info(f"batch {batch_id} already processed")
READY_BATCH_IDS.add(batch_id)
return
PROCESSING_BATCH_IDS.add(batch_id)
with open(f"{output_path}/{batch_id}/batch.json") as f:
with open(f"{results_dir}/{batch_id}/batch.json") as f:
batch = json.load(f)
so_path, ptx_path = download_library(downloads_folder, batch)
if ptx_path is not None and not HAS_GPU:
logger.error(f"Algorithm {batch['algorithm']} requires GPU support, but no GPUs are visible")
containers = set(subprocess.check_output(["docker", "ps", "--format", "{{.Names}}"], text=True).splitlines())
if batch["challenge"] not in containers:
logger.error(f"Error processing batch {batch_id}: Challenge container {batch['challenge']} not found. Did you start it with 'docker-compose up {batch['challenge']}'?")
return
c = next(
(
x for x in config["algorithms"]
@ -327,23 +305,24 @@ def process_batch(pool, tig_runtime_path, downloads_folder, config, output_path)
None
)
if c is None:
logger.error(f"Algorithm {batch['settings']['algorithm_id']} does not match any regex in the config")
logger.error(f"Error processing batch {batch_id}: Algorithm {batch['settings']['algorithm_id']} does not match any regex in the config")
return
PROCESSING_BATCH_IDS.add(batch_id)
so_path, ptx_path = download_library(algorithms_dir, batch)
logger.info(f"batch {batch['id']} started")
pool.submit(compute_merkle_root, batch, output_path)
pool.submit(compute_merkle_root, batch, results_dir)
PROCESSING_NONCES[batch['id']] = {
"batch": batch,
"so_path": so_path,
"ptx_path": ptx_path,
"current_nonce": batch["start_nonce"],
"cpu": c["cpu"],
"gpu": c["gpu"],
"cost": c["cost"],
}
def process_nonces(pool, tig_runtime_path, output_path):
def process_nonces(pool, config, results_dir):
if len(PROCESSING_NONCES) == 0:
logger.debug("No pending nonces")
time.sleep(1)
@ -354,8 +333,7 @@ def process_nonces(pool, tig_runtime_path, output_path):
batch = job["batch"]
so_path = job["so_path"]
ptx_path = job["ptx_path"]
cpu = job["cpu"]
gpu = job["gpu"]
cost = job["cost"]
if batch["id"] not in PROCESSING_BATCH_IDS:
logger.info(f"batch {batch_id} stopped")
@ -363,22 +341,19 @@ def process_nonces(pool, tig_runtime_path, output_path):
break
def process_nonce(nonce):
logger.debug(f"batch {batch_id}, nonce {nonce} started: (cpu {cpu}, gpu {gpu})")
logger.debug(f"batch {batch_id}, nonce {nonce} started: (cost {cost})")
try:
run_tig_runtime(nonce, tig_runtime_path, batch, so_path, ptx_path, output_path)
run_tig_runtime(nonce, batch, so_path, ptx_path, results_dir)
except Exception as e:
logger.error(f"batch {batch_id}, nonce {nonce}, runtime error: {e}")
finally:
TOTAL_USAGE["cpu"] -= cpu
TOTAL_USAGE["gpu"] -= gpu
TOTAL_COST[0] -= cost
while (
job["current_nonce"] < batch["start_nonce"] + batch["num_nonces"] and
TOTAL_USAGE["cpu"] + cpu <= len(VISIBLE_CPUS) and
TOTAL_USAGE["gpu"] + gpu <= len(VISIBLE_GPUS)
TOTAL_COST[0] + cost <= config["max_cost"]
):
TOTAL_USAGE["cpu"] += cpu
TOTAL_USAGE["gpu"] += gpu
TOTAL_COST[0] += cost
pool.submit(process_nonce, job["current_nonce"])
job["current_nonce"] += 1
@ -388,7 +363,7 @@ def process_nonces(pool, tig_runtime_path, output_path):
time.sleep(0.1)
def poll_batches(headers, master_ip, master_port, output_path):
def poll_batches(headers, master_ip, master_port, results_dir):
get_batches_url = f"http://{master_ip}:{master_port}/get-batches"
logger.info(f"fetching batches from {get_batches_url}")
resp = requests.get(get_batches_url, headers=headers)
@ -400,7 +375,7 @@ def poll_batches(headers, master_ip, master_port, output_path):
logger.info(f"root batches: {root_batch_ids}")
logger.info(f"proofs batches: {proofs_batch_ids}")
for batch in batches:
output_folder = f"{output_path}/{batch['id']}"
output_folder = f"{results_dir}/{batch['id']}"
os.makedirs(output_folder, exist_ok=True)
with open(f"{output_folder}/batch.json", "w") as f:
json.dump(batch, f)
@ -426,19 +401,7 @@ def wrap_thread(func, *args):
time.sleep(5)
def main(
master_ip: str,
tig_runtime_path: str,
downloads_folder: str,
config_path: str,
slave_name: str,
master_port: int,
output_path: str,
ttl: int,
):
if not os.path.exists(tig_runtime_path):
logger.error(f"tig-runtime not found at path: {tig_runtime_path}")
sys.exit(1)
def main(config_path: str):
if not os.path.exists(config_path):
logger.error(f"Config file not found at path: {config_path}")
sys.exit(1)
@ -448,22 +411,28 @@ def main(
except Exception as e:
logger.error(f"Error loading config file: {e}")
sys.exit(1)
slave_name = os.getenv("SLAVE_NAME") or randomname.get_name()
master_ip = os.getenv("MASTER_IP") or "0.0.0.0"
if (master_port := os.getenv("MASTER_PORT")) is None:
logger.error("MASTER_PORT environment variable not set")
sys.exit(1)
master_port = int(master_port)
algorithms_dir = os.getenv("ALGORITHMS_DIR") or "lib"
results_dir = os.getenv("RESULTS_DIR") or "results"
ttl = int(os.getenv("TTL") or 300)
print(f"Starting slave with config:")
print(f" Slave Name: {slave_name}")
print(f" Visible CPUs: {VISIBLE_CPUS}")
print(f" Visible GPUs: {VISIBLE_GPUS}")
print(f" Master: {master_ip}:{master_port}")
print(f" CPU Architecture: {CPU_ARCH}")
print(f" GPU Available: {HAS_GPU}")
print(f" Runtime Path: {tig_runtime_path}")
print(f" Downloads Folder: {downloads_folder}")
print(f" Config: {json.dumps(config, indent=2)}")
print(f" Output Path: {output_path}")
print(f" Algorithms Dir: {algorithms_dir}")
print(f" Results Dir: {results_dir}")
print(f" TTL: {ttl}")
print(f" Verbose: {args.verbose}")
print(f" Config: {json.dumps(config, indent=2)}")
os.makedirs(downloads_folder, exist_ok=True)
os.makedirs(algorithms_dir, exist_ok=True)
headers = {
"User-Agent": slave_name
@ -472,44 +441,36 @@ def main(
pool = ThreadPoolExecutor(max_workers=config["max_workers"])
Thread(
target=wrap_thread,
args=(process_batch, pool, tig_runtime_path, downloads_folder, config, output_path)
args=(process_batch, pool, algorithms_dir, config, results_dir)
).start()
Thread(
target=wrap_thread,
args=(process_nonces, pool, tig_runtime_path, output_path)
args=(process_nonces, pool, config, results_dir)
).start()
Thread(
target=wrap_thread,
args=(send_results, headers, master_ip, master_port, output_path)
args=(send_results, headers, master_ip, master_port, results_dir)
).start()
Thread(
target=wrap_thread,
args=(purge_folders, output_path, ttl)
args=(purge_folders, results_dir, ttl)
).start()
wrap_thread(poll_batches, headers, master_ip, master_port, output_path)
wrap_thread(poll_batches, headers, master_ip, master_port, results_dir)
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="TIG Slave Benchmarker")
parser.add_argument("config", type=str, help="Path to config file")
parser.add_argument("--tig_runtime_path", type=str, default=shutil.which("tig-runtime"), help="Path to tig-runtime executable")
parser.add_argument("--master", type=str, default="0.0.0.0", help="IP address of the master (default: 0.0.0.0)")
parser.add_argument("--download", type=str, default="libs", help="Folder to download algorithm libraries to (default: libs)")
parser.add_argument("--name", type=str, default=randomname.get_name(), help="Name for the slave (default: randomly generated)")
parser.add_argument("--port", type=int, default=5115, help="Port for master (default: 5115)")
parser.add_argument("--verbose", action='store_true', help="Print debug logs")
parser.add_argument("--output", type=str, default="results", help="Folder to output results to (default: results)")
parser.add_argument("--ttl", type=int, default=300, help="(Time To Live) Seconds to retain results (default: 300)")
args = parser.parse_args()
logging.basicConfig(
format='%(levelname)s - [%(name)s] - %(message)s',
level=logging.DEBUG if args.verbose else logging.INFO
level=logging.DEBUG if os.getenv("VERBOSE") else logging.INFO
)
main(args.master, args.tig_runtime_path, args.download, args.config, args.name, args.port, args.output, args.ttl)
main(args.config)

View File

@ -0,0 +1,4 @@
blake3
dataclasses
randomname
requests