From c96321ec8ad107ea33086c7d7e6bb711718f0be3 Mon Sep 17 00:00:00 2001 From: amzid Date: Sat, 26 Oct 2019 12:42:16 +0200 Subject: [PATCH 1/2] Merge lrpc and prpc and make the node choice more flexible --- readme.md | 6 + src/api/provider_factory.py | 8 +- src/pay/payment_producer.py | 2 +- src/rpc/lrpc_reward_api.py | 211 -------------------------------- src/rpc/prpc_reward_api.py | 236 ------------------------------------ src/rpc/rpc_reward_api.py | 212 +++++++++++++++++++------------- 6 files changed, 141 insertions(+), 534 deletions(-) delete mode 100644 src/rpc/lrpc_reward_api.py delete mode 100644 src/rpc/prpc_reward_api.py diff --git a/readme.md b/readme.md index 20f3f273..fc43d128 100644 --- a/readme.md +++ b/readme.md @@ -58,6 +58,12 @@ The most common use case is to run in mainnet and start to make payments from la python3 src/main.py ``` +TRD necessitates an interface to get provided with income and delegator data in order to perform the needed calculations. +The default provider is the public rpc node mainnet.tezrpc.me. However, it is possible to change the data provider with the flag -P rpc. +Please note that in this case, the default node would be localhost:8732. In order to change the node url for the provider, you can give the desired url +under the flag -A followed with node_url:port (e.g. -P rpc -A 127.0.0.1:8733). +Please note that the node should be an archive node, and that the port should be the rpc port specified while launching the node. + For more example commands please see wiki page: https://github.com/habanoz/tezos-reward-distributor/wiki/How-to-Run diff --git a/src/api/provider_factory.py b/src/api/provider_factory.py index d6906b26..32288d35 100644 --- a/src/api/provider_factory.py +++ b/src/api/provider_factory.py @@ -1,5 +1,3 @@ -from rpc.lrpc_reward_api import LRpcRewardApiImpl -from rpc.prpc_reward_api import PRpcRewardApiImpl from rpc.rpc_block_api import RpcBlockApiImpl from rpc.rpc_reward_api import RpcRewardApiImpl from tzscan.tzscan_mirror_selection_helper import TzScanMirrorSelector @@ -16,12 +14,12 @@ def __init__(self, provider, verbose=False): self.mirror_selector = None self.verbose = verbose - def newRewardApi(self, network_config, baking_address, wllt_clnt_mngr, node_url): + def newRewardApi(self, network_config, baking_address, node_url): if self.provider == 'rpc': - return LRpcRewardApiImpl(network_config, baking_address, node_url, wllt_clnt_mngr, validate=False, verbose=self.verbose) + return RpcRewardApiImpl(network_config, baking_address, node_url, protocol='http', validate=False, verbose=self.verbose) elif self.provider == 'prpc': url_prefix = self.url_prefixes[network_config['NAME']] - return PRpcRewardApiImpl(network_config, baking_address, self.URL.format(url_prefix), validate=False, verbose=self.verbose) + return RpcRewardApiImpl(network_config, baking_address, self.URL.format(url_prefix), protocol='https', validate=False, verbose=self.verbose) elif self.provider == 'tzscan': if not self.mirror_selector: self.init_mirror_selector(network_config) diff --git a/src/pay/payment_producer.py b/src/pay/payment_producer.py index 4a735580..94dfd848 100644 --- a/src/pay/payment_producer.py +++ b/src/pay/payment_producer.py @@ -39,7 +39,7 @@ def __init__(self, name, initial_payment_cycle, network_config, payments_dir, ca self.name = name - self.reward_api = provider_factory.newRewardApi(network_config, self.baking_address, wllt_clnt_mngr, node_url) + self.reward_api = provider_factory.newRewardApi(network_config, self.baking_address, node_url) self.block_api = provider_factory.newBlockApi(network_config, wllt_clnt_mngr, node_url) self.fee_calc = service_fee_calc diff --git a/src/rpc/lrpc_reward_api.py b/src/rpc/lrpc_reward_api.py deleted file mode 100644 index eb43a542..00000000 --- a/src/rpc/lrpc_reward_api.py +++ /dev/null @@ -1,211 +0,0 @@ -import logging - -import requests - -from NetworkConfiguration import init_network_config -from api.reward_api import RewardApi -from log_config import main_logger -from model.reward_provider_model import RewardProviderModel -from tzscan.tzscan_mirror_selection_helper import TzScanMirrorSelector -from tzscan.tzscan_reward_api import TzScanRewardApiImpl -from util.rpc_utils import parse_json_response - -logger = main_logger - - -class LRpcRewardApiImpl(RewardApi): - - COMM_HEAD = "%protocol%://{}/chains/main/blocks/head" - COMM_DELEGATES = "%protocol%://{}/chains/main/blocks/{}/context/delegates/{}" - COMM_BLOCK = "%protocol%://{}/chains/main/blocks/{}" - COMM_SNAPSHOT = COMM_BLOCK + "/context/raw/json/rolls/owner/snapshot/{}/" - COMM_DELEGATE_BALANCE = "%protocol%://{}/chains/main/blocks/{}/context/contracts/{}" - - def __init__(self, nw, baking_address, node_url, wllt_clnt_mngr, validate=False, verbose=True): - super(LRpcRewardApiImpl, self).__init__() - - self.blocks_per_cycle = nw['BLOCKS_PER_CYCLE'] - self.preserved_cycles = nw['NB_FREEZE_CYCLE'] - self.blocks_per_roll_snapshot = nw['BLOCKS_PER_ROLL_SNAPSHOT'] - - self.baking_address = baking_address - self.node_url = node_url - self.wllt_clnt_mngr = wllt_clnt_mngr - - self.verbose = verbose - self.validate = validate - - # replace protocol placeholder - protocol = 'http' - self.COMM_HEAD = self.COMM_HEAD.replace('%protocol%',protocol) - self.COMM_DELEGATES = self.COMM_DELEGATES.replace('%protocol%',protocol) - self.COMM_BLOCK = self.COMM_BLOCK.replace('%protocol%',protocol) - self.COMM_SNAPSHOT = self.COMM_SNAPSHOT.replace('%protocol%',protocol) - self.COMM_DELEGATE_BALANCE = self.COMM_DELEGATE_BALANCE.replace('%protocol%',protocol) - - if self.validate: - mirror_selector = TzScanMirrorSelector(nw) - mirror_selector.initialize() - self.validate_api = TzScanRewardApiImpl(nw, self.baking_address, mirror_selector) - - def get_nb_delegators(self, cycle, current_level): - _, delegators = self.__get_delegators_and_delgators_balance(cycle, current_level) - return len(delegators) - - def get_rewards_for_cycle_map(self, cycle): - current_level, current_cycle = self.__get_current_level() - logger.debug("Current level {}, current cycle {}".format(current_level, current_cycle)) - - reward_data = {} - reward_data["delegate_staking_balance"], reward_data["delegators"] = self.__get_delegators_and_delgators_balance(cycle, current_level) - reward_data["delegators_nb"] = len(reward_data["delegators"]) - - # Get last block in cycle where rewards are unfrozen - level_of_last_block_in_unfreeze_cycle = (cycle+self.preserved_cycles+1) * self.blocks_per_cycle - - logger.debug("Cycle {}, preserved cycles {}, blocks per cycle {}, last_block_cycle {}".format(cycle, self.preserved_cycles, self.blocks_per_cycle, level_of_last_block_in_unfreeze_cycle)) - - if current_level - level_of_last_block_in_unfreeze_cycle >= 0: - unfrozen_rewards = self.__get_unfrozen_rewards(level_of_last_block_in_unfreeze_cycle, cycle) - reward_data["total_rewards"] = unfrozen_rewards - - else: - logger.warn("Please wait until the rewards and fees for cycle {} are unfrozen".format(cycle)) - reward_data["total_rewards"] = 0 - - reward_model = RewardProviderModel(reward_data["delegate_staking_balance"], reward_data["total_rewards"], reward_data["delegators"]) - - logger.debug("delegate_staking_balance={}, total_rewards = {}".format(reward_data["delegate_staking_balance"],reward_data["total_rewards"])) - logger.debug("delegators = {}".format(reward_data["delegators"])) - - if self.validate: - self.__validate_reward_data(reward_model, cycle) - - return reward_model - - def __get_unfrozen_rewards(self, level_of_last_block_in_unfreeze_cycle, cycle): - request_metadata = self.COMM_BLOCK.format(self.node_url, level_of_last_block_in_unfreeze_cycle) + '/metadata' - metadata = self.do_rpc_request(request_metadata) - balance_updates = metadata["balance_updates"] - unfrozen_rewards = unfrozen_fees = 0 - - for i in range(len(balance_updates)): - balance_update = balance_updates[i] - if balance_update["kind"] == "freezer": - if balance_update["delegate"] == self.baking_address: - if int(balance_update["cycle"]) == cycle or int(balance_update["change"])<0: - if balance_update["category"] == "rewards": - unfrozen_rewards = -int(balance_update["change"]) - logger.debug("[__get_unfrozen_rewards] Found balance update for reward {}".format(balance_update)) - elif balance_update["category"] == "fees": - unfrozen_fees = -int(balance_update["change"]) - logger.debug("[__get_unfrozen_rewards] Found balance update for fee {}".format(balance_update)) - else: - logger.debug("[__get_unfrozen_rewards] Found balance update, not including: {}".format(balance_update)) - else: - logger.debug("[__get_unfrozen_rewards] Found balance update, cycle does not match or change is non-zero, not including: {}".format(balance_update)) - - return unfrozen_fees + unfrozen_rewards - - def do_rpc_request(self, request): - request = " rpc get "+request - if self.verbose: - logger.debug("[do_rpc_request] running command {}".format(request)) - - try: - _, resp = self.wllt_clnt_mngr.send_request(request) - response = parse_json_response(resp) - except Exception as e: - raise Exception("RPC request failed. Make sure you are using an Archive Node!") from e - - if self.verbose: - logger.debug("[do_rpc_request] Response {}".format(response)) - return response - - def __get_current_level(self): - head = self.do_rpc_request(self.COMM_HEAD.format(self.node_url)) - current_level = int(head["metadata"]["level"]["level"]) - current_cycle = int(head["metadata"]["level"]["cycle"]) - # head_hash = head["hash"] - - return current_level, current_cycle - - - - def __get_delegators_and_delgators_balance(self, cycle, current_level): - - hash_snapshot_block = self.__get_snapshot_block_hash(cycle, current_level) - if hash_snapshot_block == "": - return 0, [] - - request = self.COMM_DELEGATES.format(self.node_url, hash_snapshot_block, self.baking_address) - - delegate_staking_balance = 0 - delegators = {} - - try: - response = self.do_rpc_request(request) - delegate_staking_balance = int(response["staking_balance"]) - - delegators_addresses = response["delegated_contracts"] - for idx, delegator in enumerate(delegators_addresses): - request = self.COMM_DELEGATE_BALANCE.format(self.node_url, hash_snapshot_block, delegator) - response = self.do_rpc_request(request) - delegators[delegator] = int(response["balance"]) - - logger.debug( - "Delegator info ({}/{}) fetched: address {}, balance {}".format(idx, len(delegators_addresses), - delegator, delegators[delegator])) - except: - logger.warn('No delegators or unexpected error', exc_info=True) - - return delegate_staking_balance, delegators - - def __get_snapshot_block_hash(self, cycle, current_level): - - snapshot_level = (cycle - self.preserved_cycles) * self.blocks_per_cycle + 1 - logger.debug("Reward cycle {}, snapshot level {}".format(cycle,snapshot_level)) - - block_level = cycle * self.blocks_per_cycle + 1 - - if current_level - snapshot_level >= 0: - request = self.COMM_SNAPSHOT.format(self.node_url, block_level, cycle) - snapshots = self.do_rpc_request(request) - - if len(snapshots) == 1: - chosen_snapshot = snapshots[0] - else: - logger.error("Too few or too many possible snapshots found!") - return "" - - level_snapshot_block = (cycle - self.preserved_cycles - 2) * self.blocks_per_cycle + (chosen_snapshot+1) * self.blocks_per_roll_snapshot - return level_snapshot_block - # request = self.COMM_BLOCK.format(self.node_url, level_snapshot_block) - # response = self.do_rpc_request(request) - # snapshot = response['hash'] - # logger.debug("Hash of snapshot block is {}".format(snapshot)) - - # return snapshot - else: - logger.info("Cycle too far in the future") - return "" - - - def __validate_reward_data(self, reward_data_rpc, cycle): - reward_data_tzscan = self.validate_api.get_rewards_for_cycle_map(cycle) - if not (reward_data_rpc.delegate_staking_balance == int(reward_data_tzscan.delegate_staking_balance)): - raise Exception("Delegate staking balance from local node and tzscan are not identical. local node {}, tzscan {}".format(reward_data_rpc.delegate_staking_balance,reward_data_tzscan.delegate_staking_balance )) - - if not (len(reward_data_rpc.delegator_balance_dict) == len(reward_data_tzscan.delegator_balance_dict)): - raise Exception("Delegators number from local node and tzscan are not identical.") - - if (len(reward_data_rpc.delegator_balance_dict)) == 0: - return - - if not (reward_data_rpc.delegator_balance_dict == reward_data_tzscan.delegator_balance_dict): - raise Exception("Delegators' balances from local node and tzscan are not identical.") - - if not reward_data_rpc.total_reward_amount == reward_data_tzscan.total_reward_amount: - raise Exception("Total rewards from local node and tzscan are not identical.") - - logger.debug("[__validate_reward_data] validation passed") diff --git a/src/rpc/prpc_reward_api.py b/src/rpc/prpc_reward_api.py deleted file mode 100644 index 71853d5d..00000000 --- a/src/rpc/prpc_reward_api.py +++ /dev/null @@ -1,236 +0,0 @@ -import logging -from time import sleep - -import requests - -from NetworkConfiguration import init_network_config -from api.reward_api import RewardApi -from log_config import main_logger -from model.reward_provider_model import RewardProviderModel -from tzscan.tzscan_mirror_selection_helper import TzScanMirrorSelector -from tzscan.tzscan_reward_api import TzScanRewardApiImpl - -logger = main_logger - - -class PRpcRewardApiImpl(RewardApi): - - COMM_HEAD = "%protocol%://{}/chains/main/blocks/head" - COMM_DELEGATES = "%protocol%://{}/chains/main/blocks/{}/context/delegates/{}" - COMM_BLOCK = "%protocol%://{}/chains/main/blocks/{}" - COMM_SNAPSHOT = COMM_BLOCK + "/context/raw/json/rolls/owner/snapshot/{}/" - COMM_DELEGATE_BALANCE = "%protocol%://{}/chains/main/blocks/{}/context/contracts/{}" - - def __init__(self, nw, baking_address, node_url, validate=False, verbose=True): - super(PRpcRewardApiImpl, self).__init__() - - self.blocks_per_cycle = nw['BLOCKS_PER_CYCLE'] - self.preserved_cycles = nw['NB_FREEZE_CYCLE'] - self.blocks_per_roll_snapshot = nw['BLOCKS_PER_ROLL_SNAPSHOT'] - - self.baking_address = baking_address - self.node_url = node_url - - self.verbose = verbose - self.validate = validate - - # replace protocol placeholder - protocol = 'https' - self.COMM_HEAD = self.COMM_HEAD.replace('%protocol%',protocol) - self.COMM_DELEGATES = self.COMM_DELEGATES.replace('%protocol%',protocol) - self.COMM_BLOCK = self.COMM_BLOCK.replace('%protocol%',protocol) - self.COMM_SNAPSHOT = self.COMM_SNAPSHOT.replace('%protocol%',protocol) - self.COMM_DELEGATE_BALANCE = self.COMM_DELEGATE_BALANCE.replace('%protocol%',protocol) - - if self.validate: - mirror_selector = TzScanMirrorSelector(nw) - mirror_selector.initialize() - self.validate_api = TzScanRewardApiImpl(nw, self.baking_address, mirror_selector) - - def get_nb_delegators(self, cycle, current_level): - _, delegators = self.__get_delegators_and_delgators_balance(cycle, current_level) - return len(delegators) - - def get_rewards_for_cycle_map(self, cycle): - current_level, current_cycle = self.__get_current_level() - logger.debug("Current level {}, current cycle {}".format(current_level, current_cycle)) - - reward_data = {} - reward_data["delegate_staking_balance"], reward_data["delegators"] = self.__get_delegators_and_delgators_balance(cycle, current_level) - reward_data["delegators_nb"] = len(reward_data["delegators"]) - - # Get last block in cycle where rewards are unfrozen - level_of_last_block_in_unfreeze_cycle = (cycle+self.preserved_cycles+1) * self.blocks_per_cycle - - logger.debug("Cycle {}, preserved cycles {}, blocks per cycle {}, last_block_cycle {}".format(cycle, self.preserved_cycles, self.blocks_per_cycle, level_of_last_block_in_unfreeze_cycle)) - - if current_level - level_of_last_block_in_unfreeze_cycle >= 0: - unfrozen_rewards = self.__get_unfrozen_rewards(level_of_last_block_in_unfreeze_cycle, cycle) - reward_data["total_rewards"] = unfrozen_rewards - - else: - logger.warn("Please wait until the rewards and fees for cycle {} are unfrozen".format(cycle)) - reward_data["total_rewards"] = 0 - - reward_model = RewardProviderModel(reward_data["delegate_staking_balance"], reward_data["total_rewards"], reward_data["delegators"]) - - logger.debug("delegate_staking_balance={}, total_rewards = {}".format(reward_data["delegate_staking_balance"],reward_data["total_rewards"])) - logger.debug("delegators = {}".format(reward_data["delegators"])) - - if self.validate: - self.__validate_reward_data(reward_model, cycle) - - return reward_model - - def __get_unfrozen_rewards(self, level_of_last_block_in_unfreeze_cycle, cycle): - request_metadata = self.COMM_BLOCK.format(self.node_url, level_of_last_block_in_unfreeze_cycle) + '/metadata' - metadata = self.do_rpc_request(request_metadata) - balance_updates = metadata["balance_updates"] - unfrozen_rewards = unfrozen_fees = 0 - - for i in range(len(balance_updates)): - balance_update = balance_updates[i] - if balance_update["kind"] == "freezer": - if balance_update["delegate"] == self.baking_address: - if int(balance_update["cycle"]) == cycle or int(balance_update["change"]) < 0: - if balance_update["category"] == "rewards": - unfrozen_rewards = -int(balance_update["change"]) - logger.debug("[__get_unfrozen_rewards] Found balance update for reward {}".format(balance_update)) - elif balance_update["category"] == "fees": - unfrozen_fees = -int(balance_update["change"]) - logger.debug("[__get_unfrozen_rewards] Found balance update for fee {}".format(balance_update)) - else: - logger.debug("[__get_unfrozen_rewards] Found balance update, not including: {}".format(balance_update)) - else: - logger.debug("[__get_unfrozen_rewards] Found balance update, cycle does not match or change is non-zero, not including: {}".format(balance_update)) - - return unfrozen_fees + unfrozen_rewards - - def do_rpc_request(self, request): - if self.verbose: - logger.debug("[do_rpc_request] Requesting URL {}".format(request)) - - sleep(0.1) # be nice to public node service - - resp = requests.get(request) - if resp.status_code != 200: - raise Exception("Request '{} failed with status code {}".format(request, resp.status_code)) - - response = resp.json() - if self.verbose: - logger.debug("[do_rpc_request] Response {}".format(response)) - return response - - def __get_current_level(self): - head = self.do_rpc_request(self.COMM_HEAD.format(self.node_url)) - current_level = int(head["metadata"]["level"]["level"]) - current_cycle = int(head["metadata"]["level"]["cycle"]) - # head_hash = head["hash"] - - return current_level, current_cycle - - - - def __get_delegators_and_delgators_balance(self, cycle, current_level): - - hash_snapshot_block = self.__get_snapshot_block_hash(cycle, current_level) - if hash_snapshot_block == "": - return 0, [] - - request = self.COMM_DELEGATES.format(self.node_url, hash_snapshot_block, self.baking_address) - - delegate_staking_balance = 0 - delegators = {} - - try: - response = self.do_rpc_request(request) - delegate_staking_balance = int(response["staking_balance"]) - - delegators_addresses = response["delegated_contracts"] - for idx, delegator in enumerate(delegators_addresses): - request = self.COMM_DELEGATE_BALANCE.format(self.node_url, hash_snapshot_block, delegator) - response = self.do_rpc_request(request) - delegators[delegator] = int(response["balance"]) - - logger.debug( - "Delegator info ({}/{}) fetched: address {}, balance {}".format(idx, len(delegators_addresses), - delegator, delegators[delegator])) - except: - logger.warn('No delegators or unexpected error', exc_info=True) - - return delegate_staking_balance, delegators - - def __get_snapshot_block_hash(self, cycle, current_level): - - snapshot_level = (cycle - self.preserved_cycles) * self.blocks_per_cycle + 1 - logger.debug("Reward cycle {}, snapshot level {}".format(cycle,snapshot_level)) - - block_level = cycle * self.blocks_per_cycle + 1 - - if current_level - snapshot_level >= 0: - request = self.COMM_SNAPSHOT.format(self.node_url, block_level, cycle) - snapshots = self.do_rpc_request(request) - - if len(snapshots) == 1: - chosen_snapshot = snapshots[0] - else: - logger.error("Too few or too many possible snapshots found!") - return "" - - level_snapshot_block = (cycle - self.preserved_cycles - 2) * self.blocks_per_cycle + (chosen_snapshot+1) * self.blocks_per_roll_snapshot - return level_snapshot_block - # request = self.COMM_BLOCK.format(self.node_url, level_snapshot_block) - # response = self.do_rpc_request(request) - # snapshot = response['hash'] - # logger.debug("Hash of snapshot block is {}".format(snapshot)) - - # return snapshot - else: - logger.info("Cycle too far in the future") - return "" - - - def __validate_reward_data(self, reward_data_rpc, cycle): - reward_data_tzscan = self.validate_api.get_rewards_for_cycle_map(cycle) - if not (reward_data_rpc.delegate_staking_balance == int(reward_data_tzscan.delegate_staking_balance)): - raise Exception("Delegate staking balance from local node and tzscan are not identical. local node {}, tzscan {}".format(reward_data_rpc.delegate_staking_balance,reward_data_tzscan.delegate_staking_balance )) - - if not (len(reward_data_rpc.delegator_balance_dict) == len(reward_data_tzscan.delegator_balance_dict)): - raise Exception("Delegators number from local node and tzscan are not identical.") - - if (len(reward_data_rpc.delegator_balance_dict)) == 0: - return - - if not (reward_data_rpc.delegator_balance_dict == reward_data_tzscan.delegator_balance_dict): - raise Exception("Delegators' balances from local node and tzscan are not identical.") - - if not reward_data_rpc.total_reward_amount == reward_data_tzscan.total_reward_amount: - raise Exception("Total rewards from local node and tzscan are not identical.") - - logger.debug("[__validate_reward_data] validation passed") - - -def test(): - configure_test_logger() - - network_config_map = init_network_config("MAINNET", None, None) - network_config = network_config_map["MAINNET"] - - prpc = PRpcRewardApiImpl(network_config, "tz1Z1tMai15JWUWeN2PKL9faXXVPMuWamzJj", "mainnet.tezrpc.me",True, True) - prpc.get_rewards_for_cycle_map(42) - - -def configure_test_logger(): - test_logger = logging.getLogger('main') - test_logger.setLevel(logging.DEBUG) - ch = logging.StreamHandler() - ch.setLevel(logging.DEBUG) - formatter = logging.Formatter('%(asctime)s - %(threadName)-9s - %(message)s') - ch.setFormatter(formatter) - test_logger.addHandler(ch) - global logger - logger = test_logger - - -if __name__ == '__main__': - test() \ No newline at end of file diff --git a/src/rpc/rpc_reward_api.py b/src/rpc/rpc_reward_api.py index d6f15a7f..7f79dfdf 100644 --- a/src/rpc/rpc_reward_api.py +++ b/src/rpc/rpc_reward_api.py @@ -1,23 +1,27 @@ -from api.reward_api import RewardApi +import logging +from time import sleep + +import requests +from NetworkConfiguration import init_network_config +from api.reward_api import RewardApi from log_config import main_logger from model.reward_provider_model import RewardProviderModel from tzscan.tzscan_mirror_selection_helper import TzScanMirrorSelector -from cli.cmd_manager import CommandManager -from util.rpc_utils import parse_json_response, extract_json_part from tzscan.tzscan_reward_api import TzScanRewardApiImpl logger = main_logger -COMM_HEAD = " rpc get http://{}/chains/main/blocks/head" -COMM_DELEGATES = " rpc get http://{}/chains/main/blocks/{}/context/delegates/{}" -COMM_BLOCK = " rpc get http://{}/chains/main/blocks/{}/" -COMM_SNAPSHOT = COMM_BLOCK + "context/raw/json/rolls/owner/snapshot/{}/" -COMM_DELEGATE_BALANCE = " rpc get http://{}/chains/main/blocks/{}/context/contracts/{}" class RpcRewardApiImpl(RewardApi): - def __init__(self, nw, baking_address, wllt_clnt_mngr, node_url, validate=True): + COMM_HEAD = "%protocol%://{}/chains/main/blocks/head" + COMM_DELEGATES = "%protocol%://{}/chains/main/blocks/{}/context/delegates/{}" + COMM_BLOCK = "%protocol%://{}/chains/main/blocks/{}" + COMM_SNAPSHOT = COMM_BLOCK + "/context/raw/json/rolls/owner/snapshot/{}/" + COMM_DELEGATE_BALANCE = "%protocol%://{}/chains/main/blocks/{}/context/contracts/{}" + + def __init__(self, nw, baking_address, node_url, protocol='http', validate=False, verbose=True): super(RpcRewardApiImpl, self).__init__() self.blocks_per_cycle = nw['BLOCKS_PER_CYCLE'] @@ -25,96 +29,126 @@ def __init__(self, nw, baking_address, wllt_clnt_mngr, node_url, validate=True): self.blocks_per_roll_snapshot = nw['BLOCKS_PER_ROLL_SNAPSHOT'] self.baking_address = baking_address - self.wllt_clnt_mngr = wllt_clnt_mngr self.node_url = node_url + self.verbose = verbose self.validate = validate + + # replace protocol placeholder + self.COMM_HEAD = self.COMM_HEAD.replace('%protocol%',protocol) + self.COMM_DELEGATES = self.COMM_DELEGATES.replace('%protocol%',protocol) + self.COMM_BLOCK = self.COMM_BLOCK.replace('%protocol%',protocol) + self.COMM_SNAPSHOT = self.COMM_SNAPSHOT.replace('%protocol%',protocol) + self.COMM_DELEGATE_BALANCE = self.COMM_DELEGATE_BALANCE.replace('%protocol%',protocol) + if self.validate: mirror_selector = TzScanMirrorSelector(nw) mirror_selector.initialize() self.validate_api = TzScanRewardApiImpl(nw, self.baking_address, mirror_selector) - def get_nb_delegators(self, cycle, verbose=False): - _, delegators = self.__get_delegators_and_delgators_balance(cycle,verbose ) + def get_nb_delegators(self, cycle, current_level): + _, delegators = self.__get_delegators_and_delgators_balance(cycle, current_level) return len(delegators) - def get_rewards_for_cycle_map(self, cycle, verbose=False): + def get_rewards_for_cycle_map(self, cycle): + current_level, current_cycle = self.__get_current_level() + logger.debug("Current level {}, current cycle {}".format(current_level, current_cycle)) reward_data = {} - - reward_data["delegate_staking_balance"], reward_data[ - "delegators"] = self.__get_delegators_and_delgators_balance(cycle, verbose) + reward_data["delegate_staking_balance"], reward_data["delegators"] = self.__get_delegators_and_delgators_balance(cycle, current_level) reward_data["delegators_nb"] = len(reward_data["delegators"]) - current_level, head_hash, current_cycle = self.__get_current_level(verbose) + # Get last block in cycle where rewards are unfrozen + level_of_last_block_in_unfreeze_cycle = (cycle+self.preserved_cycles+1) * self.blocks_per_cycle - logger.debug("Current level {}, head hash {}".format(current_level, head_hash)) + logger.debug("Cycle {}, preserved cycles {}, blocks per cycle {}, last_block_cycle {}".format(cycle, self.preserved_cycles, self.blocks_per_cycle, level_of_last_block_in_unfreeze_cycle)) - # Get last block in cycle where rewards are unfrozen - level_for_relevant_request = (cycle + self.preserved_cycles + 1) * self.blocks_per_cycle - - logger.debug("Cycle {}, preserved cycles {}, blocks per cycle {}, level of interest {}" - .format(cycle, self.preserved_cycles, self.blocks_per_cycle, level_for_relevant_request)) - - if current_level - level_for_relevant_request >= 0: - request_metadata = COMM_BLOCK.format(self.node_url, head_hash, - current_level - level_for_relevant_request) + '/metadata/' - _, response_metadata = self.wllt_clnt_mngr.send_request(request_metadata) - metadata = parse_json_response(response_metadata) - balance_updates = metadata["balance_updates"] - - unfrozen_rewards = unfrozen_fees = 0 - for i in range(len(balance_updates)): - balance_update = balance_updates[i] - if balance_update["kind"] == "freezer": - if balance_update["delegate"] == self.baking_address: - if balance_update["category"] == "rewards": - unfrozen_rewards = -int(balance_update["change"]) - elif balance_update["category"] == "fees": - unfrozen_fees = -int(balance_update["change"]) - reward_data["total_rewards"] = unfrozen_rewards + unfrozen_fees + if current_level - level_of_last_block_in_unfreeze_cycle >= 0: + unfrozen_rewards = self.__get_unfrozen_rewards(level_of_last_block_in_unfreeze_cycle, cycle) + reward_data["total_rewards"] = unfrozen_rewards else: logger.warn("Please wait until the rewards and fees for cycle {} are unfrozen".format(cycle)) reward_data["total_rewards"] = 0 - reward_model = RewardProviderModel(reward_data["delegate_staking_balance"], reward_data["total_rewards"], - reward_data["delegators"]) + reward_model = RewardProviderModel(reward_data["delegate_staking_balance"], reward_data["total_rewards"], reward_data["delegators"]) + + logger.debug("delegate_staking_balance={}, total_rewards = {}".format(reward_data["delegate_staking_balance"],reward_data["total_rewards"])) + logger.debug("delegators = {}".format(reward_data["delegators"])) if self.validate: self.__validate_reward_data(reward_model, cycle) return reward_model - def __get_current_level(self, verbose=False): - _, response = self.wllt_clnt_mngr.send_request(COMM_HEAD.format(self.node_url)) - head = parse_json_response(response) + def __get_unfrozen_rewards(self, level_of_last_block_in_unfreeze_cycle, cycle): + request_metadata = self.COMM_BLOCK.format(self.node_url, level_of_last_block_in_unfreeze_cycle) + '/metadata' + metadata = self.do_rpc_request(request_metadata) + balance_updates = metadata["balance_updates"] + unfrozen_rewards = unfrozen_fees = 0 + + for i in range(len(balance_updates)): + balance_update = balance_updates[i] + if balance_update["kind"] == "freezer": + if balance_update["delegate"] == self.baking_address: + if int(balance_update["cycle"]) == cycle or int(balance_update["change"]) < 0: + if balance_update["category"] == "rewards": + unfrozen_rewards = -int(balance_update["change"]) + logger.debug("[__get_unfrozen_rewards] Found balance update for reward {}".format(balance_update)) + elif balance_update["category"] == "fees": + unfrozen_fees = -int(balance_update["change"]) + logger.debug("[__get_unfrozen_rewards] Found balance update for fee {}".format(balance_update)) + else: + logger.debug("[__get_unfrozen_rewards] Found balance update, not including: {}".format(balance_update)) + else: + logger.debug("[__get_unfrozen_rewards] Found balance update, cycle does not match or change is non-zero, not including: {}".format(balance_update)) + + return unfrozen_fees + unfrozen_rewards + + def do_rpc_request(self, request): + if self.verbose: + logger.debug("[do_rpc_request] Requesting URL {}".format(request)) + + sleep(0.1) # be nice to public node service + + resp = requests.get(request) + if resp.status_code != 200: + raise Exception("Request '{} failed with status code {}".format(request, resp.status_code)) + + response = resp.json() + if self.verbose: + logger.debug("[do_rpc_request] Response {}".format(response)) + return response + + def __get_current_level(self): + head = self.do_rpc_request(self.COMM_HEAD.format(self.node_url)) current_level = int(head["metadata"]["level"]["level"]) current_cycle = int(head["metadata"]["level"]["cycle"]) - head_hash = head["hash"] - return current_level, head_hash, current_cycle + # head_hash = head["hash"] + + return current_level, current_cycle - def __get_delegators_and_delgators_balance(self, cycle, verbose=False): - hash_snapshot_block = self.__get_snapshot_block_hash(cycle) + + def __get_delegators_and_delgators_balance(self, cycle, current_level): + + hash_snapshot_block = self.__get_snapshot_block_hash(cycle, current_level) if hash_snapshot_block == "": return 0, [] - request = COMM_DELEGATES.format(self.node_url, hash_snapshot_block, self.baking_address) - _, response = self.wllt_clnt_mngr.send_request(request) + request = self.COMM_DELEGATES.format(self.node_url, hash_snapshot_block, self.baking_address) delegate_staking_balance = 0 delegators = {} try: - response = parse_json_response(response) + response = self.do_rpc_request(request) delegate_staking_balance = int(response["staking_balance"]) delegators_addresses = response["delegated_contracts"] for idx, delegator in enumerate(delegators_addresses): - request = COMM_DELEGATE_BALANCE.format(self.node_url, hash_snapshot_block, delegator) - _, response = self.wllt_clnt_mngr.send_request(request) - response = parse_json_response(response) + request = self.COMM_DELEGATE_BALANCE.format(self.node_url, hash_snapshot_block, delegator) + response = self.do_rpc_request(request) delegators[delegator] = int(response["balance"]) logger.debug( @@ -125,22 +159,16 @@ def __get_delegators_and_delgators_balance(self, cycle, verbose=False): return delegate_staking_balance, delegators - def __get_snapshot_block_hash(self, cycle, verbose=False): - - current_level, head_hash, current_cycle = self.__get_current_level(verbose) + def __get_snapshot_block_hash(self, cycle, current_level): - level_for_snapshot_request = (cycle - self.preserved_cycles) * self.blocks_per_cycle + 1 - - logger.debug("Current level {}, head hash {}".format(current_level, head_hash)) - logger.debug("Cycle {}, preserved cycles {}, blocks per cycle {}, level of interest {}" - .format(cycle, self.preserved_cycles, self.blocks_per_cycle, level_for_snapshot_request)) + snapshot_level = (cycle - self.preserved_cycles) * self.blocks_per_cycle + 1 + logger.debug("Reward cycle {}, snapshot level {}".format(cycle,snapshot_level)) block_level = cycle * self.blocks_per_cycle + 1 - if current_level - level_for_snapshot_request >= 0: - request = COMM_SNAPSHOT.format(self.node_url, block_level, cycle) - _, response = self.wllt_clnt_mngr.send_request(request) - snapshots = parse_json_response(response) + if current_level - snapshot_level >= 0: + request = self.COMM_SNAPSHOT.format(self.node_url, block_level, cycle) + snapshots = self.do_rpc_request(request) if len(snapshots) == 1: chosen_snapshot = snapshots[0] @@ -148,17 +176,14 @@ def __get_snapshot_block_hash(self, cycle, verbose=False): logger.error("Too few or too many possible snapshots found!") return "" - level_snapshot_block = (cycle - self.preserved_cycles - 2) * self.blocks_per_cycle + ( chosen_snapshot + 1) * self.blocks_per_roll_snapshot - request = COMM_BLOCK.format(self.node_url, head_hash, current_level - level_snapshot_block) - _, comm_block_response = self.wllt_clnt_mngr.send_request(request) - comm_block_response = comm_block_response.rstrip() - comm_block_response_json = extract_json_part(comm_block_response, verbose=True) - cmd_mngr = CommandManager(verbose=verbose) - _, hash_snapshot_block = cmd_mngr.execute("echo '{}' | jq -r .hash".format(comm_block_response_json)) - - logger.debug("Hash of snapshot block is {}".format(hash_snapshot_block)) + level_snapshot_block = (cycle - self.preserved_cycles - 2) * self.blocks_per_cycle + (chosen_snapshot+1) * self.blocks_per_roll_snapshot + return level_snapshot_block + # request = self.COMM_BLOCK.format(self.node_url, level_snapshot_block) + # response = self.do_rpc_request(request) + # snapshot = response['hash'] + # logger.debug("Hash of snapshot block is {}".format(snapshot)) - return hash_snapshot_block + # return snapshot else: logger.info("Cycle too far in the future") return "" @@ -175,11 +200,36 @@ def __validate_reward_data(self, reward_data_rpc, cycle): if (len(reward_data_rpc.delegator_balance_dict)) == 0: return - # delegators_balance_tzscan = [ int(reward_data_tzscan["delegators_balance"][i][1]) for i in range(len(reward_data_tzscan["delegators_balance"]))] - # print(set(list(reward_data_rpc["delegators"].values()))) - # print(set(delegators_balance_tzscan)) if not (reward_data_rpc.delegator_balance_dict == reward_data_tzscan.delegator_balance_dict): raise Exception("Delegators' balances from local node and tzscan are not identical.") if not reward_data_rpc.total_reward_amount == reward_data_tzscan.total_reward_amount: raise Exception("Total rewards from local node and tzscan are not identical.") + + logger.debug("[__validate_reward_data] validation passed") + + +def test(): + configure_test_logger() + + network_config_map = init_network_config("MAINNET", None, None) + network_config = network_config_map["MAINNET"] + + prpc = PRpcRewardApiImpl(network_config, "tz1Z1tMai15JWUWeN2PKL9faXXVPMuWamzJj", "mainnet.tezrpc.me",True, True) + prpc.get_rewards_for_cycle_map(42) + + +def configure_test_logger(): + test_logger = logging.getLogger('main') + test_logger.setLevel(logging.DEBUG) + ch = logging.StreamHandler() + ch.setLevel(logging.DEBUG) + formatter = logging.Formatter('%(asctime)s - %(threadName)-9s - %(message)s') + ch.setFormatter(formatter) + test_logger.addHandler(ch) + global logger + logger = test_logger + + +if __name__ == '__main__': + test() \ No newline at end of file From 47a041a1f51d9e4f7a4f3aacba4e21076848a3b1 Mon Sep 17 00:00:00 2001 From: amzid Date: Sat, 26 Oct 2019 12:59:35 +0200 Subject: [PATCH 2/2] adjust unit tests with the provider class changes --- src/calc/test_calculatePhase0.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/calc/test_calculatePhase0.py b/src/calc/test_calculatePhase0.py index eb0e8635..f6e3fea8 100644 --- a/src/calc/test_calculatePhase0.py +++ b/src/calc/test_calculatePhase0.py @@ -2,7 +2,7 @@ from calc.calculate_phase0 import CalculatePhase0 from model import reward_log -from rpc.prpc_reward_api import PRpcRewardApiImpl +from api.provider_factory import ProviderFactory BAKING_ADDRESS = "tz1Z1tMai15JWUWeN2PKL9faXXVPMuWamzJj" @@ -14,7 +14,7 @@ def test_calculate(self): nw = {'NAME': 'MAINNET', 'NB_FREEZE_CYCLE': 5, 'BLOCK_TIME_IN_SEC': 60, 'BLOCKS_PER_CYCLE': 4096, 'BLOCKS_PER_ROLL_SNAPSHOT': 256} - api = PRpcRewardApiImpl(nw, BAKING_ADDRESS, "mainnet.tezrpc.me") + api = ProviderFactory(provider='prpc').newRewardApi(nw, BAKING_ADDRESS, '') model = api.get_rewards_for_cycle_map(153) phase0 = CalculatePhase0(model)