Remove legacy scripts.
Some checks failed
Test Workspace / Test Workspace (push) Has been cancelled

This commit is contained in:
FiveMovesAhead 2025-10-19 23:09:11 +01:00
parent bef4043b00
commit 1c6bfe14fd
6 changed files with 2 additions and 502 deletions

View File

@ -1,114 +0,0 @@
import numpy as np
import json
import requests
from common.structs import *
from common.calcs import *
API_URL = "https://mainnet-api.tig.foundation"
deposit = input("Enter deposit in TIG (leave blank to fetch deposit from your player_id): ")
if deposit != "":
lock_period = input("Enter number of weeks to lock (longer lock will have higher APY): ")
deposit = float(deposit)
lock_period = float(lock_period)
player_id = None
else:
player_id = input("Enter player_id: ").lower()
print("Fetching data...")
block = Block.from_dict(requests.get(f"{API_URL}/get-block?include_data").json()["block"])
opow_data = {
x["player_id"]: OPoW.from_dict(x)
for x in requests.get(f"{API_URL}/get-opow?block_id={block.id}").json()["opow"]
}
factors = {
benchmarker: {
**{
f: (
opow_data[benchmarker].block_data.num_qualifiers_by_challenge.get(f, 0)
)
for f in block.data.active_ids["challenge"]
},
"weighted_deposit": opow_data[benchmarker].block_data.delegated_weighted_deposit.to_float()
}
for benchmarker in opow_data
}
if player_id is None:
blocks_till_round_ends = block.config["rounds"]["blocks_per_round"] - (block.details.height % block.config["rounds"]["blocks_per_round"])
seconds_till_round_ends = blocks_till_round_ends * block.config["rounds"]["seconds_between_blocks"]
weighted_deposit = calc_weighted_deposit(deposit, seconds_till_round_ends, lock_period * 604800)
else:
player_data = Player.from_dict(requests.get(f"{API_URL}/get-player-data?player_id={player_id}&block_id={block.id}").json()["player"])
deposit = sum(x.to_float() for x in player_data.block_data.deposit_by_locked_period)
weighted_deposit = player_data.block_data.weighted_deposit.to_float()
for delegatee, fraction in player_data.block_data.delegatees.items():
factors[delegatee]["weighted_deposit"] -= fraction * weighted_deposit
total_factors = {
f: sum(factors[benchmarker][f] for benchmarker in opow_data)
for f in list(block.data.active_ids["challenge"]) + ["weighted_deposit"]
}
reward_shares = {
benchmarker: opow_data[benchmarker].block_data.reward_share.to_float() / (opow_data[benchmarker].block_data.reward.to_float() + 1e-12)
for benchmarker in opow_data
}
print("Optimising delegation by splitting into 100 chunks...")
chunk = weighted_deposit / 100
delegate = {}
for i in range(100):
print(f"Chunk {i + 1}: simulating delegation...")
total_factors["weighted_deposit"] += chunk
if len(delegate) == 10:
potential_delegatees = list(delegate)
else:
potential_delegatees = [benchmarker for benchmarker in opow_data if opow_data[benchmarker].block_data.self_deposit.to_float() >= 10000]
highest_apy_benchmarker = max(
potential_delegatees,
key=lambda delegatee: (
calc_influence({
benchmarker: {
f: (factors[benchmarker][f] + chunk * (benchmarker == delegatee and f == "weighted_deposit")) / total_factors[f]
for f in total_factors
}
for benchmarker in opow_data
}, block.config["opow"])[delegatee] *
reward_shares[delegatee] * chunk / (factors[delegatee]["weighted_deposit"] + chunk)
)
)
print(f"Chunk {i + 1}: best delegatee is {highest_apy_benchmarker}")
if highest_apy_benchmarker not in delegate:
delegate[highest_apy_benchmarker] = 0
delegate[highest_apy_benchmarker] += 1
factors[highest_apy_benchmarker]["weighted_deposit"] += chunk
influences = calc_influence({
benchmarker: {
f: factors[benchmarker][f] / total_factors[f]
for f in total_factors
}
for benchmarker in opow_data
}, block.config["opow"])
print("")
print("Optimised delegation split:")
reward_pool = block.config["rewards"]["distribution"]["opow"] * next(x["block_reward"] for x in reversed(block.config["rewards"]["schedule"]) if x["round_start"] <= block.details.round)
deposit_chunk = deposit / 100
total_reward = 0
for delegatee, num_chunks in delegate.items():
share_fraction = influences[delegatee] * reward_shares[delegatee] * (num_chunks * chunk) / factors[delegatee]["weighted_deposit"]
reward = share_fraction * reward_pool
total_reward += reward
apy = reward * block.config["rounds"]["blocks_per_round"] * 52 / (num_chunks * deposit_chunk)
print(f"{delegatee}: %delegated = {num_chunks}%, apy = {apy * 100:.2f}%")
print(f"average_apy = {total_reward * 10080 * 52 / deposit * 100:.2f}% on your deposit of {deposit} TIG")
print("")
print("To set this delegation split, run the following command:")
req = {"delegatees": {k: v / 100 for k, v in delegate.items()}}
print("API_KEY=<YOUR API KEY HERE>")
print(f"curl -H \"X-Api-Key: $API_KEY\" -X POST -d '{json.dumps(req)}' {API_URL}/set-delegatees")

