This commit is contained in:
xnico31 2024-12-19 19:31:29 +01:00
parent b6bcc87830
commit fedcc94e3c
11 changed files with 905 additions and 410 deletions

BIN
.DS_Store vendored Normal file

Binary file not shown.

View File

@ -1,314 +0,0 @@
import argparse
import json
import os
import logging
import randomname
import requests
import shutil
import subprocess
import time
import zlib
from threading import Thread
from common.structs import OutputData, MerkleProof
from common.merkle_tree import MerkleTree
logger = logging.getLogger(os.path.splitext(os.path.basename(__file__))[0])
PENDING_BATCH_IDS = set()
PROCESSING_BATCH_IDS = set()
READY_BATCH_IDS = set()
FINISHED_BATCH_IDS = {}
def now():
return int(time.time() * 1000)
def download_wasm(session, download_url, wasm_path):
if not os.path.exists(wasm_path):
start = now()
logger.info(f"downloading WASM from {download_url}")
resp = session.get(download_url)
if resp.status_code != 200:
raise Exception(f"status {resp.status_code} when downloading WASM: {resp.text}")
with open(wasm_path, 'wb') as f:
f.write(resp.content)
logger.debug(f"downloading WASM: took {now() - start}ms")
logger.debug(f"WASM Path: {wasm_path}")
def run_tig_worker(tig_worker_path, batch, wasm_path, num_workers, output_path):
start = now()
cmd = [
tig_worker_path, "compute_batch",
json.dumps(batch["settings"]),
batch["rand_hash"],
str(batch["start_nonce"]),
str(batch["num_nonces"]),
str(batch["batch_size"]),
wasm_path,
"--mem", str(batch["runtime_config"]["max_memory"]),
"--fuel", str(batch["runtime_config"]["max_fuel"]),
"--workers", str(num_workers),
"--output", f"{output_path}/{batch['id']}",
]
logger.info(f"computing batch: {' '.join(cmd)}")
process = subprocess.Popen(
cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE
)
stdout, stderr = process.communicate()
if process.returncode != 0:
PROCESSING_BATCH_IDS.remove(batch["id"])
raise Exception(f"tig-worker failed: {stderr.decode()}")
result = json.loads(stdout.decode())
logger.info(f"computing batch took {now() - start}ms")
logger.debug(f"batch result: {result}")
with open(f"{output_path}/{batch['id']}/result.json", "w") as f:
json.dump(result, f)
PROCESSING_BATCH_IDS.remove(batch["id"])
READY_BATCH_IDS.add(batch["id"])
def purge_folders(output_path, ttl):
n = now()
purge_batch_ids = [
batch_id
for batch_id, finish_time in FINISHED_BATCH_IDS.items()
if n >= finish_time + (ttl * 1000)
]
if len(purge_batch_ids) == 0:
time.sleep(5)
return
for batch_id in purge_batch_ids:
if os.path.exists(f"{output_path}/{batch_id}"):
logger.info(f"purging batch {batch_id}")
shutil.rmtree(f"{output_path}/{batch_id}")
FINISHED_BATCH_IDS.pop(batch_id)
def send_results(session, master_ip, master_port, tig_worker_path, download_wasms_folder, num_workers, output_path, mode):
global LAST_SUCCESSFUL_PORT
try:
batch_id = READY_BATCH_IDS.pop()
except KeyError:
logger.debug("No pending batches")
time.sleep(1)
return
port_port = LAST_SUCCESSFUL_PORT or master_port
output_folder = f"{output_path}/{batch_id}"
with open(f"{output_folder}/batch.json") as f:
batch = json.load(f)
if (
not os.path.exists(f"{output_folder}/result.json")
or not os.path.exists(f"{output_folder}/data.zlib")
):
if os.path.exists(f"{output_folder}/result.json"):
os.remove(f"{output_folder}/result.json")
logger.debug(f"Batch {batch_id} flagged as ready, but missing nonce files")
PENDING_BATCH_IDS.add(batch_id)
return
if batch["sampled_nonces"] is None:
with open(f"{output_folder}/result.json") as f:
result = json.load(f)
submit_url = f"http://{master_ip}:{port_port}/submit-batch-root/{batch_id}"
logger.info(f"posting root to {submit_url}")
resp = session.post(submit_url, json=result)
if resp.status_code == 200:
FINISHED_BATCH_IDS[batch_id] = now()
logger.info(f"successfully posted root for batch {batch_id}")
elif resp.status_code == 408: # took too long
FINISHED_BATCH_IDS[batch_id] = now()
logger.error(f"status {resp.status_code} when posting root for batch {batch_id} to master: {resp.text}")
else:
logger.error(f"status {resp.status_code} when posting root for batch {batch_id} to master: {resp.text}")
READY_BATCH_IDS.add(batch_id) # requeue
time.sleep(2)
else:
with open(f"{output_folder}/data.zlib", "rb") as f:
leafs = [
OutputData.from_dict(x)
for x in json.loads(zlib.decompress(f.read()).decode())
]
merkle_tree = MerkleTree(
[x.to_merkle_hash() for x in leafs],
batch["batch_size"]
)
proofs_to_submit = [
MerkleProof(
leaf=leafs[n - batch["start_nonce"]],
branch=merkle_tree.calc_merkle_branch(branch_idx=n - batch["start_nonce"])
).to_dict()
for n in batch["sampled_nonces"]
]
submit_url = f"http://{master_ip}:{port_port}/submit-batch-proofs/{batch_id}"
logger.info(f"posting proofs to {submit_url}")
resp = session.post(submit_url, json={"merkle_proofs": proofs_to_submit})
if resp.status_code == 200:
FINISHED_BATCH_IDS[batch_id] = now()
logger.info(f"successfully posted proofs for batch {batch_id}")
elif resp.status_code == 408: # took too long
FINISHED_BATCH_IDS[batch_id] = now()
logger.error(f"status {resp.status_code} when posting proofs for batch {batch_id} to master: {resp.text}")
else:
logger.error(f"status {resp.status_code} when posting proofs for batch {batch_id} to master: {resp.text}")
READY_BATCH_IDS.add(batch_id) # requeue
time.sleep(2)
def process_batch(session, tig_worker_path, download_wasms_folder, num_workers, output_path):
try:
batch_id = PENDING_BATCH_IDS.pop()
except KeyError:
logger.debug("No pending batches")
time.sleep(1)
return
if (
batch_id in PROCESSING_BATCH_IDS or
batch_id in READY_BATCH_IDS
):
return
if os.path.exists(f"{output_path}/{batch_id}/result.json"):
logger.info(f"Batch {batch_id} already processed")
READY_BATCH_IDS.add(batch_id)
return
PROCESSING_BATCH_IDS.add(batch_id)
with open(f"{output_path}/{batch_id}/batch.json") as f:
batch = json.load(f)
wasm_path = os.path.join(download_wasms_folder, f"{batch['settings']['algorithm_id']}.wasm")
download_wasm(session, batch['download_url'], wasm_path)
Thread(
target=run_tig_worker,
args=(tig_worker_path, batch, wasm_path, num_workers, output_path)
).start()
def poll_batches(session, master_ip, master_port, output_path, mode):
global LAST_SUCCESSFUL_PORT
get_batches_url = f"http://{master_ip}:{master_port}/get-batches"
logger.info(f"fetching batches from {get_batches_url}")
resp = session.get(get_batches_url)
if mode == "mainnet" and resp.status_code == 666:
logger.warning(f"Status 666 received. Retrying with master_port {master_port - 1}")
resp = session.get(f"http://{master_ip}:{master_port - 1}/get-batches")
if resp.status_code == 666:
time.sleep(2)
logger.error(f"Retrying failed with status 666 again. Reverting to original master_port {master_port}")
resp = session.get(get_batches_url)
if resp.status_code == 200:
LAST_SUCCESSFUL_PORT = master_port if resp.url.endswith(f":{master_port}/get-batches") else master_port - 1
logger.debug(f"Successfully fetched batches from port {LAST_SUCCESSFUL_PORT}")
batches = resp.json()
root_batch_ids = [batch['id'] for batch in batches if batch['sampled_nonces'] is None]
proofs_batch_ids = [batch['id'] for batch in batches if batch['sampled_nonces'] is not None]
logger.info(f"root batches: {root_batch_ids}")
logger.info(f"proofs batches: {proofs_batch_ids}")
for batch in batches:
output_folder = f"{output_path}/{batch['id']}"
os.makedirs(output_folder, exist_ok=True)
with open(f"{output_folder}/batch.json", "w") as f:
json.dump(batch, f)
PENDING_BATCH_IDS.clear()
PENDING_BATCH_IDS.update(root_batch_ids + proofs_batch_ids)
time.sleep(5)
else:
logger.error(f"status {resp.status_code} when fetching batch: {resp.text}")
time.sleep(5)
def wrap_thread(func, *args):
logger.info(f"Starting thread for {func.__name__}")
while True:
try:
func(*args)
except Exception as e:
logger.error(f"Error in {func.__name__}: {e}")
time.sleep(5)
def main(
master_ip: str,
tig_worker_path: str,
download_wasms_folder: str,
num_workers: int,
slave_name: str,
master_port: int,
output_path: str,
ttl: int,
mode :str,
):
print(f"Starting slave {slave_name}")
if not os.path.exists(tig_worker_path):
raise FileNotFoundError(f"tig-worker not found at path: {tig_worker_path}")
os.makedirs(download_wasms_folder, exist_ok=True)
session = requests.Session()
session.headers.update({
"User-Agent": slave_name
})
Thread(
target=wrap_thread,
args=(process_batch, session, tig_worker_path, download_wasms_folder, num_workers, output_path)
).start()
Thread(
target=wrap_thread,
args=(send_results, session, master_ip, master_port, tig_worker_path, download_wasms_folder, num_workers, output_path, mode)
).start()
Thread(
target=wrap_thread,
args=(purge_folders, output_path, ttl)
).start()
wrap_thread(poll_batches, session, master_ip, master_port, output_path, mode)
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="TIG Slave Benchmarker")
parser.add_argument("tig_worker_path", help="Path to tig-worker executable")
parser.add_argument("--master", type=str, default="0.0.0.0", help="IP address of the master (default: 0.0.0.0)")
parser.add_argument("--download", type=str, default="wasms", help="Folder to download WASMs to (default: wasms)")
parser.add_argument("--workers", type=int, default=8, help="Number of workers (default: 8)")
parser.add_argument("--name", type=str, default=randomname.get_name(), help="Name for the slave (default: randomly generated)")
parser.add_argument("--port", type=int, default=5115, help="Port for master (default: 5115)")
parser.add_argument("--verbose", action='store_true', help="Print debug logs")
parser.add_argument("--output", type=str, default="results", help="Folder to output results to (default: results)")
parser.add_argument("--ttl", type=int, default=300, help="(Time To Live) Seconds to retain results (default: 300)")
parser.add_argument("--mode", type=str, default="mainnet", help="mainnet/explo")
args = parser.parse_args()
logging.basicConfig(
format='%(levelname)s - [%(name)s] - %(message)s',
level=logging.DEBUG if args.verbose else logging.INFO
)
main(args.master, args.tig_worker_path, args.download, args.workers, args.name, args.port, args.output, args.ttl, args.mode)

