diff --git a/app/__init__.py b/app/__init__.py index 4d2adb0..c3fe504 100644 --- a/app/__init__.py +++ b/app/__init__.py @@ -21,9 +21,9 @@ if os.getenv("WHOOGLE_DOTENV", ''): dotenv_path)) app.default_key = generate_user_key() -app.no_cookie_ips = [] app.config['SECRET_KEY'] = os.urandom(32) app.config['SESSION_TYPE'] = 'filesystem' +app.config['SESSION_COOKIE_SAMESITE'] = 'strict' app.config['VERSION_NUMBER'] = '0.6.0' app.config['APP_ROOT'] = os.getenv( 'APP_ROOT', diff --git a/app/filter.py b/app/filter.py index 65bc73b..452b71c 100644 --- a/app/filter.py +++ b/app/filter.py @@ -1,3 +1,4 @@ +from app.models.endpoint import Endpoint from app.request import VALID_PARAMS, MAPS_URL from app.utils.misc import read_config_bool from app.utils.results import * @@ -250,7 +251,7 @@ class Filter: element['src'] = BLANK_B64 return - element['src'] = 'element?url=' + self.encrypt_path( + element['src'] = f'{Endpoint.element}?url=' + self.encrypt_path( src, is_element=True) + '&type=' + urlparse.quote(mime) @@ -385,7 +386,8 @@ class Filter: if len(urls) != 2: continue - img_url = urlparse.unquote(urls[0].replace('/imgres?imgurl=', '')) + img_url = urlparse.unquote(urls[0].replace( + f'/{Endpoint.imgres}?imgurl=', '')) try: # Try to strip out only the necessary part of the web page link diff --git a/app/models/config.py b/app/models/config.py index ef4204f..bb10d12 100644 --- a/app/models/config.py +++ b/app/models/config.py @@ -18,7 +18,7 @@ class Config: self.block_title = os.getenv('WHOOGLE_CONFIG_BLOCK_TITLE', '') self.block_url = os.getenv('WHOOGLE_CONFIG_BLOCK_URL', '') self.ctry = os.getenv('WHOOGLE_CONFIG_COUNTRY', '') - self.theme = os.getenv('WHOOGLE_CONFIG_THEME', '') + self.theme = os.getenv('WHOOGLE_CONFIG_THEME', 'system') self.safe = read_config_bool('WHOOGLE_CONFIG_SAFE') self.dark = read_config_bool('WHOOGLE_CONFIG_DARK') # deprecated self.alts = read_config_bool('WHOOGLE_CONFIG_ALTS') diff --git a/app/models/endpoint.py b/app/models/endpoint.py new file mode 100644 index 0000000..eeddc64 --- /dev/null +++ b/app/models/endpoint.py @@ -0,0 +1,23 @@ +from enum import Enum + + +class Endpoint(Enum): + autocomplete = 'autocomplete' + home = 'home' + healthz = 'healthz' + session = 'session' + config = 'config' + opensearch = 'opensearch.xml' + search = 'search' + search_html = 'search.html' + url = 'url' + imgres = 'imgres' + element = 'element' + window = 'window' + + def __str__(self): + return self.value + + def in_path(self, path: str) -> bool: + return path.startswith(self.value) or \ + path.startswith(f'/{self.value}') diff --git a/app/routes.py b/app/routes.py index 0889407..2e066fc 100644 --- a/app/routes.py +++ b/app/routes.py @@ -1,16 +1,17 @@ import argparse import base64 -import html import io import json import pickle import urllib.parse as urlparse import uuid +from datetime import timedelta from functools import wraps import waitress from app import app from app.models.config import Config +from app.models.endpoint import Endpoint from app.request import Request, TorError from app.utils.bangs import resolve_bang from app.utils.misc import read_config_bool, get_client_ip @@ -22,6 +23,7 @@ from bs4 import BeautifulSoup as bsoup from flask import jsonify, make_response, request, redirect, render_template, \ send_file, session, url_for from requests import exceptions, get +from requests.models import PreparedRequest # Load DDG bang json files only on init bang_json = json.load(open(app.config['BANG_FILE'])) @@ -57,23 +59,79 @@ def auth_required(f): return decorated +def session_required(f): + @wraps(f) + def decorated(*args, **kwargs): + if (valid_user_session(session) and + 'cookies_disabled' not in request.args): + g.session_key = session['key'] + else: + session.pop('_permanent', None) + g.session_key = app.default_key + + # Clear out old sessions + invalid_sessions = [] + for user_session in os.listdir(app.config['SESSION_FILE_DIR']): + session_path = os.path.join( + app.config['SESSION_FILE_DIR'], + user_session) + try: + with open(session_path, 'rb') as session_file: + _ = pickle.load(session_file) + data = pickle.load(session_file) + if isinstance(data, dict) and 'valid' in data: + continue + invalid_sessions.append(session_path) + except (EOFError, FileNotFoundError): + pass + + for invalid_session in invalid_sessions: + os.remove(invalid_session) + + return f(*args, **kwargs) + + return decorated + + @app.before_request def before_request_func(): g.request_params = ( request.args if request.method == 'GET' else request.form ) - g.cookies_disabled = False + + # Skip pre-request actions if verifying session + if '/session' in request.path and not valid_user_session(session): + return + + default_config = json.load(open(app.config['DEFAULT_CONFIG'])) \ + if os.path.exists(app.config['DEFAULT_CONFIG']) else {} # Generate session values for user if unavailable - if not valid_user_session(session): - session['config'] = json.load(open(app.config['DEFAULT_CONFIG'])) \ - if os.path.exists(app.config['DEFAULT_CONFIG']) else {} + if (not valid_user_session(session) and + 'cookies_disabled' not in request.args): + session['config'] = default_config session['uuid'] = str(uuid.uuid4()) - session['key'] = generate_user_key(True) + session['key'] = generate_user_key() - # Flag cookies as possibly disabled in order to prevent against - # unnecessary session directory expansion - g.cookies_disabled = True + # Skip checking for session on /autocomplete searches, + # since they can be done from the browser search bar (aka + # no ability to initialize a session) + if not Endpoint.autocomplete.in_path(request.path): + return redirect(url_for( + 'session_check', + session_id=session['uuid'], + follow=request.url), code=307) + else: + g.user_config = Config(**session['config']) + elif 'cookies_disabled' not in request.args: + # Set session as permanent + session.permanent = True + app.permanent_session_lifetime = timedelta(days=365) + g.user_config = Config(**session['config']) + else: + # User has cookies disabled, fall back to immutable default config + session.pop('_permanent', None) + g.user_config = Config(**default_config) # Handle https upgrade if needs_https(request.url): @@ -81,8 +139,6 @@ def before_request_func(): request.url.replace('http://', 'https://', 1), code=308) - g.user_config = Config(**session['config']) - if not g.user_config.url: g.user_config.url = request.url_root.replace( 'http://', @@ -98,19 +154,6 @@ def before_request_func(): @app.after_request def after_request_func(resp): - # Check if address consistently has cookies blocked, - # in which case start removing session files after creation. - # - # Note: This is primarily done to prevent overpopulation of session - # directories, since browsers that block cookies will still trigger - # Flask's session creation routine with every request. - if g.cookies_disabled and request.remote_addr not in app.no_cookie_ips: - app.no_cookie_ips.append(request.remote_addr) - elif g.cookies_disabled and request.remote_addr in app.no_cookie_ips: - session_list = list(session.keys()) - for key in session_list: - session.pop(key) - resp.headers['Content-Security-Policy'] = app.config['CSP'] if os.environ.get('HTTPS_ONLY', False): resp.headers['Content-Security-Policy'] += 'upgrade-insecure-requests' @@ -124,22 +167,28 @@ def unknown_page(e): return redirect(g.app_location) -@app.route('/healthz', methods=['GET']) +@app.route(f'/{Endpoint.healthz}', methods=['GET']) def healthz(): return '' -@app.route('/home', methods=['GET']) -def home(): - return redirect(url_for('.index')) +@app.route(f'/{Endpoint.session}/', methods=['GET', 'PUT', 'POST']) +def session_check(session_id): + if 'uuid' in session and session['uuid'] == session_id: + session['valid'] = True + return redirect(request.args.get('follow'), code=307) + else: + follow_url = request.args.get('follow') + req = PreparedRequest() + req.prepare_url(follow_url, {'cookies_disabled': 1}) + session.pop('_permanent', None) + return redirect(req.url, code=307) @app.route('/', methods=['GET']) +@app.route(f'/{Endpoint.home}', methods=['GET']) @auth_required def index(): - # Reset keys - session['key'] = generate_user_key(g.cookies_disabled) - # Redirect if an error was raised if 'error_message' in session and session['error_message']: error_message = session['error_message'] @@ -157,13 +206,16 @@ def index(): logo=render_template( 'logo.html', dark=g.user_config.dark), - config_disabled=app.config['CONFIG_DISABLE'], + config_disabled=( + app.config['CONFIG_DISABLE'] or + not valid_user_session(session) or + 'cookies_disabled' in request.args), config=g.user_config, tor_available=int(os.environ.get('TOR_AVAILABLE')), version_number=app.config['VERSION_NUMBER']) -@app.route('/opensearch.xml', methods=['GET']) +@app.route(f'/{Endpoint.opensearch}', methods=['GET']) def opensearch(): opensearch_url = g.app_location if opensearch_url.endswith('/'): @@ -183,7 +235,7 @@ def opensearch(): ), 200, {'Content-Disposition': 'attachment; filename="opensearch.xml"'} -@app.route('/search.html', methods=['GET']) +@app.route(f'/{Endpoint.search_html}', methods=['GET']) def search_html(): search_url = g.app_location if search_url.endswith('/'): @@ -191,7 +243,7 @@ def search_html(): return render_template('search.html', url=search_url) -@app.route('/autocomplete', methods=['GET', 'POST']) +@app.route(f'/{Endpoint.autocomplete}', methods=['GET', 'POST']) def autocomplete(): ac_var = 'WHOOGLE_AUTOCOMPLETE' if os.getenv(ac_var) and not read_config_bool(ac_var): @@ -224,14 +276,14 @@ def autocomplete(): ]) -@app.route('/search', methods=['GET', 'POST']) +@app.route(f'/{Endpoint.search}', methods=['GET', 'POST']) +@session_required @auth_required def search(): # Update user config if specified in search args g.user_config = g.user_config.from_params(g.request_params) - search_util = Search(request, g.user_config, session, - cookies_disabled=g.cookies_disabled) + search_util = Search(request, g.user_config, g.session_key) query = search_util.new_search_query() bang = resolve_bang(query=query, bangs_dict=bang_json) @@ -240,7 +292,7 @@ def search(): # Redirect to home if invalid/blank search if not query: - return redirect('/') + return redirect(url_for('.index')) # Generate response and number of external elements from the page try: @@ -300,10 +352,13 @@ def search(): search_util.search_type else '')), resp_code -@app.route('/config', methods=['GET', 'POST', 'PUT']) +@app.route(f'/{Endpoint.config}', methods=['GET', 'POST', 'PUT']) +@session_required @auth_required def config(): - config_disabled = app.config['CONFIG_DISABLE'] + config_disabled = ( + app.config['CONFIG_DISABLE'] or + not valid_user_session(session)) if request.method == 'GET': return json.dumps(g.user_config.__dict__) elif request.method == 'PUT' and not config_disabled: @@ -330,18 +385,14 @@ def config(): app.config['CONFIG_PATH'], request.args.get('name')), 'wb')) - # Overwrite default config if user has cookies disabled - if g.cookies_disabled: - open(app.config['DEFAULT_CONFIG'], 'w').write( - json.dumps(config_data, indent=4)) - session['config'] = config_data return redirect(config_data['url']) else: return redirect(url_for('.index'), code=403) -@app.route('/url', methods=['GET']) +@app.route(f'/{Endpoint.url}', methods=['GET']) +@session_required @auth_required def url(): if 'url' in request.args: @@ -356,16 +407,18 @@ def url(): error_message='Unable to resolve query: ' + q) -@app.route('/imgres') +@app.route(f'/{Endpoint.imgres}') +@session_required @auth_required def imgres(): return redirect(request.args.get('imgurl')) -@app.route('/element') +@app.route(f'/{Endpoint.element}') +@session_required @auth_required def element(): - cipher_suite = Fernet(session['key']) + cipher_suite = Fernet(g.session_key) src_url = cipher_suite.decrypt(request.args.get('url').encode()).decode() src_type = request.args.get('type') @@ -384,7 +437,7 @@ def element(): return send_file(io.BytesIO(empty_gif), mimetype='image/gif') -@app.route('/window') +@app.route(f'/{Endpoint.window}') @auth_required def window(): get_body = g.user_request.send(base_url=request.args.get('location')).text diff --git a/app/utils/results.py b/app/utils/results.py index 416d0ea..0b445af 100644 --- a/app/utils/results.py +++ b/app/utils/results.py @@ -1,3 +1,4 @@ +from app.models.endpoint import Endpoint from bs4 import BeautifulSoup, NavigableString import html import os @@ -177,7 +178,7 @@ def append_nojs(result: BeautifulSoup) -> None: """ nojs_link = BeautifulSoup(features='html.parser').new_tag('a') - nojs_link['href'] = '/window?location=' + result['href'] + nojs_link['href'] = f'/{Endpoint.window}?location=' + result['href'] nojs_link.string = ' NoJS Link' result.append(nojs_link) diff --git a/app/utils/search.py b/app/utils/search.py index 7ee191c..0bc9335 100644 --- a/app/utils/search.py +++ b/app/utils/search.py @@ -52,16 +52,15 @@ class Search: Attributes: request: the incoming flask request config: the current user config settings - session: the flask user session + session_key: the flask user fernet key """ - - def __init__(self, request, config, session, cookies_disabled=False): + def __init__(self, request, config, session_key, cookies_disabled=False): method = request.method self.request_params = request.args if method == 'GET' else request.form self.user_agent = request.headers.get('User-Agent') self.feeling_lucky = False self.config = config - self.session = session + self.session_key = session_key self.query = '' self.cookies_disabled = cookies_disabled self.search_type = self.request_params.get( @@ -96,7 +95,7 @@ class Search: else: # Attempt to decrypt if this is an internal link try: - q = Fernet(self.session['key']).decrypt(q.encode()).decode() + q = Fernet(self.session_key).decrypt(q.encode()).decode() except InvalidToken: pass @@ -115,7 +114,7 @@ class Search: """ mobile = 'Android' in self.user_agent or 'iPhone' in self.user_agent - content_filter = Filter(self.session['key'], + content_filter = Filter(self.session_key, mobile=mobile, config=self.config) full_query = gen_query(self.query, diff --git a/app/utils/session.py b/app/utils/session.py index 0dc8dc5..7aea933 100644 --- a/app/utils/session.py +++ b/app/utils/session.py @@ -4,7 +4,7 @@ from flask import current_app as app REQUIRED_SESSION_VALUES = ['uuid', 'config', 'key'] -def generate_user_key(cookies_disabled=False) -> bytes: +def generate_user_key() -> bytes: """Generates a key for encrypting searches and element URLs Args: @@ -16,9 +16,6 @@ def generate_user_key(cookies_disabled=False) -> bytes: str: A unique Fernet key """ - if cookies_disabled: - return app.default_key - # Generate/regenerate unique key per user return Fernet.generate_key() diff --git a/requirements.txt b/requirements.txt index b8f8aef..09e2f14 100644 --- a/requirements.txt +++ b/requirements.txt @@ -7,7 +7,7 @@ chardet==3.0.4 click==8.0.3 cryptography==3.3.2 Flask==1.1.1 -Flask-Session==0.3.2 +Flask-Session==0.4.0 idna==2.9 itsdangerous==1.1.0 Jinja2==2.11.3 diff --git a/test/test_autocomplete.py b/test/test_autocomplete.py index 4f55f6d..194a5ab 100644 --- a/test/test_autocomplete.py +++ b/test/test_autocomplete.py @@ -1,12 +1,16 @@ +from app.models.endpoint import Endpoint + + def test_autocomplete_get(client): - rv = client.get('/autocomplete?q=green+eggs+and') + rv = client.get(f'/{Endpoint.autocomplete}?q=green+eggs+and') assert rv._status_code == 200 assert len(rv.data) >= 1 assert b'green eggs and ham' in rv.data def test_autocomplete_post(client): - rv = client.post('/autocomplete', data=dict(q='the+cat+in+the')) + rv = client.post(f'/{Endpoint.autocomplete}', + data=dict(q='the+cat+in+the')) assert rv._status_code == 200 assert len(rv.data) >= 1 assert b'the cat in the hat' in rv.data diff --git a/test/test_misc.py b/test/test_misc.py index 13dde2e..e3cd7e2 100644 --- a/test/test_misc.py +++ b/test/test_misc.py @@ -1,6 +1,7 @@ from cryptography.fernet import Fernet from app import app +from app.models.endpoint import Endpoint from app.utils.session import generate_user_key, valid_user_session @@ -37,13 +38,13 @@ def test_query_decryption(client): rv = client.get('/') cookie = rv.headers['Set-Cookie'] - rv = client.get('/search?q=test+1', headers={'Cookie': cookie}) + rv = client.get(f'/{Endpoint.search}?q=test+1', headers={'Cookie': cookie}) assert rv._status_code == 200 with client.session_transaction() as session: assert valid_user_session(session) - rv = client.get('/search?q=test+2', headers={'Cookie': cookie}) + rv = client.get(f'/{Endpoint.search}?q=test+2', headers={'Cookie': cookie}) assert rv._status_code == 200 with client.session_transaction() as session: diff --git a/test/test_results.py b/test/test_results.py index 7ec9462..b462242 100644 --- a/test/test_results.py +++ b/test/test_results.py @@ -1,5 +1,6 @@ from bs4 import BeautifulSoup from app.filter import Filter +from app.models.endpoint import Endpoint from app.utils.session import generate_user_key from datetime import datetime from dateutil.parser import * @@ -30,7 +31,7 @@ def get_search_results(data): def test_get_results(client): - rv = client.get('/search?q=test') + rv = client.get(f'/{Endpoint.search}?q=test') assert rv._status_code == 200 # Depending on the search, there can be more @@ -41,7 +42,7 @@ def test_get_results(client): def test_post_results(client): - rv = client.post('/search', data=dict(q='test')) + rv = client.post(f'/{Endpoint.search}', data=dict(q='test')) assert rv._status_code == 200 # Depending on the search, there can be more @@ -52,7 +53,7 @@ def test_post_results(client): def test_translate_search(client): - rv = client.post('/search', data=dict(q='translate hola')) + rv = client.post(f'/{Endpoint.search}', data=dict(q='translate hola')) assert rv._status_code == 200 # Pretty weak test, but better than nothing @@ -62,7 +63,7 @@ def test_translate_search(client): def test_block_results(client): - rv = client.post('/search', data=dict(q='pinterest')) + rv = client.post(f'/{Endpoint.search}', data=dict(q='pinterest')) assert rv._status_code == 200 has_pinterest = False @@ -74,10 +75,10 @@ def test_block_results(client): assert has_pinterest demo_config['block'] = 'pinterest.com' - rv = client.post('/config', data=demo_config) + rv = client.post(f'/{Endpoint.config}', data=demo_config) assert rv._status_code == 302 - rv = client.post('/search', data=dict(q='pinterest')) + rv = client.post(f'/{Endpoint.search}', data=dict(q='pinterest')) assert rv._status_code == 200 for link in BeautifulSoup(rv.data, 'html.parser').find_all('a', href=True): @@ -106,7 +107,7 @@ def test_recent_results(client): } for time, num_days in times.items(): - rv = client.post('/search', data=dict(q='test :' + time)) + rv = client.post(f'/{Endpoint.search}', data=dict(q='test :' + time)) result_divs = get_search_results(rv.data) current_date = datetime.now() diff --git a/test/test_routes.py b/test/test_routes.py index 4aaaf68..e71e995 100644 --- a/test/test_routes.py +++ b/test/test_routes.py @@ -1,4 +1,5 @@ from app import app +from app.models.endpoint import Endpoint import json @@ -11,47 +12,47 @@ def test_main(client): def test_search(client): - rv = client.get('/search?q=test') + rv = client.get(f'/{Endpoint.search}?q=test') assert rv._status_code == 200 def test_feeling_lucky(client): - rv = client.get('/search?q=!%20test') + rv = client.get(f'/{Endpoint.search}?q=!%20test') assert rv._status_code == 303 def test_ddg_bang(client): # Bang at beginning of query - rv = client.get('/search?q=!gh%20whoogle') + rv = client.get(f'/{Endpoint.search}?q=!gh%20whoogle') assert rv._status_code == 302 assert rv.headers.get('Location').startswith('https://github.com') # Move bang to end of query - rv = client.get('/search?q=github%20!w') + rv = client.get(f'/{Endpoint.search}?q=github%20!w') assert rv._status_code == 302 assert rv.headers.get('Location').startswith('https://en.wikipedia.org') # Move bang to middle of query - rv = client.get('/search?q=big%20!r%20chungus') + rv = client.get(f'/{Endpoint.search}?q=big%20!r%20chungus') assert rv._status_code == 302 assert rv.headers.get('Location').startswith('https://www.reddit.com') # Move '!' to end of the bang - rv = client.get('/search?q=gitlab%20w!') + rv = client.get(f'/{Endpoint.search}?q=gitlab%20w!') assert rv._status_code == 302 assert rv.headers.get('Location').startswith('https://en.wikipedia.org') # Ensure bang is case insensitive - rv = client.get('/search?q=!GH%20whoogle') + rv = client.get(f'/{Endpoint.search}?q=!GH%20whoogle') assert rv._status_code == 302 assert rv.headers.get('Location').startswith('https://github.com') def test_config(client): - rv = client.post('/config', data=demo_config) + rv = client.post(f'/{Endpoint.config}', data=demo_config) assert rv._status_code == 302 - rv = client.get('/config') + rv = client.get(f'/{Endpoint.config}') assert rv._status_code == 200 config = json.loads(rv.data) @@ -62,15 +63,15 @@ def test_config(client): app.config['CONFIG_DISABLE'] = 1 dark_mod = not demo_config['dark'] demo_config['dark'] = dark_mod - rv = client.post('/config', data=demo_config) + rv = client.post(f'/{Endpoint.config}', data=demo_config) assert rv._status_code == 403 - rv = client.get('/config') + rv = client.get(f'/{Endpoint.config}') config = json.loads(rv.data) assert config['dark'] != dark_mod def test_opensearch(client): - rv = client.get('/opensearch.xml') + rv = client.get(f'/{Endpoint.opensearch}') assert rv._status_code == 200 assert 'Whoogle' in str(rv.data)