Skip to content

Commit

Permalink
add field registrar_abusemail
Browse files Browse the repository at this point in the history
  • Loading branch information
rvyhnal committed Oct 16, 2024
1 parent 6611411 commit 9e0aa9d
Show file tree
Hide file tree
Showing 3 changed files with 72 additions and 22 deletions.
4 changes: 4 additions & 0 deletions convey/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -295,6 +295,7 @@ def get_method(start: Type, target: Type):
raise LookupError

whois = Type("whois", TypeGroup.whois, "ask whois servers", is_private=True)
whoisdomain = Type("whoisdomain", TypeGroup.whois, "ask whois servers for domain", is_private=True)
web = Type("web", TypeGroup.web, "scrape web contents", is_private=True)

external = Type("external", TypeGroup.custom, from_message="from a method in your .py file")
Expand All @@ -304,6 +305,7 @@ def get_method(start: Type, target: Type):
reg_m = Type("reg_m", TypeGroup.custom, from_message="match from a regular expression")
netname = Type("netname", TypeGroup.whois)
country = Type("country", TypeGroup.whois)
registrar_abusemail = Type("registrar_abusemail", TypeGroup.whois, "Abuse e-mail contact from whois")
abusemail = Type("abusemail", TypeGroup.whois, "Abuse e-mail contact from whois")
prefix = Type("prefix", TypeGroup.whois) # XX rename to 'inetnum'? to 'range'?
csirt_contact = Type("csirt_contact", TypeGroup.whois,
Expand Down Expand Up @@ -510,12 +512,14 @@ def _get_methods():
"FIELDS") else Checker.hostname_ip,
# (t.url, t.ip): Whois.url2ip,
(t.ip, t.whois): Whois,
(t.hostname, t.whoisdomain): lambda x: Whois(ip=None, hostname=x),
# (t.asn, t.whois): Whois, # XX can be easily allowed, however Whois object will huff there is no IP prefix range
(t.cidr, t.ip): Checker.cidr_ips if Config.get("multiple_cidr_ip", "FIELDS") else
lambda x: str(ipaddress.ip_interface(x).ip),
(t.whois, t.prefix): lambda x: str(x.get[0]),
(t.whois, t.asn): lambda x: x.get[3],
(t.whois, t.abusemail): lambda x: x.get[6],
(t.whoisdomain, t.registrar_abusemail): lambda x: x.get[8],
(t.whois, t.country): lambda x: x.get[5],
(t.whois, t.netname): lambda x: x.get[4],
(t.whois, t.csirt_contact):
Expand Down
89 changes: 67 additions & 22 deletions convey/whois.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from time import time, sleep

from netaddr import IPRange, IPNetwork
from tldextract import tldextract