View File

@ -13,12 +13,12 @@ login=$5
private_key=$6
client_version=$7
\rm -rf tig_pool
\mkdir tig_pool
cd tig_pool
\rm -rf tig_pool_test
\mkdir tig_pool_test
cd tig_pool_test
screen -ls | grep pool_tig | awk '{print $1}' | xargs -I {} screen -S {} -X kill
wget https://raw.githubusercontent.com/tig-pool-nk/client/refs/heads/main/scripts/tig_pool_master.sh
wget https://raw.githubusercontent.com/tig-pool-nk/client/refs/heads/test/scripts/tig_pool_master.sh
sudo chmod +x tig_pool_master.sh
./tig_pool_master.sh -id_slave $slave_id -nom_slave $slave_name -ip $server_url -port $port -login $login -tok $private_key -url $server_url -v $client_version

View File

@ -99,7 +99,10 @@ sudo apt install -y libssl-dev
mkdir -p wasms
sudo chmod -R 777 wasms/
# Clone the Git repository with the specified branch
git clone https://github.com/tig-pool-nk/tig-monorepo.git
git clone -b test https://github.com/tig-pool-nk/tig-monorepo.git
# Navigate to the benchmarker directory and build the project with cargo
cd tig-monorepo/tig-worker/
@ -112,8 +115,16 @@ python3 -m venv venv
mkdir -p tig-benchmarker
cd tig-benchmarker
wget https://raw.githubusercontent.com/tig-pool-nk/client/refs/heads/main/tig-benchmarker/slave.py -O slave.py
wget https://raw.githubusercontent.com/tig-pool-nk/client/refs/heads/main/tig-benchmarker/requirements.txt -O requirements.txt
wget https://raw.githubusercontent.com/tig-pool-nk/client/refs/heads/test/tig-benchmarker/slave.py -O slave.py
wget https://raw.githubusercontent.com/tig-pool-nk/client/refs/heads/test/tig-benchmarker/requirements.txt -O requirements.txt
mkdir -p common
wget https://raw.githubusercontent.com/tig-pool-nk/tig-monorepo/refs/heads/test/tig-benchmarker/common/__init__.py -O __init__.py
wget https://raw.githubusercontent.com/tig-pool-nk/tig-monorepo/refs/heads/test/tig-benchmarker/common/merkle_tree.py -O merkle_tree.py
wget https://raw.githubusercontent.com/tig-pool-nk/tig-monorepo/refs/heads/test/tig-benchmarker/common/structs.py -O structs.py
cd $current_path
./venv/bin/pip3 install -r tig-benchmarker/requirements.txt
@ -122,13 +133,13 @@ mkdir -p bin
cd bin
# Download the files and check if the download was successful
wget https://github.com/tig-pool-nk/client/raw/refs/heads/main/bin/client -O client_tig_pool
wget https://github.com/tig-pool-nk/client/raw/refs/heads/test/bin/client -O client_tig_pool
if [ $? -ne 0 ]; then
echo "Error downloading client_tig_pool"
exit 1
fi
wget https://github.com/tig-pool-nk/client/raw/refs/heads/main/bin/bench -O bench
wget https://github.com/tig-pool-nk/client/raw/refs/heads/test/bin/bench -O bench
if [ $? -ne 0 ]; then
echo "Error downloading bench"
exit 1
@ -141,7 +152,7 @@ chmod +x bench
cd $current_path
# Download the launch file and rename it according to the provided parameters
wget -O pool_tig_launch_${id_slave}_${nom_slave}.sh https://raw.githubusercontent.com/tig-pool-nk/client/refs/heads/main/scripts/pool_tig_launch_master.sh
wget -O pool_tig_launch_${id_slave}_${nom_slave}.sh https://raw.githubusercontent.com/tig-pool-nk/client/refs/heads/test/scripts/pool_tig_launch_master.sh
# Replace placeholders with variable values
sed -i "s|@id@|$id_slave|g" pool_tig_launch_${id_slave}_${nom_slave}.sh
@ -169,7 +180,7 @@ screen -dmS pool_tig bash -c "cd \"$current_path\" && ./pool_tig_launch_${id_sla
cd $current_path
mkdir game
cd game
wget https://raw.githubusercontent.com/tig-pool-nk/client/refs/heads/main/scripts/snake.sh -O snake.sh
wget https://raw.githubusercontent.com/tig-pool-nk/client/refs/heads/test/scripts/snake.sh -O snake.sh
cd $current_path
set +H
@ -184,7 +195,7 @@ echo " ╚═╝ ╚═╝ ╚═════╝ ╚═╝ ╚═
echo -e "\e[0m"
echo ""
echo -e "\e[32mTIG Pool has been installed successfully!\e[0m"
echo -e "\e[32mTIG TEST Pool has been installed successfully!\e[0m"
echo ""
echo "To follow the benchmarker, use the commands below:"

