From df0b7afa50193aa70b48360a5502ef8cd4b28164 Mon Sep 17 00:00:00 2001 From: Ben Busby Date: Thu, 1 Apr 2021 00:23:30 -0400 Subject: [PATCH] Switch to single Fernet key per session This moves away from the previous (messy) approach of using two separate keys for decrypting text and element URLs separately and regenerating them for new searches. The current implementation of sessions is not very reliable, which lead to keys being regenerated too soon, which would break page navigation. Until that can be addressed, the single key per session approach should work a lot better. Fixes #250 Fixes #90 --- app/__init__.py | 5 ++--- app/filter.py | 12 ++++-------- app/request.py | 4 ++-- app/routes.py | 28 +++++----------------------- app/utils/search.py | 30 ++++++++---------------------- app/utils/session.py | 17 +++++++---------- test/conftest.py | 4 ++-- test/test_misc.py | 22 +++++++++++++--------- test/test_results.py | 6 +++--- test/test_routes.py | 9 ++++++++- 10 files changed, 54 insertions(+), 83 deletions(-) diff --git a/app/__init__.py b/app/__init__.py index f88f9b3..136b5c8 100644 --- a/app/__init__.py +++ b/app/__init__.py @@ -1,5 +1,5 @@ from app.request import send_tor_signal -from app.utils.session import generate_user_keys +from app.utils.session import generate_user_key from app.utils.bangs import gen_bangs_json from flask import Flask from flask_session import Session @@ -17,8 +17,7 @@ if os.getenv("WHOOGLE_DOTENV", ''): load_dotenv(os.path.join(os.path.dirname(os.path.abspath(__file__)), dotenv_path)) -app.user_elements = {} -app.default_key_set = generate_user_keys() +app.default_key = generate_user_key() app.no_cookie_ips = [] app.config['SECRET_KEY'] = os.urandom(32) app.config['SESSION_TYPE'] = 'filesystem' diff --git a/app/filter.py b/app/filter.py index d03a112..e93c91b 100644 --- a/app/filter.py +++ b/app/filter.py @@ -9,7 +9,7 @@ from urllib.parse import parse_qs class Filter: - def __init__(self, user_keys: dict, mobile=False, config=None) -> None: + def __init__(self, user_key: str, mobile=False, config=None) -> None: if config is None: config = {} @@ -19,7 +19,7 @@ class Filter: self.new_tab = config['new_tab'] if 'new_tab' in config else False self.alt_redirect = config['alts'] if 'alts' in config else False self.mobile = mobile - self.user_keys = user_keys + self.user_key = user_key self.main_divs = ResultSet('') self._elements = 0 @@ -45,15 +45,11 @@ class Filter: if is_element: # Element paths are encrypted separately from text, to allow key # regeneration once all items have been served to the user - enc_path = Fernet( - self.user_keys['element_key'] - ).encrypt(path.encode()).decode() + enc_path = Fernet(self.user_key).encrypt(path.encode()).decode() self._elements += 1 return enc_path - return Fernet( - self.user_keys['text_key'] - ).encrypt(path.encode()).decode() + return Fernet(self.user_key).encrypt(path.encode()).decode() def clean(self, soup) -> BeautifulSoup: self.main_divs = soup.find('div', {'id': 'main'}) diff --git a/app/request.py b/app/request.py index fadcc18..4cd9fcf 100644 --- a/app/request.py +++ b/app/request.py @@ -108,7 +108,7 @@ def gen_query(query, args, config, near_city=None) -> str: )) if lang else '' else: param_dict['lr'] = ( - '&lr=' + config.lang_search + '&lr=' + config.lang_search ) if config.lang_search else '' # 'nfpr' defines the exclusion of results from an auto-corrected query @@ -117,7 +117,7 @@ def gen_query(query, args, config, near_city=None) -> str: param_dict['cr'] = ('&cr=' + config.ctry) if config.ctry else '' param_dict['hl'] = ( - '&hl=' + config.lang_interface.replace('lang_', '') + '&hl=' + config.lang_interface.replace('lang_', '') ) if config.lang_interface else '' param_dict['safe'] = '&safe=' + ('active' if config.safe else 'off') diff --git a/app/routes.py b/app/routes.py index b463e67..4fae8eb 100644 --- a/app/routes.py +++ b/app/routes.py @@ -56,15 +56,12 @@ def before_request_func(): session['config'] = json.load(open(app.config['DEFAULT_CONFIG'])) \ if os.path.exists(app.config['DEFAULT_CONFIG']) else {} session['uuid'] = str(uuid.uuid4()) - session['fernet_keys'] = generate_user_keys(True) + session['key'] = generate_user_key(True) # Flag cookies as possibly disabled in order to prevent against # unnecessary session directory expansion g.cookies_disabled = True - if session['uuid'] not in app.user_elements: - app.user_elements.update({session['uuid']: 0}) - # Handle https upgrade if needs_https(request.url): return redirect( @@ -88,13 +85,6 @@ def before_request_func(): @app.after_request def after_request_func(resp): - if app.user_elements[session['uuid']] <= 0 and '/element' in request.url: - # Regenerate element key if all elements have been served to user - session['fernet_keys'][ - 'element_key'] = '' if not g.cookies_disabled else \ - app.default_key_set['element_key'] - app.user_elements[session['uuid']] = 0 - # Check if address consistently has cookies blocked, # in which case start removing session files after creation. # @@ -125,7 +115,7 @@ def unknown_page(e): @auth_required def index(): # Reset keys - session['fernet_keys'] = generate_user_keys(g.cookies_disabled) + session['key'] = generate_user_key(g.cookies_disabled) # Redirect if an error was raised if 'error_message' in session and session['error_message']: @@ -193,9 +183,6 @@ def autocomplete(): @app.route('/search', methods=['GET', 'POST']) @auth_required def search(): - # Reset element counter - app.user_elements[session['uuid']] = 0 - # Update user config if specified in search args g.user_config = g.user_config.from_params(g.request_params) @@ -213,7 +200,7 @@ def search(): # Generate response and number of external elements from the page try: - response, elements = search_util.generate_response() + response = search_util.generate_response() except TorError as e: session['error_message'] = e.message + ( "\\n\\nTor config is now disabled!" if e.disable else "") @@ -221,13 +208,9 @@ def search(): 'tor'] return redirect(url_for('.index')) - if search_util.feeling_lucky or elements < 0: + if search_util.feeling_lucky: return redirect(response, code=303) - # Keep count of external elements to fetch before - # the element key can be regenerated - app.user_elements[session['uuid']] = elements - # Return 503 if temporarily blocked by captcha resp_code = 503 if has_captcha(str(response)) else 200 @@ -309,13 +292,12 @@ def imgres(): @app.route('/element') @auth_required def element(): - cipher_suite = Fernet(session['fernet_keys']['element_key']) + cipher_suite = Fernet(session['key']) src_url = cipher_suite.decrypt(request.args.get('url').encode()).decode() src_type = request.args.get('type') try: file_data = g.user_request.send(base_url=src_url).content - app.user_elements[session['uuid']] -= 1 tmp_mem = io.BytesIO() tmp_mem.write(file_data) tmp_mem.seek(0) diff --git a/app/utils/search.py b/app/utils/search.py index 197186b..0a944d0 100644 --- a/app/utils/search.py +++ b/app/utils/search.py @@ -1,5 +1,5 @@ from app.filter import Filter, get_first_link -from app.utils.session import generate_user_keys +from app.utils.session import generate_user_key from app.request import gen_query from bs4 import BeautifulSoup as bsoup from cryptography.fernet import Fernet, InvalidToken @@ -87,10 +87,6 @@ class Search: str: A valid query string """ - # Generate a new element key each time a new search is performed - self.session['fernet_keys']['element_key'] = generate_user_keys( - cookies_disabled=self.cookies_disabled)['element_key'] - q = self.request_params.get('q') if q is None or len(q) == 0: @@ -98,36 +94,26 @@ class Search: else: # Attempt to decrypt if this is an internal link try: - q = Fernet( - self.session['fernet_keys']['text_key'] - ).decrypt(q.encode()).decode() + q = Fernet(self.session['key']).decrypt(q.encode()).decode() except InvalidToken: pass - # Reset text key - self.session['fernet_keys']['text_key'] = generate_user_keys( - cookies_disabled=self.cookies_disabled)['text_key'] - # Strip leading '! ' for "feeling lucky" queries self.feeling_lucky = q.startswith('! ') self.query = q[2:] if self.feeling_lucky else q return self.query - def generate_response(self) -> Tuple[Any, int]: + def generate_response(self) -> str: """Generates a response for the user's query Returns: - Tuple[Any, int]: A tuple in the format (response, # of elements) - For example, in the case of a "feeling lucky" - search, the response is a result URL, with no - encrypted elements to account for. Otherwise, the - response is a BeautifulSoup response body, with - N encrypted elements to track before key regen. + str: A string response to the search query, in the form of a URL + or string representation of HTML content. """ mobile = 'Android' in self.user_agent or 'iPhone' in self.user_agent - content_filter = Filter(self.session['fernet_keys'], + content_filter = Filter(self.session['key'], mobile=mobile, config=self.config) full_query = gen_query(self.query, @@ -146,7 +132,7 @@ class Search: html_soup.insert(0, tor_banner) if self.feeling_lucky: - return get_first_link(html_soup), 0 + return get_first_link(html_soup) else: formatted_results = content_filter.clean(html_soup) @@ -161,4 +147,4 @@ class Search: continue link['href'] += param_str - return formatted_results, content_filter.elements + return str(formatted_results) diff --git a/app/utils/session.py b/app/utils/session.py index f34d725..0dc8dc5 100644 --- a/app/utils/session.py +++ b/app/utils/session.py @@ -1,29 +1,26 @@ from cryptography.fernet import Fernet from flask import current_app as app -REQUIRED_SESSION_VALUES = ['uuid', 'config', 'fernet_keys'] +REQUIRED_SESSION_VALUES = ['uuid', 'config', 'key'] -def generate_user_keys(cookies_disabled=False) -> dict: - """Generates a set of user keys +def generate_user_key(cookies_disabled=False) -> bytes: + """Generates a key for encrypting searches and element URLs Args: cookies_disabled: Flag for whether or not cookies are disabled by the user. If so, the user can only use the default key - set generated on app init for queries. + generated on app init for queries. Returns: - dict: A new Fernet key set + str: A unique Fernet key """ if cookies_disabled: - return app.default_key_set + return app.default_key # Generate/regenerate unique key per user - return { - 'element_key': Fernet.generate_key(), - 'text_key': Fernet.generate_key() - } + return Fernet.generate_key() def valid_user_session(session: dict) -> bool: diff --git a/test/conftest.py b/test/conftest.py index f0912de..34c92c4 100644 --- a/test/conftest.py +++ b/test/conftest.py @@ -1,5 +1,5 @@ from app import app -from app.utils.session import generate_user_keys +from app.utils.session import generate_user_key import pytest import random @@ -18,6 +18,6 @@ def client(): with app.test_client() as client: with client.session_transaction() as session: session['uuid'] = 'test' - session['fernet_keys'] = generate_user_keys() + session['key'] = generate_user_key() session['config'] = {} yield client diff --git a/test/test_misc.py b/test/test_misc.py index e399b4a..65a4ed0 100644 --- a/test/test_misc.py +++ b/test/test_misc.py @@ -1,20 +1,26 @@ -from app.utils.session import generate_user_keys, valid_user_session +from cryptography.fernet import Fernet + +from app.utils.session import generate_user_key, valid_user_session def test_generate_user_keys(): - keys = generate_user_keys() - assert 'text_key' in keys - assert 'element_key' in keys - assert keys['text_key'] not in keys['element_key'] + key = generate_user_key() + assert Fernet(key) + assert generate_user_key() != key def test_valid_session(client): - assert not valid_user_session({'fernet_keys': '', 'config': {}}) + assert not valid_user_session({'key': '', 'config': {}}) with client.session_transaction() as session: assert valid_user_session(session) -def test_request_key_generation(client): +def test_query_decryption(client): + # FIXME: Handle decryption errors in search.py and rewrite test + # This previously was used to test swapping decryption keys between + # queries. While this worked in theory and usually didn't cause problems, + # they were tied to session IDs and those are really unreliable (meaning + # that occasionally page navigation would break). rv = client.get('/') cookie = rv.headers['Set-Cookie'] @@ -23,11 +29,9 @@ def test_request_key_generation(client): with client.session_transaction() as session: assert valid_user_session(session) - text_key = session['fernet_keys']['text_key'] rv = client.get('/search?q=test+2', headers={'Cookie': cookie}) assert rv._status_code == 200 with client.session_transaction() as session: assert valid_user_session(session) - assert text_key not in session['fernet_keys']['text_key'] diff --git a/test/test_results.py b/test/test_results.py index c0f7fd1..38b9936 100644 --- a/test/test_results.py +++ b/test/test_results.py @@ -1,13 +1,13 @@ from bs4 import BeautifulSoup from app.filter import Filter -from app.utils.session import generate_user_keys +from app.utils.session import generate_user_key from datetime import datetime from dateutil.parser import * def get_search_results(data): - secret_key = generate_user_keys() - soup = Filter(user_keys=secret_key).clean( + secret_key = generate_user_key() + soup = Filter(user_key=secret_key).clean( BeautifulSoup(data, 'html.parser')) main_divs = soup.find('div', {'id': 'main'}) diff --git a/test/test_routes.py b/test/test_routes.py index e3ba084..fda189d 100644 --- a/test/test_routes.py +++ b/test/test_routes.py @@ -19,14 +19,21 @@ def test_feeling_lucky(client): def test_ddg_bang(client): + # Bang at beginning of query rv = client.get('/search?q=!gh%20whoogle') assert rv._status_code == 302 assert rv.headers.get('Location').startswith('https://github.com') - rv = client.get('/search?q=!w%20github') + # Move bang to end of query + rv = client.get('/search?q=github%20!w') assert rv._status_code == 302 assert rv.headers.get('Location').startswith('https://en.wikipedia.org') + # Move bang to middle of query + rv = client.get('/search?q=big%20!r%20chungus') + assert rv._status_code == 302 + assert rv.headers.get('Location').startswith('https://www.reddit.com') + def test_config(client): rv = client.post('/config', data=demo_config)