From 8ad8e66d374f8345c49d7c5e2d971ba9d3a82c7e Mon Sep 17 00:00:00 2001 From: Ben Busby Date: Wed, 24 Mar 2021 15:13:52 -0400 Subject: [PATCH] Improve static typing throughout repo Eventually this should be part of a separate mypy ci build, but right now it's just a general guideline. Future commits and PRs should be validated for static typing wherever possible. For reference, the testing commands used for this commit were: mypy --ignore-missing-imports --pretty --disallow-untyped-calls app/ mypy --ignore-missing-imports --pretty --disallow-untyped-calls test/ --- app/filter.py | 57 ++++++++++++++++++++++++++++++++++---------- app/request.py | 27 +++++++++++---------- app/routes.py | 2 +- app/utils/results.py | 1 + app/utils/search.py | 41 ++++++++++++++++++------------- 5 files changed, 86 insertions(+), 42 deletions(-) diff --git a/app/filter.py b/app/filter.py index 66e9c6e..d03a112 100644 --- a/app/filter.py +++ b/app/filter.py @@ -1,6 +1,7 @@ from app.request import VALID_PARAMS from app.utils.results import * -from bs4.element import ResultSet +from bs4 import BeautifulSoup +from bs4.element import ResultSet, Tag from cryptography.fernet import Fernet import re import urllib.parse as urlparse @@ -8,7 +9,7 @@ from urllib.parse import parse_qs class Filter: - def __init__(self, user_keys: dict, mobile=False, config=None): + def __init__(self, user_keys: dict, mobile=False, config=None) -> None: if config is None: config = {} @@ -29,7 +30,7 @@ class Filter: def elements(self): return self._elements - def reskin(self, page): + def reskin(self, page: str) -> str: # Aesthetic only re-skinning if self.dark: page = page.replace( @@ -39,22 +40,22 @@ class Filter: return page - def encrypt_path(self, msg, is_element=False): + def encrypt_path(self, path, is_element=False) -> str: # Encrypts path to avoid plaintext results in logs if is_element: # Element paths are encrypted separately from text, to allow key # regeneration once all items have been served to the user enc_path = Fernet( self.user_keys['element_key'] - ).encrypt(msg.encode()).decode() + ).encrypt(path.encode()).decode() self._elements += 1 return enc_path return Fernet( self.user_keys['text_key'] - ).encrypt(msg.encode()).decode() + ).encrypt(path.encode()).decode() - def clean(self, soup): + def clean(self, soup) -> BeautifulSoup: self.main_divs = soup.find('div', {'id': 'main'}) self.remove_ads() self.fix_question_section() @@ -90,7 +91,12 @@ class Filter: return soup - def remove_ads(self): + def remove_ads(self) -> None: + """Removes ads found in the list of search result divs + + Returns: + None (The soup object is modified directly) + """ if not self.main_divs: return @@ -99,7 +105,16 @@ class Filter: if has_ad_content(_.text)] _ = div.decompose() if len(div_ads) else None - def fix_question_section(self): + def fix_question_section(self) -> None: + """Collapses the "People Also Asked" section into a "details" element + + These sections are typically the only sections in the results page that + are structured as

Title