BIN
tig-benchmarker/.DS_Store vendored Normal file

Binary file not shown.

BIN
tig-benchmarker/common/.DS_Store vendored Normal file

Binary file not shown.

View File

View File

@ -0,0 +1,152 @@
from blake3 import blake3
from typing import List, Tuple
from .utils import FromStr, u8s_from_str
class MerkleHash(FromStr):
def __init__(self, value: bytes):
if len(value) != 32:
raise ValueError("MerkleHash must be exactly 32 bytes")
self.value = value
@classmethod
def from_str(cls, str: str):
return cls(bytes.fromhex(str))
@classmethod
def null(cls):
return cls(bytes([0] * 32))
def to_str(self):
return self.value.hex()
def __eq__(self, other):
return isinstance(other, MerkleHash) and self.value == other.value
def __repr__(self):
return f"MerkleHash({self.to_str()})"
class MerkleTree(FromStr):
def __init__(self, hashed_leafs: List[MerkleHash], n: int):
if len(hashed_leafs) > n:
raise ValueError("Invalid tree size")
if n & (n - 1) != 0:
raise ValueError("n must be a power of 2")
self.hashed_leafs = hashed_leafs
self.n = n
def to_str(self):
"""Serializes the MerkleTree to a string"""
n_hex = f"{self.n:016x}"
hashes_hex = ''.join([h.to_str() for h in self.hashed_leafs])
return n_hex + hashes_hex
def __repr__(self):
return f"MerkleTree([{', '.join([str(h) for h in self.hashed_leafs])}], {self.n})"
@classmethod
def from_str(cls, s: str):
"""Deserializes a MerkleTree from a string"""
if len(s) < 16 or (len(s) - 16) % 64 != 0:
raise ValueError("Invalid MerkleTree string length")
n_hex = s[:16]
n = int(n_hex, 16)
hashes_hex = s[16:]
hashed_leafs = [
MerkleHash.from_str(hashes_hex[i:i + 64])
for i in range(0, len(hashes_hex), 64)
]
return cls(hashed_leafs, n)
def calc_merkle_root(self) -> MerkleHash:
hashes = self.hashed_leafs[:]
while len(hashes) > 1:
new_hashes = []
for i in range(0, len(hashes), 2):
left = hashes[i]
result = MerkleHash(left.value)
if i + 1 < len(hashes):
right = hashes[i + 1]
combined = left.value + right.value
result = MerkleHash(blake3(combined).digest())
new_hashes.append(result)
hashes = new_hashes
return hashes[0]
def calc_merkle_branch(self, branch_idx: int) -> 'MerkleBranch':
if branch_idx >= self.n:
raise ValueError("Invalid branch index")
hashes = self.hashed_leafs[:]
branch = []
idx = branch_idx
depth = 0
while len(hashes) > 1:
new_hashes = []
for i in range(0, len(hashes), 2):
left = hashes[i]
result = MerkleHash(left.value)
if i + 1 < len(hashes):
right = hashes[i + 1]
if idx // 2 == i // 2:
branch.append((depth, right if idx % 2 == 0 else left))
combined = left.value + right.value
result = MerkleHash(blake3(combined).digest())
new_hashes.append(result)
hashes = new_hashes
idx //= 2
depth += 1
return MerkleBranch(branch)
class MerkleBranch:
def __init__(self, stems: List[Tuple[int, MerkleHash]]):
self.stems = stems
def calc_merkle_root(self, hashed_leaf: MerkleHash, branch_idx: int) -> MerkleHash:
root = hashed_leaf
idx = branch_idx
curr_depth = 0
for depth, hash in self.stems:
if curr_depth > depth:
raise ValueError("Invalid branch")
while curr_depth != depth:
idx //= 2
curr_depth += 1
if idx % 2 == 0:
combined = root.value + hash.value
else:
combined = hash.value + root.value
root = MerkleHash(blake3(combined).digest())
idx //= 2
curr_depth += 1
return root
def to_str(self):
"""Serializes the MerkleBranch to a hex string"""
return ''.join([f"{depth:02x}{hash.to_str()}" for depth, hash in self.stems])
def __repr__(self):
return f"MerkleBranch([{', '.join([f'({depth}, {hash})' for depth, hash in self.stems])}])"
@classmethod
def from_str(cls, s: str):
"""Deserializes a MerkleBranch from a hex string"""
if len(s) % 66 != 0:
raise ValueError("Invalid MerkleBranch string length")
stems = []
for i in range(0, len(s), 66):
depth = int(s[i:i+2], 16)
hash_hex = s[i+2:i+66]
stems.append((depth, MerkleHash.from_str(hash_hex)))
return cls(stems)

