From fedcc94e3cca6b5ea3a0d57fa07c91685726fd59 Mon Sep 17 00:00:00 2001 From: xnico31 <54291310+xnico31@users.noreply.github.com> Date: Thu, 19 Dec 2024 19:31:29 +0100 Subject: [PATCH] maj test --- .DS_Store | Bin 0 -> 6148 bytes bin/new_slave/slave.py | 314 -------------------------- install.sh | 8 +- scripts/tig_pool_master.sh | 27 ++- tig-benchmarker/.DS_Store | Bin 0 -> 6148 bytes tig-benchmarker/common/.DS_Store | Bin 0 -> 6148 bytes tig-benchmarker/common/__init__.py | 0 tig-benchmarker/common/merkle_tree.py | 152 +++++++++++++ tig-benchmarker/common/structs.py | 297 ++++++++++++++++++++++++ tig-benchmarker/common/utils.py | 217 ++++++++++++++++++ tig-benchmarker/slave.py | 300 +++++++++++++++++------- 11 files changed, 905 insertions(+), 410 deletions(-) create mode 100644 .DS_Store delete mode 100644 bin/new_slave/slave.py create mode 100644 tig-benchmarker/.DS_Store create mode 100644 tig-benchmarker/common/.DS_Store create mode 100644 tig-benchmarker/common/__init__.py create mode 100644 tig-benchmarker/common/merkle_tree.py create mode 100644 tig-benchmarker/common/structs.py create mode 100644 tig-benchmarker/common/utils.py diff --git a/.DS_Store b/.DS_Store new file mode 100644 index 0000000000000000000000000000000000000000..df82a4a84b1e41560c71183f5937abba7f95f818 GIT binary patch literal 6148 zcmeHK!Ab)$5S_GDw-m7l1@RQ{+G1;^C|;JTKj4ZURO+rRx^%lK+pRs6!hV4MkRRgr zIFqDesp3UM$_z~2WHOVGmnE|S0MQzC>HuW`a8LwJg_YGcXU!=& zx89=~d#Rs}hAlt2rrw28Q82Up;4q7u3mGlTl- zz=nPwX}my4f;PP+2&F~WVrCFWP=rZEG^xTqF@#A+zqE0##mt~d2ccKSdF;x=gK!OU%M36B^9K!Tx o#bpLRQqa()7-OjvZ=-5Kza#_EwU`-14+{SXXd1X-2L6= finish_time + (ttl * 1000) - ] - if len(purge_batch_ids) == 0: - time.sleep(5) - return - - for batch_id in purge_batch_ids: - if os.path.exists(f"{output_path}/{batch_id}"): - logger.info(f"purging batch {batch_id}") - shutil.rmtree(f"{output_path}/{batch_id}") - FINISHED_BATCH_IDS.pop(batch_id) - - -def send_results(session, master_ip, master_port, tig_worker_path, download_wasms_folder, num_workers, output_path, mode): - global LAST_SUCCESSFUL_PORT - - try: - batch_id = READY_BATCH_IDS.pop() - except KeyError: - logger.debug("No pending batches") - time.sleep(1) - return - - port_port = LAST_SUCCESSFUL_PORT or master_port - - output_folder = f"{output_path}/{batch_id}" - with open(f"{output_folder}/batch.json") as f: - batch = json.load(f) - - if ( - not os.path.exists(f"{output_folder}/result.json") - or not os.path.exists(f"{output_folder}/data.zlib") - ): - if os.path.exists(f"{output_folder}/result.json"): - os.remove(f"{output_folder}/result.json") - logger.debug(f"Batch {batch_id} flagged as ready, but missing nonce files") - PENDING_BATCH_IDS.add(batch_id) - return - - if batch["sampled_nonces"] is None: - with open(f"{output_folder}/result.json") as f: - result = json.load(f) - - submit_url = f"http://{master_ip}:{port_port}/submit-batch-root/{batch_id}" - logger.info(f"posting root to {submit_url}") - resp = session.post(submit_url, json=result) - - - if resp.status_code == 200: - FINISHED_BATCH_IDS[batch_id] = now() - logger.info(f"successfully posted root for batch {batch_id}") - elif resp.status_code == 408: # took too long - FINISHED_BATCH_IDS[batch_id] = now() - logger.error(f"status {resp.status_code} when posting root for batch {batch_id} to master: {resp.text}") - else: - logger.error(f"status {resp.status_code} when posting root for batch {batch_id} to master: {resp.text}") - READY_BATCH_IDS.add(batch_id) # requeue - time.sleep(2) - - else: - with open(f"{output_folder}/data.zlib", "rb") as f: - leafs = [ - OutputData.from_dict(x) - for x in json.loads(zlib.decompress(f.read()).decode()) - ] - - merkle_tree = MerkleTree( - [x.to_merkle_hash() for x in leafs], - batch["batch_size"] - ) - - proofs_to_submit = [ - MerkleProof( - leaf=leafs[n - batch["start_nonce"]], - branch=merkle_tree.calc_merkle_branch(branch_idx=n - batch["start_nonce"]) - ).to_dict() - for n in batch["sampled_nonces"] - ] - - submit_url = f"http://{master_ip}:{port_port}/submit-batch-proofs/{batch_id}" - logger.info(f"posting proofs to {submit_url}") - resp = session.post(submit_url, json={"merkle_proofs": proofs_to_submit}) - - if resp.status_code == 200: - FINISHED_BATCH_IDS[batch_id] = now() - logger.info(f"successfully posted proofs for batch {batch_id}") - elif resp.status_code == 408: # took too long - FINISHED_BATCH_IDS[batch_id] = now() - logger.error(f"status {resp.status_code} when posting proofs for batch {batch_id} to master: {resp.text}") - else: - logger.error(f"status {resp.status_code} when posting proofs for batch {batch_id} to master: {resp.text}") - READY_BATCH_IDS.add(batch_id) # requeue - time.sleep(2) - - - -def process_batch(session, tig_worker_path, download_wasms_folder, num_workers, output_path): - try: - batch_id = PENDING_BATCH_IDS.pop() - except KeyError: - logger.debug("No pending batches") - time.sleep(1) - return - - if ( - batch_id in PROCESSING_BATCH_IDS or - batch_id in READY_BATCH_IDS - ): - return - - if os.path.exists(f"{output_path}/{batch_id}/result.json"): - logger.info(f"Batch {batch_id} already processed") - READY_BATCH_IDS.add(batch_id) - return - PROCESSING_BATCH_IDS.add(batch_id) - - with open(f"{output_path}/{batch_id}/batch.json") as f: - batch = json.load(f) - - wasm_path = os.path.join(download_wasms_folder, f"{batch['settings']['algorithm_id']}.wasm") - download_wasm(session, batch['download_url'], wasm_path) - - Thread( - target=run_tig_worker, - args=(tig_worker_path, batch, wasm_path, num_workers, output_path) - ).start() - - -def poll_batches(session, master_ip, master_port, output_path, mode): - - global LAST_SUCCESSFUL_PORT - - get_batches_url = f"http://{master_ip}:{master_port}/get-batches" - logger.info(f"fetching batches from {get_batches_url}") - resp = session.get(get_batches_url) - - if mode == "mainnet" and resp.status_code == 666: - logger.warning(f"Status 666 received. Retrying with master_port {master_port - 1}") - resp = session.get(f"http://{master_ip}:{master_port - 1}/get-batches") - if resp.status_code == 666: - time.sleep(2) - logger.error(f"Retrying failed with status 666 again. Reverting to original master_port {master_port}") - resp = session.get(get_batches_url) - - - if resp.status_code == 200: - LAST_SUCCESSFUL_PORT = master_port if resp.url.endswith(f":{master_port}/get-batches") else master_port - 1 - logger.debug(f"Successfully fetched batches from port {LAST_SUCCESSFUL_PORT}") - batches = resp.json() - root_batch_ids = [batch['id'] for batch in batches if batch['sampled_nonces'] is None] - proofs_batch_ids = [batch['id'] for batch in batches if batch['sampled_nonces'] is not None] - logger.info(f"root batches: {root_batch_ids}") - logger.info(f"proofs batches: {proofs_batch_ids}") - for batch in batches: - output_folder = f"{output_path}/{batch['id']}" - os.makedirs(output_folder, exist_ok=True) - with open(f"{output_folder}/batch.json", "w") as f: - json.dump(batch, f) - PENDING_BATCH_IDS.clear() - PENDING_BATCH_IDS.update(root_batch_ids + proofs_batch_ids) - time.sleep(5) - - else: - logger.error(f"status {resp.status_code} when fetching batch: {resp.text}") - time.sleep(5) - - -def wrap_thread(func, *args): - logger.info(f"Starting thread for {func.__name__}") - while True: - try: - func(*args) - except Exception as e: - logger.error(f"Error in {func.__name__}: {e}") - time.sleep(5) - - -def main( - master_ip: str, - tig_worker_path: str, - download_wasms_folder: str, - num_workers: int, - slave_name: str, - master_port: int, - output_path: str, - ttl: int, - mode :str, -): - print(f"Starting slave {slave_name}") - - - - if not os.path.exists(tig_worker_path): - raise FileNotFoundError(f"tig-worker not found at path: {tig_worker_path}") - os.makedirs(download_wasms_folder, exist_ok=True) - - session = requests.Session() - session.headers.update({ - "User-Agent": slave_name - }) - - Thread( - target=wrap_thread, - args=(process_batch, session, tig_worker_path, download_wasms_folder, num_workers, output_path) - ).start() - - Thread( - target=wrap_thread, - args=(send_results, session, master_ip, master_port, tig_worker_path, download_wasms_folder, num_workers, output_path, mode) - ).start() - - Thread( - target=wrap_thread, - args=(purge_folders, output_path, ttl) - ).start() - - wrap_thread(poll_batches, session, master_ip, master_port, output_path, mode) - - -if __name__ == "__main__": - parser = argparse.ArgumentParser(description="TIG Slave Benchmarker") - parser.add_argument("tig_worker_path", help="Path to tig-worker executable") - parser.add_argument("--master", type=str, default="0.0.0.0", help="IP address of the master (default: 0.0.0.0)") - parser.add_argument("--download", type=str, default="wasms", help="Folder to download WASMs to (default: wasms)") - parser.add_argument("--workers", type=int, default=8, help="Number of workers (default: 8)") - parser.add_argument("--name", type=str, default=randomname.get_name(), help="Name for the slave (default: randomly generated)") - parser.add_argument("--port", type=int, default=5115, help="Port for master (default: 5115)") - parser.add_argument("--verbose", action='store_true', help="Print debug logs") - parser.add_argument("--output", type=str, default="results", help="Folder to output results to (default: results)") - parser.add_argument("--ttl", type=int, default=300, help="(Time To Live) Seconds to retain results (default: 300)") - parser.add_argument("--mode", type=str, default="mainnet", help="mainnet/explo") - - args = parser.parse_args() - - logging.basicConfig( - format='%(levelname)s - [%(name)s] - %(message)s', - level=logging.DEBUG if args.verbose else logging.INFO - ) - - main(args.master, args.tig_worker_path, args.download, args.workers, args.name, args.port, args.output, args.ttl, args.mode) diff --git a/install.sh b/install.sh index 1a9b787..626de8b 100755 --- a/install.sh +++ b/install.sh @@ -13,12 +13,12 @@ login=$5 private_key=$6 client_version=$7 -\rm -rf tig_pool -\mkdir tig_pool -cd tig_pool +\rm -rf tig_pool_test +\mkdir tig_pool_test +cd tig_pool_test screen -ls | grep pool_tig | awk '{print $1}' | xargs -I {} screen -S {} -X kill -wget https://raw.githubusercontent.com/tig-pool-nk/client/refs/heads/main/scripts/tig_pool_master.sh +wget https://raw.githubusercontent.com/tig-pool-nk/client/refs/heads/test/scripts/tig_pool_master.sh sudo chmod +x tig_pool_master.sh ./tig_pool_master.sh -id_slave $slave_id -nom_slave $slave_name -ip $server_url -port $port -login $login -tok $private_key -url $server_url -v $client_version diff --git a/scripts/tig_pool_master.sh b/scripts/tig_pool_master.sh index 8a911e3..213fcd2 100755 --- a/scripts/tig_pool_master.sh +++ b/scripts/tig_pool_master.sh @@ -99,7 +99,10 @@ sudo apt install -y libssl-dev mkdir -p wasms sudo chmod -R 777 wasms/ # Clone the Git repository with the specified branch -git clone https://github.com/tig-pool-nk/tig-monorepo.git +git clone -b test https://github.com/tig-pool-nk/tig-monorepo.git + + + # Navigate to the benchmarker directory and build the project with cargo cd tig-monorepo/tig-worker/ @@ -112,8 +115,16 @@ python3 -m venv venv mkdir -p tig-benchmarker cd tig-benchmarker -wget https://raw.githubusercontent.com/tig-pool-nk/client/refs/heads/main/tig-benchmarker/slave.py -O slave.py -wget https://raw.githubusercontent.com/tig-pool-nk/client/refs/heads/main/tig-benchmarker/requirements.txt -O requirements.txt +wget https://raw.githubusercontent.com/tig-pool-nk/client/refs/heads/test/tig-benchmarker/slave.py -O slave.py +wget https://raw.githubusercontent.com/tig-pool-nk/client/refs/heads/test/tig-benchmarker/requirements.txt -O requirements.txt +mkdir -p common +wget https://raw.githubusercontent.com/tig-pool-nk/tig-monorepo/refs/heads/test/tig-benchmarker/common/__init__.py -O __init__.py +wget https://raw.githubusercontent.com/tig-pool-nk/tig-monorepo/refs/heads/test/tig-benchmarker/common/merkle_tree.py -O merkle_tree.py +wget https://raw.githubusercontent.com/tig-pool-nk/tig-monorepo/refs/heads/test/tig-benchmarker/common/structs.py -O structs.py + + + + cd $current_path ./venv/bin/pip3 install -r tig-benchmarker/requirements.txt @@ -122,13 +133,13 @@ mkdir -p bin cd bin # Download the files and check if the download was successful -wget https://github.com/tig-pool-nk/client/raw/refs/heads/main/bin/client -O client_tig_pool +wget https://github.com/tig-pool-nk/client/raw/refs/heads/test/bin/client -O client_tig_pool if [ $? -ne 0 ]; then echo "Error downloading client_tig_pool" exit 1 fi -wget https://github.com/tig-pool-nk/client/raw/refs/heads/main/bin/bench -O bench +wget https://github.com/tig-pool-nk/client/raw/refs/heads/test/bin/bench -O bench if [ $? -ne 0 ]; then echo "Error downloading bench" exit 1 @@ -141,7 +152,7 @@ chmod +x bench cd $current_path # Download the launch file and rename it according to the provided parameters -wget -O pool_tig_launch_${id_slave}_${nom_slave}.sh https://raw.githubusercontent.com/tig-pool-nk/client/refs/heads/main/scripts/pool_tig_launch_master.sh +wget -O pool_tig_launch_${id_slave}_${nom_slave}.sh https://raw.githubusercontent.com/tig-pool-nk/client/refs/heads/test/scripts/pool_tig_launch_master.sh # Replace placeholders with variable values sed -i "s|@id@|$id_slave|g" pool_tig_launch_${id_slave}_${nom_slave}.sh @@ -169,7 +180,7 @@ screen -dmS pool_tig bash -c "cd \"$current_path\" && ./pool_tig_launch_${id_sla cd $current_path mkdir game cd game -wget https://raw.githubusercontent.com/tig-pool-nk/client/refs/heads/main/scripts/snake.sh -O snake.sh +wget https://raw.githubusercontent.com/tig-pool-nk/client/refs/heads/test/scripts/snake.sh -O snake.sh cd $current_path set +H @@ -184,7 +195,7 @@ echo " ╚═╝ ╚═╝ ╚═════╝ ╚═╝ ╚═ echo -e "\e[0m" echo "" -echo -e "\e[32mTIG Pool has been installed successfully!\e[0m" +echo -e "\e[32mTIG TEST Pool has been installed successfully!\e[0m" echo "" echo "To follow the benchmarker, use the commands below:" diff --git a/tig-benchmarker/.DS_Store b/tig-benchmarker/.DS_Store new file mode 100644 index 0000000000000000000000000000000000000000..45f4ec022dc0c7d6f522afa6bf4f089ef967ceae GIT binary patch literal 6148 zcmeHKI|{-;5S>vG!N$^Zuiy<9(G%nXf`W}k5VTI^xjdS0K23;r+Q^$Q`*vsNZOAKj zG9seu+hHZL5Rn<&P#!i6&GyX)Hp++s;W*cPe`@&v^QL+oMqd zDnJFO02QDD-&G*%>umnrV|f}CpaOrOfZY!TZden$K)*ULcnbiWAnb;@_Y%Nj0bot+ z0ug~}P=P_!Y%w(Gh?mT(iCtjOMYH+Pe6wbUqW*TAUp!s326ChVRG_cGIF<{m|6BNn z{=ZM+iV9GHzfwRan`X1dCuMCNe4N$V0^h+c=Lt8%+$k8m90R=^V`1fZ=}D1SY>xe! V*abQrai;_MGhn*VsKBokcme016|DdO literal 0 HcmV?d00001 diff --git a/tig-benchmarker/common/.DS_Store b/tig-benchmarker/common/.DS_Store new file mode 100644 index 0000000000000000000000000000000000000000..5008ddfcf53c02e82d7eee2e57c38e5672ef89f6 GIT binary patch literal 6148 zcmeH~Jr2S!425mzP>H1@V-^m;4Wg<&0T*E43hX&L&p$$qDprKhvt+--jT7}7np#A3 zem<@ulZcFPQ@L2!n>{z**++&mCkOWA81W14cNZlEfg7;MkzE(HCqgga^y>{tEnwC%0;vJ&^%eQ zLs35+`xjp>T0 n: + raise ValueError("Invalid tree size") + if n & (n - 1) != 0: + raise ValueError("n must be a power of 2") + self.hashed_leafs = hashed_leafs + self.n = n + + def to_str(self): + """Serializes the MerkleTree to a string""" + n_hex = f"{self.n:016x}" + hashes_hex = ''.join([h.to_str() for h in self.hashed_leafs]) + return n_hex + hashes_hex + + def __repr__(self): + return f"MerkleTree([{', '.join([str(h) for h in self.hashed_leafs])}], {self.n})" + + @classmethod + def from_str(cls, s: str): + """Deserializes a MerkleTree from a string""" + if len(s) < 16 or (len(s) - 16) % 64 != 0: + raise ValueError("Invalid MerkleTree string length") + + n_hex = s[:16] + n = int(n_hex, 16) + + hashes_hex = s[16:] + hashed_leafs = [ + MerkleHash.from_str(hashes_hex[i:i + 64]) + for i in range(0, len(hashes_hex), 64) + ] + + return cls(hashed_leafs, n) + + def calc_merkle_root(self) -> MerkleHash: + hashes = self.hashed_leafs[:] + + while len(hashes) > 1: + new_hashes = [] + for i in range(0, len(hashes), 2): + left = hashes[i] + result = MerkleHash(left.value) + if i + 1 < len(hashes): + right = hashes[i + 1] + combined = left.value + right.value + result = MerkleHash(blake3(combined).digest()) + new_hashes.append(result) + hashes = new_hashes + + return hashes[0] + + def calc_merkle_branch(self, branch_idx: int) -> 'MerkleBranch': + if branch_idx >= self.n: + raise ValueError("Invalid branch index") + + hashes = self.hashed_leafs[:] + branch = [] + idx = branch_idx + depth = 0 + + while len(hashes) > 1: + new_hashes = [] + for i in range(0, len(hashes), 2): + left = hashes[i] + result = MerkleHash(left.value) + if i + 1 < len(hashes): + right = hashes[i + 1] + if idx // 2 == i // 2: + branch.append((depth, right if idx % 2 == 0 else left)) + combined = left.value + right.value + result = MerkleHash(blake3(combined).digest()) + new_hashes.append(result) + hashes = new_hashes + idx //= 2 + depth += 1 + + return MerkleBranch(branch) + +class MerkleBranch: + def __init__(self, stems: List[Tuple[int, MerkleHash]]): + self.stems = stems + + def calc_merkle_root(self, hashed_leaf: MerkleHash, branch_idx: int) -> MerkleHash: + root = hashed_leaf + idx = branch_idx + curr_depth = 0 + + for depth, hash in self.stems: + if curr_depth > depth: + raise ValueError("Invalid branch") + while curr_depth != depth: + idx //= 2 + curr_depth += 1 + + if idx % 2 == 0: + combined = root.value + hash.value + else: + combined = hash.value + root.value + root = MerkleHash(blake3(combined).digest()) + idx //= 2 + curr_depth += 1 + + return root + + def to_str(self): + """Serializes the MerkleBranch to a hex string""" + return ''.join([f"{depth:02x}{hash.to_str()}" for depth, hash in self.stems]) + + def __repr__(self): + return f"MerkleBranch([{', '.join([f'({depth}, {hash})' for depth, hash in self.stems])}])" + + @classmethod + def from_str(cls, s: str): + """Deserializes a MerkleBranch from a hex string""" + if len(s) % 66 != 0: + raise ValueError("Invalid MerkleBranch string length") + + stems = [] + for i in range(0, len(s), 66): + depth = int(s[i:i+2], 16) + hash_hex = s[i+2:i+66] + stems.append((depth, MerkleHash.from_str(hash_hex))) + + return cls(stems) \ No newline at end of file diff --git a/tig-benchmarker/common/structs.py b/tig-benchmarker/common/structs.py new file mode 100644 index 0000000..c746f3d --- /dev/null +++ b/tig-benchmarker/common/structs.py @@ -0,0 +1,297 @@ +from .merkle_tree import MerkleHash, MerkleBranch +from .utils import FromDict, u64s_from_str, u8s_from_str, jsonify, PreciseNumber +from dataclasses import dataclass, field +from typing import Dict, List, Optional, Set, Any, Tuple + +Point = Tuple[int, ...] +Frontier = Set[Point] + +@dataclass +class AlgorithmDetails(FromDict): + name: str + player_id: str + challenge_id: str + breakthrough_id: Optional[str] + type: str + fee_paid: PreciseNumber + +@dataclass +class AlgorithmState(FromDict): + block_confirmed: int + round_submitted: int + round_pushed: Optional[int] + round_active: Optional[int] + round_merged: Optional[int] + banned: bool + +@dataclass +class AlgorithmBlockData(FromDict): + num_qualifiers_by_player: Dict[str, int] + adoption: PreciseNumber + merge_points: int + reward: PreciseNumber + +@dataclass +class Algorithm(FromDict): + id: str + details: AlgorithmDetails + state: AlgorithmState + block_data: Optional[AlgorithmBlockData] + +@dataclass +class BenchmarkSettings(FromDict): + player_id: str + block_id: str + challenge_id: str + algorithm_id: str + difficulty: Point + + def calc_seed(self, rand_hash: str, nonce: int) -> bytes: + return u8s_from_str(f"{jsonify(self)}_{rand_hash}_{nonce}") + +@dataclass +class PrecommitDetails(FromDict): + block_started: int + num_nonces: int + rand_hash: str + fee_paid: PreciseNumber + +@dataclass +class PrecommitState(FromDict): + block_confirmed: int + +@dataclass +class Precommit(FromDict): + benchmark_id: str + details: PrecommitDetails + settings: BenchmarkSettings + state: PrecommitState + +@dataclass +class BenchmarkDetails(FromDict): + num_solutions: int + merkle_root: MerkleHash + sampled_nonces: List[int] + +@dataclass +class BenchmarkState(FromDict): + block_confirmed: int + +@dataclass +class Benchmark(FromDict): + id: str + details: BenchmarkDetails + state: BenchmarkState + solution_nonces: Optional[Set[int]] + +@dataclass +class OutputMetaData(FromDict): + nonce: int + runtime_signature: int + fuel_consumed: int + solution_signature: int + + @classmethod + def from_output_data(cls, output_data: 'OutputData') -> 'OutputMetaData': + return OutputData.to_output_metadata() + + def to_merkle_hash(self) -> MerkleHash: + return MerkleHash(u8s_from_str(jsonify(self))) + +@dataclass +class OutputData(FromDict): + nonce: int + runtime_signature: int + fuel_consumed: int + solution: dict + + def calc_solution_signature(self) -> int: + return u64s_from_str(jsonify(self.solution))[0] + + def to_output_metadata(self) -> OutputMetaData: + return OutputMetaData( + nonce=self.nonce, + runtime_signature=self.runtime_signature, + fuel_consumed=self.fuel_consumed, + solution_signature=self.calc_solution_signature() + ) + + def to_merkle_hash(self) -> MerkleHash: + return self.to_output_metadata().to_merkle_hash() + +@dataclass +class MerkleProof(FromDict): + leaf: OutputData + branch: MerkleBranch + +@dataclass +class ProofDetails(FromDict): + submission_delay: int + block_active: int + +@dataclass +class ProofState(FromDict): + block_confirmed: int + +@dataclass +class Proof(FromDict): + benchmark_id: str + details: ProofDetails + state: ProofState + merkle_proofs: Optional[List[MerkleProof]] + +@dataclass +class FraudState(FromDict): + block_confirmed: int + +@dataclass +class Fraud(FromDict): + benchmark_id: str + state: FraudState + allegation: Optional[str] + +@dataclass +class BlockDetails(FromDict): + prev_block_id: str + height: int + round: int + timestamp: int + num_confirmed: Dict[str, int] + num_active: Dict[str, int] + +@dataclass +class BlockData(FromDict): + confirmed_ids: Dict[str, Set[int]] + active_ids: Dict[str, Set[int]] + +@dataclass +class Block(FromDict): + id: str + details: BlockDetails + config: dict + data: Optional[BlockData] + +@dataclass +class ChallengeDetails(FromDict): + name: str + +@dataclass +class ChallengeState(FromDict): + round_active: int + +@dataclass +class ChallengeBlockData(FromDict): + num_qualifiers: int + qualifier_difficulties: Set[Point] + base_frontier: Frontier + scaled_frontier: Frontier + scaling_factor: float + base_fee: PreciseNumber + per_nonce_fee: PreciseNumber + +@dataclass +class Challenge(FromDict): + id: str + details: ChallengeDetails + state: ChallengeState + block_data: Optional[ChallengeBlockData] + +@dataclass +class OPoWBlockData(FromDict): + num_qualifiers_by_challenge: Dict[str, int] + cutoff: int + delegated_weighted_deposit: PreciseNumber + delegators: Set[str] + reward_share: float + imbalance: PreciseNumber + influence: PreciseNumber + reward: PreciseNumber + +@dataclass +class OPoW(FromDict): + player_id: str + block_data: Optional[OPoWBlockData] + +@dataclass +class PlayerDetails(FromDict): + name: Optional[str] + is_multisig: bool + +@dataclass +class PlayerState(FromDict): + total_fees_paid: PreciseNumber + available_fee_balance: PreciseNumber + delegatee: Optional[dict] + votes: dict + reward_share: Optional[dict] + +@dataclass +class PlayerBlockData(FromDict): + delegatee: Optional[str] + reward_by_type: Dict[str, PreciseNumber] + deposit_by_locked_period: List[PreciseNumber] + weighted_deposit: PreciseNumber + +@dataclass +class Player(FromDict): + id: str + details: PlayerDetails + state: PlayerState + block_data: Optional[PlayerBlockData] + +@dataclass +class BinaryDetails(FromDict): + compile_success: bool + download_url: Optional[str] + +@dataclass +class BinaryState(FromDict): + block_confirmed: int + +@dataclass +class Binary(FromDict): + algorithm_id: str + details: BinaryDetails + state: BinaryState + +@dataclass +class TopUpDetails(FromDict): + player_id: str + amount: PreciseNumber + log_idx: int + tx_hash: str + +@dataclass +class TopUpState(FromDict): + block_confirmed: int + +@dataclass +class TopUp(FromDict): + id: str + details: TopUpDetails + state: TopUpState + +@dataclass +class DifficultyData(FromDict): + num_solutions: int + num_nonces: int + difficulty: Point + +@dataclass +class DepositDetails(FromDict): + player_id: str + amount: PreciseNumber + log_idx: int + tx_hash: str + start_timestamp: int + end_timestamp: int + +@dataclass +class DepositState(FromDict): + block_confirmed: int + +@dataclass +class Deposit(FromDict): + id: str + details: DepositDetails + state: DepositState \ No newline at end of file diff --git a/tig-benchmarker/common/utils.py b/tig-benchmarker/common/utils.py new file mode 100644 index 0000000..becb696 --- /dev/null +++ b/tig-benchmarker/common/utils.py @@ -0,0 +1,217 @@ +from __future__ import annotations +from abc import ABC, abstractclassmethod, abstractmethod +from blake3 import blake3 +from dataclasses import dataclass, fields, is_dataclass, asdict +from typing import TypeVar, Type, Dict, Any, List, Union, Optional, get_origin, get_args +import json +import time + +T = TypeVar('T', bound='DataclassBase') + +class FromStr(ABC): + @abstractclassmethod + def from_str(cls, s: str): + raise NotImplementedError + + @abstractmethod + def to_str(self) -> str: + raise NotImplementedError + +@dataclass +class FromDict: + @classmethod + def from_dict(cls: Type[T], d: Dict[str, Any]) -> T: + field_types = {f.name: f.type for f in fields(cls)} + kwargs = {} + + for field in fields(cls): + value = d.pop(field.name, None) + field_type = field_types[field.name] + + if value is None: + if cls._is_optional(field_type): + kwargs[field.name] = None + else: + raise ValueError(f"Missing required field: {field.name}") + continue + + kwargs[field.name] = cls._process_value(value, field_type) + + return cls(**kwargs) + + @classmethod + def _process_value(cls, value: Any, field_type: Type) -> Any: + origin_type = get_origin(field_type) + + if cls._is_optional(field_type): + if value is None: + return None + non_none_type = next(arg for arg in get_args(field_type) if arg is not type(None)) + return cls._process_value(value, non_none_type) + + if hasattr(field_type, 'from_dict') and isinstance(value, dict): + return field_type.from_dict(value) + elif hasattr(field_type, 'from_str') and isinstance(value, str): + return field_type.from_str(value) + elif origin_type in (list, set, tuple): + elem_type = get_args(field_type)[0] + return origin_type(cls._process_value(item, elem_type) for item in value) + elif origin_type is dict: + key_type, val_type = get_args(field_type) + return {cls._process_value(k, key_type): cls._process_value(v, val_type) for k, v in value.items()} + else: + return field_type(value) + + @staticmethod + def _is_optional(field_type: Type) -> bool: + return get_origin(field_type) is Union and type(None) in get_args(field_type) + + def to_dict(self) -> Dict[str, Any]: + d = {} + for field in fields(self): + value = getattr(self, field.name) + if value is not None: + if hasattr(value, 'to_dict'): + d[field.name] = value.to_dict() + elif hasattr(value, 'to_str'): + d[field.name] = value.to_str() + elif isinstance(value, (list, set, tuple)): + d[field.name] = [ + item.to_dict() if hasattr(item, 'to_dict') + else item.to_str() if hasattr(item, 'to_str') + else item + for item in value + ] + elif isinstance(value, dict): + d[field.name] = { + k: (v.to_dict() if hasattr(v, 'to_dict') + else v.to_str() if hasattr(v, 'to_str') + else v) + for k, v in value.items() + } + elif is_dataclass(value): + d[field.name] = asdict(value) + else: + d[field.name] = value + return d + + +class PreciseNumber(FromStr): + PRECISION = 10**18 # 18 decimal places of precision + + def __init__(self, value: Union[int, float, str, PreciseNumber]): + if isinstance(value, PreciseNumber): + self._value = value._value + elif isinstance(value, int): + self._value = value * self.PRECISION + elif isinstance(value, float): + self._value = int(value * self.PRECISION) + elif isinstance(value, str): + self._value = int(value) + else: + raise TypeError(f"Unsupported type for PreciseNumber: {type(value)}") + + @classmethod + def from_str(cls, s: str) -> 'PreciseNumber': + return cls(s) + + def to_str(self) -> str: + return str(self._value) + + def __repr__(self) -> str: + return f"PreciseNumber({self.to_float()})" + + def to_float(self) -> float: + return self._value / self.PRECISION + + def __add__(self, other: Union[PreciseNumber, int, float]) -> PreciseNumber: + if isinstance(other, (int, float)): + other = PreciseNumber(other) + return PreciseNumber(self._value + other._value) + + def __sub__(self, other: Union[PreciseNumber, int, float]) -> PreciseNumber: + if isinstance(other, (int, float)): + other = PreciseNumber(other) + return PreciseNumber(self._value - other._value) + + def __mul__(self, other: Union[PreciseNumber, int, float]) -> PreciseNumber: + if isinstance(other, (int, float)): + other = PreciseNumber(other) + return PreciseNumber((self._value * other._value) // self.PRECISION) + + def __truediv__(self, other: Union[PreciseNumber, int, float]) -> PreciseNumber: + if isinstance(other, (int, float)): + other = PreciseNumber(other) + if other._value == 0: + raise ZeroDivisionError + return PreciseNumber((self._value * self.PRECISION) // other._value) + + def __floordiv__(self, other: Union[PreciseNumber, int, float]) -> PreciseNumber: + if isinstance(other, (int, float)): + other = PreciseNumber(other) + if other._value == 0: + raise ZeroDivisionError + return PreciseNumber((self._value * self.PRECISION // other._value)) + + def __eq__(self, other: Union[PreciseNumber, int, float]) -> bool: + if isinstance(other, (int, float)): + other = PreciseNumber(other) + return self._value == other._value + + def __lt__(self, other: Union[PreciseNumber, int, float]) -> bool: + if isinstance(other, (int, float)): + other = PreciseNumber(other) + return self._value < other._value + + def __le__(self, other: Union[PreciseNumber, int, float]) -> bool: + if isinstance(other, (int, float)): + other = PreciseNumber(other) + return self._value <= other._value + + def __gt__(self, other: Union[PreciseNumber, int, float]) -> bool: + if isinstance(other, (int, float)): + other = PreciseNumber(other) + return self._value > other._value + + def __ge__(self, other: Union[PreciseNumber, int, float]) -> bool: + if isinstance(other, (int, float)): + other = PreciseNumber(other) + return self._value >= other._value + + def __radd__(self, other: Union[int, float]) -> PreciseNumber: + return self + other + + def __rsub__(self, other: Union[int, float]) -> PreciseNumber: + return PreciseNumber(other) - self + + def __rmul__(self, other: Union[int, float]) -> PreciseNumber: + return self * other + + def __rtruediv__(self, other: Union[int, float]) -> PreciseNumber: + return PreciseNumber(other) / self + + def __rfloordiv__(self, other: Union[int, float]) -> PreciseNumber: + return PreciseNumber(other) // self + +def jsonify(obj: Any) -> str: + if hasattr(obj, 'to_dict'): + obj = obj.to_dict() + return json.dumps(obj, sort_keys=True, separators=(',', ':')) + +def u8s_from_str(input: str) -> bytes: + return blake3(input.encode()).digest() + +def u64s_from_str(input: str) -> List[int]: + u8s = u8s_from_str(input) + return [ + int.from_bytes( + u8s[i * 8:(i + 1) * 8], + byteorder='little', + signed=False + ) + for i in range(4) + ] + +def now(): + return int(time.time() * 1000) + diff --git a/tig-benchmarker/slave.py b/tig-benchmarker/slave.py index 6ca7f31..4d8a51e 100644 --- a/tig-benchmarker/slave.py +++ b/tig-benchmarker/slave.py @@ -3,154 +3,286 @@ import json import os import logging import randomname -import aiohttp -import asyncio +import requests +import shutil import subprocess import time +import zlib +from threading import Thread +from common.structs import OutputData, MerkleProof +from common.merkle_tree import MerkleTree logger = logging.getLogger(os.path.splitext(os.path.basename(__file__))[0]) +PENDING_BATCH_IDS = set() +PROCESSING_BATCH_IDS = set() +READY_BATCH_IDS = set() +FINISHED_BATCH_IDS = {} def now(): return int(time.time() * 1000) -async def download_wasm(session, download_url, wasm_path): +def download_wasm(session, download_url, wasm_path): if not os.path.exists(wasm_path): start = now() logger.info(f"downloading WASM from {download_url}") - async with session.get(download_url) as resp: - if resp.status != 200: - raise Exception(f"status {resp.status} when downloading WASM: {await resp.text()}") - with open(wasm_path, 'wb') as f: - f.write(await resp.read()) + resp = session.get(download_url) + if resp.status_code != 200: + raise Exception(f"status {resp.status_code} when downloading WASM: {resp.text}") + with open(wasm_path, 'wb') as f: + f.write(resp.content) logger.debug(f"downloading WASM: took {now() - start}ms") logger.debug(f"WASM Path: {wasm_path}") -async def run_tig_worker(tig_worker_path, batch, wasm_path, num_workers): + +def run_tig_worker(tig_worker_path, batch, wasm_path, num_workers, output_path): start = now() cmd = [ tig_worker_path, "compute_batch", - json.dumps(batch["settings"]), - batch["rand_hash"], - str(batch["start_nonce"]), + json.dumps(batch["settings"]), + batch["rand_hash"], + str(batch["start_nonce"]), str(batch["num_nonces"]), - str(batch["batch_size"]), + str(batch["batch_size"]), wasm_path, "--mem", str(batch["runtime_config"]["max_memory"]), "--fuel", str(batch["runtime_config"]["max_fuel"]), "--workers", str(num_workers), + "--output", f"{output_path}/{batch['id']}", ] - if batch["sampled_nonces"]: - cmd += ["--sampled", *map(str, batch["sampled_nonces"])] logger.info(f"computing batch: {' '.join(cmd)}") - process = await asyncio.create_subprocess_exec( - *cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE + process = subprocess.Popen( + cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE ) - stdout, stderr = await process.communicate() + stdout, stderr = process.communicate() if process.returncode != 0: + PROCESSING_BATCH_IDS.remove(batch["id"]) raise Exception(f"tig-worker failed: {stderr.decode()}") result = json.loads(stdout.decode()) logger.info(f"computing batch took {now() - start}ms") logger.debug(f"batch result: {result}") - return result + with open(f"{output_path}/{batch['id']}/result.json", "w") as f: + json.dump(result, f) + + PROCESSING_BATCH_IDS.remove(batch["id"]) + READY_BATCH_IDS.add(batch["id"]) + -async def process_batch(session, master_ip, master_port, tig_worker_path, download_wasms_folder, num_workers, batch, headers): +def purge_folders(output_path, ttl): + n = now() + purge_batch_ids = [ + batch_id + for batch_id, finish_time in FINISHED_BATCH_IDS.items() + if n >= finish_time + (ttl * 1000) + ] + if len(purge_batch_ids) == 0: + time.sleep(5) + return + + for batch_id in purge_batch_ids: + if os.path.exists(f"{output_path}/{batch_id}"): + logger.info(f"purging batch {batch_id}") + shutil.rmtree(f"{output_path}/{batch_id}") + FINISHED_BATCH_IDS.pop(batch_id) + + +def send_results(session, master_ip, master_port, tig_worker_path, download_wasms_folder, num_workers, output_path): try: - batch_id = f"{batch['benchmark_id']}_{batch['start_nonce']}" - logger.info(f"Processing batch {batch_id}: {batch}") + batch_id = READY_BATCH_IDS.pop() + except KeyError: + logger.debug("No pending batches") + time.sleep(1) + return + + output_folder = f"{output_path}/{batch_id}" + with open(f"{output_folder}/batch.json") as f: + batch = json.load(f) - # Step 2: Download WASM - wasm_path = os.path.join(download_wasms_folder, f"{batch['settings']['algorithm_id']}.wasm") - await download_wasm(session, batch['download_url'], wasm_path) + if ( + not os.path.exists(f"{output_folder}/result.json") + or not os.path.exists(f"{output_folder}/data.zlib") + ): + if os.path.exists(f"{output_folder}/result.json"): + os.remove(f"{output_folder}/result.json") + logger.debug(f"Batch {batch_id} flagged as ready, but missing nonce files") + PENDING_BATCH_IDS.add(batch_id) + return - # Step 3: Run tig-worker - result = await run_tig_worker(tig_worker_path, batch, wasm_path, num_workers) + if batch["sampled_nonces"] is None: + with open(f"{output_folder}/result.json") as f: + result = json.load(f) - # Step 4: Submit results - start = now() - submit_url = f"http://{master_ip}:{master_port}/submit-batch-result/{batch_id}" - logger.info(f"posting results to {submit_url}") - async with session.post(submit_url, json=result, headers=headers) as resp: - if resp.status != 200: - raise Exception(f"status {resp.status} when posting results to master: {await resp.text()}") - logger.debug(f"posting results took {now() - start} ms") + submit_url = f"http://{master_ip}:{master_port}/submit-batch-root/{batch_id}" + logger.info(f"posting root to {submit_url}") + resp = session.post(submit_url, json=result) + if resp.status_code == 200: + FINISHED_BATCH_IDS[batch_id] = now() + logger.info(f"successfully posted root for batch {batch_id}") + elif resp.status_code == 408: # took too long + FINISHED_BATCH_IDS[batch_id] = now() + logger.error(f"status {resp.status_code} when posting root for batch {batch_id} to master: {resp.text}") + else: + logger.error(f"status {resp.status_code} when posting root for batch {batch_id} to master: {resp.text}") + READY_BATCH_IDS.add(batch_id) # requeue + time.sleep(2) - except Exception as e: - logger.error(f"Error processing batch {batch_id}: {e}") + else: + with open(f"{output_folder}/data.zlib", "rb") as f: + leafs = [ + OutputData.from_dict(x) + for x in json.loads(zlib.decompress(f.read()).decode()) + ] + + merkle_tree = MerkleTree( + [x.to_merkle_hash() for x in leafs], + batch["batch_size"] + ) -async def main( + proofs_to_submit = [ + MerkleProof( + leaf=leafs[n - batch["start_nonce"]], + branch=merkle_tree.calc_merkle_branch(branch_idx=n - batch["start_nonce"]) + ).to_dict() + for n in batch["sampled_nonces"] + ] + + submit_url = f"http://{master_ip}:{master_port}/submit-batch-proofs/{batch_id}" + logger.info(f"posting proofs to {submit_url}") + resp = session.post(submit_url, json={"merkle_proofs": proofs_to_submit}) + if resp.status_code == 200: + FINISHED_BATCH_IDS[batch_id] = now() + logger.info(f"successfully posted proofs for batch {batch_id}") + elif resp.status_code == 408: # took too long + FINISHED_BATCH_IDS[batch_id] = now() + logger.error(f"status {resp.status_code} when posting proofs for batch {batch_id} to master: {resp.text}") + else: + logger.error(f"status {resp.status_code} when posting proofs for batch {batch_id} to master: {resp.text}") + READY_BATCH_IDS.add(batch_id) # requeue + time.sleep(2) + + +def process_batch(session, tig_worker_path, download_wasms_folder, num_workers, output_path): + try: + batch_id = PENDING_BATCH_IDS.pop() + except KeyError: + logger.debug("No pending batches") + time.sleep(1) + return + + if ( + batch_id in PROCESSING_BATCH_IDS or + batch_id in READY_BATCH_IDS + ): + return + + if os.path.exists(f"{output_path}/{batch_id}/result.json"): + logger.info(f"Batch {batch_id} already processed") + READY_BATCH_IDS.add(batch_id) + return + PROCESSING_BATCH_IDS.add(batch_id) + + with open(f"{output_path}/{batch_id}/batch.json") as f: + batch = json.load(f) + + wasm_path = os.path.join(download_wasms_folder, f"{batch['settings']['algorithm_id']}.wasm") + download_wasm(session, batch['download_url'], wasm_path) + + Thread( + target=run_tig_worker, + args=(tig_worker_path, batch, wasm_path, num_workers, output_path) + ).start() + + +def poll_batches(session, master_ip, master_port, output_path): + get_batches_url = f"http://{master_ip}:{master_port}/get-batches" + logger.info(f"fetching batches from {get_batches_url}") + resp = session.get(get_batches_url) + + if resp.status_code == 200: + batches = resp.json() + root_batch_ids = [batch['id'] for batch in batches if batch['sampled_nonces'] is None] + proofs_batch_ids = [batch['id'] for batch in batches if batch['sampled_nonces'] is not None] + logger.info(f"root batches: {root_batch_ids}") + logger.info(f"proofs batches: {proofs_batch_ids}") + for batch in batches: + output_folder = f"{output_path}/{batch['id']}" + os.makedirs(output_folder, exist_ok=True) + with open(f"{output_folder}/batch.json", "w") as f: + json.dump(batch, f) + PENDING_BATCH_IDS.clear() + PENDING_BATCH_IDS.update(root_batch_ids + proofs_batch_ids) + time.sleep(5) + + else: + logger.error(f"status {resp.status_code} when fetching batch: {resp.text}") + time.sleep(5) + + +def wrap_thread(func, *args): + logger.info(f"Starting thread for {func.__name__}") + while True: + try: + func(*args) + except Exception as e: + logger.error(f"Error in {func.__name__}: {e}") + time.sleep(5) + + +def main( master_ip: str, tig_worker_path: str, download_wasms_folder: str, num_workers: int, slave_name: str, - master_port: int + master_port: int, + output_path: str, + ttl: int, ): + print(f"Starting slave {slave_name}") + if not os.path.exists(tig_worker_path): raise FileNotFoundError(f"tig-worker not found at path: {tig_worker_path}") os.makedirs(download_wasms_folder, exist_ok=True) - headers = { + session = requests.Session() + session.headers.update({ "User-Agent": slave_name - } + }) + Thread( + target=wrap_thread, + args=(process_batch, session, tig_worker_path, download_wasms_folder, num_workers, output_path) + ).start() - async with aiohttp.ClientSession() as session: - while True: - try: - # Step 1: Query for job test maj - start = now() - get_batch_url = f"http://{master_ip}:{master_port}/get-batches" - logger.info(f"fetching job from {get_batch_url}") - try: - resp = await asyncio.wait_for(session.get(get_batch_url, headers=headers), timeout=5) - if resp.status != 200: - text = await resp.text() - if resp.status == 404 and text.strip() == "No batches available": - # Retry with master_port - 1 - new_port = master_port - 1 - get_batch_url = f"http://{master_ip}:{new_port}/get-batches" - logger.info(f"No batches available on port {master_port}, trying port {new_port}") - resp_retry = await asyncio.wait_for(session.get(get_batch_url, headers=headers), timeout=10) - if resp_retry.status != 200: - raise Exception(f"status {resp_retry.status} when fetching job: {await resp_retry.text()}") - master_port_w = new_port - batches = await resp_retry.json(content_type=None) - else: - raise Exception(f"status {resp.status} when fetching job: {text}") - else: - master_port_w = master_port - batches = await resp.json(content_type=None) - except asyncio.TimeoutError: - logger.error(f"Timeout occurred when fetching job from {get_batch_url}") - continue - logger.debug(f"fetching job: took {now() - start}ms") + Thread( + target=wrap_thread, + args=(send_results, session, master_ip, master_port, tig_worker_path, download_wasms_folder, num_workers, output_path) + ).start() - # Process batches concurrently - tasks = [ - process_batch(session, master_ip, master_port_w, tig_worker_path, download_wasms_folder, num_workers, batch, headers) - for batch in batches - ] - await asyncio.gather(*tasks) + Thread( + target=wrap_thread, + args=(purge_folders, output_path, ttl) + ).start() + + wrap_thread(poll_batches, session, master_ip, master_port, output_path) - except Exception as e: - logger.error(e) - await asyncio.sleep(2) if __name__ == "__main__": parser = argparse.ArgumentParser(description="TIG Slave Benchmarker") - parser.add_argument("master_ip", help="IP address of the master") parser.add_argument("tig_worker_path", help="Path to tig-worker executable") + parser.add_argument("--master", type=str, default="0.0.0.0", help="IP address of the master (default: 0.0.0.0)") parser.add_argument("--download", type=str, default="wasms", help="Folder to download WASMs to (default: wasms)") parser.add_argument("--workers", type=int, default=8, help="Number of workers (default: 8)") parser.add_argument("--name", type=str, default=randomname.get_name(), help="Name for the slave (default: randomly generated)") parser.add_argument("--port", type=int, default=5115, help="Port for master (default: 5115)") parser.add_argument("--verbose", action='store_true', help="Print debug logs") - + parser.add_argument("--output", type=str, default="results", help="Folder to output results to (default: results)") + parser.add_argument("--ttl", type=int, default=300, help="(Time To Live) Seconds to retain results (default: 300)") + args = parser.parse_args() - + logging.basicConfig( format='%(levelname)s - [%(name)s] - %(message)s', level=logging.DEBUG if args.verbose else logging.INFO ) - asyncio.run(main(args.master_ip, args.tig_worker_path, args.download, args.workers, args.name, args.port)) \ No newline at end of file + main(args.master, args.tig_worker_path, args.download, args.workers, args.name, args.port, args.output, args.ttl)