View File

@ -1,96 +0,0 @@
import numpy as np
import requests
from common.structs import *
from common.calcs import *
from copy import deepcopy
API_URL = "https://mainnet-api.tig.foundation"
player_id = input("Enter player_id: ").lower()
print("Fetching data...")
block = Block.from_dict(requests.get(f"{API_URL}/get-block?include_data").json()["block"])
opow_data = {
x["player_id"]: OPoW.from_dict(x)
for x in requests.get(f"{API_URL}/get-opow?block_id={block.id}").json()["opow"]
}
player_data = Player.from_dict(requests.get(f"{API_URL}/get-player-data?player_id={player_id}&block_id={block.id}").json()["player"])
factors = {
benchmarker: {
**{
f: (
opow_data[benchmarker].block_data.num_qualifiers_by_challenge.get(f, 0) *
opow_data[benchmarker].block_data.solution_ratio_by_challenge.get(f, 0)
)
for f in block.data.active_ids["challenge"]
},
"weighted_deposit": opow_data[benchmarker].block_data.delegated_weighted_deposit.to_float()
}
for benchmarker in opow_data
}
total_factors = {
f: sum(factors[benchmarker][f] for benchmarker in opow_data)
for f in list(block.data.active_ids["challenge"]) + ["weighted_deposit"]
}
fractions = {
benchmarker: {
f: factors[benchmarker][f] / total_factors[f]
for f in total_factors
}
for benchmarker in opow_data
}
sum_other_deposit_fractions = sum(
fractions[benchmarker]["weighted_deposit"] for benchmarker in opow_data if benchmarker != player_id
)
def simulate_deposit_fraction(deposit_fraction):
delta = fractions[player_id]["weighted_deposit"] - deposit_fraction
fractions2 = deepcopy(fractions)
for benchmarker in opow_data:
if benchmarker != player_id:
fractions2[benchmarker]["weighted_deposit"] += delta * fractions[benchmarker]["weighted_deposit"] / sum_other_deposit_fractions
else:
fractions2[benchmarker]["weighted_deposit"] = deposit_fraction
influence = calc_influence(fractions2, block.config["opow"])
return influence[player_id]
print("Calculating rewards...")
average_fraction_of_qualifiers = sum(fractions[player_id][f] for f in block.data.active_ids["challenge"]) / len(block.data.active_ids["challenge"])
self_only_deposit_fraction = player_data.block_data.weighted_deposit.to_float() / total_factors["weighted_deposit"]
influence = {
"current": opow_data[player_id].block_data.influence.to_float(),
"only_self_deposit": simulate_deposit_fraction(self_only_deposit_fraction),
"at_parity": simulate_deposit_fraction(average_fraction_of_qualifiers)
}
current_reward = opow_data[player_id].block_data.reward.to_float()
reward = {
"current": current_reward,
"only_self_deposit": influence["only_self_deposit"] / influence["current"] * current_reward,
"at_parity": influence["at_parity"] / influence["current"] * current_reward
}
print("-----Current State-----")
for f in sorted(block.data.active_ids["challenge"]):
print(f"%qualifiers for {f}: {fractions[player_id][f] * 100:.2f}%")
print(f"average %qualifiers: {average_fraction_of_qualifiers * 100:.2f}%")
print(f"%weighted_deposit (self only): {self_only_deposit_fraction * 100:.2f}%")
print(f"%weighted_deposit (current self + delegated): {fractions[player_id]['weighted_deposit'] * 100:.2f}%")
print("----------------------")
print(f"Scenario 1 (only self-deposit)")
print(f"%weighted_deposit = {self_only_deposit_fraction * 100:.2f}%")
print(f"reward = {reward['only_self_deposit']:.4f} TIG per block")
print("")
print("Scenario 2 (current self + delegated deposit)")
print(f"%weighted_deposit = {fractions[player_id]['weighted_deposit'] * 100:.2f}%")
print(f"reward = {reward['current']:.4f} TIG per block (recommended max reward_share = {100 - reward['only_self_deposit'] / reward['current'] * 100:.2f}%*)")
print("")
print(f"Scenario 3 (self + delegated deposit at parity)")
print(f"%weighted_deposit = average %qualifiers = {average_fraction_of_qualifiers * 100:.2f}%")
print(f"reward = {reward['at_parity']:.4f} TIG per block (recommended max reward_share = {100 - reward['only_self_deposit'] / reward['at_parity'] * 100:.2f}%*)")
print("")
print("*Recommend not setting reward_share above the max. You will not benefit from delegation (earn the same as Scenario 1 with zero delegation).")
print("")
print("Note: the imbalance penalty is such that your reward increases at a high rate when moving up to parity")

