diff --git a/.DS_Store b/.DS_Store new file mode 100644 index 0000000..b9164a4 Binary files /dev/null and b/.DS_Store differ diff --git a/bin/.DS_Store b/bin/.DS_Store new file mode 100644 index 0000000..5008ddf Binary files /dev/null and b/bin/.DS_Store differ diff --git a/bin/client b/bin/client index 0da8daf..0fe3e45 100644 Binary files a/bin/client and b/bin/client differ diff --git a/bin/new_slave/slave.py b/bin/new_slave/slave.py deleted file mode 100644 index a3c6d77..0000000 --- a/bin/new_slave/slave.py +++ /dev/null @@ -1,314 +0,0 @@ -import argparse -import json -import os -import logging -import randomname -import requests -import shutil -import subprocess -import time -import zlib -from threading import Thread -from common.structs import OutputData, MerkleProof -from common.merkle_tree import MerkleTree - -logger = logging.getLogger(os.path.splitext(os.path.basename(__file__))[0]) -PENDING_BATCH_IDS = set() -PROCESSING_BATCH_IDS = set() -READY_BATCH_IDS = set() -FINISHED_BATCH_IDS = {} - -def now(): - return int(time.time() * 1000) - -def download_wasm(session, download_url, wasm_path): - if not os.path.exists(wasm_path): - start = now() - logger.info(f"downloading WASM from {download_url}") - resp = session.get(download_url) - if resp.status_code != 200: - raise Exception(f"status {resp.status_code} when downloading WASM: {resp.text}") - with open(wasm_path, 'wb') as f: - f.write(resp.content) - logger.debug(f"downloading WASM: took {now() - start}ms") - logger.debug(f"WASM Path: {wasm_path}") - - -def run_tig_worker(tig_worker_path, batch, wasm_path, num_workers, output_path): - start = now() - cmd = [ - tig_worker_path, "compute_batch", - json.dumps(batch["settings"]), - batch["rand_hash"], - str(batch["start_nonce"]), - str(batch["num_nonces"]), - str(batch["batch_size"]), - wasm_path, - "--mem", str(batch["runtime_config"]["max_memory"]), - "--fuel", str(batch["runtime_config"]["max_fuel"]), - "--workers", str(num_workers), - "--output", f"{output_path}/{batch['id']}", - ] - logger.info(f"computing batch: {' '.join(cmd)}") - process = subprocess.Popen( - cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE - ) - stdout, stderr = process.communicate() - if process.returncode != 0: - PROCESSING_BATCH_IDS.remove(batch["id"]) - raise Exception(f"tig-worker failed: {stderr.decode()}") - result = json.loads(stdout.decode()) - logger.info(f"computing batch took {now() - start}ms") - logger.debug(f"batch result: {result}") - with open(f"{output_path}/{batch['id']}/result.json", "w") as f: - json.dump(result, f) - - PROCESSING_BATCH_IDS.remove(batch["id"]) - READY_BATCH_IDS.add(batch["id"]) - - -def purge_folders(output_path, ttl): - n = now() - purge_batch_ids = [ - batch_id - for batch_id, finish_time in FINISHED_BATCH_IDS.items() - if n >= finish_time + (ttl * 1000) - ] - if len(purge_batch_ids) == 0: - time.sleep(5) - return - - for batch_id in purge_batch_ids: - if os.path.exists(f"{output_path}/{batch_id}"): - logger.info(f"purging batch {batch_id}") - shutil.rmtree(f"{output_path}/{batch_id}") - FINISHED_BATCH_IDS.pop(batch_id) - - -def send_results(session, master_ip, master_port, tig_worker_path, download_wasms_folder, num_workers, output_path, mode): - global LAST_SUCCESSFUL_PORT - - try: - batch_id = READY_BATCH_IDS.pop() - except KeyError: - logger.debug("No pending batches") - time.sleep(1) - return - - port_port = LAST_SUCCESSFUL_PORT or master_port - - output_folder = f"{output_path}/{batch_id}" - with open(f"{output_folder}/batch.json") as f: - batch = json.load(f) - - if ( - not os.path.exists(f"{output_folder}/result.json") - or not os.path.exists(f"{output_folder}/data.zlib") - ): - if os.path.exists(f"{output_folder}/result.json"): - os.remove(f"{output_folder}/result.json") - logger.debug(f"Batch {batch_id} flagged as ready, but missing nonce files") - PENDING_BATCH_IDS.add(batch_id) - return - - if batch["sampled_nonces"] is None: - with open(f"{output_folder}/result.json") as f: - result = json.load(f) - - submit_url = f"http://{master_ip}:{port_port}/submit-batch-root/{batch_id}" - logger.info(f"posting root to {submit_url}") - resp = session.post(submit_url, json=result) - - - if resp.status_code == 200: - FINISHED_BATCH_IDS[batch_id] = now() - logger.info(f"successfully posted root for batch {batch_id}") - elif resp.status_code == 408: # took too long - FINISHED_BATCH_IDS[batch_id] = now() - logger.error(f"status {resp.status_code} when posting root for batch {batch_id} to master: {resp.text}") - else: - logger.error(f"status {resp.status_code} when posting root for batch {batch_id} to master: {resp.text}") - READY_BATCH_IDS.add(batch_id) # requeue - time.sleep(2) - - else: - with open(f"{output_folder}/data.zlib", "rb") as f: - leafs = [ - OutputData.from_dict(x) - for x in json.loads(zlib.decompress(f.read()).decode()) - ] - - merkle_tree = MerkleTree( - [x.to_merkle_hash() for x in leafs], - batch["batch_size"] - ) - - proofs_to_submit = [ - MerkleProof( - leaf=leafs[n - batch["start_nonce"]], - branch=merkle_tree.calc_merkle_branch(branch_idx=n - batch["start_nonce"]) - ).to_dict() - for n in batch["sampled_nonces"] - ] - - submit_url = f"http://{master_ip}:{port_port}/submit-batch-proofs/{batch_id}" - logger.info(f"posting proofs to {submit_url}") - resp = session.post(submit_url, json={"merkle_proofs": proofs_to_submit}) - - if resp.status_code == 200: - FINISHED_BATCH_IDS[batch_id] = now() - logger.info(f"successfully posted proofs for batch {batch_id}") - elif resp.status_code == 408: # took too long - FINISHED_BATCH_IDS[batch_id] = now() - logger.error(f"status {resp.status_code} when posting proofs for batch {batch_id} to master: {resp.text}") - else: - logger.error(f"status {resp.status_code} when posting proofs for batch {batch_id} to master: {resp.text}") - READY_BATCH_IDS.add(batch_id) # requeue - time.sleep(2) - - - -def process_batch(session, tig_worker_path, download_wasms_folder, num_workers, output_path): - try: - batch_id = PENDING_BATCH_IDS.pop() - except KeyError: - logger.debug("No pending batches") - time.sleep(1) - return - - if ( - batch_id in PROCESSING_BATCH_IDS or - batch_id in READY_BATCH_IDS - ): - return - - if os.path.exists(f"{output_path}/{batch_id}/result.json"): - logger.info(f"Batch {batch_id} already processed") - READY_BATCH_IDS.add(batch_id) - return - PROCESSING_BATCH_IDS.add(batch_id) - - with open(f"{output_path}/{batch_id}/batch.json") as f: - batch = json.load(f) - - wasm_path = os.path.join(download_wasms_folder, f"{batch['settings']['algorithm_id']}.wasm") - download_wasm(session, batch['download_url'], wasm_path) - - Thread( - target=run_tig_worker, - args=(tig_worker_path, batch, wasm_path, num_workers, output_path) - ).start() - - -def poll_batches(session, master_ip, master_port, output_path, mode): - - global LAST_SUCCESSFUL_PORT - - get_batches_url = f"http://{master_ip}:{master_port}/get-batches" - logger.info(f"fetching batches from {get_batches_url}") - resp = session.get(get_batches_url) - - if mode == "mainnet" and resp.status_code == 666: - logger.warning(f"Status 666 received. Retrying with master_port {master_port - 1}") - resp = session.get(f"http://{master_ip}:{master_port - 1}/get-batches") - if resp.status_code == 666: - time.sleep(2) - logger.error(f"Retrying failed with status 666 again. Reverting to original master_port {master_port}") - resp = session.get(get_batches_url) - - - if resp.status_code == 200: - LAST_SUCCESSFUL_PORT = master_port if resp.url.endswith(f":{master_port}/get-batches") else master_port - 1 - logger.debug(f"Successfully fetched batches from port {LAST_SUCCESSFUL_PORT}") - batches = resp.json() - root_batch_ids = [batch['id'] for batch in batches if batch['sampled_nonces'] is None] - proofs_batch_ids = [batch['id'] for batch in batches if batch['sampled_nonces'] is not None] - logger.info(f"root batches: {root_batch_ids}") - logger.info(f"proofs batches: {proofs_batch_ids}") - for batch in batches: - output_folder = f"{output_path}/{batch['id']}" - os.makedirs(output_folder, exist_ok=True) - with open(f"{output_folder}/batch.json", "w") as f: - json.dump(batch, f) - PENDING_BATCH_IDS.clear() - PENDING_BATCH_IDS.update(root_batch_ids + proofs_batch_ids) - time.sleep(5) - - else: - logger.error(f"status {resp.status_code} when fetching batch: {resp.text}") - time.sleep(5) - - -def wrap_thread(func, *args): - logger.info(f"Starting thread for {func.__name__}") - while True: - try: - func(*args) - except Exception as e: - logger.error(f"Error in {func.__name__}: {e}") - time.sleep(5) - - -def main( - master_ip: str, - tig_worker_path: str, - download_wasms_folder: str, - num_workers: int, - slave_name: str, - master_port: int, - output_path: str, - ttl: int, - mode :str, -): - print(f"Starting slave {slave_name}") - - - - if not os.path.exists(tig_worker_path): - raise FileNotFoundError(f"tig-worker not found at path: {tig_worker_path}") - os.makedirs(download_wasms_folder, exist_ok=True) - - session = requests.Session() - session.headers.update({ - "User-Agent": slave_name - }) - - Thread( - target=wrap_thread, - args=(process_batch, session, tig_worker_path, download_wasms_folder, num_workers, output_path) - ).start() - - Thread( - target=wrap_thread, - args=(send_results, session, master_ip, master_port, tig_worker_path, download_wasms_folder, num_workers, output_path, mode) - ).start() - - Thread( - target=wrap_thread, - args=(purge_folders, output_path, ttl) - ).start() - - wrap_thread(poll_batches, session, master_ip, master_port, output_path, mode) - - -if __name__ == "__main__": - parser = argparse.ArgumentParser(description="TIG Slave Benchmarker") - parser.add_argument("tig_worker_path", help="Path to tig-worker executable") - parser.add_argument("--master", type=str, default="0.0.0.0", help="IP address of the master (default: 0.0.0.0)") - parser.add_argument("--download", type=str, default="wasms", help="Folder to download WASMs to (default: wasms)") - parser.add_argument("--workers", type=int, default=8, help="Number of workers (default: 8)") - parser.add_argument("--name", type=str, default=randomname.get_name(), help="Name for the slave (default: randomly generated)") - parser.add_argument("--port", type=int, default=5115, help="Port for master (default: 5115)") - parser.add_argument("--verbose", action='store_true', help="Print debug logs") - parser.add_argument("--output", type=str, default="results", help="Folder to output results to (default: results)") - parser.add_argument("--ttl", type=int, default=300, help="(Time To Live) Seconds to retain results (default: 300)") - parser.add_argument("--mode", type=str, default="mainnet", help="mainnet /explo") - - args = parser.parse_args() - - logging.basicConfig( - format='%(levelname)s - [%(name)s] - %(message)s', - level=logging.DEBUG if args.verbose else logging.INFO - ) - - main(args.master, args.tig_worker_path, args.download, args.workers, args.name, args.port, args.output, args.ttl, args.mode) diff --git a/bin/tig_idle b/bin/tig_idle new file mode 100644 index 0000000..585e58a Binary files /dev/null and b/bin/tig_idle differ diff --git a/install.sh b/install.sh index 1a9b787..c2ea42a 100755 --- a/install.sh +++ b/install.sh @@ -1,6 +1,6 @@ #!/bin/bash -if [ "$#" -ne 7 ]; then +if [ "$#" -ne 7 ] && [ "$#" -ne 8 ]; then echo "wrong parameters" exit 1 fi @@ -13,12 +13,30 @@ login=$5 private_key=$6 client_version=$7 -\rm -rf tig_pool -\mkdir tig_pool -cd tig_pool +branch="main" +if [ "$#" -eq 8 ] && [ "$mode" = "testnet" ]; then + branch="test" +fi +# Remove existing directory and recreate +rm -rf "tig_pool_$branch" +mkdir "tig_pool_$branch" +cd "tig_pool_$branch" || exit 1 + +# Kill any existing screens named pool_tig screen -ls | grep pool_tig | awk '{print $1}' | xargs -I {} screen -S {} -X kill -wget https://raw.githubusercontent.com/tig-pool-nk/client/refs/heads/main/scripts/tig_pool_master.sh +# Download and run the updated script +wget "https://raw.githubusercontent.com/tig-pool-nk/client/refs/heads/$branch/scripts/tig_pool_master.sh" sudo chmod +x tig_pool_master.sh -./tig_pool_master.sh -id_slave $slave_id -nom_slave $slave_name -ip $server_url -port $port -login $login -tok $private_key -url $server_url -v $client_version + +./tig_pool_master.sh \ + -id_slave "$slave_id" \ + -nom_slave "$slave_name" \ + -ip "$server_url" \ + -port "$port" \ + -login "$login" \ + -tok "$private_key" \ + -url "$server_url" \ + -v "$client_version" \ + -b "$branch" diff --git a/scripts/tig_pool_master.sh b/scripts/tig_pool_master.sh index 8a911e3..78bcc73 100755 --- a/scripts/tig_pool_master.sh +++ b/scripts/tig_pool_master.sh @@ -2,12 +2,12 @@ # Function to display usage usage() { - echo "Usage: $0 -id_slave -nom_slave -ip -port -login -tok -url " + echo "Usage: $0 -id_slave -nom_slave -ip -port -login -tok -url -b " exit 1 } # Check if the total number of arguments ok -if [ "$#" -ne 16 ]; then +if [ "$#" -ne 18 ]; then usage fi @@ -20,6 +20,7 @@ v="" login_discord="" private_key="" URL_SERVER="" +branch="" # Parse input arguments while [[ "$#" -gt 0 ]]; do @@ -56,6 +57,10 @@ while [[ "$#" -gt 0 ]]; do URL_SERVER="$2" shift 2 ;; + -b) + branch="$2" + shift 2 + ;; *) echo "Unknown parameter: $1" usage @@ -64,7 +69,7 @@ while [[ "$#" -gt 0 ]]; do done # Ensure variables are not empty -if [ -z "$id_slave" ] || [ -z "$nom_slave" ] || [ -z "$ip" ] || [ -z "$port" ] || [ -z "$login_discord" ] || [ -z "$private_key" ] || [ -z "$URL_SERVER" ]; then +if [ -z "$id_slave" ] || [ -z "$nom_slave" ] || [ -z "$ip" ] || [ -z "$port" ] || [ -z "$login_discord" ] || [ -z "$private_key" ] || [ -z "$URL_SERVER" ]|| [ -z "$branch" ]; then usage fi @@ -85,6 +90,7 @@ echo "Login: $login_discord" echo "Private Key: $private_key" echo "URL Server: $URL_SERVER" echo "Current path: $current_path" +echo "Current branch: $branch" sudo apt update sudo apt install -y python3 python3-venv python3-dev @@ -97,9 +103,14 @@ sudo apt install -y libssl-dev # Create the directory tig_pool_test and navigate to it mkdir -p wasms +mkdir -p logs +sudo chmod -R 777 logs/ sudo chmod -R 777 wasms/ # Clone the Git repository with the specified branch -git clone https://github.com/tig-pool-nk/tig-monorepo.git +git clone -b $branch https://github.com/tig-pool-nk/tig-monorepo.git + + + # Navigate to the benchmarker directory and build the project with cargo cd tig-monorepo/tig-worker/ @@ -112,8 +123,18 @@ python3 -m venv venv mkdir -p tig-benchmarker cd tig-benchmarker -wget https://raw.githubusercontent.com/tig-pool-nk/client/refs/heads/main/tig-benchmarker/slave.py -O slave.py -wget https://raw.githubusercontent.com/tig-pool-nk/client/refs/heads/main/tig-benchmarker/requirements.txt -O requirements.txt +wget https://raw.githubusercontent.com/tig-pool-nk/client/refs/heads/$branch/tig-benchmarker/slave.py -O slave.py +wget https://raw.githubusercontent.com/tig-pool-nk/client/refs/heads/$branch/tig-benchmarker/requirements.txt -O requirements.txt +mkdir -p common +cd common +wget https://raw.githubusercontent.com/tig-pool-nk/tig-monorepo/refs/heads/$branch/tig-benchmarker/common/__init__.py -O __init__.py +wget https://raw.githubusercontent.com/tig-pool-nk/tig-monorepo/refs/heads/$branch/tig-benchmarker/common/merkle_tree.py -O merkle_tree.py +wget https://raw.githubusercontent.com/tig-pool-nk/tig-monorepo/refs/heads/$branch/tig-benchmarker/common/structs.py -O structs.py +wget https://raw.githubusercontent.com/tig-pool-nk/client/refs/heads/$branch/tig-benchmarker/common/utils.py -O utils.py + + + + cd $current_path ./venv/bin/pip3 install -r tig-benchmarker/requirements.txt @@ -122,26 +143,34 @@ mkdir -p bin cd bin # Download the files and check if the download was successful -wget https://github.com/tig-pool-nk/client/raw/refs/heads/main/bin/client -O client_tig_pool +wget https://github.com/tig-pool-nk/client/raw/refs/heads/$branch/bin/client -O client_tig_pool if [ $? -ne 0 ]; then echo "Error downloading client_tig_pool" exit 1 fi -wget https://github.com/tig-pool-nk/client/raw/refs/heads/main/bin/bench -O bench +wget https://github.com/tig-pool-nk/client/raw/refs/heads/$branch/bin/bench -O bench if [ $? -ne 0 ]; then echo "Error downloading bench" exit 1 fi + +wget https://github.com/tig-pool-nk/client/raw/refs/heads/$branch/bin/tig_idle -O tig_idle +if [ $? -ne 0 ]; then + echo "Error downloading tig_idle" + exit 1 +fi + # Grant execution permissions to both files chmod +x client_tig_pool chmod +x bench +chmod +x tig_idle cd $current_path # Download the launch file and rename it according to the provided parameters -wget -O pool_tig_launch_${id_slave}_${nom_slave}.sh https://raw.githubusercontent.com/tig-pool-nk/client/refs/heads/main/scripts/pool_tig_launch_master.sh +wget -O pool_tig_launch_${id_slave}_${nom_slave}.sh https://raw.githubusercontent.com/tig-pool-nk/client/refs/heads/test/scripts/pool_tig_launch_master.sh # Replace placeholders with variable values sed -i "s|@id@|$id_slave|g" pool_tig_launch_${id_slave}_${nom_slave}.sh @@ -162,14 +191,15 @@ sed -i "s|@@path@@|$current_path/|g" pool_tig_launch_${id_slave}_${nom_slave}.sh echo "Script completed successfully. Files have been downloaded, configured, and the path has been updated." # Start a new screen called pool_tig and execute the script pool_tig_launch_${id_slave}_${nom_slave}.sh -screen -dmS pool_tig bash -c "cd \"$current_path\" && ./pool_tig_launch_${id_slave}_${nom_slave}.sh ; exec bash" +screen -dmL -Logfile "$current_path/logs/pool_tig.log" -S pool_tig bash -c "cd \"$current_path\" && ./pool_tig_launch_${id_slave}_${nom_slave}.sh ; exec bash" + # Download snake cd $current_path mkdir game cd game -wget https://raw.githubusercontent.com/tig-pool-nk/client/refs/heads/main/scripts/snake.sh -O snake.sh +wget https://raw.githubusercontent.com/tig-pool-nk/client/refs/heads/$branch/scripts/snake.sh -O snake.sh cd $current_path set +H @@ -184,16 +214,16 @@ echo " ╚═╝ ╚═╝ ╚═════╝ ╚═╝ ╚═ echo -e "\e[0m" echo "" -echo -e "\e[32mTIG Pool has been installed successfully!\e[0m" +echo -e "\e[32mTIG $branch Pool has been installed successfully!\e[0m" echo "" echo "To follow the benchmarker, use the commands below:" echo echo " 1. Follow pool:" -echo " screen -r pool_tig" -echo +echo " tail -f logs/pool_tig" +echo echo " 2. Follow slave:" -echo " screen -r slave_tig" +echo " tail -f logs/slave_tig" echo echo " 3. Have some time to lose :)" echo " bash game/snake.sh" diff --git a/tig-benchmarker/.DS_Store b/tig-benchmarker/.DS_Store new file mode 100644 index 0000000..9dbc999 Binary files /dev/null and b/tig-benchmarker/.DS_Store differ diff --git a/tig-benchmarker/common/.DS_Store b/tig-benchmarker/common/.DS_Store new file mode 100644 index 0000000..5008ddf Binary files /dev/null and b/tig-benchmarker/common/.DS_Store differ diff --git a/tig-benchmarker/common/__init__.py b/tig-benchmarker/common/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tig-benchmarker/common/merkle_tree.py b/tig-benchmarker/common/merkle_tree.py new file mode 100644 index 0000000..202a4de --- /dev/null +++ b/tig-benchmarker/common/merkle_tree.py @@ -0,0 +1,152 @@ +from blake3 import blake3 +from typing import List, Tuple +from .utils import FromStr, u8s_from_str + +class MerkleHash(FromStr): + def __init__(self, value: bytes): + if len(value) != 32: + raise ValueError("MerkleHash must be exactly 32 bytes") + self.value = value + + @classmethod + def from_str(cls, str: str): + return cls(bytes.fromhex(str)) + + @classmethod + def null(cls): + return cls(bytes([0] * 32)) + + def to_str(self): + return self.value.hex() + + def __eq__(self, other): + return isinstance(other, MerkleHash) and self.value == other.value + + def __repr__(self): + return f"MerkleHash({self.to_str()})" + +class MerkleTree(FromStr): + def __init__(self, hashed_leafs: List[MerkleHash], n: int): + if len(hashed_leafs) > n: + raise ValueError("Invalid tree size") + if n & (n - 1) != 0: + raise ValueError("n must be a power of 2") + self.hashed_leafs = hashed_leafs + self.n = n + + def to_str(self): + """Serializes the MerkleTree to a string""" + n_hex = f"{self.n:016x}" + hashes_hex = ''.join([h.to_str() for h in self.hashed_leafs]) + return n_hex + hashes_hex + + def __repr__(self): + return f"MerkleTree([{', '.join([str(h) for h in self.hashed_leafs])}], {self.n})" + + @classmethod + def from_str(cls, s: str): + """Deserializes a MerkleTree from a string""" + if len(s) < 16 or (len(s) - 16) % 64 != 0: + raise ValueError("Invalid MerkleTree string length") + + n_hex = s[:16] + n = int(n_hex, 16) + + hashes_hex = s[16:] + hashed_leafs = [ + MerkleHash.from_str(hashes_hex[i:i + 64]) + for i in range(0, len(hashes_hex), 64) + ] + + return cls(hashed_leafs, n) + + def calc_merkle_root(self) -> MerkleHash: + hashes = self.hashed_leafs[:] + + while len(hashes) > 1: + new_hashes = [] + for i in range(0, len(hashes), 2): + left = hashes[i] + result = MerkleHash(left.value) + if i + 1 < len(hashes): + right = hashes[i + 1] + combined = left.value + right.value + result = MerkleHash(blake3(combined).digest()) + new_hashes.append(result) + hashes = new_hashes + + return hashes[0] + + def calc_merkle_branch(self, branch_idx: int) -> 'MerkleBranch': + if branch_idx >= self.n: + raise ValueError("Invalid branch index") + + hashes = self.hashed_leafs[:] + branch = [] + idx = branch_idx + depth = 0 + + while len(hashes) > 1: + new_hashes = [] + for i in range(0, len(hashes), 2): + left = hashes[i] + result = MerkleHash(left.value) + if i + 1 < len(hashes): + right = hashes[i + 1] + if idx // 2 == i // 2: + branch.append((depth, right if idx % 2 == 0 else left)) + combined = left.value + right.value + result = MerkleHash(blake3(combined).digest()) + new_hashes.append(result) + hashes = new_hashes + idx //= 2 + depth += 1 + + return MerkleBranch(branch) + +class MerkleBranch: + def __init__(self, stems: List[Tuple[int, MerkleHash]]): + self.stems = stems + + def calc_merkle_root(self, hashed_leaf: MerkleHash, branch_idx: int) -> MerkleHash: + root = hashed_leaf + idx = branch_idx + curr_depth = 0 + + for depth, hash in self.stems: + if curr_depth > depth: + raise ValueError("Invalid branch") + while curr_depth != depth: + idx //= 2 + curr_depth += 1 + + if idx % 2 == 0: + combined = root.value + hash.value + else: + combined = hash.value + root.value + root = MerkleHash(blake3(combined).digest()) + idx //= 2 + curr_depth += 1 + + return root + + def to_str(self): + """Serializes the MerkleBranch to a hex string""" + return ''.join([f"{depth:02x}{hash.to_str()}" for depth, hash in self.stems]) + + def __repr__(self): + return f"MerkleBranch([{', '.join([f'({depth}, {hash})' for depth, hash in self.stems])}])" + + @classmethod + def from_str(cls, s: str): + """Deserializes a MerkleBranch from a hex string""" + if len(s) % 66 != 0: + raise ValueError("Invalid MerkleBranch string length") + + stems = [] + for i in range(0, len(s), 66): + depth = int(s[i:i+2], 16) + hash_hex = s[i+2:i+66] + stems.append((depth, MerkleHash.from_str(hash_hex))) + + return cls(stems) \ No newline at end of file diff --git a/tig-benchmarker/common/structs.py b/tig-benchmarker/common/structs.py new file mode 100644 index 0000000..c746f3d --- /dev/null +++ b/tig-benchmarker/common/structs.py @@ -0,0 +1,297 @@ +from .merkle_tree import MerkleHash, MerkleBranch +from .utils import FromDict, u64s_from_str, u8s_from_str, jsonify, PreciseNumber +from dataclasses import dataclass, field +from typing import Dict, List, Optional, Set, Any, Tuple + +Point = Tuple[int, ...] +Frontier = Set[Point] + +@dataclass +class AlgorithmDetails(FromDict): + name: str + player_id: str + challenge_id: str + breakthrough_id: Optional[str] + type: str + fee_paid: PreciseNumber + +@dataclass +class AlgorithmState(FromDict): + block_confirmed: int + round_submitted: int + round_pushed: Optional[int] + round_active: Optional[int] + round_merged: Optional[int] + banned: bool + +@dataclass +class AlgorithmBlockData(FromDict): + num_qualifiers_by_player: Dict[str, int] + adoption: PreciseNumber + merge_points: int + reward: PreciseNumber + +@dataclass +class Algorithm(FromDict): + id: str + details: AlgorithmDetails + state: AlgorithmState + block_data: Optional[AlgorithmBlockData] + +@dataclass +class BenchmarkSettings(FromDict): + player_id: str + block_id: str + challenge_id: str + algorithm_id: str + difficulty: Point + + def calc_seed(self, rand_hash: str, nonce: int) -> bytes: + return u8s_from_str(f"{jsonify(self)}_{rand_hash}_{nonce}") + +@dataclass +class PrecommitDetails(FromDict): + block_started: int + num_nonces: int + rand_hash: str + fee_paid: PreciseNumber + +@dataclass +class PrecommitState(FromDict): + block_confirmed: int + +@dataclass +class Precommit(FromDict): + benchmark_id: str + details: PrecommitDetails + settings: BenchmarkSettings + state: PrecommitState + +@dataclass +class BenchmarkDetails(FromDict): + num_solutions: int + merkle_root: MerkleHash + sampled_nonces: List[int] + +@dataclass +class BenchmarkState(FromDict): + block_confirmed: int + +@dataclass +class Benchmark(FromDict): + id: str + details: BenchmarkDetails + state: BenchmarkState + solution_nonces: Optional[Set[int]] + +@dataclass +class OutputMetaData(FromDict): + nonce: int + runtime_signature: int + fuel_consumed: int + solution_signature: int + + @classmethod + def from_output_data(cls, output_data: 'OutputData') -> 'OutputMetaData': + return OutputData.to_output_metadata() + + def to_merkle_hash(self) -> MerkleHash: + return MerkleHash(u8s_from_str(jsonify(self))) + +@dataclass +class OutputData(FromDict): + nonce: int + runtime_signature: int + fuel_consumed: int + solution: dict + + def calc_solution_signature(self) -> int: + return u64s_from_str(jsonify(self.solution))[0] + + def to_output_metadata(self) -> OutputMetaData: + return OutputMetaData( + nonce=self.nonce, + runtime_signature=self.runtime_signature, + fuel_consumed=self.fuel_consumed, + solution_signature=self.calc_solution_signature() + ) + + def to_merkle_hash(self) -> MerkleHash: + return self.to_output_metadata().to_merkle_hash() + +@dataclass +class MerkleProof(FromDict): + leaf: OutputData + branch: MerkleBranch + +@dataclass +class ProofDetails(FromDict): + submission_delay: int + block_active: int + +@dataclass +class ProofState(FromDict): + block_confirmed: int + +@dataclass +class Proof(FromDict): + benchmark_id: str + details: ProofDetails + state: ProofState + merkle_proofs: Optional[List[MerkleProof]] + +@dataclass +class FraudState(FromDict): + block_confirmed: int + +@dataclass +class Fraud(FromDict): + benchmark_id: str + state: FraudState + allegation: Optional[str] + +@dataclass +class BlockDetails(FromDict): + prev_block_id: str + height: int + round: int + timestamp: int + num_confirmed: Dict[str, int] + num_active: Dict[str, int] + +@dataclass +class BlockData(FromDict): + confirmed_ids: Dict[str, Set[int]] + active_ids: Dict[str, Set[int]] + +@dataclass +class Block(FromDict): + id: str + details: BlockDetails + config: dict + data: Optional[BlockData] + +@dataclass +class ChallengeDetails(FromDict): + name: str + +@dataclass +class ChallengeState(FromDict): + round_active: int + +@dataclass +class ChallengeBlockData(FromDict): + num_qualifiers: int + qualifier_difficulties: Set[Point] + base_frontier: Frontier + scaled_frontier: Frontier + scaling_factor: float + base_fee: PreciseNumber + per_nonce_fee: PreciseNumber + +@dataclass +class Challenge(FromDict): + id: str + details: ChallengeDetails + state: ChallengeState + block_data: Optional[ChallengeBlockData] + +@dataclass +class OPoWBlockData(FromDict): + num_qualifiers_by_challenge: Dict[str, int] + cutoff: int + delegated_weighted_deposit: PreciseNumber + delegators: Set[str] + reward_share: float + imbalance: PreciseNumber + influence: PreciseNumber + reward: PreciseNumber + +@dataclass +class OPoW(FromDict): + player_id: str + block_data: Optional[OPoWBlockData] + +@dataclass +class PlayerDetails(FromDict): + name: Optional[str] + is_multisig: bool + +@dataclass +class PlayerState(FromDict): + total_fees_paid: PreciseNumber + available_fee_balance: PreciseNumber + delegatee: Optional[dict] + votes: dict + reward_share: Optional[dict] + +@dataclass +class PlayerBlockData(FromDict): + delegatee: Optional[str] + reward_by_type: Dict[str, PreciseNumber] + deposit_by_locked_period: List[PreciseNumber] + weighted_deposit: PreciseNumber + +@dataclass +class Player(FromDict): + id: str + details: PlayerDetails + state: PlayerState + block_data: Optional[PlayerBlockData] + +@dataclass +class BinaryDetails(FromDict): + compile_success: bool + download_url: Optional[str] + +@dataclass +class BinaryState(FromDict): + block_confirmed: int + +@dataclass +class Binary(FromDict): + algorithm_id: str + details: BinaryDetails + state: BinaryState + +@dataclass +class TopUpDetails(FromDict): + player_id: str + amount: PreciseNumber + log_idx: int + tx_hash: str + +@dataclass +class TopUpState(FromDict): + block_confirmed: int + +@dataclass +class TopUp(FromDict): + id: str + details: TopUpDetails + state: TopUpState + +@dataclass +class DifficultyData(FromDict): + num_solutions: int + num_nonces: int + difficulty: Point + +@dataclass +class DepositDetails(FromDict): + player_id: str + amount: PreciseNumber + log_idx: int + tx_hash: str + start_timestamp: int + end_timestamp: int + +@dataclass +class DepositState(FromDict): + block_confirmed: int + +@dataclass +class Deposit(FromDict): + id: str + details: DepositDetails + state: DepositState \ No newline at end of file diff --git a/tig-benchmarker/common/utils.py b/tig-benchmarker/common/utils.py new file mode 100644 index 0000000..becb696 --- /dev/null +++ b/tig-benchmarker/common/utils.py @@ -0,0 +1,217 @@ +from __future__ import annotations +from abc import ABC, abstractclassmethod, abstractmethod +from blake3 import blake3 +from dataclasses import dataclass, fields, is_dataclass, asdict +from typing import TypeVar, Type, Dict, Any, List, Union, Optional, get_origin, get_args +import json +import time + +T = TypeVar('T', bound='DataclassBase') + +class FromStr(ABC): + @abstractclassmethod + def from_str(cls, s: str): + raise NotImplementedError + + @abstractmethod + def to_str(self) -> str: + raise NotImplementedError + +@dataclass +class FromDict: + @classmethod + def from_dict(cls: Type[T], d: Dict[str, Any]) -> T: + field_types = {f.name: f.type for f in fields(cls)} + kwargs = {} + + for field in fields(cls): + value = d.pop(field.name, None) + field_type = field_types[field.name] + + if value is None: + if cls._is_optional(field_type): + kwargs[field.name] = None + else: + raise ValueError(f"Missing required field: {field.name}") + continue + + kwargs[field.name] = cls._process_value(value, field_type) + + return cls(**kwargs) + + @classmethod + def _process_value(cls, value: Any, field_type: Type) -> Any: + origin_type = get_origin(field_type) + + if cls._is_optional(field_type): + if value is None: + return None + non_none_type = next(arg for arg in get_args(field_type) if arg is not type(None)) + return cls._process_value(value, non_none_type) + + if hasattr(field_type, 'from_dict') and isinstance(value, dict): + return field_type.from_dict(value) + elif hasattr(field_type, 'from_str') and isinstance(value, str): + return field_type.from_str(value) + elif origin_type in (list, set, tuple): + elem_type = get_args(field_type)[0] + return origin_type(cls._process_value(item, elem_type) for item in value) + elif origin_type is dict: + key_type, val_type = get_args(field_type) + return {cls._process_value(k, key_type): cls._process_value(v, val_type) for k, v in value.items()} + else: + return field_type(value) + + @staticmethod + def _is_optional(field_type: Type) -> bool: + return get_origin(field_type) is Union and type(None) in get_args(field_type) + + def to_dict(self) -> Dict[str, Any]: + d = {} + for field in fields(self): + value = getattr(self, field.name) + if value is not None: + if hasattr(value, 'to_dict'): + d[field.name] = value.to_dict() + elif hasattr(value, 'to_str'): + d[field.name] = value.to_str() + elif isinstance(value, (list, set, tuple)): + d[field.name] = [ + item.to_dict() if hasattr(item, 'to_dict') + else item.to_str() if hasattr(item, 'to_str') + else item + for item in value + ] + elif isinstance(value, dict): + d[field.name] = { + k: (v.to_dict() if hasattr(v, 'to_dict') + else v.to_str() if hasattr(v, 'to_str') + else v) + for k, v in value.items() + } + elif is_dataclass(value): + d[field.name] = asdict(value) + else: + d[field.name] = value + return d + + +class PreciseNumber(FromStr): + PRECISION = 10**18 # 18 decimal places of precision + + def __init__(self, value: Union[int, float, str, PreciseNumber]): + if isinstance(value, PreciseNumber): + self._value = value._value + elif isinstance(value, int): + self._value = value * self.PRECISION + elif isinstance(value, float): + self._value = int(value * self.PRECISION) + elif isinstance(value, str): + self._value = int(value) + else: + raise TypeError(f"Unsupported type for PreciseNumber: {type(value)}") + + @classmethod + def from_str(cls, s: str) -> 'PreciseNumber': + return cls(s) + + def to_str(self) -> str: + return str(self._value) + + def __repr__(self) -> str: + return f"PreciseNumber({self.to_float()})" + + def to_float(self) -> float: + return self._value / self.PRECISION + + def __add__(self, other: Union[PreciseNumber, int, float]) -> PreciseNumber: + if isinstance(other, (int, float)): + other = PreciseNumber(other) + return PreciseNumber(self._value + other._value) + + def __sub__(self, other: Union[PreciseNumber, int, float]) -> PreciseNumber: + if isinstance(other, (int, float)): + other = PreciseNumber(other) + return PreciseNumber(self._value - other._value) + + def __mul__(self, other: Union[PreciseNumber, int, float]) -> PreciseNumber: + if isinstance(other, (int, float)): + other = PreciseNumber(other) + return PreciseNumber((self._value * other._value) // self.PRECISION) + + def __truediv__(self, other: Union[PreciseNumber, int, float]) -> PreciseNumber: + if isinstance(other, (int, float)): + other = PreciseNumber(other) + if other._value == 0: + raise ZeroDivisionError + return PreciseNumber((self._value * self.PRECISION) // other._value) + + def __floordiv__(self, other: Union[PreciseNumber, int, float]) -> PreciseNumber: + if isinstance(other, (int, float)): + other = PreciseNumber(other) + if other._value == 0: + raise ZeroDivisionError + return PreciseNumber((self._value * self.PRECISION // other._value)) + + def __eq__(self, other: Union[PreciseNumber, int, float]) -> bool: + if isinstance(other, (int, float)): + other = PreciseNumber(other) + return self._value == other._value + + def __lt__(self, other: Union[PreciseNumber, int, float]) -> bool: + if isinstance(other, (int, float)): + other = PreciseNumber(other) + return self._value < other._value + + def __le__(self, other: Union[PreciseNumber, int, float]) -> bool: + if isinstance(other, (int, float)): + other = PreciseNumber(other) + return self._value <= other._value + + def __gt__(self, other: Union[PreciseNumber, int, float]) -> bool: + if isinstance(other, (int, float)): + other = PreciseNumber(other) + return self._value > other._value + + def __ge__(self, other: Union[PreciseNumber, int, float]) -> bool: + if isinstance(other, (int, float)): + other = PreciseNumber(other) + return self._value >= other._value + + def __radd__(self, other: Union[int, float]) -> PreciseNumber: + return self + other + + def __rsub__(self, other: Union[int, float]) -> PreciseNumber: + return PreciseNumber(other) - self + + def __rmul__(self, other: Union[int, float]) -> PreciseNumber: + return self * other + + def __rtruediv__(self, other: Union[int, float]) -> PreciseNumber: + return PreciseNumber(other) / self + + def __rfloordiv__(self, other: Union[int, float]) -> PreciseNumber: + return PreciseNumber(other) // self + +def jsonify(obj: Any) -> str: + if hasattr(obj, 'to_dict'): + obj = obj.to_dict() + return json.dumps(obj, sort_keys=True, separators=(',', ':')) + +def u8s_from_str(input: str) -> bytes: + return blake3(input.encode()).digest() + +def u64s_from_str(input: str) -> List[int]: + u8s = u8s_from_str(input) + return [ + int.from_bytes( + u8s[i * 8:(i + 1) * 8], + byteorder='little', + signed=False + ) + for i in range(4) + ] + +def now(): + return int(time.time() * 1000) + diff --git a/tig-benchmarker/slave.py b/tig-benchmarker/slave.py index 6ca7f31..82c706e 100644 --- a/tig-benchmarker/slave.py +++ b/tig-benchmarker/slave.py @@ -3,154 +3,282 @@ import json import os import logging import randomname -import aiohttp -import asyncio +import requests +import shutil import subprocess import time +import zlib +from threading import Thread +from common.structs import OutputData, MerkleProof +from common.merkle_tree import MerkleTree +from datetime import datetime, timedelta logger = logging.getLogger(os.path.splitext(os.path.basename(__file__))[0]) +PENDING_BATCH_IDS = set() +PROCESSING_BATCH_IDS = set() +READY_BATCH_IDS = set() +FINISHED_BATCH_IDS = {} def now(): return int(time.time() * 1000) -async def download_wasm(session, download_url, wasm_path): +def download_wasm(session, download_url, wasm_path): if not os.path.exists(wasm_path): start = now() logger.info(f"downloading WASM from {download_url}") - async with session.get(download_url) as resp: - if resp.status != 200: - raise Exception(f"status {resp.status} when downloading WASM: {await resp.text()}") - with open(wasm_path, 'wb') as f: - f.write(await resp.read()) + resp = session.get(download_url) + if resp.status_code != 200: + raise Exception(f"status {resp.status_code} when downloading WASM: {resp.text}") + with open(wasm_path, 'wb') as f: + f.write(resp.content) logger.debug(f"downloading WASM: took {now() - start}ms") logger.debug(f"WASM Path: {wasm_path}") -async def run_tig_worker(tig_worker_path, batch, wasm_path, num_workers): + +def run_tig_worker(tig_worker_path, batch, wasm_path, num_workers, output_path): start = now() cmd = [ tig_worker_path, "compute_batch", - json.dumps(batch["settings"]), - batch["rand_hash"], - str(batch["start_nonce"]), + json.dumps(batch["settings"]), + batch["rand_hash"], + str(batch["start_nonce"]), str(batch["num_nonces"]), - str(batch["batch_size"]), + str(batch["batch_size"]), wasm_path, "--mem", str(batch["runtime_config"]["max_memory"]), "--fuel", str(batch["runtime_config"]["max_fuel"]), "--workers", str(num_workers), + "--output", f"{output_path}/{batch['id']}", ] - if batch["sampled_nonces"]: - cmd += ["--sampled", *map(str, batch["sampled_nonces"])] logger.info(f"computing batch: {' '.join(cmd)}") - process = await asyncio.create_subprocess_exec( - *cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE + process = subprocess.Popen( + cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE ) - stdout, stderr = await process.communicate() + stdout, stderr = process.communicate() if process.returncode != 0: + PROCESSING_BATCH_IDS.remove(batch["id"]) raise Exception(f"tig-worker failed: {stderr.decode()}") result = json.loads(stdout.decode()) logger.info(f"computing batch took {now() - start}ms") logger.debug(f"batch result: {result}") - return result + with open(f"{output_path}/{batch['id']}/result.json", "w") as f: + json.dump(result, f) + + PROCESSING_BATCH_IDS.remove(batch["id"]) + READY_BATCH_IDS.add(batch["id"]) + -async def process_batch(session, master_ip, master_port, tig_worker_path, download_wasms_folder, num_workers, batch, headers): +def purge_folders(output_path, ttl): + cutoff_time = datetime.now() - timedelta(hours=2) + for folder in os.listdir(output_path): + folder_path = os.path.join(output_path, folder) + if os.path.isdir(folder_path) and datetime.fromtimestamp(os.path.getmtime(folder_path)) < cutoff_time: + logger.info(f"removing old folder: {folder_path}") + shutil.rmtree(folder_path) + + n = now() + purge_batch_ids = [ + batch_id + for batch_id, finish_time in FINISHED_BATCH_IDS.items() + if n >= finish_time + (ttl * 1000) + ] + if len(purge_batch_ids) == 0: + time.sleep(5) + return + + for batch_id in purge_batch_ids: + if os.path.exists(f"{output_path}/{batch_id}"): + logger.info(f"purging batch {batch_id}") + shutil.rmtree(f"{output_path}/{batch_id}") + FINISHED_BATCH_IDS.pop(batch_id) + + +def send_results(session, master_ip, master_port, tig_worker_path, download_wasms_folder, num_workers, output_path): try: - batch_id = f"{batch['benchmark_id']}_{batch['start_nonce']}" - logger.info(f"Processing batch {batch_id}: {batch}") + batch_id = READY_BATCH_IDS.pop() + except KeyError: + logger.debug("No pending batches") + return + + output_folder = f"{output_path}/{batch_id}" + with open(f"{output_folder}/batch.json") as f: + batch = json.load(f) + + if ( + not os.path.exists(f"{output_folder}/result.json") + or not os.path.exists(f"{output_folder}/data.zlib") + ): + if os.path.exists(f"{output_folder}/result.json"): + os.remove(f"{output_folder}/result.json") + logger.debug(f"Batch {batch_id} flagged as ready, but missing nonce files") + PENDING_BATCH_IDS.add(batch_id) + return - # Step 2: Download WASM - wasm_path = os.path.join(download_wasms_folder, f"{batch['settings']['algorithm_id']}.wasm") - await download_wasm(session, batch['download_url'], wasm_path) + if batch["sampled_nonces"] is None: + with open(f"{output_folder}/result.json") as f: + result = json.load(f) - # Step 3: Run tig-worker - result = await run_tig_worker(tig_worker_path, batch, wasm_path, num_workers) + submit_url = f"http://{master_ip}:{master_port}/submit-batch-root/{batch_id}" + logger.info(f"posting root to {submit_url}") + resp = session.post(submit_url, json=result) + if resp.status_code == 200: + FINISHED_BATCH_IDS[batch_id] = now() + logger.info(f"successfully posted root for batch {batch_id}") + elif resp.status_code == 408: # took too long + FINISHED_BATCH_IDS[batch_id] = now() + logger.error(f"status {resp.status_code} when posting root for batch {batch_id} to master: {resp.text}") + else: + logger.error(f"status {resp.status_code} when posting root for batch {batch_id} to master: {resp.text}") + READY_BATCH_IDS.add(batch_id) # requeue - # Step 4: Submit results - start = now() - submit_url = f"http://{master_ip}:{master_port}/submit-batch-result/{batch_id}" - logger.info(f"posting results to {submit_url}") - async with session.post(submit_url, json=result, headers=headers) as resp: - if resp.status != 200: - raise Exception(f"status {resp.status} when posting results to master: {await resp.text()}") - logger.debug(f"posting results took {now() - start} ms") + else: + with open(f"{output_folder}/data.zlib", "rb") as f: + leafs = [ + OutputData.from_dict(x) + for x in json.loads(zlib.decompress(f.read()).decode()) + ] + + merkle_tree = MerkleTree( + [x.to_merkle_hash() for x in leafs], + batch["batch_size"] + ) - except Exception as e: - logger.error(f"Error processing batch {batch_id}: {e}") + proofs_to_submit = [ + MerkleProof( + leaf=leafs[n - batch["start_nonce"]], + branch=merkle_tree.calc_merkle_branch(branch_idx=n - batch["start_nonce"]) + ).to_dict() + for n in batch["sampled_nonces"] + ] + + submit_url = f"http://{master_ip}:{master_port}/submit-batch-proofs/{batch_id}" + logger.info(f"posting proofs to {submit_url}") + resp = session.post(submit_url, json={"merkle_proofs": proofs_to_submit}) + if resp.status_code == 200: + FINISHED_BATCH_IDS[batch_id] = now() + logger.info(f"successfully posted proofs for batch {batch_id}") + elif resp.status_code == 408: # took too long + FINISHED_BATCH_IDS[batch_id] = now() + logger.error(f"status {resp.status_code} when posting proofs for batch {batch_id} to master: {resp.text}") + else: + logger.error(f"status {resp.status_code} when posting proofs for batch {batch_id} to master: {resp.text}") + READY_BATCH_IDS.add(batch_id) # requeue -async def main( +def process_batch(session, tig_worker_path, download_wasms_folder, num_workers, output_path): + try: + batch_id = PENDING_BATCH_IDS.pop() + except KeyError: + logger.debug("No pending batches") + return + + if ( + batch_id in PROCESSING_BATCH_IDS or + batch_id in READY_BATCH_IDS + ): + return + + if os.path.exists(f"{output_path}/{batch_id}/result.json"): + logger.info(f"Batch {batch_id} already processed") + READY_BATCH_IDS.add(batch_id) + return + PROCESSING_BATCH_IDS.add(batch_id) + + with open(f"{output_path}/{batch_id}/batch.json") as f: + batch = json.load(f) + + wasm_path = os.path.join(download_wasms_folder, f"{batch['settings']['algorithm_id']}.wasm") + download_wasm(session, batch['download_url'], wasm_path) + + run_tig_worker(tig_worker_path, batch, wasm_path, num_workers, output_path) + + +def poll_batches(session, master_ip, master_port, output_path): + get_batches_url = f"http://{master_ip}:{master_port}/get-batches" + logger.info(f"fetching batches from {get_batches_url}") + resp = session.get(get_batches_url) + + if resp.status_code == 200: + batches = resp.json() + root_batch_ids = [batch['id'] for batch in batches if batch['sampled_nonces'] is None] + proofs_batch_ids = [batch['id'] for batch in batches if batch['sampled_nonces'] is not None] + logger.info(f"root batches: {root_batch_ids}") + logger.info(f"proofs batches: {proofs_batch_ids}") + for batch in batches: + output_folder = f"{output_path}/{batch['id']}" + os.makedirs(output_folder, exist_ok=True) + with open(f"{output_folder}/batch.json", "w") as f: + json.dump(batch, f) + PENDING_BATCH_IDS.clear() + PENDING_BATCH_IDS.update(root_batch_ids + proofs_batch_ids) + + else: + logger.error(f"status {resp.status_code} when fetching batch: {resp.text}") + +def wrap_thread(func, *args): + logger.info(f"Starting thread for {func.__name__}") + while True: + try: + func(*args) + except Exception as e: + logger.error(f"Error in {func.__name__}: {e}") + time.sleep(5) + + +def main( master_ip: str, tig_worker_path: str, download_wasms_folder: str, num_workers: int, slave_name: str, - master_port: int + master_port: int, + output_path: str, + ttl: int, ): + print(f"Starting slave {slave_name}") + if not os.path.exists(tig_worker_path): raise FileNotFoundError(f"tig-worker not found at path: {tig_worker_path}") os.makedirs(download_wasms_folder, exist_ok=True) + os.makedirs(output_path, exist_ok=True) - headers = { + session = requests.Session() + session.headers.update({ "User-Agent": slave_name - } + }) + Thread( + target=wrap_thread, + args=(purge_folders, output_path, ttl) + ).start() - async with aiohttp.ClientSession() as session: - while True: - try: - # Step 1: Query for job test maj - start = now() - get_batch_url = f"http://{master_ip}:{master_port}/get-batches" - logger.info(f"fetching job from {get_batch_url}") - try: - resp = await asyncio.wait_for(session.get(get_batch_url, headers=headers), timeout=5) - if resp.status != 200: - text = await resp.text() - if resp.status == 404 and text.strip() == "No batches available": - # Retry with master_port - 1 - new_port = master_port - 1 - get_batch_url = f"http://{master_ip}:{new_port}/get-batches" - logger.info(f"No batches available on port {master_port}, trying port {new_port}") - resp_retry = await asyncio.wait_for(session.get(get_batch_url, headers=headers), timeout=10) - if resp_retry.status != 200: - raise Exception(f"status {resp_retry.status} when fetching job: {await resp_retry.text()}") - master_port_w = new_port - batches = await resp_retry.json(content_type=None) - else: - raise Exception(f"status {resp.status} when fetching job: {text}") - else: - master_port_w = master_port - batches = await resp.json(content_type=None) - except asyncio.TimeoutError: - logger.error(f"Timeout occurred when fetching job from {get_batch_url}") - continue - logger.debug(f"fetching job: took {now() - start}ms") - - # Process batches concurrently - tasks = [ - process_batch(session, master_ip, master_port_w, tig_worker_path, download_wasms_folder, num_workers, batch, headers) - for batch in batches - ] - await asyncio.gather(*tasks) - - except Exception as e: - logger.error(e) - await asyncio.sleep(2) + while True: + try: + poll_batches(session, master_ip, master_port, output_path) + batches_size = len(PENDING_BATCH_IDS) + for _ in range(batches_size): + process_batch(session, tig_worker_path, download_wasms_folder, num_workers, output_path) + send_results(session, master_ip, master_port, tig_worker_path, download_wasms_folder, num_workers, output_path) + if batches_size == 0: + time.sleep(5) + except Exception as e: + logger.error(f"Error in main loop: {e}") + time.sleep(5) if __name__ == "__main__": parser = argparse.ArgumentParser(description="TIG Slave Benchmarker") - parser.add_argument("master_ip", help="IP address of the master") parser.add_argument("tig_worker_path", help="Path to tig-worker executable") + parser.add_argument("--master", type=str, default="0.0.0.0", help="IP address of the master (default: 0.0.0.0)") parser.add_argument("--download", type=str, default="wasms", help="Folder to download WASMs to (default: wasms)") parser.add_argument("--workers", type=int, default=8, help="Number of workers (default: 8)") parser.add_argument("--name", type=str, default=randomname.get_name(), help="Name for the slave (default: randomly generated)") parser.add_argument("--port", type=int, default=5115, help="Port for master (default: 5115)") parser.add_argument("--verbose", action='store_true', help="Print debug logs") - + parser.add_argument("--output", type=str, default="results", help="Folder to output results to (default: results)") + parser.add_argument("--ttl", type=int, default=300, help="(Time To Live) Seconds to retain results (default: 300)") args = parser.parse_args() - logging.basicConfig( format='%(levelname)s - [%(name)s] - %(message)s', level=logging.DEBUG if args.verbose else logging.INFO ) - asyncio.run(main(args.master_ip, args.tig_worker_path, args.download, args.workers, args.name, args.port)) \ No newline at end of file + main(args.master, args.tig_worker_path, args.download, args.workers, args.name, args.port, args.output, args.ttl)