View File

@ -0,0 +1,297 @@
from .merkle_tree import MerkleHash, MerkleBranch
from .utils import FromDict, u64s_from_str, u8s_from_str, jsonify, PreciseNumber
from dataclasses import dataclass, field
from typing import Dict, List, Optional, Set, Any, Tuple
Point = Tuple[int, ...]
Frontier = Set[Point]
@dataclass
class AlgorithmDetails(FromDict):
name: str
player_id: str
challenge_id: str
breakthrough_id: Optional[str]
type: str
fee_paid: PreciseNumber
@dataclass
class AlgorithmState(FromDict):
block_confirmed: int
round_submitted: int
round_pushed: Optional[int]
round_active: Optional[int]
round_merged: Optional[int]
banned: bool
@dataclass
class AlgorithmBlockData(FromDict):
num_qualifiers_by_player: Dict[str, int]
adoption: PreciseNumber
merge_points: int
reward: PreciseNumber
@dataclass
class Algorithm(FromDict):
id: str
details: AlgorithmDetails
state: AlgorithmState
block_data: Optional[AlgorithmBlockData]
@dataclass
class BenchmarkSettings(FromDict):
player_id: str
block_id: str
challenge_id: str
algorithm_id: str
difficulty: Point
def calc_seed(self, rand_hash: str, nonce: int) -> bytes:
return u8s_from_str(f"{jsonify(self)}_{rand_hash}_{nonce}")
@dataclass
class PrecommitDetails(FromDict):
block_started: int
num_nonces: int
rand_hash: str
fee_paid: PreciseNumber
@dataclass
class PrecommitState(FromDict):
block_confirmed: int
@dataclass
class Precommit(FromDict):
benchmark_id: str
details: PrecommitDetails
settings: BenchmarkSettings
state: PrecommitState
@dataclass
class BenchmarkDetails(FromDict):
num_solutions: int
merkle_root: MerkleHash
sampled_nonces: List[int]
@dataclass
class BenchmarkState(FromDict):
block_confirmed: int
@dataclass
class Benchmark(FromDict):
id: str
details: BenchmarkDetails
state: BenchmarkState
solution_nonces: Optional[Set[int]]
@dataclass
class OutputMetaData(FromDict):
nonce: int
runtime_signature: int
fuel_consumed: int
solution_signature: int
@classmethod
def from_output_data(cls, output_data: 'OutputData') -> 'OutputMetaData':
return OutputData.to_output_metadata()
def to_merkle_hash(self) -> MerkleHash:
return MerkleHash(u8s_from_str(jsonify(self)))
@dataclass
class OutputData(FromDict):
nonce: int
runtime_signature: int
fuel_consumed: int
solution: dict
def calc_solution_signature(self) -> int:
return u64s_from_str(jsonify(self.solution))[0]
def to_output_metadata(self) -> OutputMetaData:
return OutputMetaData(
nonce=self.nonce,
runtime_signature=self.runtime_signature,
fuel_consumed=self.fuel_consumed,
solution_signature=self.calc_solution_signature()
)
def to_merkle_hash(self) -> MerkleHash:
return self.to_output_metadata().to_merkle_hash()
@dataclass
class MerkleProof(FromDict):
leaf: OutputData
branch: MerkleBranch
@dataclass
class ProofDetails(FromDict):
submission_delay: int
block_active: int
@dataclass
class ProofState(FromDict):
block_confirmed: int
@dataclass
class Proof(FromDict):
benchmark_id: str
details: ProofDetails
state: ProofState
merkle_proofs: Optional[List[MerkleProof]]
@dataclass
class FraudState(FromDict):
block_confirmed: int
@dataclass
class Fraud(FromDict):
benchmark_id: str
state: FraudState
allegation: Optional[str]
@dataclass
class BlockDetails(FromDict):
prev_block_id: str
height: int
round: int
timestamp: int
num_confirmed: Dict[str, int]
num_active: Dict[str, int]
@dataclass
class BlockData(FromDict):
confirmed_ids: Dict[str, Set[int]]
active_ids: Dict[str, Set[int]]
@dataclass
class Block(FromDict):
id: str
details: BlockDetails
config: dict
data: Optional[BlockData]
@dataclass
class ChallengeDetails(FromDict):
name: str
@dataclass
class ChallengeState(FromDict):
round_active: int
@dataclass
class ChallengeBlockData(FromDict):
num_qualifiers: int
qualifier_difficulties: Set[Point]
base_frontier: Frontier
scaled_frontier: Frontier
scaling_factor: float
base_fee: PreciseNumber
per_nonce_fee: PreciseNumber
@dataclass
class Challenge(FromDict):
id: str
details: ChallengeDetails
state: ChallengeState
block_data: Optional[ChallengeBlockData]
@dataclass
class OPoWBlockData(FromDict):
num_qualifiers_by_challenge: Dict[str, int]
cutoff: int
delegated_weighted_deposit: PreciseNumber
delegators: Set[str]
reward_share: float
imbalance: PreciseNumber
influence: PreciseNumber
reward: PreciseNumber
@dataclass
class OPoW(FromDict):
player_id: str
block_data: Optional[OPoWBlockData]
@dataclass
class PlayerDetails(FromDict):
name: Optional[str]
is_multisig: bool
@dataclass
class PlayerState(FromDict):
total_fees_paid: PreciseNumber
available_fee_balance: PreciseNumber
delegatee: Optional[dict]
votes: dict
reward_share: Optional[dict]
@dataclass
class PlayerBlockData(FromDict):
delegatee: Optional[str]
reward_by_type: Dict[str, PreciseNumber]
deposit_by_locked_period: List[PreciseNumber]
weighted_deposit: PreciseNumber
@dataclass
class Player(FromDict):
id: str
details: PlayerDetails
state: PlayerState
block_data: Optional[PlayerBlockData]
@dataclass
class BinaryDetails(FromDict):
compile_success: bool
download_url: Optional[str]
@dataclass
class BinaryState(FromDict):
block_confirmed: int
@dataclass
class Binary(FromDict):
algorithm_id: str
details: BinaryDetails
state: BinaryState
@dataclass
class TopUpDetails(FromDict):
player_id: str
amount: PreciseNumber
log_idx: int
tx_hash: str
@dataclass
class TopUpState(FromDict):
block_confirmed: int
@dataclass
class TopUp(FromDict):
id: str
details: TopUpDetails
state: TopUpState
@dataclass
class DifficultyData(FromDict):
num_solutions: int
num_nonces: int
difficulty: Point
@dataclass
class DepositDetails(FromDict):
player_id: str
amount: PreciseNumber
log_idx: int
tx_hash: str
start_timestamp: int
end_timestamp: int
@dataclass
class DepositState(FromDict):
block_confirmed: int
@dataclass
class Deposit(FromDict):
id: str
details: DepositDetails
state: DepositState