View File

@ -1,54 +0,0 @@
import requests
API_URL = "https://mainnet-api.tig.foundation"
print(f"Fetching block, challenges, and benchmarkers..")
block = requests.get(f"{API_URL}/get-block").json()["block"]
challenges = sorted(
requests.get(f"{API_URL}/get-challenges?block_id={block['id']}").json()["challenges"],
key=lambda x: x["id"]
)
opow = requests.get(f"{API_URL}/get-opow?block_id={block['id']}").json()["opow"]
print("Challenges:")
for i, c in enumerate(challenges):
print(f"{i+1}) {c['details']['name']}")
c = challenges[int(input("Enter challenge index: ")) - 1]
print("")
for x in opow:
x["reliability"] = x["block_data"]["solution_ratio_by_challenge"].get(c["id"], 0) / c["block_data"]["average_solution_ratio"]
opow = sorted(opow, key=lambda x: x["reliability"], reverse=True)
print(f"Benchmarkers Reliability for {c['details']['name']}:")
for i, x in enumerate(opow):
print(f"{i+1}) {x['player_id']}: {x['reliability']:.4f}")
player = opow[int(input("Enter benchmarker index: ")) - 1]
print("")
print(f"Fetching benchmarks for {player['player_id']}..")
d = requests.get(f"{API_URL}/get-benchmarks?player_id={player['player_id']}&block_id={block['id']}").json()
precommits = {x["benchmark_id"]: x for x in d["precommits"]}
benchmarks = {x["id"]: x for x in d["benchmarks"]}
proofs = {x["benchmark_id"]: x for x in d["proofs"]}
frauds = {x["benchmark_id"]: x for x in d["frauds"]}
total_solutions = {}
total_nonces = {}
for b_id in proofs:
p = precommits[b_id]
b = benchmarks[b_id]
if b_id in frauds or p["settings"]["challenge_id"] != c["id"]:
continue
k = f"difficulty {p['settings']['difficulty']}, algorithm_id: {p['settings']['algorithm_id']}"
total_solutions[k] = total_solutions.get(k, 0) + b["details"]["num_solutions"]
total_nonces[k] = total_nonces.get(k, 0) + p["details"]["num_nonces"]
reliability = {
k: total_solutions[k] / total_nonces[k] / c["block_data"]["average_solution_ratio"]
for k in total_solutions
}
reliability = sorted(reliability.items(), key=lambda x: x[1], reverse=True)
for k, r in reliability:
print(f"{k}, num_solutions: {total_solutions[k]}, num_nonces: {total_nonces[k]}, reliability: {r:.4f}")

View File

