diff --git a/.github/workflows/pep8.yml b/.github/workflows/pep8.yml new file mode 100644 index 0000000..26bcc20 --- /dev/null +++ b/.github/workflows/pep8.yml @@ -0,0 +1,22 @@ +name: pep8 + +on: + push + +jobs: + test: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v2 + - name: Set up Python + uses: actions/setup-python@v2 + with: + python-version: '3.x' + - name: Install dependencies + run: | + python -m pip install --upgrade pip + pip install pycodestyle + - name: Run pycodestyle + run: | + pycodestyle --show-source --show-pep8 app/* + pycodestyle --show-source --show-pep8 test/* diff --git a/app/__init__.py b/app/__init__.py index a349acc..dccc85d 100644 --- a/app/__init__.py +++ b/app/__init__.py @@ -6,20 +6,35 @@ from flask_session import Session import os from stem import Signal -app = Flask(__name__, static_folder=os.path.dirname(os.path.abspath(__file__)) + '/static') +app = Flask(__name__, static_folder=os.path.dirname( + os.path.abspath(__file__)) + '/static') app.user_elements = {} app.default_key_set = generate_user_keys() app.no_cookie_ips = [] app.config['SECRET_KEY'] = os.urandom(32) app.config['SESSION_TYPE'] = 'filesystem' app.config['VERSION_NUMBER'] = '0.2.1' -app.config['APP_ROOT'] = os.getenv('APP_ROOT', os.path.dirname(os.path.abspath(__file__))) -app.config['STATIC_FOLDER'] = os.getenv('STATIC_FOLDER', os.path.join(app.config['APP_ROOT'], 'static')) -app.config['CONFIG_PATH'] = os.getenv('CONFIG_VOLUME', os.path.join(app.config['STATIC_FOLDER'], 'config')) -app.config['DEFAULT_CONFIG'] = os.path.join(app.config['CONFIG_PATH'], 'config.json') -app.config['SESSION_FILE_DIR'] = os.path.join(app.config['CONFIG_PATH'], 'session') -app.config['BANG_PATH'] = os.getenv('CONFIG_VOLUME', os.path.join(app.config['STATIC_FOLDER'], 'bangs')) -app.config['BANG_FILE'] = os.path.join(app.config['BANG_PATH'], 'bangs.json') +app.config['APP_ROOT'] = os.getenv( + 'APP_ROOT', + os.path.dirname(os.path.abspath(__file__))) +app.config['STATIC_FOLDER'] = os.getenv( + 'STATIC_FOLDER', + os.path.join(app.config['APP_ROOT'], 'static')) +app.config['CONFIG_PATH'] = os.getenv( + 'CONFIG_VOLUME', + os.path.join(app.config['STATIC_FOLDER'], 'config')) +app.config['DEFAULT_CONFIG'] = os.path.join( + app.config['CONFIG_PATH'], + 'config.json') +app.config['SESSION_FILE_DIR'] = os.path.join( + app.config['CONFIG_PATH'], + 'session') +app.config['BANG_PATH'] = os.getenv( + 'CONFIG_VOLUME', + os.path.join(app.config['STATIC_FOLDER'], 'bangs')) +app.config['BANG_FILE'] = os.path.join( + app.config['BANG_PATH'], + 'bangs.json') if not os.path.exists(app.config['CONFIG_PATH']): os.makedirs(app.config['CONFIG_PATH']) @@ -38,4 +53,4 @@ Session(app) # Attempt to acquire tor identity, to determine if Tor config is available send_tor_signal(Signal.HEARTBEAT) -from app import routes +from app import routes # noqa diff --git a/app/filter.py b/app/filter.py index 5299b92..5fede0e 100644 --- a/app/filter.py +++ b/app/filter.py @@ -32,20 +32,27 @@ class Filter: def reskin(self, page): # Aesthetic only re-skinning if self.dark: - page = page.replace('fff', '000').replace('202124', 'ddd').replace('1967D2', '3b85ea') + page = page.replace( + 'fff', '000').replace( + '202124', 'ddd').replace( + '1967D2', '3b85ea') return page def encrypt_path(self, msg, is_element=False): # Encrypts path to avoid plaintext results in logs if is_element: - # Element paths are tracked differently in order for the element key to be regenerated - # once all elements have been loaded - enc_path = Fernet(self.user_keys['element_key']).encrypt(msg.encode()).decode() + # Element paths are encrypted separately from text, to allow key + # regeneration once all items have been served to the user + enc_path = Fernet( + self.user_keys['element_key'] + ).encrypt(msg.encode()).decode() self._elements += 1 return enc_path - return Fernet(self.user_keys['text_key']).encrypt(msg.encode()).decode() + return Fernet( + self.user_keys['text_key'] + ).encrypt(msg.encode()).decode() def clean(self, soup): self.main_divs = soup.find('div', {'id': 'main'}) @@ -74,8 +81,8 @@ class Filter: footer = soup.find('footer') if footer: # Remove divs that have multiple links beyond just page navigation - [_.decompose() for _ in footer.find_all('div', recursive=False) - if len(_.find_all('a', href=True)) > 3] + [_.decompose() for _ in footer.find_all('div', recursive=False) + if len(_.find_all('a', href=True)) > 3] header = soup.find('header') if header: @@ -88,8 +95,9 @@ class Filter: return for div in [_ for _ in self.main_divs.find_all('div', recursive=True)]: - has_ad = len([_ for _ in div.find_all('span', recursive=True) if has_ad_content(_.text)]) - _ = div.decompose() if has_ad else None + div_ads = [_ for _ in div.find_all('span', recursive=True) + if has_ad_content(_.text)] + _ = div.decompose() if len(div_ads) else None def fix_question_section(self): if not self.main_divs: @@ -97,14 +105,14 @@ class Filter: question_divs = [_ for _ in self.main_divs.find_all( 'div', recursive=False - ) if len(_.find_all('h2')) > 0] + ) if len(_.find_all('h2')) > 0] if len(question_divs) == 0: return # Wrap section in details element to allow collapse/expand - details = BeautifulSoup(features='lxml').new_tag('details') - summary = BeautifulSoup(features='lxml').new_tag('summary') + details = BeautifulSoup('html.parser').new_tag('details') + summary = BeautifulSoup('html.parser').new_tag('summary') summary.string = question_divs[0].find('h2').text question_divs[0].find('h2').decompose() details.append(summary) @@ -113,7 +121,7 @@ class Filter: for question_div in question_divs: questions = [_ for _ in question_div.find_all( 'div', recursive=True - ) if _.text.endswith('?')] + ) if _.text.endswith('?')] for question in questions: question['style'] = 'padding: 10px; font-style: italic;' @@ -131,11 +139,15 @@ class Filter: element['src'] = BLANK_B64 return - element['src'] = 'element?url=' + self.encrypt_path(element_src, is_element=True) + \ - '&type=' + urlparse.quote(mime) - # TODO: Non-mobile image results link to website instead of image + element['src'] = 'element?url=' + self.encrypt_path( + element_src, + is_element=True) + '&type=' + urlparse.quote(mime) + + # FIXME: Non-mobile image results link to website instead of image # if not self.mobile: - # img.append(BeautifulSoup(FULL_RES_IMG.format(element_src), 'html.parser')) + # img.append( + # BeautifulSoup(FULL_RES_IMG.format(element_src), + # 'html.parser')) def update_styling(self, soup): # Remove unnecessary button(s) @@ -149,8 +161,9 @@ class Filter: # Update logo logo = soup.find('a', {'class': 'l'}) if logo and self.mobile: - logo['style'] = 'display:flex; justify-content:center; align-items:center; color:#685e79; ' \ - 'font-size:18px; ' + logo['style'] = ('display:flex; justify-content:center; ' + 'align-items:center; color:#685e79; ' + 'font-size:18px; ') # Fix search bar length on mobile try: @@ -163,7 +176,7 @@ class Filter: # Replace href with only the intended destination (no "utm" type tags) href = link['href'].replace('https://www.google.com', '') if 'advanced_search' in href or 'tbm=shop' in href: - # TODO: The "Shopping" tab requires further filtering (see #136) + # FIXME: The "Shopping" tab requires further filtering (see #136) # Temporarily removing all links to that tab for now. link.decompose() return @@ -171,20 +184,26 @@ class Filter: link['target'] = '_blank' result_link = urlparse.urlparse(href) - query_link = parse_qs(result_link.query)['q'][0] if '?q=' in href else '' + query_link = parse_qs( + result_link.query + )['q'][0] if '?q=' in href else '' if query_link.startswith('/'): - # Internal google links (i.e. mail, maps, etc) should still be forwarded to Google + # Internal google links (i.e. mail, maps, etc) should still + # be forwarded to Google link['href'] = 'https://google.com' + query_link elif '/search?q=' in href: - # "li:1" implies the query should be interpreted verbatim, so we wrap it in double quotes + # "li:1" implies the query should be interpreted verbatim, + # which is accomplished by wrapping the query in double quotes if 'li:1' in href: query_link = '"' + query_link + '"' new_search = 'search?q=' + self.encrypt_path(query_link) query_params = parse_qs(urlparse.urlparse(href).query) for param in VALID_PARAMS: - param_val = query_params[param][0] if param in query_params else '' + if param not in query_params: + continue + param_val = query_params[param][0] new_search += '&' + param + '=' + param_val link['href'] = new_search elif 'url?q=' in href: @@ -199,9 +218,11 @@ class Filter: # Replace link location if "alts" config is enabled if self.alt_redirect: - # Search and replace all link descriptions with alternative location + # Search and replace all link descriptions + # with alternative location link['href'] = get_site_alt(link['href']) - link_desc = link.find_all(text=re.compile('|'.join(SITE_ALTS.keys()))) + link_desc = link.find_all( + text=re.compile('|'.join(SITE_ALTS.keys()))) if len(link_desc) == 0: return diff --git a/app/models/config.py b/app/models/config.py index e69e037..4fe0092 100644 --- a/app/models/config.py +++ b/app/models/config.py @@ -128,7 +128,7 @@ class Config: {'name': 'Fiji', 'value': 'countryFJ'}, {'name': 'Finland', 'value': 'countryFI'}, {'name': 'France', 'value': 'countryFR'}, - {'name': 'France\, Metropolitan', 'value': 'countryFX'}, + {'name': r'France\, Metropolitan', 'value': 'countryFX'}, {'name': 'French Guiana', 'value': 'countryGF'}, {'name': 'French Polynesia', 'value': 'countryPF'}, {'name': 'French Southern Territories', 'value': 'countryTF'}, @@ -167,7 +167,8 @@ class Config: {'name': 'Kazakhstan', 'value': 'countryKZ'}, {'name': 'Kenya', 'value': 'countryKE'}, {'name': 'Kiribati', 'value': 'countryKI'}, - {'name': 'Korea, Democratic People\'s Republic of', 'value': 'countryKP'}, + {'name': 'Korea, Democratic People\'s Republic of', + 'value': 'countryKP'}, {'name': 'Korea, Republic of', 'value': 'countryKR'}, {'name': 'Kuwait', 'value': 'countryKW'}, {'name': 'Kyrgyzstan', 'value': 'countryKG'}, @@ -181,7 +182,8 @@ class Config: {'name': 'Lithuania', 'value': 'countryLT'}, {'name': 'Luxembourg', 'value': 'countryLU'}, {'name': 'Macao', 'value': 'countryMO'}, - {'name': 'Macedonia, the Former Yugosalv Republic of', 'value': 'countryMK'}, + {'name': 'Macedonia, the Former Yugosalv Republic of', + 'value': 'countryMK'}, {'name': 'Madagascar', 'value': 'countryMG'}, {'name': 'Malawi', 'value': 'countryMW'}, {'name': 'Malaysia', 'value': 'countryMY'}, @@ -253,7 +255,8 @@ class Config: {'name': 'Solomon Islands', 'value': 'countrySB'}, {'name': 'Somalia', 'value': 'countrySO'}, {'name': 'South Africa', 'value': 'countryZA'}, - {'name': 'South Georgia and the South Sandwich Islands', 'value': 'countryGS'}, + {'name': 'South Georgia and the South Sandwich Islands', + 'value': 'countryGS'}, {'name': 'Spain', 'value': 'countryES'}, {'name': 'Sri Lanka', 'value': 'countryLK'}, {'name': 'Sudan', 'value': 'countrySD'}, @@ -310,6 +313,12 @@ class Config: self.alts = False self.new_tab = False self.get_only = False + self.safe_keys = [ + 'lang_search', + 'lang_interface', + 'ctry', + 'dark' + ] for key, value in kwargs.items(): setattr(self, key, value) @@ -338,12 +347,7 @@ class Config: array """ - return key in [ - 'lang_search', - 'lang_interface', - 'ctry', - 'dark' - ] + return key in self.safe_keys def from_params(self, params) -> 'Config': """Modify user config with search parameters. This is primarily diff --git a/app/request.py b/app/request.py index 04ae3db..b64f6c9 100644 --- a/app/request.py +++ b/app/request.py @@ -8,9 +8,9 @@ import os from stem import Signal, SocketError from stem.control import Controller -# Core Google search URLs SEARCH_URL = 'https://www.google.com/search?gbv=1&q=' -AUTOCOMPLETE_URL = 'https://suggestqueries.google.com/complete/search?client=toolbar&' +AUTOCOMPLETE_URL = ('https://suggestqueries.google.com/' + 'complete/search?client=toolbar&') MOBILE_UA = '{}/5.0 (Android 0; Mobile; rv:54.0) Gecko/54.0 {}/59.0' DESKTOP_UA = '{}/5.0 (X11; {} x86_64; rv:75.0) Gecko/20100101 {}/75.0' @@ -72,11 +72,16 @@ def gen_query(query, args, config, near_city=None) -> str: result_tbs = args.get('tbs') param_dict['tbs'] = '&tbs=' + result_tbs - # Occasionally the 'tbs' param provided by google also contains a field for 'lr', but formatted - # strangely. This is a (admittedly not very elegant) solution for this. - # Ex/ &tbs=qdr:h,lr:lang_1pl --> the lr param needs to be extracted and have the "1" digit removed in this case + # Occasionally the 'tbs' param provided by google also contains a + # field for 'lr', but formatted strangely. This is a rough solution + # for this. + # + # Example: + # &tbs=qdr:h,lr:lang_1pl + # -- the lr param needs to be extracted and remove the leading '1' sub_lang = [_ for _ in result_tbs.split(',') if 'lr:' in _] - sub_lang = sub_lang[0][sub_lang[0].find('lr:') + 3:len(sub_lang[0])] if len(sub_lang) > 0 else '' + sub_lang = sub_lang[0][sub_lang[0].find('lr:') + + 3:len(sub_lang[0])] if len(sub_lang) > 0 else '' # Ensure search query is parsable query = urlparse.quote(query) @@ -93,20 +98,26 @@ def gen_query(query, args, config, near_city=None) -> str: if near_city: param_dict['near'] = '&near=' + urlparse.quote(near_city) - # Set language for results (lr) if source isn't set, otherwise use the result - # language param provided by google (but with the strange digit(s) removed) + # Set language for results (lr) if source isn't set, otherwise use the + # result language param provided in the results if 'source' in args: param_dict['source'] = '&source=' + args.get('source') - param_dict['lr'] = ('&lr=' + ''.join([_ for _ in sub_lang if not _.isdigit()])) if sub_lang else '' + param_dict['lr'] = ('&lr=' + ''.join( + [_ for _ in sub_lang if not _.isdigit()] + )) if sub_lang else '' else: - param_dict['lr'] = ('&lr=' + config.lang_search) if config.lang_search else '' + param_dict['lr'] = ( + '&lr=' + config.lang_search + ) if config.lang_search else '' - # Set autocorrected search ignore + # 'nfpr' defines the exclusion of results from an auto-corrected query if 'nfpr' in args: param_dict['nfpr'] = '&nfpr=' + args.get('nfpr') param_dict['cr'] = ('&cr=' + config.ctry) if config.ctry else '' - param_dict['hl'] = ('&hl=' + config.lang_interface.replace('lang_', '')) if config.lang_interface else '' + param_dict['hl'] = ( + '&hl=' + config.lang_interface.replace('lang_', '') + ) if config.lang_interface else '' param_dict['safe'] = '&safe=' + ('active' if config.safe else 'off') for val in param_dict.values(): @@ -126,6 +137,7 @@ class Request: root_path -- the root path of the whoogle instance config -- the user's current whoogle configuration """ + def __init__(self, normal_ua, root_path, config: Config): # Send heartbeat to Tor, used in determining if the user can or cannot # enable Tor for future requests @@ -143,9 +155,10 @@ class Request: ':' + os.environ.get('WHOOGLE_PROXY_PASS') self.proxies = { 'http': os.environ.get('WHOOGLE_PROXY_TYPE') + '://' + - auth_str + '@' + os.environ.get('WHOOGLE_PROXY_LOC'), + auth_str + '@' + os.environ.get('WHOOGLE_PROXY_LOC'), } - self.proxies['https'] = self.proxies['http'].replace('http', 'https') + self.proxies['https'] = self.proxies['http'].replace('http', + 'https') else: self.proxies = { 'http': 'socks5://127.0.0.1:9050', @@ -169,7 +182,8 @@ class Request: """ ac_query = dict(hl=self.language, q=query) - response = self.send(base_url=AUTOCOMPLETE_URL, query=urlparse.urlencode(ac_query)).text + response = self.send(base_url=AUTOCOMPLETE_URL, + query=urlparse.urlencode(ac_query)).text if response: dom = etree.fromstring(response) @@ -178,14 +192,14 @@ class Request: return [] def send(self, base_url=SEARCH_URL, query='', attempt=0) -> Response: - """Sends an outbound request to a URL. Optionally sends the request using Tor, if - enabled by the user. + """Sends an outbound request to a URL. Optionally sends the request + using Tor, if enabled by the user. Args: base_url: The URL to use in the request query: The optional query string for the request - attempt: The number of attempts made for the request (used for cycling - through Tor identities, if enabled) + attempt: The number of attempts made for the request + (used for cycling through Tor identities, if enabled) Returns: Response: The Response object returned by the requests call @@ -195,21 +209,30 @@ class Request: 'User-Agent': self.modified_user_agent } - # Validate Tor connection and request new identity if the last one failed - if self.tor and not send_tor_signal(Signal.NEWNYM if attempt > 0 else Signal.HEARTBEAT): - raise TorError("Tor was previously enabled, but the connection has been dropped. Please check your " + - "Tor configuration and try again.", disable=True) + # Validate Tor conn and request new identity if the last one failed + if self.tor and not send_tor_signal( + Signal.NEWNYM if attempt > 0 else Signal.HEARTBEAT): + raise TorError( + "Tor was previously enabled, but the connection has been " + "dropped. Please check your Tor configuration and try again.", + disable=True) # Make sure that the tor connection is valid, if enabled if self.tor: - tor_check = requests.get('https://check.torproject.org/', proxies=self.proxies, headers=headers) + tor_check = requests.get('https://check.torproject.org/', + proxies=self.proxies, headers=headers) self.tor_valid = 'Congratulations' in tor_check.text if not self.tor_valid: - raise TorError("Tor connection succeeded, but the connection could not be validated by torproject.org", - disable=True) + raise TorError( + "Tor connection succeeded, but the connection could not " + "be validated by torproject.org", + disable=True) - response = requests.get(base_url + query, proxies=self.proxies, headers=headers) + response = requests.get( + base_url + query, + proxies=self.proxies, + headers=headers) # Retry query with new identity if using Tor (max 10 attempts) if 'form id="captcha-form"' in response.text and self.tor: diff --git a/app/routes.py b/app/routes.py index dba4b54..37f7983 100644 --- a/app/routes.py +++ b/app/routes.py @@ -9,7 +9,8 @@ import uuid from functools import wraps import waitress -from flask import jsonify, make_response, request, redirect, render_template, send_file, session, url_for +from flask import jsonify, make_response, request, redirect, render_template, \ + send_file, session, url_for from requests import exceptions from app import app @@ -30,23 +31,30 @@ def auth_required(f): # Skip if username/password not set whoogle_user = os.getenv('WHOOGLE_USER', '') whoogle_pass = os.getenv('WHOOGLE_PASS', '') - if (not whoogle_user or not whoogle_pass) or \ - (auth and whoogle_user == auth.username and whoogle_pass == auth.password): + if (not whoogle_user or not whoogle_pass) or ( + auth + and whoogle_user == auth.username + and whoogle_pass == auth.password): return f(*args, **kwargs) else: - return make_response('Not logged in', 401, {'WWW-Authenticate': 'Basic realm="Login Required"'}) + return make_response('Not logged in', 401, { + 'WWW-Authenticate': 'Basic realm="Login Required"'}) + return decorated @app.before_request def before_request_func(): - g.request_params = request.args if request.method == 'GET' else request.form + g.request_params = ( + request.args if request.method == 'GET' else request.form + ) g.cookies_disabled = False # Generate session values for user if unavailable if not valid_user_session(session): session['config'] = json.load(open(app.config['DEFAULT_CONFIG'])) \ - if os.path.exists(app.config['DEFAULT_CONFIG']) else {'url': request.url_root} + if os.path.exists(app.config['DEFAULT_CONFIG']) else { + 'url': request.url_root} session['uuid'] = str(uuid.uuid4()) session['fernet_keys'] = generate_user_keys(True) @@ -63,12 +71,16 @@ def before_request_func(): is_http = request.url.startswith('http://') if (is_heroku and is_http) or (https_only and is_http): - return redirect(request.url.replace('http://', 'https://', 1), code=308) + return redirect( + request.url.replace('http://', 'https://', 1), + code=308) g.user_config = Config(**session['config']) if not g.user_config.url: - g.user_config.url = request.url_root.replace('http://', 'https://') if https_only else request.url_root + g.user_config.url = request.url_root.replace( + 'http://', + 'https://') if https_only else request.url_root g.user_request = Request( request.headers.get('User-Agent'), @@ -82,13 +94,17 @@ def before_request_func(): def after_request_func(response): if app.user_elements[session['uuid']] <= 0 and '/element' in request.url: # Regenerate element key if all elements have been served to user - session['fernet_keys']['element_key'] = '' if not g.cookies_disabled else app.default_key_set['element_key'] + session['fernet_keys'][ + 'element_key'] = '' if not g.cookies_disabled else \ + app.default_key_set['element_key'] app.user_elements[session['uuid']] = 0 - # Check if address consistently has cookies blocked, in which case start removing session - # files after creation. - # Note: This is primarily done to prevent overpopulation of session directories, since browsers that - # block cookies will still trigger Flask's session creation routine with every request. + # Check if address consistently has cookies blocked, + # in which case start removing session files after creation. + # + # Note: This is primarily done to prevent overpopulation of session + # directories, since browsers that block cookies will still trigger + # Flask's session creation routine with every request. if g.cookies_disabled and request.remote_addr not in app.no_cookie_ips: app.no_cookie_ips.append(request.remote_addr) elif g.cookies_disabled and request.remote_addr in app.no_cookie_ips: @@ -101,6 +117,7 @@ def after_request_func(response): @app.errorhandler(404) def unknown_page(e): + app.logger.warn(e) return redirect(g.app_location) @@ -109,7 +126,8 @@ def unknown_page(e): def index(): # Reset keys session['fernet_keys'] = generate_user_keys(g.cookies_disabled) - error_message = session['error_message'] if 'error_message' in session else '' + error_message = session[ + 'error_message'] if 'error_message' in session else '' session['error_message'] = '' return render_template('index.html', @@ -128,7 +146,8 @@ def opensearch(): if opensearch_url.endswith('/'): opensearch_url = opensearch_url[:-1] - get_only = g.user_config.get_only or 'Chrome' in request.headers.get('User-Agent') + get_only = g.user_config.get_only or 'Chrome' in request.headers.get( + 'User-Agent') return render_template( 'opensearch.xml', @@ -147,16 +166,23 @@ def autocomplete(): # Search bangs if the query begins with "!", but not "! " (feeling lucky) if q.startswith('!') and len(q) > 1 and not q.startswith('! '): - return jsonify([q, [bang_json[_]['suggestion'] for _ in bang_json if _.startswith(q)]]) + return jsonify([q, [bang_json[_]['suggestion'] for _ in bang_json if + _.startswith(q)]]) if not q and not request.data: return jsonify({'?': []}) elif request.data: - q = urlparse.unquote_plus(request.data.decode('utf-8').replace('q=', '')) + q = urlparse.unquote_plus( + request.data.decode('utf-8').replace('q=', '')) # Return a list of suggestions for the query - # Note: If Tor is enabled, this returns nothing, as the request is almost always rejected - return jsonify([q, g.user_request.autocomplete(q) if not g.user_config.tor else []]) + # + # Note: If Tor is enabled, this returns nothing, as the request is + # almost always rejected + return jsonify([ + q, + g.user_request.autocomplete(q) if not g.user_config.tor else [] + ]) @app.route('/search', methods=['GET', 'POST']) @@ -168,7 +194,8 @@ def search(): # Update user config if specified in search args g.user_config = g.user_config.from_params(g.request_params) - search_util = RoutingUtils(request, g.user_config, session, cookies_disabled=g.cookies_disabled) + search_util = RoutingUtils(request, g.user_config, session, + cookies_disabled=g.cookies_disabled) query = search_util.new_search_query() resolved_bangs = search_util.bang_operator(bang_json) @@ -183,14 +210,17 @@ def search(): try: response, elements = search_util.generate_response() except TorError as e: - session['error_message'] = e.message + ("\\n\\nTor config is now disabled!" if e.disable else "") - session['config']['tor'] = False if e.disable else session['config']['tor'] + session['error_message'] = e.message + ( + "\\n\\nTor config is now disabled!" if e.disable else "") + session['config']['tor'] = False if e.disable else session['config'][ + 'tor'] return redirect(url_for('.index')) if search_util.feeling_lucky or elements < 0: return redirect(response, code=303) - # Keep count of external elements to fetch before element key can be regenerated + # Keep count of external elements to fetch before + # the element key can be regenerated app.user_elements[session['uuid']] = elements return render_template( @@ -200,12 +230,13 @@ def search(): dark_mode=g.user_config.dark, response=response, version_number=app.config['VERSION_NUMBER'], - search_header=render_template( + search_header=(render_template( 'header.html', dark_mode=g.user_config.dark, query=urlparse.unquote(query), search_type=search_util.search_type, - mobile=g.user_request.mobile) if 'isch' not in search_util.search_type else '') + mobile=g.user_request.mobile) + if 'isch' not in search_util.search_type else '')) @app.route('/config', methods=['GET', 'POST', 'PUT']) @@ -215,8 +246,12 @@ def config(): return json.dumps(g.user_config.__dict__) elif request.method == 'PUT': if 'name' in request.args: - config_pkl = os.path.join(app.config['CONFIG_PATH'], request.args.get('name')) - session['config'] = pickle.load(open(config_pkl, 'rb')) if os.path.exists(config_pkl) else session['config'] + config_pkl = os.path.join( + app.config['CONFIG_PATH'], + request.args.get('name')) + session['config'] = (pickle.load(open(config_pkl, 'rb')) + if os.path.exists(config_pkl) + else session['config']) return json.dumps(session['config']) else: return json.dumps({}) @@ -227,11 +262,16 @@ def config(): # Save config by name to allow a user to easily load later if 'name' in request.args: - pickle.dump(config_data, open(os.path.join(app.config['CONFIG_PATH'], request.args.get('name')), 'wb')) + pickle.dump( + config_data, + open(os.path.join( + app.config['CONFIG_PATH'], + request.args.get('name')), 'wb')) # Overwrite default config if user has cookies disabled if g.cookies_disabled: - open(app.config['DEFAULT_CONFIG'], 'w').write(json.dumps(config_data, indent=4)) + open(app.config['DEFAULT_CONFIG'], 'w').write( + json.dumps(config_data, indent=4)) session['config'] = config_data return redirect(config_data['url']) @@ -274,7 +314,8 @@ def element(): except exceptions.RequestException: pass - empty_gif = base64.b64decode('R0lGODlhAQABAIAAAP///////yH5BAEKAAEALAAAAAABAAEAAAICTAEAOw==') + empty_gif = base64.b64decode( + 'R0lGODlhAQABAIAAAP///////yH5BAEKAAEALAAAAAABAAEAAAICTAEAOw==') return send_file(io.BytesIO(empty_gif), mimetype='image/gif') @@ -282,38 +323,62 @@ def element(): @auth_required def window(): get_body = g.user_request.send(base_url=request.args.get('location')).text - get_body = get_body.replace('src="/', 'src="' + request.args.get('location') + '"') - get_body = get_body.replace('href="/', 'href="' + request.args.get('location') + '"') + get_body = get_body.replace('src="/', + 'src="' + request.args.get('location') + '"') + get_body = get_body.replace('href="/', + 'href="' + request.args.get('location') + '"') - results = BeautifulSoup(get_body, 'html.parser') + results = bsoup(get_body, 'html.parser') - try: - for script in results('script'): - script.decompose() - except Exception: - pass + for script in results('script'): + script.decompose() return render_template('display.html', response=results) def run_app(): - parser = argparse.ArgumentParser(description='Whoogle Search console runner') - parser.add_argument('--port', default=5000, metavar='', - help='Specifies a port to run on (default 5000)') - parser.add_argument('--host', default='127.0.0.1', metavar='', - help='Specifies the host address to use (default 127.0.0.1)') - parser.add_argument('--debug', default=False, action='store_true', - help='Activates debug mode for the server (default False)') - parser.add_argument('--https-only', default=False, action='store_true', - help='Enforces HTTPS redirects for all requests') - parser.add_argument('--userpass', default='', metavar='', - help='Sets a username/password basic auth combo (default None)') - parser.add_argument('--proxyauth', default='', metavar='', - help='Sets a username/password for a HTTP/SOCKS proxy (default None)') - parser.add_argument('--proxytype', default='', metavar='', - help='Sets a proxy type for all connections (default None)') - parser.add_argument('--proxyloc', default='', metavar='', - help='Sets a proxy location for all connections (default None)') + parser = argparse.ArgumentParser( + description='Whoogle Search console runner') + parser.add_argument( + '--port', + default=5000, + metavar='', + help='Specifies a port to run on (default 5000)') + parser.add_argument( + '--host', + default='127.0.0.1', + metavar='', + help='Specifies the host address to use (default 127.0.0.1)') + parser.add_argument( + '--debug', + default=False, + action='store_true', + help='Activates debug mode for the server (default False)') + parser.add_argument( + '--https-only', + default=False, + action='store_true', + help='Enforces HTTPS redirects for all requests') + parser.add_argument( + '--userpass', + default='', + metavar='', + help='Sets a username/password basic auth combo (default None)') + parser.add_argument( + '--proxyauth', + default='', + metavar='', + help='Sets a username/password for a HTTP/SOCKS proxy (default None)') + parser.add_argument( + '--proxytype', + default='', + metavar='', + help='Sets a proxy type for all connections (default None)') + parser.add_argument( + '--proxyloc', + default='', + metavar='', + help='Sets a proxy location for all connections (default None)') args = parser.parse_args() if args.userpass: diff --git a/app/utils/filter_utils.py b/app/utils/filter_utils.py index 0efb4ff..877c38c 100644 --- a/app/utils/filter_utils.py +++ b/app/utils/filter_utils.py @@ -7,14 +7,16 @@ SKIP_ARGS = ['ref_src', 'utm'] FULL_RES_IMG = '
Full Image' GOOG_IMG = '/images/branding/searchlogo/1x/googlelogo' LOGO_URL = GOOG_IMG + '_desk' -BLANK_B64 = ''' - -''' +BLANK_B64 = ('data:image/png;base64,' + 'iVBORw0KGgoAAAANSUhEUgAAAAoAAAAKCAQAAAAnOwc2AAAAD0lEQVR42mNkw' + 'AIYh7IgAAVVAAuInjI5AAAAAElFTkSuQmCC') +# Ad keywords BLACKLIST = [ - 'ad', 'anuncio', 'annuncio', 'annonce', 'Anzeige', '广告', '廣告', 'Reklama', 'Реклама', 'Anunț', '광고', - 'annons', 'Annonse', 'Iklan', '広告', 'Augl.', 'Mainos', 'Advertentie', 'إعلان', 'Գովազդ', 'विज्ञापन', 'Reklam', - 'آگهی', 'Reklāma', 'Reklaam', 'Διαφήμιση', 'מודעה', 'Hirdetés', 'Anúncio' + 'ad', 'anuncio', 'annuncio', 'annonce', 'Anzeige', '广告', '廣告', 'Reklama', + 'Реклама', 'Anunț', '광고', 'annons', 'Annonse', 'Iklan', '広告', 'Augl.', + 'Mainos', 'Advertentie', 'إعلان', 'Գովազդ', 'विज्ञापन', 'Reklam', 'آگهی', + 'Reklāma', 'Reklaam', 'Διαφήμιση', 'מודעה', 'Hirdetés', 'Anúncio' ] SITE_ALTS = { @@ -25,7 +27,8 @@ SITE_ALTS = { def has_ad_content(element: str): - return element.upper() in (value.upper() for value in BLACKLIST) or 'ⓘ' in element + return element.upper() in (value.upper() for value in BLACKLIST) \ + or 'ⓘ' in element def get_first_link(soup): diff --git a/app/utils/routing_utils.py b/app/utils/routing_utils.py index 703b7e0..3822d48 100644 --- a/app/utils/routing_utils.py +++ b/app/utils/routing_utils.py @@ -1,25 +1,26 @@ from app.filter import Filter, get_first_link from app.utils.session_utils import generate_user_keys from app.request import gen_query -from bs4 import BeautifulSoup +from bs4 import BeautifulSoup as bsoup from cryptography.fernet import Fernet, InvalidToken from flask import g from typing import Any, Tuple - TOR_BANNER = '

You are using Tor


' class RoutingUtils: def __init__(self, request, config, session, cookies_disabled=False): - self.request_params = request.args if request.method == 'GET' else request.form + method = request.method + self.request_params = request.args if method == 'GET' else request.form self.user_agent = request.headers.get('User-Agent') self.feeling_lucky = False self.config = config self.session = session self.query = '' self.cookies_disabled = cookies_disabled - self.search_type = self.request_params.get('tbm') if 'tbm' in self.request_params else '' + self.search_type = self.request_params.get( + 'tbm') if 'tbm' in self.request_params else '' def __getitem__(self, name): return getattr(self, name) @@ -45,7 +46,9 @@ class RoutingUtils: else: # Attempt to decrypt if this is an internal link try: - q = Fernet(self.session['fernet_keys']['text_key']).decrypt(q.encode()).decode() + q = Fernet( + self.session['fernet_keys']['text_key'] + ).decrypt(q.encode()).decode() except InvalidToken: pass @@ -53,29 +56,40 @@ class RoutingUtils: self.session['fernet_keys']['text_key'] = generate_user_keys( cookies_disabled=self.cookies_disabled)['text_key'] - # Format depending on whether or not the query is a "feeling lucky" query + # Strip leading '! ' for "feeling lucky" queries self.feeling_lucky = q.startswith('! ') self.query = q[2:] if self.feeling_lucky else q return self.query def bang_operator(self, bangs_dict: dict) -> str: for operator in bangs_dict.keys(): - if self.query.split(' ')[0] == operator: - return bangs_dict[operator]['url'].format(self.query.replace(operator, '').strip()) + if self.query.split(' ')[0] != operator: + continue + + return bangs_dict[operator]['url'].format( + self.query.replace(operator, '').strip()) return '' def generate_response(self) -> Tuple[Any, int]: mobile = 'Android' in self.user_agent or 'iPhone' in self.user_agent - content_filter = Filter(self.session['fernet_keys'], mobile=mobile, config=self.config) - full_query = gen_query(self.query, self.request_params, self.config, content_filter.near) + content_filter = Filter( + self.session['fernet_keys'], + mobile=mobile, + config=self.config) + full_query = gen_query( + self.query, + self.request_params, + self.config, + content_filter.near) get_body = g.user_request.send(query=full_query) # Produce cleanable html soup from response - html_soup = BeautifulSoup(content_filter.reskin(get_body.text), 'html.parser') - html_soup.insert(0, BeautifulSoup( - TOR_BANNER, - features='lxml') if g.user_request.tor_valid else BeautifulSoup("", features="lxml")) + html_soup = bsoup(content_filter.reskin(get_body.text), 'html.parser') + html_soup.insert( + 0, + bsoup(TOR_BANNER, 'html.parser') + if g.user_request.tor_valid else bsoup('', 'html.parser')) if self.feeling_lucky: return get_first_link(html_soup), 1 @@ -83,11 +97,13 @@ class RoutingUtils: formatted_results = content_filter.clean(html_soup) # Append user config to all search links, if available - param_str = ''.join('&{}={}'.format(k, v) - for k, v in self.request_params.to_dict(flat=True).items() - if self.config.is_safe_key(k)) + param_str = ''.join('&{}={}'.format(k, v) + for k, v in + self.request_params.to_dict(flat=True).items() + if self.config.is_safe_key(k)) for link in formatted_results.find_all('a', href=True): - if 'search?' not in link['href'] or link['href'].index('search?') > 1: + if 'search?' not in link['href'] or link['href'].index( + 'search?') > 1: continue link['href'] += param_str diff --git a/requirements.txt b/requirements.txt index 508cbcf..399d342 100644 --- a/requirements.txt +++ b/requirements.txt @@ -18,6 +18,7 @@ more-itertools==8.3.0 packaging==20.4 pluggy==0.13.1 py==1.8.1 +pycodestyle==2.6.0 pycparser==2.19 pyOpenSSL==19.1.0 pyparsing==2.4.7 diff --git a/test/test_results.py b/test/test_results.py index 119ac7f..74af29c 100644 --- a/test/test_results.py +++ b/test/test_results.py @@ -3,13 +3,12 @@ from app.filter import Filter from app.utils.session_utils import generate_user_keys from datetime import datetime from dateutil.parser import * -import json -import os def get_search_results(data): secret_key = generate_user_keys() - soup = Filter(user_keys=secret_key).clean(BeautifulSoup(data, 'html.parser')) + soup = Filter(user_keys=secret_key).clean( + BeautifulSoup(data, 'html.parser')) main_divs = soup.find('div', {'id': 'main'}) assert len(main_divs) > 1 @@ -17,7 +16,9 @@ def get_search_results(data): result_divs = [] for div in main_divs: # Result divs should only have 1 inner div - if len(list(div.children)) != 1 or not div.findChild() or 'div' not in div.findChild().name: + if (len(list(div.children)) != 1 + or not div.findChild() + or 'div' not in div.findChild().name): continue result_divs.append(div) @@ -78,6 +79,7 @@ def test_recent_results(client): try: date = parse(date_span) - assert (current_date - date).days <= (num_days + 5) # Date can have a little bit of wiggle room + # Date can have a little bit of wiggle room + assert (current_date - date).days <= (num_days + 5) except ParserError: pass