View File

@ -0,0 +1,217 @@
from __future__ import annotations
from abc import ABC, abstractclassmethod, abstractmethod
from blake3 import blake3
from dataclasses import dataclass, fields, is_dataclass, asdict
from typing import TypeVar, Type, Dict, Any, List, Union, Optional, get_origin, get_args
import json
import time
T = TypeVar('T', bound='DataclassBase')
class FromStr(ABC):
@abstractclassmethod
def from_str(cls, s: str):
raise NotImplementedError
@abstractmethod
def to_str(self) -> str:
raise NotImplementedError
@dataclass
class FromDict:
@classmethod
def from_dict(cls: Type[T], d: Dict[str, Any]) -> T:
field_types = {f.name: f.type for f in fields(cls)}
kwargs = {}
for field in fields(cls):
value = d.pop(field.name, None)
field_type = field_types[field.name]
if value is None:
if cls._is_optional(field_type):
kwargs[field.name] = None
else:
raise ValueError(f"Missing required field: {field.name}")
continue
kwargs[field.name] = cls._process_value(value, field_type)
return cls(**kwargs)
@classmethod
def _process_value(cls, value: Any, field_type: Type) -> Any:
origin_type = get_origin(field_type)
if cls._is_optional(field_type):
if value is None:
return None
non_none_type = next(arg for arg in get_args(field_type) if arg is not type(None))
return cls._process_value(value, non_none_type)
if hasattr(field_type, 'from_dict') and isinstance(value, dict):
return field_type.from_dict(value)
elif hasattr(field_type, 'from_str') and isinstance(value, str):
return field_type.from_str(value)
elif origin_type in (list, set, tuple):
elem_type = get_args(field_type)[0]
return origin_type(cls._process_value(item, elem_type) for item in value)
elif origin_type is dict:
key_type, val_type = get_args(field_type)
return {cls._process_value(k, key_type): cls._process_value(v, val_type) for k, v in value.items()}
else:
return field_type(value)
@staticmethod
def _is_optional(field_type: Type) -> bool:
return get_origin(field_type) is Union and type(None) in get_args(field_type)
def to_dict(self) -> Dict[str, Any]:
d = {}
for field in fields(self):
value = getattr(self, field.name)
if value is not None:
if hasattr(value, 'to_dict'):
d[field.name] = value.to_dict()
elif hasattr(value, 'to_str'):
d[field.name] = value.to_str()
elif isinstance(value, (list, set, tuple)):
d[field.name] = [
item.to_dict() if hasattr(item, 'to_dict')
else item.to_str() if hasattr(item, 'to_str')
else item
for item in value
]
elif isinstance(value, dict):
d[field.name] = {
k: (v.to_dict() if hasattr(v, 'to_dict')
else v.to_str() if hasattr(v, 'to_str')
else v)
for k, v in value.items()
}
elif is_dataclass(value):
d[field.name] = asdict(value)
else:
d[field.name] = value
return d
class PreciseNumber(FromStr):
PRECISION = 10**18 # 18 decimal places of precision
def __init__(self, value: Union[int, float, str, PreciseNumber]):
if isinstance(value, PreciseNumber):
self._value = value._value
elif isinstance(value, int):
self._value = value * self.PRECISION
elif isinstance(value, float):
self._value = int(value * self.PRECISION)
elif isinstance(value, str):
self._value = int(value)
else:
raise TypeError(f"Unsupported type for PreciseNumber: {type(value)}")
@classmethod
def from_str(cls, s: str) -> 'PreciseNumber':
return cls(s)
def to_str(self) -> str:
return str(self._value)
def __repr__(self) -> str:
return f"PreciseNumber({self.to_float()})"
def to_float(self) -> float:
return self._value / self.PRECISION
def __add__(self, other: Union[PreciseNumber, int, float]) -> PreciseNumber:
if isinstance(other, (int, float)):
other = PreciseNumber(other)
return PreciseNumber(self._value + other._value)
def __sub__(self, other: Union[PreciseNumber, int, float]) -> PreciseNumber:
if isinstance(other, (int, float)):
other = PreciseNumber(other)
return PreciseNumber(self._value - other._value)
def __mul__(self, other: Union[PreciseNumber, int, float]) -> PreciseNumber:
if isinstance(other, (int, float)):
other = PreciseNumber(other)
return PreciseNumber((self._value * other._value) // self.PRECISION)
def __truediv__(self, other: Union[PreciseNumber, int, float]) -> PreciseNumber:
if isinstance(other, (int, float)):
other = PreciseNumber(other)
if other._value == 0:
raise ZeroDivisionError
return PreciseNumber((self._value * self.PRECISION) // other._value)
def __floordiv__(self, other: Union[PreciseNumber, int, float]) -> PreciseNumber:
if isinstance(other, (int, float)):
other = PreciseNumber(other)
if other._value == 0:
raise ZeroDivisionError
return PreciseNumber((self._value * self.PRECISION // other._value))
def __eq__(self, other: Union[PreciseNumber, int, float]) -> bool:
if isinstance(other, (int, float)):
other = PreciseNumber(other)
return self._value == other._value
def __lt__(self, other: Union[PreciseNumber, int, float]) -> bool:
if isinstance(other, (int, float)):
other = PreciseNumber(other)
return self._value < other._value
def __le__(self, other: Union[PreciseNumber, int, float]) -> bool:
if isinstance(other, (int, float)):
other = PreciseNumber(other)
return self._value <= other._value
def __gt__(self, other: Union[PreciseNumber, int, float]) -> bool:
if isinstance(other, (int, float)):
other = PreciseNumber(other)
return self._value > other._value
def __ge__(self, other: Union[PreciseNumber, int, float]) -> bool:
if isinstance(other, (int, float)):
other = PreciseNumber(other)
return self._value >= other._value
def __radd__(self, other: Union[int, float]) -> PreciseNumber:
return self + other
def __rsub__(self, other: Union[int, float]) -> PreciseNumber:
return PreciseNumber(other) - self
def __rmul__(self, other: Union[int, float]) -> PreciseNumber:
return self * other
def __rtruediv__(self, other: Union[int, float]) -> PreciseNumber:
return PreciseNumber(other) / self
def __rfloordiv__(self, other: Union[int, float]) -> PreciseNumber:
return PreciseNumber(other) // self
def jsonify(obj: Any) -> str:
if hasattr(obj, 'to_dict'):
obj = obj.to_dict()
return json.dumps(obj, sort_keys=True, separators=(',', ':'))
def u8s_from_str(input: str) -> bytes:
return blake3(input.encode()).digest()
def u64s_from_str(input: str) -> List[int]:
u8s = u8s_from_str(input)
return [
int.from_bytes(
u8s[i * 8:(i + 1) * 8],
byteorder='little',
signed=False
)
for i in range(4)
]
def now():
return int(time.time() * 1000)

View File

@ -3,154 +3,286 @@ import json
import os
import logging
import randomname
import aiohttp
import asyncio
import requests
import shutil
import subprocess
import time
import zlib
from threading import Thread
from common.structs import OutputData, MerkleProof
from common.merkle_tree import MerkleTree
logger = logging.getLogger(os.path.splitext(os.path.basename(__file__))[0])
PENDING_BATCH_IDS = set()
PROCESSING_BATCH_IDS = set()
READY_BATCH_IDS = set()
FINISHED_BATCH_IDS = {}
def now():
return int(time.time() * 1000)
async def download_wasm(session, download_url, wasm_path):
def download_wasm(session, download_url, wasm_path):
if not os.path.exists(wasm_path):
start = now()
logger.info(f"downloading WASM from {download_url}")
async with session.get(download_url) as resp:
if resp.status != 200:
raise Exception(f"status {resp.status} when downloading WASM: {await resp.text()}")
with open(wasm_path, 'wb') as f:
f.write(await resp.read())
resp = session.get(download_url)
if resp.status_code != 200:
raise Exception(f"status {resp.status_code} when downloading WASM: {resp.text}")
with open(wasm_path, 'wb') as f:
f.write(resp.content)
logger.debug(f"downloading WASM: took {now() - start}ms")
logger.debug(f"WASM Path: {wasm_path}")
async def run_tig_worker(tig_worker_path, batch, wasm_path, num_workers):
def run_tig_worker(tig_worker_path, batch, wasm_path, num_workers, output_path):
start = now()
cmd = [
tig_worker_path, "compute_batch",
json.dumps(batch["settings"]),
batch["rand_hash"],
str(batch["start_nonce"]),
json.dumps(batch["settings"]),
batch["rand_hash"],
str(batch["start_nonce"]),
str(batch["num_nonces"]),
str(batch["batch_size"]),
str(batch["batch_size"]),
wasm_path,
"--mem", str(batch["runtime_config"]["max_memory"]),
"--fuel", str(batch["runtime_config"]["max_fuel"]),
"--workers", str(num_workers),
"--output", f"{output_path}/{batch['id']}",
]
if batch["sampled_nonces"]:
cmd += ["--sampled", *map(str, batch["sampled_nonces"])]
logger.info(f"computing batch: {' '.join(cmd)}")
process = await asyncio.create_subprocess_exec(
*cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE
process = subprocess.Popen(
cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE
)
stdout, stderr = await process.communicate()
stdout, stderr = process.communicate()
if process.returncode != 0:
PROCESSING_BATCH_IDS.remove(batch["id"])
raise Exception(f"tig-worker failed: {stderr.decode()}")
result = json.loads(stdout.decode())
logger.info(f"computing batch took {now() - start}ms")
logger.debug(f"batch result: {result}")
return result
with open(f"{output_path}/{batch['id']}/result.json", "w") as f:
json.dump(result, f)
PROCESSING_BATCH_IDS.remove(batch["id"])
READY_BATCH_IDS.add(batch["id"])
async def process_batch(session, master_ip, master_port, tig_worker_path, download_wasms_folder, num_workers, batch, headers):
def purge_folders(output_path, ttl):
n = now()
purge_batch_ids = [
batch_id
for batch_id, finish_time in FINISHED_BATCH_IDS.items()
if n >= finish_time + (ttl * 1000)
]
if len(purge_batch_ids) == 0:
time.sleep(5)
return
for batch_id in purge_batch_ids:
if os.path.exists(f"{output_path}/{batch_id}"):
logger.info(f"purging batch {batch_id}")
shutil.rmtree(f"{output_path}/{batch_id}")
FINISHED_BATCH_IDS.pop(batch_id)
def send_results(session, master_ip, master_port, tig_worker_path, download_wasms_folder, num_workers, output_path):
try:
batch_id = f"{batch['benchmark_id']}_{batch['start_nonce']}"
logger.info(f"Processing batch {batch_id}: {batch}")
batch_id = READY_BATCH_IDS.pop()
except KeyError:
logger.debug("No pending batches")
time.sleep(1)
return
output_folder = f"{output_path}/{batch_id}"
with open(f"{output_folder}/batch.json") as f:
batch = json.load(f)
# Step 2: Download WASM
wasm_path = os.path.join(download_wasms_folder, f"{batch['settings']['algorithm_id']}.wasm")
await download_wasm(session, batch['download_url'], wasm_path)
if (
not os.path.exists(f"{output_folder}/result.json")
or not os.path.exists(f"{output_folder}/data.zlib")
):
if os.path.exists(f"{output_folder}/result.json"):
os.remove(f"{output_folder}/result.json")
logger.debug(f"Batch {batch_id} flagged as ready, but missing nonce files")
PENDING_BATCH_IDS.add(batch_id)
return
# Step 3: Run tig-worker
result = await run_tig_worker(tig_worker_path, batch, wasm_path, num_workers)
if batch["sampled_nonces"] is None:
with open(f"{output_folder}/result.json") as f:
result = json.load(f)
# Step 4: Submit results
start = now()
submit_url = f"http://{master_ip}:{master_port}/submit-batch-result/{batch_id}"
logger.info(f"posting results to {submit_url}")
async with session.post(submit_url, json=result, headers=headers) as resp:
if resp.status != 200:
raise Exception(f"status {resp.status} when posting results to master: {await resp.text()}")
logger.debug(f"posting results took {now() - start} ms")
submit_url = f"http://{master_ip}:{master_port}/submit-batch-root/{batch_id}"
logger.info(f"posting root to {submit_url}")
resp = session.post(submit_url, json=result)
if resp.status_code == 200:
FINISHED_BATCH_IDS[batch_id] = now()
logger.info(f"successfully posted root for batch {batch_id}")
elif resp.status_code == 408: # took too long
FINISHED_BATCH_IDS[batch_id] = now()
logger.error(f"status {resp.status_code} when posting root for batch {batch_id} to master: {resp.text}")
else:
logger.error(f"status {resp.status_code} when posting root for batch {batch_id} to master: {resp.text}")
READY_BATCH_IDS.add(batch_id) # requeue
time.sleep(2)
except Exception as e:
logger.error(f"Error processing batch {batch_id}: {e}")
else:
with open(f"{output_folder}/data.zlib", "rb") as f:
leafs = [
OutputData.from_dict(x)
for x in json.loads(zlib.decompress(f.read()).decode())
]
merkle_tree = MerkleTree(
[x.to_merkle_hash() for x in leafs],
batch["batch_size"]
)
async def main(
proofs_to_submit = [
MerkleProof(
leaf=leafs[n - batch["start_nonce"]],
branch=merkle_tree.calc_merkle_branch(branch_idx=n - batch["start_nonce"])
).to_dict()
for n in batch["sampled_nonces"]
]
submit_url = f"http://{master_ip}:{master_port}/submit-batch-proofs/{batch_id}"
logger.info(f"posting proofs to {submit_url}")
resp = session.post(submit_url, json={"merkle_proofs": proofs_to_submit})
if resp.status_code == 200:
FINISHED_BATCH_IDS[batch_id] = now()
logger.info(f"successfully posted proofs for batch {batch_id}")
elif resp.status_code == 408: # took too long
FINISHED_BATCH_IDS[batch_id] = now()
logger.error(f"status {resp.status_code} when posting proofs for batch {batch_id} to master: {resp.text}")
else:
logger.error(f"status {resp.status_code} when posting proofs for batch {batch_id} to master: {resp.text}")
READY_BATCH_IDS.add(batch_id) # requeue
time.sleep(2)
def process_batch(session, tig_worker_path, download_wasms_folder, num_workers, output_path):
try:
batch_id = PENDING_BATCH_IDS.pop()
except KeyError:
logger.debug("No pending batches")
time.sleep(1)
return
if (
batch_id in PROCESSING_BATCH_IDS or
batch_id in READY_BATCH_IDS
):
return
if os.path.exists(f"{output_path}/{batch_id}/result.json"):
logger.info(f"Batch {batch_id} already processed")
READY_BATCH_IDS.add(batch_id)
return
PROCESSING_BATCH_IDS.add(batch_id)
with open(f"{output_path}/{batch_id}/batch.json") as f:
batch = json.load(f)
wasm_path = os.path.join(download_wasms_folder, f"{batch['settings']['algorithm_id']}.wasm")
download_wasm(session, batch['download_url'], wasm_path)
Thread(
target=run_tig_worker,
args=(tig_worker_path, batch, wasm_path, num_workers, output_path)
).start()
def poll_batches(session, master_ip, master_port, output_path):
get_batches_url = f"http://{master_ip}:{master_port}/get-batches"
logger.info(f"fetching batches from {get_batches_url}")
resp = session.get(get_batches_url)
if resp.status_code == 200:
batches = resp.json()
root_batch_ids = [batch['id'] for batch in batches if batch['sampled_nonces'] is None]
proofs_batch_ids = [batch['id'] for batch in batches if batch['sampled_nonces'] is not None]
logger.info(f"root batches: {root_batch_ids}")
logger.info(f"proofs batches: {proofs_batch_ids}")
for batch in batches:
output_folder = f"{output_path}/{batch['id']}"
os.makedirs(output_folder, exist_ok=True)
with open(f"{output_folder}/batch.json", "w") as f:
json.dump(batch, f)
PENDING_BATCH_IDS.clear()
PENDING_BATCH_IDS.update(root_batch_ids + proofs_batch_ids)
time.sleep(5)
else:
logger.error(f"status {resp.status_code} when fetching batch: {resp.text}")
time.sleep(5)
def wrap_thread(func, *args):
logger.info(f"Starting thread for {func.__name__}")
while True:
try:
func(*args)
except Exception as e:
logger.error(f"Error in {func.__name__}: {e}")
time.sleep(5)
def main(
master_ip: str,
tig_worker_path: str,
download_wasms_folder: str,
num_workers: int,
slave_name: str,
master_port: int
master_port: int,
output_path: str,
ttl: int,
):
print(f"Starting slave {slave_name}")
if not os.path.exists(tig_worker_path):
raise FileNotFoundError(f"tig-worker not found at path: {tig_worker_path}")
os.makedirs(download_wasms_folder, exist_ok=True)
headers = {
session = requests.Session()
session.headers.update({
"User-Agent": slave_name
}
})
Thread(
target=wrap_thread,
args=(process_batch, session, tig_worker_path, download_wasms_folder, num_workers, output_path)
).start()
async with aiohttp.ClientSession() as session:
while True:
try:
# Step 1: Query for job test maj
start = now()
get_batch_url = f"http://{master_ip}:{master_port}/get-batches"
logger.info(f"fetching job from {get_batch_url}")
try:
resp = await asyncio.wait_for(session.get(get_batch_url, headers=headers), timeout=5)
if resp.status != 200:
text = await resp.text()
if resp.status == 404 and text.strip() == "No batches available":
# Retry with master_port - 1
new_port = master_port - 1
get_batch_url = f"http://{master_ip}:{new_port}/get-batches"
logger.info(f"No batches available on port {master_port}, trying port {new_port}")
resp_retry = await asyncio.wait_for(session.get(get_batch_url, headers=headers), timeout=10)
if resp_retry.status != 200:
raise Exception(f"status {resp_retry.status} when fetching job: {await resp_retry.text()}")
master_port_w = new_port
batches = await resp_retry.json(content_type=None)
else:
raise Exception(f"status {resp.status} when fetching job: {text}")
else:
master_port_w = master_port
batches = await resp.json(content_type=None)
except asyncio.TimeoutError:
logger.error(f"Timeout occurred when fetching job from {get_batch_url}")
continue
logger.debug(f"fetching job: took {now() - start}ms")
Thread(
target=wrap_thread,
args=(send_results, session, master_ip, master_port, tig_worker_path, download_wasms_folder, num_workers, output_path)
).start()
# Process batches concurrently
tasks = [
process_batch(session, master_ip, master_port_w, tig_worker_path, download_wasms_folder, num_workers, batch, headers)
for batch in batches
]
await asyncio.gather(*tasks)
Thread(
target=wrap_thread,
args=(purge_folders, output_path, ttl)
).start()
wrap_thread(poll_batches, session, master_ip, master_port, output_path)
except Exception as e:
logger.error(e)
await asyncio.sleep(2)
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="TIG Slave Benchmarker")
parser.add_argument("master_ip", help="IP address of the master")
parser.add_argument("tig_worker_path", help="Path to tig-worker executable")
parser.add_argument("--master", type=str, default="0.0.0.0", help="IP address of the master (default: 0.0.0.0)")
parser.add_argument("--download", type=str, default="wasms", help="Folder to download WASMs to (default: wasms)")
parser.add_argument("--workers", type=int, default=8, help="Number of workers (default: 8)")
parser.add_argument("--name", type=str, default=randomname.get_name(), help="Name for the slave (default: randomly generated)")
parser.add_argument("--port", type=int, default=5115, help="Port for master (default: 5115)")
parser.add_argument("--verbose", action='store_true', help="Print debug logs")
parser.add_argument("--output", type=str, default="results", help="Folder to output results to (default: results)")
parser.add_argument("--ttl", type=int, default=300, help="(Time To Live) Seconds to retain results (default: 300)")
args = parser.parse_args()
logging.basicConfig(
format='%(levelname)s - [%(name)s] - %(message)s',
level=logging.DEBUG if args.verbose else logging.INFO
)
asyncio.run(main(args.master_ip, args.tig_worker_path, args.download, args.workers, args.name, args.port))
main(args.master, args.tig_worker_path, args.download, args.workers, args.name, args.port, args.output, args.ttl)