@ -1,102 +0,0 @@
import numpy as np
from typing import List, Dict
def calc_influence(fractions: Dict[str, Dict[str, float]], opow_config: dict) -> Dict[str, float]:
"""
Calculate the influence of each benchmarker based on their fractions and the OPoW configuration.
Args:
fractions: A dictionary of dictionaries, mapping benchmarker_ids to their fraction of each factor (challenges & weighted_deposit).
opow_config: A dictionary containing configuration parameters for the calculation.
Returns:
Dict[str, float]: A dictionary mapping each benchmarker_id to their calculated influence.
"""
benchmarkers = list(fractions)
factors = list(next(iter(fractions.values())))
num_challenges = len(factors) - 1
avg_qualifier_fractions = {
benchmarker: sum(
fractions[benchmarker][f]
for f in factors
if f != "weighted_deposit"
) / num_challenges
for benchmarker in benchmarkers
}
deposit_fraction_cap = {
benchmarker: avg_qualifier_fractions[benchmarker] * opow_config["max_deposit_to_qualifier_ratio"]
for benchmarker in benchmarkers
}
capped_fractions = {
benchmarker: {
**fractions[benchmarker],
"weighted_deposit": min(
fractions[benchmarker]["weighted_deposit"],
deposit_fraction_cap[benchmarker]
)
}
for benchmarker in benchmarkers
}
avg_fraction = {
benchmarker: np.mean(list(capped_fractions[benchmarker].values()))
for benchmarker in benchmarkers
}
var_fraction = {
benchmarker: np.var(list(capped_fractions[benchmarker].values()))
for benchmarker in benchmarkers
}
imbalance = {
benchmarker: (var_fraction[benchmarker] / np.square(avg_fraction[benchmarker]) / num_challenges) if avg_fraction[benchmarker] > 0 else 0
for benchmarker in benchmarkers
}
imbalance_penalty = {
benchmarker: 1.0 - np.exp(-opow_config["imbalance_multiplier"] * imbalance[benchmarker])
for benchmarker in benchmarkers
}
weighted_avg_fraction = {
benchmarker: ((avg_qualifier_fractions[benchmarker] * num_challenges) + capped_fractions[benchmarker]["weighted_deposit"] * opow_config["deposit_multiplier"]) / (num_challenges + opow_config["deposit_multiplier"])
for benchmarker in benchmarkers
}
unormalised_influence = {
benchmarker: weighted_avg_fraction[benchmarker] * (1.0 - imbalance_penalty[benchmarker])
for benchmarker in benchmarkers
}
total = sum(unormalised_influence.values())
influence = {
benchmarker: unormalised_influence[benchmarker] / total
for benchmarker in benchmarkers
}
return influence
def calc_weighted_deposit(deposit: float, seconds_till_round_end: int, lock_seconds: int) -> float:
"""
Calculate weighted deposit
Args:
deposit: Amount to deposit
seconds_till_round_end: Seconds remaining in current round
lock_seconds: Total lock duration in seconds
Returns:
Weighted deposit
"""
weighted_deposit = 0
if lock_seconds <= 0:
return weighted_deposit
# Calculate first chunk (partial week)
weighted_deposit += deposit * min(seconds_till_round_end, lock_seconds) // lock_seconds
remaining_seconds = lock_seconds - min(seconds_till_round_end, lock_seconds)
weight = 2
while remaining_seconds > 0:
chunk_seconds = min(remaining_seconds, 604800)
chunk = deposit * chunk_seconds // lock_seconds
weighted_deposit += chunk * weight
remaining_seconds -= chunk_seconds
weight = min(weight + 1, 26)
return weighted_deposit

View File

@ -206,8 +206,8 @@ class Challenge(FromDict):
class OPoWBlockData(FromDict):
num_qualifiers_by_challenge: Dict[str, int]
cutoff: int
delegated_weighted_deposit: PreciseNumber
self_deposit: PreciseNumber
weighted_delegated_deposit: PreciseNumber
weighted_self_deposit: PreciseNumber
delegators: Set[str]
coinbase: Dict[str, PreciseNumber]
solution_ratio_by_challenge: Dict[str, float]

View File