...
, so they are + extracted by checking all result divs for h2 children. + + Returns: + None (The soup object is modified directly) + """ if not self.main_divs: return @@ -126,7 +141,14 @@ class Filter: for question in questions: question['style'] = 'padding: 10px; font-style: italic;' - def update_element_src(self, element, mime): + def update_element_src(self, element: Tag, mime: str) -> None: + """Encrypts the original src of an element and rewrites the element src + to use the "/element?src=" pass-through. + + Returns: + None (The soup element is modified directly) + + """ src = element['src'] if src.startswith('//'): @@ -145,7 +167,8 @@ class Filter: src, is_element=True) + '&type=' + urlparse.quote(mime) - def update_styling(self, soup): + def update_styling(self, soup) -> None: + """""" # Remove unnecessary button(s) for button in soup.find_all('button'): button.decompose() @@ -168,7 +191,17 @@ class Filter: except AttributeError: pass - def update_link(self, link): + def update_link(self, link: Tag) -> None: + """Update internal link paths with encrypted path, otherwise remove + unnecessary redirects and/or marketing params from the url + + Args: + link: A bs4 Tag element to inspect and update + + Returns: + None (the tag is updated directly) + + """ # Replace href with only the intended destination (no "utm" type tags) href = link['href'].replace('https://www.google.com', '') if 'advanced_search' in href or 'tbm=shop' in href: diff --git a/app/request.py b/app/request.py index 71eeb45..fadcc18 100644 --- a/app/request.py +++ b/app/request.py @@ -29,10 +29,10 @@ class TorError(Exception): altogether). """ - def __init__(self, message, disable=False): + def __init__(self, message, disable=False) -> None: self.message = message self.disable = disable - super().__init__(self.message) + super().__init__(message) def send_tor_signal(signal: Signal) -> bool: @@ -64,7 +64,7 @@ def gen_query(query, args, config, near_city=None) -> str: # Use :past(hour/day/week/month/year) if available # example search "new restaurants :past month" - sub_lang = '' + lang = '' if ':past' in query and 'tbs' not in args: time_range = str.strip(query.split(':past', 1)[-1]) param_dict['tbs'] = '&tbs=' + ('qdr:' + str.lower(time_range[0])) @@ -79,9 +79,10 @@ def gen_query(query, args, config, near_city=None) -> str: # Example: # &tbs=qdr:h,lr:lang_1pl # -- the lr param needs to be extracted and remove the leading '1' - sub_lang = [_ for _ in result_tbs.split(',') if 'lr:' in _] - sub_lang = sub_lang[0][sub_lang[0].find('lr:') + - 3:len(sub_lang[0])] if len(sub_lang) > 0 else '' + result_params = [_ for _ in result_tbs.split(',') if 'lr:' in _] + if len(result_params) > 0: + result_param = result_params[0] + lang = result_param[result_param.find('lr:') + 3:len(result_param)] # Ensure search query is parsable query = urlparse.quote(query) @@ -103,8 +104,8 @@ def gen_query(query, args, config, near_city=None) -> str: if 'source' in args: param_dict['source'] = '&source=' + args.get('source') param_dict['lr'] = ('&lr=' + ''.join( - [_ for _ in sub_lang if not _.isdigit()] - )) if sub_lang else '' + [_ for _ in lang if not _.isdigit()] + )) if lang else '' else: param_dict['lr'] = ( '&lr=' + config.lang_search @@ -150,12 +151,12 @@ class Request: # Set up proxy, if previously configured if os.environ.get('WHOOGLE_PROXY_LOC'): auth_str = '' - if os.environ.get('WHOOGLE_PROXY_USER'): - auth_str = os.environ.get('WHOOGLE_PROXY_USER') + \ - ':' + os.environ.get('WHOOGLE_PROXY_PASS') + if os.environ.get('WHOOGLE_PROXY_USER', ''): + auth_str = os.environ.get('WHOOGLE_PROXY_USER', '') + \ + ':' + os.environ.get('WHOOGLE_PROXY_PASS', '') self.proxies = { - 'http': os.environ.get('WHOOGLE_PROXY_TYPE') + '://' + - auth_str + '@' + os.environ.get('WHOOGLE_PROXY_LOC'), + 'http': os.environ.get('WHOOGLE_PROXY_TYPE', '') + '://' + + auth_str + '@' + os.environ.get('WHOOGLE_PROXY_LOC', ''), } self.proxies['https'] = self.proxies['http'].replace('http', 'https') diff --git a/app/routes.py b/app/routes.py index 006be07..35f1066 100644 --- a/app/routes.py +++ b/app/routes.py @@ -347,7 +347,7 @@ def window(): return render_template('display.html', response=results) -def run_app(): +def run_app() -> None: parser = argparse.ArgumentParser( description='Whoogle Search console runner') parser.add_argument( diff --git a/app/utils/results.py b/app/utils/results.py index 58c450f..2a9e60e 100644 --- a/app/utils/results.py +++ b/app/utils/results.py @@ -57,6 +57,7 @@ def get_first_link(soup: BeautifulSoup) -> str: # Return the first search result URL if 'url?q=' in a['href']: return filter_link_args(a['href']) + return '' def get_site_alt(link: str) -> str: diff --git a/app/utils/search.py b/app/utils/search.py index ee75f3f..9694d14 100644 --- a/app/utils/search.py +++ b/app/utils/search.py @@ -24,15 +24,24 @@ def needs_https(url: str) -> bool: bool: True/False representing the need to upgrade """ - https_only = os.getenv('HTTPS_ONLY', False) + https_only = bool(os.getenv('HTTPS_ONLY', 0)) is_heroku = url.endswith('.herokuapp.com') is_http = url.startswith('http://') return (is_heroku and is_http) or (https_only and is_http) -def has_captcha(site_contents: str) -> bool: - return CAPTCHA in site_contents +def has_captcha(results: str) -> bool: + """Checks to see if the search results are blocked by a captcha + + Args: + results: The search page html as a string + + Returns: + bool: True/False indicating if a captcha element was found + + """ + return CAPTCHA in results class Search: @@ -118,23 +127,23 @@ class Search: """ mobile = 'Android' in self.user_agent or 'iPhone' in self.user_agent - content_filter = Filter( - self.session['fernet_keys'], - mobile=mobile, - config=self.config) - full_query = gen_query( - self.query, - self.request_params, - self.config, - content_filter.near) + content_filter = Filter(self.session['fernet_keys'], + mobile=mobile, + config=self.config) + full_query = gen_query(self.query, + self.request_params, + self.config, + content_filter.near) get_body = g.user_request.send(query=full_query) # Produce cleanable html soup from response html_soup = bsoup(content_filter.reskin(get_body.text), 'html.parser') - html_soup.insert( - 0, - bsoup(TOR_BANNER, 'html.parser') - if g.user_request.tor_valid else bsoup('', 'html.parser')) + + # Indicate whether or not a Tor connection is active + tor_banner = bsoup('', 'html.parser') + if g.user_request.tor_valid: + tor_banner = bsoup(TOR_BANNER, 'html.parser') + html_soup.insert(0, tor_banner) if self.feeling_lucky: return get_first_link(html_soup), 0