from .contacts import Contacts
from .config import Config, subprocess_env
Expand Down Expand Up @@ -76,27 +77,36 @@ def init(cls, stats, ranges, ip_seen, csvstats, slow_mode=False, unknown_mode=Fa
# ["whois.ripe.net -r", "whois.arin.net", "whois.lacnic.net", "whois.apnic.net", "whois.afrinic.net"]):
# Whois.servers[name] = val

def __init__(self, ip):
def __init__(self, ip, hostname=None):
"""
self.get stores tuple: prefix, location, mail, asn, netname, country, ttl
"""
self.ip = ip
self.hostname = hostname
self.whois_response = []
prefix = self.cache_load() # try load prefix from earlier WHOIS responses
if prefix:
if (self.ttl != -1 and self.get[7] + self.ttl < time()) or (Whois.unknown_mode and not self.get[6]):
# the TTL is too old, we cannot guarantee IP stayed in the same prefix, let's get rid of the old results
# OR we are in unknown_mode which means we want abusemail. If not here, maybe another IP claimed
# a range superset without abuse e-mail. Delete this possible superset
# We do not have to call now `self.get = None; del self.ip_seen[ip]` if there is no need to be thread safe,
# these lines will be called at the function end.
del self.ranges[prefix]

if self.hostname:
if len(self.hostname.split(".")) > 2:
self.hostname_registerable = self.to_registerable(self.hostname)
else:
self.count_stats()
return
self.hostname_registerable = self.hostname

if self.ip:
prefix = self.cache_load() # try load prefix from earlier WHOIS responses
if prefix:
if (self.ttl != -1 and self.get[7] + self.ttl < time()) or (Whois.unknown_mode and not self.get[6]):
# the TTL is too old, we cannot guarantee IP stayed in the same prefix, let's get rid of the old results
# OR we are in unknown_mode which means we want abusemail. If not here, maybe another IP claimed
# a range superset without abuse e-mail. Delete this possible superset
# We do not have to call now `self.get = None; del self.ip_seen[ip]` if there is no need to be thread safe,
# these lines will be called at the function end.
del self.ranges[prefix]
else:
self.count_stats()
return

if self.see:
print(f"Whois {ip}... ", end="", flush=True)
print(f"Whois {self.ip or self.hostname_registerable}... ", end="", flush=True)
if Whois.slow_mode:
if self.see:
print("waiting 7 seconds... ", end="", flush=True)
Expand All @@ -105,7 +115,7 @@ def __init__(self, ip):
if self.see:
print(get[2] or "no incident contact.")
prefix = get[0]
if not prefix:
if not prefix and self.ip:
logger.info(f"No prefix found for IP {ip}")
prefix = IPRange(0, 0) # make key consistent when saving into cache
self.ip_seen[ip] = prefix
Expand Down Expand Up @@ -202,14 +212,14 @@ def _match_response(self, patterns, last_word=False):
:param patterns: pattern string or list of strings
:param last_word: returns only the last word of whole matched expression else last group (ex: the one in parentheses)
:return:
"""
# , take_nth=None, group=None
# :param take_nth: if available, return n-th result instead of the first available
# I.E. `whois 131.72.138.234 | grep ountr` returns three countries: UY, CL, CL.
# ARIN registry informs us that this IP is a LACNIC resource and prints out LACNIC address in UY.
# However, CL is the country the IP is hosted in.
# :param group: returned group - default: last group is returned (ex: the one in parentheses)
:return:
"""
if type(patterns) is str:
patterns = [patterns]

Expand Down Expand Up @@ -350,6 +360,7 @@ def analyze(self):
asn = self._match_response(r'\norigin(.*)\d+', last_word=True)
netname = self._match_response([r'netname:\s*([^\s]*)', r'network:network-name:\s*([^\s]*)'])

registrar_ab = self.get_registrar_abusemail()
ab = self.get_abusemail()
if Whois.unknown_mode and not ab:
ab = self.resolve_unknown_mail()
Expand All @@ -362,7 +373,7 @@ def analyze(self):
else:
get1 = "local"
get2 = ab
return prefix, get1, get2, asn, netname, country, ab, int(time())
return prefix, get1, get2, asn, netname, country, ab, int(time()), registrar_ab

def _load_country_from_addresses(self):
# let's try to find country in the non-standardised address field
Expand All @@ -373,7 +384,9 @@ def _load_country_from_addresses(self):
return c
return ""

reAbuse = re.compile(r'[a-z0-9._%+-]{1,64}@(?:[a-z0-9-]{1,63}\.){1,125}[a-z]{2,63}')
# email regex
email_regex = r"[a-z0-9._%+-]{1,64}@(?:[a-z0-9-]{1,63}\.){1,125}[a-z]{2,63}"
reAbuse = re.compile(email_regex)

def get_abusemail(self):
""" Loads abusemail from last whois response OR from whois json api. """
Expand All @@ -384,16 +397,48 @@ def get_abusemail(self):
]))
return match.group(0) if match else ""

def get_registrar_abusemail(self):
"""Loads registrar's abusemail from last whois response OR from whois json api."""

# reg_lines = re.findall(r'^.*registrar.*$', self.whois_response[0])
# for rl in reg_lines:
# if 'abuse' in rl and '@' in rl:
# abusemail = re.search(self.email_regex, rl)
# if abusemail:
# return abusemail.group(0)
# return ""

match = re.search(r".*(?=.*\babuse\b).*(?=.*\b@\b).*(?=.*\bregistrar\b).*(?:.*\b(email|contact)\b)?.*", self.whois_response[0])

if match:
abusemail = re.search(self.email_regex, match.group(0))
if abusemail:
return abusemail.group(0)
else:
return ""

def to_registerable(self, url: str) -> str:
"""Compares the given url with a public list of registerable domains
ex: website.xyz.com.br -> xyz.com.br
Note: if a nonregisterable url is given (e.g.: com.br) the same is returned
"""

return tldextract.extract(url).registered_domain

regRe = re.compile(r"using server (.*)\.")

def _exec(self, server, server_url=None):
""" Query whois server """
"""Query whois server"""
target = self.hostname_registerable if self.hostname else self.ip

if server == "general":
cmd = ["whois", "--verbose", self.ip]
cmd = ["whois", "--verbose", target]
else:
if not server_url:
server_url = Whois.servers[server]
cmd = ["whois", "--verbose", "-h", server_url, "--", self.ip]
cmd = ["whois", "--verbose", "-h", server_url, "--", target]
self.last_server = None # check what registry whois asks - may use a strange LIR that returns non-senses
try:
# in case wrong env is set to whois, we get `147.32.106.205` country NL and not CZ
Expand All @@ -404,7 +449,7 @@ def _exec(self, server, server_url=None):
except UnicodeDecodeError:
# ip address 94.230.155.109 had this string 'Jan Krivsky Hl\xc3\x83\x83\xc3\x82\xc2\xa1dkov' and everything failed
self.whois_response = []
logger.warning("Whois response for IP {} on server {} cannot be parsed.".format(self.ip, server))
logger.warning("Whois response for IP {} on server {} cannot be parsed.".format(target, server))
except TypeError: # could not resolve host
self.whois_response = []
except FileNotFoundError:
Expand Down
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -21,3 +21,4 @@ pythondialog
requests
tabulate
xlrd
tldextract

0 comments on commit 9e0aa9d

Please sign in to comment.