Update tig-benchmarker and add runtime docker.

This commit is contained in:
FiveMovesAhead 2025-05-01 15:42:08 +01:00
parent f1c92f869f
commit 4d52507075
8 changed files with 174 additions and 134 deletions

23
Dockerfile.runtime Normal file
View File

@ -0,0 +1,23 @@
ARG BASE_IMAGE=ubuntu:24.04
ARG DEV_IMAGE=ghcr.io/tig-foundation/tig-monorepo/dev
FROM ${DEV_IMAGE} AS dev
FROM ${BASE_IMAGE}
ENV DEBIAN_FRONTEND=noninteractive
COPY --from=dev /usr/local/bin/tig-runtime /usr/local/bin/tig-runtime
COPY --from=dev /usr/local/bin/tig-worker /usr/local/bin/tig-worker
COPY --from=dev /usr/local/lib/rust /usr/local/lib/rust
RUN chmod +x /usr/local/bin/tig-runtime && \
chmod +x /usr/local/bin/tig-worker && \
echo "export LD_LIBRARY_PATH=\"${LD_LIBRARY_PATH}:/usr/local/lib/rust\"" >> /etc/bash.bashrc
RUN apt update && apt install -y python3 python3-pip
RUN pip3 install blake3 requests randomname --break-system-packages
COPY tig-benchmarker /app
WORKDIR /app

View File

@ -12,7 +12,6 @@ class AlgorithmDetails(FromDict):
player_id: str
challenge_id: str
breakthrough_id: Optional[str]
type: str
fee_paid: PreciseNumber
@dataclass
@ -158,6 +157,7 @@ class BlockDetails(FromDict):
timestamp: int
num_confirmed: Dict[str, int]
num_active: Dict[str, int]
emissions: Dict[str, PreciseNumber]
@dataclass
class BlockData(FromDict):

View File