@ -1,134 +0,0 @@
from common.structs import *
import requests
import json
import random
import os
import subprocess
print("THIS IS AN EXAMPLE SCRIPT TO VERIFY A BATCH")
TIG_WORKER_PATH = input("Enter path of tig-worker executable: ")
NUM_WORKERS = int(input("Enter number of workers: "))
if not os.path.exists(TIG_WORKER_PATH):
raise FileNotFound[ERROR](f"tig-worker not found at path: {TIG_WORKER_PATH}")
MASTER_IP = input("Enter Master IP: ")
MASTER_PORT = input("Enter Master Port: ")
jobs = requests.get(f"http://{MASTER_IP}:{MASTER_PORT}/get-jobs").json()
for i, j in enumerate(jobs):
print(f"{i + 1}) benchmark_id: {j['benchmark_id']}, challenge: {j['challenge']}, algorithm: {j['algorithm']}, status: {j['status']}")
job_idx = int(input("Enter the index of the batch you want to verify: ")) - 1
for i, b in enumerate(jobs[job_idx]["batches"]):
print(f"{i + 1}) slave: {b['slave']}, num_solutions: {b['num_solutions']}, status: {b['status']}")
batch_idx = int(input("Enter the index of the batch you want to verify: ")) - 1
benchmark_id = jobs[job_idx]["benchmark_id"]
url = f"http://{MASTER_IP}:{MASTER_PORT}/get-batch-data/{benchmark_id}_{batch_idx}"
print(f"Fetching batch data from {url}")
data = requests.get(url).json()
batch = data['batch']
merkle_root = data['merkle_root']
solution_nonces = data['solution_nonces']
merkle_proofs = data['merkle_proofs']
if merkle_proofs is not None:
merkle_proofs = {x['leaf']['nonce']: x for x in merkle_proofs}
if (
merkle_proofs is None or
len(solution_nonces := set(merkle_proofs) & set(solution_nonces)) == 0
):
print("No solution data to verify for this batch")
else:
for nonce in solution_nonces:
print(f"Verifying solution for nonce: {nonce}")
cmd = [
TIG_WORKER_PATH, "verify_solution",
json.dumps(batch['settings'], separators=(',', ':')),
batch["rand_hash"],
str(nonce),
json.dumps(merkle_proofs[nonce]['leaf']['solution'], separators=(',', ':')),
]
print(f"Running cmd: {' '.join(cmd)}")
ret = subprocess.run(
cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE
)
if ret.returncode == 0:
print(f"[SUCCESS]: {ret.stdout.decode()}")
else:
print(f"[ERROR]: {ret.stderr.decode()}")
if merkle_root is not None:
download_url = batch["download_url"]
print(f"Downloading WASM from {download_url}")
resp = requests.get(download_url)
if resp.status_code != 200:
raise Exception(f"status {resp.status_code} when downloading WASM: {resp.text}")
wasm_path = f'{batch["settings"]["algorithm_id"]}.wasm'
with open(wasm_path, 'wb') as f:
f.write(resp.content)
print(f"WASM Path: {wasm_path}")
print("")
if merkle_proofs is None:
print("No merkle proofs to verify for this batch")
else:
for nonce in merkle_proofs:
print(f"Verifying output data for nonce: {nonce}")
cmd = [
TIG_WORKER_PATH, "compute_solution",
json.dumps(batch['settings'], separators=(',', ':')),
batch["rand_hash"],
str(nonce),
wasm_path,
"--mem", str(batch["runtime_config"]["max_memory"]),
"--fuel", str(batch["runtime_config"]["max_fuel"]),
]
print(f"Running cmd: {' '.join(cmd)}")
ret = subprocess.run(
cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE
)
out = json.loads(ret.stdout.decode())
expected = json.dumps(merkle_proofs[nonce]['leaf'], separators=(',', ':'), sort_keys=True)
actual = json.dumps(out, separators=(',', ':'), sort_keys=True)
if expected == actual:
print(f"[SUCCESS]: output data match")
else:
print(f"[ERROR]: output data mismatch")
print(f"Batch data: {expected}")
print(f"Recomputed: {actual}")
print(f"")
if merkle_root is None:
print("No merkle root to verify for this batch")
else:
print("Verifying merkle root")
cmd = [
TIG_WORKER_PATH, "compute_batch",
json.dumps(batch["settings"]),
batch["rand_hash"],
str(batch["start_nonce"]),
str(batch["num_nonces"]),
str(batch["batch_size"]),
wasm_path,
"--mem", str(batch["runtime_config"]["max_memory"]),
"--fuel", str(batch["runtime_config"]["max_fuel"]),
"--workers", str(NUM_WORKERS),
]
print(f"Running cmd: {' '.join(cmd)}")
ret = subprocess.run(
cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE
)
if ret.returncode == 0:
out = json.loads(ret.stdout.decode())
if out["merkle_root"] == merkle_root:
print(f"[SUCCESS]: merkle root match")
else:
print(f"[ERROR]: merkle root mismatch")
print(f"Batch data: {expected}")
print(f"Recomputed: {actual}")
else:
print(f"[ERROR]: {ret.stderr.decode()}")
print("")
print("FINISHED")