Update slave to execute tig-runtime directly.

This commit is contained in:
FiveMovesAhead 2025-05-20 09:21:12 +01:00
parent ef77a27c7d
commit 9c1d871c8a
2 changed files with 139 additions and 45 deletions

View File

@ -0,0 +1,11 @@
{
"cpus": 8,
"gpus": 0,
"algorithms": [
{
"id_regex": ".*",
"cpu_cost": 1.0,
"gpu_cost": 0.0
}
]
}

View File

@ -7,6 +7,7 @@ import os
import platform
import logging
import randomname
import re
import requests
import shutil
import subprocess
@ -23,6 +24,7 @@ PENDING_BATCH_IDS = set()
PROCESSING_BATCH_IDS = set()
READY_BATCH_IDS = set()
FINISHED_BATCH_IDS = {}
RUNTIME_THREADS = []
if (CPU_ARCH := platform.machine().lower()) in ["x86_64", "amd64"]:
CPU_ARCH = "amd64"
elif CPU_ARCH in ["arm64", "aarch64"]:
@ -56,43 +58,48 @@ def download_library(downloads_folder, batch):
return so_path, ptx_path
def run_tig_worker(tig_worker_path, tig_runtime_path, batch, so_path, ptx_path, num_workers, output_path):
def run_tig_runtime(nonce, tig_runtime_path, batch, so_path, ptx_path, output_path):
start = now()
output_file = f"{output_path}/{batch['id']}/{nonce}.json"
cmd = [
tig_worker_path,
tig_runtime_path,
json.dumps(batch["settings"]),
batch["rand_hash"],
str(batch["start_nonce"]),
str(batch["num_nonces"]),
str(batch["batch_size"]),
str(nonce),
so_path,
"--fuel", str(batch["runtime_config"]["max_fuel"]),
"--workers", str(num_workers),
"--output", f"{output_path}/{batch['id']}",
"--output", output_file,
]
if ptx_path is not None:
cmd += ["--ptx", ptx_path]
logger.info(f"computing batch: {' '.join(cmd)}")
logger.debug(f"computing batch: {' '.join(cmd)}")
process = subprocess.Popen(
cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE
)
while True:
ret = process.poll()
if ret is not None:
if ret != 0:
PROCESSING_BATCH_IDS.remove(batch["id"])
raise Exception(f"tig-worker failed with return code {ret}")
# exit codes:
# 0 - success
# 1 - runtime error
# 85 - no solution
# 86 - invalid solution
# 87 - out of fuel
if (ret == 1 or ret == 87) and not os.path.exists(output_file):
d = OutputData(
nonce=nonce,
runtime_signature=0,
fuel_consumed=(ret == 87) and (batch["runtime_config"]["max_fuel"] + 1),
solution={},
cpu_arch=CPU_ARCH,
)
with open(output_file, "wb") as f:
json.dump(d.to_dict(), f)
stdout, stderr = process.communicate()
result = json.loads(stdout.decode())
logger.info(f"computing batch {batch['id']} took {now() - start}ms")
logger.debug(f"batch {batch['id']} result: {result}")
with open(f"{output_path}/{batch['id']}/result.json", "w") as f:
json.dump(result, f)
assert os.path.exists(output_file), f"Exit code {ret}. Output file does not exist"
if ret == 1:
logger.error(f"batch {batch['id']}, nonce {nonce}, runtime error: {process.stderr.read().decode()}")
PROCESSING_BATCH_IDS.remove(batch["id"])
READY_BATCH_IDS.add(batch["id"])
break
elif batch["id"] not in PROCESSING_BATCH_IDS:
@ -102,6 +109,39 @@ def run_tig_worker(tig_worker_path, tig_runtime_path, batch, so_path, ptx_path,
time.sleep(0.1)
if not all(
os.path.exists(f"{output_path}/{batch['id']}/{n}.json")
for n in range(batch["start_nonce"], batch["start_nonce"] + batch["num_nonces"])
):
return
compute_time = now() - start
hashes = []
solution_nonces = []
for n in range(batch["start_nonce"], batch["start_nonce"] + batch["num_nonces"]):
with open(f"{output_path}/{batch['id']}/{n}.json", "r") as f:
d = OutputData.from_dict(json.load(f))
if len(d.solution) > 0:
solution_nonces.append(n)
hashes.append(d.to_merkle_hash())
merkle_tree = MerkleTree(hashes, batch["batch_size"])
with open(f"{output_path}/{batch['id']}/hashes.zlib", "wb") as f:
hashes = [h.to_str() for h in hashes]
f.write(zlib.compress(json.dumps(hashes).encode()))
with open(f"{output_path}/{batch['id']}/result.json", "w") as f:
result = {
"solution_nonces": list(solution_nonces),
"merkle_root": merkle_tree.calc_merkle_root().to_str(),
}
logger.debug(f"batch {batch['id']} result: {result}")
json.dump(result, f)
merkle_root_time = now() - start - compute_time
logger.info(f"batch {batch['id']}, compute_time: {compute_time}ms, merkle_root_time: {merkle_root_time}ms")
PROCESSING_BATCH_IDS.remove(batch["id"])
READY_BATCH_IDS.add(batch["id"])
def purge_folders(output_path, ttl):
n = now()
@ -121,7 +161,7 @@ def purge_folders(output_path, ttl):
FINISHED_BATCH_IDS.pop(batch_id)
def send_results(headers, master_ip, master_port, tig_worker_path, downloads_folder, num_workers, output_path):
def send_results(headers, master_ip, master_port, output_path):
try:
batch_id = READY_BATCH_IDS.pop()
except KeyError:
@ -139,7 +179,10 @@ def send_results(headers, master_ip, master_port, tig_worker_path, downloads_fol
if (
not os.path.exists(f"{output_folder}/result.json")
or not os.path.exists(f"{output_folder}/data.zlib")
or not all(
os.path.exists(f"{output_folder}/{n}.json")
for n in range(batch["start_nonce"], batch["start_nonce"] + batch["num_nonces"])
)
or not os.path.exists(f"{output_folder}/hashes.zlib")
):
if os.path.exists(f"{output_folder}/result.json"):
@ -178,8 +221,10 @@ def send_results(headers, master_ip, master_port, tig_worker_path, downloads_fol
else:
with open(f"{output_folder}/hashes.zlib", "rb") as f:
hashes = json.loads(zlib.decompress(f.read()).decode())
with open(f"{output_folder}/data.zlib", "rb") as f:
leafs = json.loads(zlib.decompress(f.read()).decode())
leafs = []
for n in batch["sampled_nonces"]:
with open(f"{output_folder}/{n}.json", "r") as f:
leafs.append(json.load(f))
merkle_tree = MerkleTree(
[MerkleHash.from_str(x) for x in hashes],
@ -188,10 +233,10 @@ def send_results(headers, master_ip, master_port, tig_worker_path, downloads_fol
proofs_to_submit = [
dict(
leaf=leafs[n - batch["start_nonce"]],
leaf=leaf,
branch=merkle_tree.calc_merkle_branch(branch_idx=n - batch["start_nonce"]).to_str()
)
for n in batch["sampled_nonces"]
for n, leaf in zip(batch["sampled_nonces"], leafs)
]
submit_url = f"http://{master_ip}:{master_port}/submit-batch-proofs/{batch_id}"
@ -209,7 +254,7 @@ def send_results(headers, master_ip, master_port, tig_worker_path, downloads_fol
time.sleep(2)
def process_batch(tig_worker_path, tig_runtime_path, downloads_folder, num_workers, output_path):
def process_batch(tig_runtime_path, downloads_folder, config, output_path):
try:
batch_id = PENDING_BATCH_IDS.pop()
except KeyError:
@ -233,11 +278,41 @@ def process_batch(tig_worker_path, tig_runtime_path, downloads_folder, num_worke
batch = json.load(f)
so_path, ptx_path = download_library(downloads_folder, batch)
c = next(
(
x for x in config["algorithms"]
if re.match(x["id_regex"], batch["settings"]["algorithm_id"])
),
None
)
if c is None:
logger.error(f"Algorithm {batch['settings']['algorithm_id']} does not match any regex in the config")
return
Thread(
target=run_tig_worker,
args=(tig_worker_path, tig_runtime_path, batch, so_path, ptx_path, num_workers, output_path)
).start()
nonce = batch["start_nonce"]
while nonce < batch["start_nonce"] + batch["num_nonces"]:
if batch["id"] not in PROCESSING_BATCH_IDS:
logger.info(f"batch {batch['id']} stopped")
break
RUNTIME_THREADS[:] = [x for x in RUNTIME_THREADS if not x[2].is_alive()]
total_cpu_cost = sum(x[0] for x in RUNTIME_THREADS)
total_gpu_cost = sum(x[1] for x in RUNTIME_THREADS)
if (
total_cpu_cost + c["cpu_cost"] <= config["cpus"] and
total_gpu_cost + c["gpu_cost"] <= config["gpus"]
):
logger.debug(f"Starting batch {batch['id']}, nonce {nonce}, cpu_cost {c['cpu_cost']}, gpu_cost {c['gpu_cost']}")
t = Thread(
target=run_tig_runtime,
args=(nonce, tig_runtime_path, batch, so_path, ptx_path, output_path)
)
t.start()
RUNTIME_THREADS.append((c["cpu_cost"], c["gpu_cost"], t))
nonce += 1
time.sleep(0.1)
def poll_batches(headers, master_ip, master_port, output_path):
@ -280,31 +355,40 @@ def wrap_thread(func, *args):
def main(
master_ip: str,
tig_worker_path: str,
tig_runtime_path: str,
downloads_folder: str,
num_workers: int,
config_path: str,
slave_name: str,
master_port: int,
output_path: str,
ttl: int,
):
if not os.path.exists(tig_runtime_path):
logger.error(f"tig-runtime not found at path: {tig_runtime_path}")
sys.exit(1)
if not os.path.exists(config_path):
logger.error(f"Config file not found at path: {config_path}")
sys.exit(1)
try:
with open(config_path) as f:
config = json.load(f)
except Exception as e:
logger.error(f"Error loading config file: {e}")
sys.exit(1)
print(f"Starting slave with config:")
print(f" Slave Name: {slave_name}")
print(f" Master IP: {master_ip}")
print(f" Master Port: {master_port}")
print(f" Worker Path: {tig_worker_path}")
print(f" CPU Architecture: {CPU_ARCH}")
print(f" GPU Available: {HAS_GPU}")
print(f" Runtime Path: {tig_runtime_path}")
print(f" Downloads Folder: {downloads_folder}")
print(f" Number of Workers: {num_workers}")
print(f" Config: {json.dumps(config, indent=2)}")
print(f" Output Path: {output_path}")
print(f" TTL: {ttl}")
print(f" Verbose: {args.verbose}")
if not os.path.exists(tig_worker_path):
raise FileNotFoundError(f"tig-worker not found at path: {tig_worker_path}")
if not os.path.exists(tig_runtime_path):
raise FileNotFoundError(f"tig-runtime not found at path: {tig_runtime_path}")
os.makedirs(downloads_folder, exist_ok=True)
headers = {
@ -313,12 +397,12 @@ def main(
Thread(
target=wrap_thread,
args=(process_batch, tig_worker_path, tig_runtime_path, downloads_folder, num_workers, output_path)
args=(process_batch, tig_runtime_path, downloads_folder, config, output_path)
).start()
Thread(
target=wrap_thread,
args=(send_results, headers, master_ip, master_port, tig_worker_path, downloads_folder, num_workers, output_path)
args=(send_results, headers, master_ip, master_port, output_path)
).start()
Thread(
@ -331,11 +415,10 @@ def main(
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="TIG Slave Benchmarker")
parser.add_argument("--tig_worker_path", type=str, default=shutil.which("tig-worker"), help="Path to tig-worker executable")
parser.add_argument("config", type=str, help="Path to config file")
parser.add_argument("--tig_runtime_path", type=str, default=shutil.which("tig-runtime"), help="Path to tig-runtime executable")
parser.add_argument("--master", type=str, default="0.0.0.0", help="IP address of the master (default: 0.0.0.0)")
parser.add_argument("--download", type=str, default="libs", help="Folder to download algorithm libraries to (default: libs)")
parser.add_argument("--workers", type=int, default=8, help="Number of workers (default: 8)")
parser.add_argument("--name", type=str, default=randomname.get_name(), help="Name for the slave (default: randomly generated)")
parser.add_argument("--port", type=int, default=5115, help="Port for master (default: 5115)")
parser.add_argument("--verbose", action='store_true', help="Print debug logs")
@ -349,4 +432,4 @@ if __name__ == "__main__":
level=logging.DEBUG if args.verbose else logging.INFO
)
main(args.master, args.tig_worker_path, args.tig_runtime_path, args.download, args.workers, args.name, args.port, args.output, args.ttl)
main(args.master, args.tig_runtime_path, args.download, args.config, args.name, args.port, args.output, args.ttl)