@ -117,8 +117,7 @@ class DifficultySampler:
self.challenges = {}
def on_new_block(self, challenges: Dict[str, Challenge], **kwargs):
config = CONFIG["difficulty_sampler_config"]
for c in challenges.values():
for c_id, c in challenges.items():
if c.block_data is None:
continue
logger.debug(f"Calculating valid difficulties and frontiers for challenge {c.details.name}")
@ -126,22 +125,23 @@ class DifficultySampler:
upper_frontier, lower_frontier = c.block_data.scaled_frontier, c.block_data.base_frontier
else:
upper_frontier, lower_frontier = c.block_data.base_frontier, c.block_data.scaled_frontier
self.valid_difficulties[c.details.name] = calc_valid_difficulties(list(upper_frontier), list(lower_frontier))
if config["difficulty_ranges"] is None:
self.frontiers[c.details.name] = []
else:
self.frontiers[c.details.name] = calc_all_frontiers(self.valid_difficulties[c.details.name])
self.valid_difficulties[c_id] = calc_valid_difficulties(list(upper_frontier), list(lower_frontier))
self.frontiers[c_id] = None
self.challenges = [c.details.name for c in challenges.values()]
self.challenge_id_2_name = {
c_id: c.details.name for c_id, c in challenges.items()
}
def run(self) -> Dict[str, Point]:
samples = {}
config = CONFIG["difficulty_sampler_config"]
for c_name in self.challenges:
for config in CONFIG["algo_selection"]:
found_valid = False
a_id = config["algorithm_id"]
c_id = a_id[:4]
c_name = self.challenge_id_2_name[c_id]
if len(selected_difficulties := config["selected_difficulties"].get(c_name, [])) > 0:
if len(selected_difficulties := config["selected_difficulties"]) > 0:
valid_difficulties = set(tuple(d) for d in self.valid_difficulties[c_name])
selected_difficulties = [tuple(d) for d in selected_difficulties]
selected_difficulties = [
@ -157,17 +157,20 @@ class DifficultySampler:
logger.debug(f"No valid difficulties found for {c_name} - skipping selected difficulties")
if not found_valid:
if len(self.frontiers[c_name]) == 0 or config["difficulty_ranges"] is None:
valid_difficulties = self.valid_difficulties[c_name]
if config["difficulty_range"] is None:
valid_difficulties = self.valid_difficulties[c_id]
difficulty = random.choice(valid_difficulties)
else:
frontiers = self.frontiers[c_name]
difficulty_range = config["difficulty_ranges"][c_name]
if self.frontiers[c_id] is None:
logger.debug(f"Calculating frontiers for {c_name}")
self.frontiers[c_id] = calc_all_frontiers(self.valid_difficulties[c_id])
frontiers = self.frontiers[c_id]
difficulty_range = config["difficulty_range"]
idx1 = math.floor(difficulty_range[0] * (len(frontiers) - 1))
idx2 = math.ceil(difficulty_range[1] * (len(frontiers) - 1))
difficulties = [p for frontier in frontiers[idx1:idx2 + 1] for p in frontier]
difficulty = random.choice(difficulties)
samples[c_name] = difficulty
logger.debug(f"Sampled difficulty {difficulty} for challenge {c_name}")
samples[a_id] = difficulty
logger.debug(f"Sampled difficulty {difficulty} for algorithm {a_id} in challenge {c_name}")
return samples

View File

@ -29,6 +29,7 @@ class JobManager:
):
api_url = CONFIG["api_url"]
config = CONFIG["job_manager_config"]
algo_selection = CONFIG["algo_selection"]
# create jobs from confirmed precommits
challenge_id_2_name = {
c.id: c.details.name
@ -75,7 +76,14 @@ class JobManager:
if wasm.details.download_url is None:
logger.error(f"no download_url found for wasm {wasm.algorithm_id}")
continue
num_batches = math.ceil(x.details.num_nonces / config["batch_sizes"][c_name])
batch_size = next(
(s["batch_size"] for s in algo_selection if s["algorithm_id"] == x.settings.algorithm_id),
None
)
if batch_size is None:
batch_size = config["default_batch_size"][x.settings.challenge_id]
logger.error(f"No batch size found for algorithm_id {x.settings.algorithm_id}, using default {batch_size}")
num_batches = math.ceil(x.details.num_nonces / batch_size)
atomic_inserts = [
(
"""
@ -105,7 +113,7 @@ class JobManager:
num_batches,
x.details.rand_hash,
json.dumps(block.config["benchmarks"]["runtime_configs"]["wasm"]),
config["batch_sizes"][c_name],
batch_size,
c_name,
a_name,
wasm.details.download_url,

View File

@ -43,14 +43,6 @@ class PrecommitManager:
):
self.last_block_id = block.id
self.num_precommits_submitted = 0
self.challenge_name_2_id = {
c.details.name: c.id
for c in challenges.values()
}
self.algorithm_name_2_id = {
f"{challenges[a.details.challenge_id].details.name}_{a.details.name}": a.id
for a in algorithms.values()
}
self.curr_base_fees = {
c.details.name: c.block_data.base_fee
for c in challenges.values()
@ -103,22 +95,16 @@ class PrecommitManager:
)["count"]
config = CONFIG["precommit_manager_config"]
algo_selection = CONFIG["algo_selection"]
num_pending_benchmarks = num_pending_jobs + self.num_precommits_submitted
if num_pending_benchmarks >= config["max_pending_benchmarks"]:
logger.debug(f"number of pending benchmarks has reached max of {config['max_pending_benchmarks']}")
return
selections = [
(c_name, x) for c_name, x in config["algo_selection"].items()
#if self.curr_base_fees[c_name] <= x.base_fee_limit
]
if len(selections) == 0:
logger.warning("No challenges under base fee limit")
return None
logger.debug(f"Selecting challenge from: {[(c_name, x['weight']) for c_name, x in selections]}")
selection = random.choices(selections, weights=[x["weight"] for _, x in selections])[0]
c_id = self.challenge_name_2_id[selection[0]]
a_id = self.algorithm_name_2_id[f"{selection[0]}_{selection[1]['algorithm']}"]
logger.debug(f"Selecting algorithm from: {[(x['algorithm_id'], x['weight']) for x in algo_selection]}")
selection = random.choices(algo_selection, weights=[x["weight"] for x in algo_selection])[0]
a_id = selection["algorithm_id"]
c_id = a_id[:4]
self.num_precommits_submitted += 1
req = SubmitPrecommitRequest(
settings=BenchmarkSettings(
@ -126,9 +112,9 @@ class PrecommitManager:
algorithm_id=a_id,
player_id=CONFIG["player_id"],
block_id=self.last_block_id,
difficulty=difficulty_samples[selection[0]]
difficulty=difficulty_samples[a_id]
),
num_nonces=selection[1]["num_nonces"]
num_nonces=selection["num_nonces"]
)
logger.info(f"Created precommit (challenge: {selection[0]}, algorithm: {selection[1]['algorithm']}, difficulty: {req.settings.difficulty}, num_nonces: {req.num_nonces})")
logger.info(f"Created precommit (algorithm_id: {a_id}, difficulty: {req.settings.difficulty}, num_nonces: {req.num_nonces})")
return req

View File

@ -34,7 +34,6 @@ class SlaveManager:
A.slave,
A.start_time,
A.end_time,
B.challenge,
JSONB_BUILD_OBJECT(
'id', A.benchmark_id || '_' || A.batch_idx,
'benchmark_id', A.benchmark_id,
@ -47,7 +46,9 @@ class SlaveManager:
'rand_hash', B.rand_hash,
'batch_size', B.batch_size,
'batch_idx', A.batch_idx,
'hash_threshold', B.hash_threshold
'hash_threshold', B.hash_threshold,
'challenge', B.challenge,
'algorithm', B.algorithm
) AS batch
FROM proofs_batch A
INNER JOIN job B
@ -65,7 +66,6 @@ class SlaveManager:
A.slave,
A.start_time,
A.end_time,
B.challenge,
JSONB_BUILD_OBJECT(
'id', A.benchmark_id || '_' || A.batch_idx,
'benchmark_id', A.benchmark_id,
@ -78,7 +78,9 @@ class SlaveManager:
'rand_hash', B.rand_hash,
'batch_size', B.batch_size,
'batch_idx', A.batch_idx,
'hash_threshold', B.hash_threshold
'hash_threshold', B.hash_threshold,
'challenge', B.challenge,
'algorithm', B.algorithm
) AS batch
FROM root_batch A
INNER JOIN job B
@ -109,7 +111,6 @@ class SlaveManager:
concurrent = []
updates = []
now = time.time() * 1000
selected_challenges = set(slave["selected_challenges"])
with self.lock:
concurrent = [
b["batch"] for b in self.batches
@ -121,7 +122,7 @@ class SlaveManager:
break
if (
b["slave"] == slave_name or
b["challenge"] not in selected_challenges or
not re.match(slave["algorithm_id_regex"], b["batch"]["settings"]["algorithm_id"]) or
b["end_time"] is not None
):
continue

View File

@ -106,72 +106,65 @@ SELECT '
"player_id": "0x0000000000000000000000000000000000000000",
"api_key": "00000000000000000000000000000000",
"api_url": "https://mainnet-api.tig.foundation",
"difficulty_sampler_config": {
"difficulty_ranges": {
"satisfiability": [0, 0.5],
"vehicle_routing": [0, 0.5],
"knapsack": [0, 0.5],
"vector_search": [0, 0.5]
},
"selected_difficulties": {
"satisfiability": [],
"vehicle_routing": [],
"knapsack": [],
"vector_search": []
}
},
"job_manager_config": {
"batch_sizes": {
"satisfiability": 8,
"vehicle_routing": 8,
"knapsack": 8,
"vector_search": 8
}
},
"submissions_manager_config": {
"time_between_retries": 60000
},
"precommit_manager_config": {
"max_pending_benchmarks": 4,
"algo_selection": {
"satisfiability": {
"algorithm": "schnoing",
"num_nonces": 40,
"weight": 1,
"base_fee_limit": "10000000000000000"
},
"vehicle_routing": {
"algorithm": "clarke_wright",
"num_nonces": 40,
"weight": 1,
"base_fee_limit": "10000000000000000"
},
"knapsack": {
"algorithm": "dynamic",
"num_nonces": 40,
"weight": 1,
"base_fee_limit": "10000000000000000"
},
"vector_search": {
"algorithm": "optimal_ann",
"num_nonces": 40,
"weight": 1,
"base_fee_limit": "10000000000000000"
}
}
"job_manager_config": {
"default_batch_sizes": {
"c001": 8,
"c002": 8,
"c003": 8,
"c004": 8
},
},
"precommit_manager_config": {
"max_pending_benchmarks": 4
},
"algo_selection": [
{
"algorithm_id": "c001_a001",
"num_nonces": 40,
"difficulty_range": [0, 0.5],
"selected_difficulties": [],
"weight": 1,
"batch_size": 8,
"base_fee_limit": "10000000000000000"
},
{
"algorithm_id": "c002_a001",
"num_nonces": 40,
"difficulty_range": [0, 0.5],
"selected_difficulties": [],
"weight": 1,
"batch_size": 8,
"base_fee_limit": "10000000000000000"
},
{
"algorithm_id": "c003_a001",
"num_nonces": 40,
"difficulty_range": [0, 0.5],
"selected_difficulties": [],
"weight": 1,
"batch_size": 8,
"base_fee_limit": "10000000000000000"
},
{
"algorithm_id": "c004_a001",
"num_nonces": 40,
"difficulty_range": [0, 0.5],
"selected_difficulties": [],
"weight": 1,
"batch_size": 8,
"base_fee_limit": "10000000000000000"
}
],
"slave_manager_config": {
"time_before_batch_retry": 60000,
"slaves": [
{
"name_regex": ".*",
"max_concurrent_batches": 1,
"selected_challenges": [
"satisfiability",
"vehicle_routing",
"knapsack",
"vector_search"
]
"algorithm_id_regex": ".*",
"max_concurrent_batches": 1
}
]
}

View File

@ -1,4 +1,7 @@
#!/usr/bin/env python3
import argparse
import io
import json
import os
import logging
@ -6,8 +9,10 @@ import randomname
import requests
import shutil
import subprocess
import tarfile
import time
import zlib
from glob import glob
from threading import Thread
from common.structs import OutputData, MerkleProof
from common.merkle_tree import MerkleTree, MerkleHash
@ -21,34 +26,42 @@ FINISHED_BATCH_IDS = {}
def now():
return int(time.time() * 1000)
def download_wasm(download_url, wasm_path):
if not os.path.exists(wasm_path):
def download_library(downloads_folder, batch):
challenge_folder = f"{downloads_folder}/{batch['challenge']}"
so_path = (glob(f"{challenge_folder}/*/{batch['algorithm']}.so") or [None])[0]
if so_path is None:
start = now()
logger.info(f"downloading WASM from {download_url}")
resp = requests.get(download_url)
logger.info(f"downloading {batch['algorithm']}.tar.gz from {batch['download_url']}")
resp = requests.get(batch['download_url'], stream=True)
if resp.status_code != 200:
raise Exception(f"status {resp.status_code} when downloading WASM: {resp.text}")
with open(wasm_path, 'wb') as f:
f.write(resp.content)
logger.debug(f"downloading WASM: took {now() - start}ms")
logger.debug(f"WASM Path: {wasm_path}")
raise Exception(f"status {resp.status_code} when downloading algorithm library: {resp.text}")
with tarfile.open(fileobj=io.BytesIO(resp.content), mode="r:gz") as tar:
tar.extractall(path=challenge_folder)
logger.debug(f"downloading {batch['algorithm']}.tar.gz took {now() - start}ms")
so_path = (glob(f"{challenge_folder}/*/{batch['algorithm']}.so") or [None])[0]
ptx_path = (glob(f"{challenge_folder}/*/{batch['algorithm']}.ptx") or [None])[0]
return so_path, ptx_path
def run_tig_worker(tig_worker_path, batch, wasm_path, num_workers, output_path):
def run_tig_worker(tig_worker_path, tig_runtime_path, batch, so_path, ptx_path, num_workers, output_path):
start = now()
cmd = [
tig_worker_path, "compute_batch",
json.dumps(batch["settings"]),
batch["rand_hash"],
tig_worker_path,
tig_runtime_path,
json.dumps(batch["settings"]),
batch["rand_hash"],
str(batch["start_nonce"]),
str(batch["num_nonces"]),
str(batch["batch_size"]),
wasm_path,
"--mem", str(batch["runtime_config"]["max_memory"]),
str(batch["batch_size"]),
so_path,
"--fuel", str(batch["runtime_config"]["max_fuel"]),
"--workers", str(num_workers),
"--output", f"{output_path}/{batch['id']}",
]
if ptx_path is not None:
cmd += ["--ptx", ptx_path]
logger.info(f"computing batch: {' '.join(cmd)}")
process = subprocess.Popen(
cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE
@ -97,7 +110,7 @@ def purge_folders(output_path, ttl):
FINISHED_BATCH_IDS.pop(batch_id)
def send_results(headers, master_ip, master_port, tig_worker_path, download_wasms_folder, num_workers, output_path):
def send_results(headers, master_ip, master_port, tig_worker_path, downloads_folder, num_workers, output_path):
try:
batch_id = READY_BATCH_IDS.pop()
except KeyError:
@ -185,7 +198,7 @@ def send_results(headers, master_ip, master_port, tig_worker_path, download_wasm
time.sleep(2)
def process_batch(tig_worker_path, download_wasms_folder, num_workers, output_path):
def process_batch(tig_worker_path, tig_runtime_path, downloads_folder, num_workers, output_path):
try:
batch_id = PENDING_BATCH_IDS.pop()
except KeyError:
@ -208,12 +221,11 @@ def process_batch(tig_worker_path, download_wasms_folder, num_workers, output_pa
with open(f"{output_path}/{batch_id}/batch.json") as f:
batch = json.load(f)
wasm_path = os.path.join(download_wasms_folder, f"{batch['settings']['algorithm_id']}.wasm")
download_wasm(batch['download_url'], wasm_path)
so_path, ptx_path = download_library(downloads_folder, batch)
Thread(
target=run_tig_worker,
args=(tig_worker_path, batch, wasm_path, num_workers, output_path)
args=(tig_worker_path, tig_runtime_path, batch, so_path, ptx_path, num_workers, output_path)
).start()
@ -258,18 +270,31 @@ def wrap_thread(func, *args):
def main(
master_ip: str,
tig_worker_path: str,
download_wasms_folder: str,
tig_runtime_path: str,
downloads_folder: str,
num_workers: int,
slave_name: str,
master_port: int,
output_path: str,
ttl: int,
):
print(f"Starting slave {slave_name}")
print(f"Starting slave with config:")
print(f" Slave Name: {slave_name}")
print(f" Master IP: {master_ip}")
print(f" Master Port: {master_port}")
print(f" Worker Path: {tig_worker_path}")
print(f" Runtime Path: {tig_runtime_path}")
print(f" Downloads Folder: {downloads_folder}")
print(f" Number of Workers: {num_workers}")
print(f" Output Path: {output_path}")
print(f" TTL: {ttl}")
print(f" Verbose: {args.verbose}")
if not os.path.exists(tig_worker_path):
raise FileNotFoundError(f"tig-worker not found at path: {tig_worker_path}")
os.makedirs(download_wasms_folder, exist_ok=True)
if not os.path.exists(tig_runtime_path):
raise FileNotFoundError(f"tig-runtime not found at path: {tig_runtime_path}")
os.makedirs(downloads_folder, exist_ok=True)
headers = {
"User-Agent": slave_name
@ -277,12 +302,12 @@ def main(
Thread(
target=wrap_thread,
args=(process_batch, tig_worker_path, download_wasms_folder, num_workers, output_path)
args=(process_batch, tig_worker_path, tig_runtime_path, downloads_folder, num_workers, output_path)
).start()
Thread(
target=wrap_thread,
args=(send_results, headers, master_ip, master_port, tig_worker_path, download_wasms_folder, num_workers, output_path)
args=(send_results, headers, master_ip, master_port, tig_worker_path, downloads_folder, num_workers, output_path)
).start()
Thread(
@ -295,9 +320,10 @@ def main(
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="TIG Slave Benchmarker")
parser.add_argument("tig_worker_path", help="Path to tig-worker executable")
parser.add_argument("--tig_worker_path", type=str, default=shutil.which("tig-worker"), help="Path to tig-worker executable")
parser.add_argument("--tig_runtime_path", type=str, default=shutil.which("tig-runtime"), help="Path to tig-runtime executable")
parser.add_argument("--master", type=str, default="0.0.0.0", help="IP address of the master (default: 0.0.0.0)")
parser.add_argument("--download", type=str, default="wasms", help="Folder to download WASMs to (default: wasms)")
parser.add_argument("--download", type=str, default="libs", help="Folder to download algorithm libraries to (default: libs)")
parser.add_argument("--workers", type=int, default=8, help="Number of workers (default: 8)")
parser.add_argument("--name", type=str, default=randomname.get_name(), help="Name for the slave (default: randomly generated)")
parser.add_argument("--port", type=int, default=5115, help="Port for master (default: 5115)")
@ -312,4 +338,4 @@ if __name__ == "__main__":
level=logging.DEBUG if args.verbose else logging.INFO
)
main(args.master, args.tig_worker_path, args.download, args.workers, args.name, args.port, args.output, args.ttl)
main(args.master, args.tig_worker_path, args.tig_runtime_path, args.download, args.workers, args.name, args.port, args.output, args.ttl)