-
Notifications
You must be signed in to change notification settings - Fork 0
/
ethereum_data.py
50 lines (44 loc) · 2.09 KB
/
ethereum_data.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
# this the interface to create your own data source
# this class pings etherscan to get the latest code and balance information
import requests
import logging
log = logging.getLogger(__name__)
class EthereumData:
def __init__(self, contract_address):
self.apiDomain = "https://api.etherscan.io/api"
self.apikey = "VT4IW6VK7VES1Q9NYFI74YKH8U7QW9XRHN"
self.contract_addr = contract_address
def getBalance(self, address):
try:
apiEndPoint = "%s?module=account&action=balance&address=%s&tag=latest&apikey=%s" % (self.apiDomain, address, self.apikey)
r = requests.get(apiEndPoint)
result = r.json()
status = result['message']
if status == "OK":
result = result['result']
except Exception as e:
log.exception("Error at: contract address: %s" % address)
raise e
return result
def getCode(self, address):
try:
apiEndPoint = "%s?module=proxy&action=eth_getCode&address=%s&tag=latest&apikey=%s" % (self.apiDomain, address, self.apikey)
r = requests.get(apiEndPoint)
result = r.json()["result"]
except Exception as e:
log.exception("Error at: contract address: %s" % address)
raise e
return result
def getStorageAt(self, position):
try:
position = hex(position)
if position[-1] == "L":
position = position[:-1]
apiEndPoint = "%s?module=proxy&action=eth_getStorageAt&address=%s&position=%s&tag=latest&apikey=%s" % (self.apiDomain, self.contract_addr, position, self.apikey)
r = requests.get(apiEndPoint)
result = r.json()["result"]
except Exception as e:
if str(e) != 'timeout':
log.exception("Error at: contract address: %s, position: %s" % (self.contract_addr, position))
raise
return int(result, 16)