Improve public instance session management (#480)
This introduces a new approach to handling user sessions, which should allow for users to set more reliable config settings on public instances. Previously, when a user with cookies disabled would update their config, this would modify the app's default config file, which would in turn cause new users to inherit these settings when visiting the app for the first time and cause users to inherit these settings when their current session cookie expired (which was after 30 days by default I believe). There was also some half-baked logic for determining on the backend whether or not a user had cookies disabled, which lead to some issues with out of control session file creation by Flask. Now, when a user visits the site, their initial request is forwarded to a session/<session id> endpoint, and during that subsequent request their current session id is matched against the one found in the url. If the ids match, the user has cookies enabled. If not, their original request is modified with a 'cookies_disabled' query param that tells Flask not to bother trying to set up a new session for that user, and instead just use the app's fallback Fernet key for encryption and the default config. Since attempting to create a session for a user with cookies disabled creates a new session file, there is now also a clean-up routine included in the new session decorator, which will remove all sessions that don't include a valid key in the dict. NOTE!!! This means that current user sessions on public instances will be cleared once this update is merged in. In the long run that's a good thing though, since this will allow session mgmt to be a lot more reliable overall for users regardless of their cookie preference. Individual user sessions still use a unique Fernet key for encrypting queries, but users with cookies disabled will use the default app key for encryption and decryption. Sessions are also now (semi)permanent and have a lifetime of 1 year.main
parent
1f18e505ab
commit
e06ff85579
|
@ -21,9 +21,9 @@ if os.getenv("WHOOGLE_DOTENV", ''):
|
||||||
dotenv_path))
|
dotenv_path))
|
||||||
|
|
||||||
app.default_key = generate_user_key()
|
app.default_key = generate_user_key()
|
||||||
app.no_cookie_ips = []
|
|
||||||
app.config['SECRET_KEY'] = os.urandom(32)
|
app.config['SECRET_KEY'] = os.urandom(32)
|
||||||
app.config['SESSION_TYPE'] = 'filesystem'
|
app.config['SESSION_TYPE'] = 'filesystem'
|
||||||
|
app.config['SESSION_COOKIE_SAMESITE'] = 'strict'
|
||||||
app.config['VERSION_NUMBER'] = '0.6.0'
|
app.config['VERSION_NUMBER'] = '0.6.0'
|
||||||
app.config['APP_ROOT'] = os.getenv(
|
app.config['APP_ROOT'] = os.getenv(
|
||||||
'APP_ROOT',
|
'APP_ROOT',
|
||||||
|
|
|
@ -1,3 +1,4 @@
|
||||||
|
from app.models.endpoint import Endpoint
|
||||||
from app.request import VALID_PARAMS, MAPS_URL
|
from app.request import VALID_PARAMS, MAPS_URL
|
||||||
from app.utils.misc import read_config_bool
|
from app.utils.misc import read_config_bool
|
||||||
from app.utils.results import *
|
from app.utils.results import *
|
||||||
|
@ -250,7 +251,7 @@ class Filter:
|
||||||
element['src'] = BLANK_B64
|
element['src'] = BLANK_B64
|
||||||
return
|
return
|
||||||
|
|
||||||
element['src'] = 'element?url=' + self.encrypt_path(
|
element['src'] = f'{Endpoint.element}?url=' + self.encrypt_path(
|
||||||
src,
|
src,
|
||||||
is_element=True) + '&type=' + urlparse.quote(mime)
|
is_element=True) + '&type=' + urlparse.quote(mime)
|
||||||
|
|
||||||
|
@ -385,7 +386,8 @@ class Filter:
|
||||||
if len(urls) != 2:
|
if len(urls) != 2:
|
||||||
continue
|
continue
|
||||||
|
|
||||||
img_url = urlparse.unquote(urls[0].replace('/imgres?imgurl=', ''))
|
img_url = urlparse.unquote(urls[0].replace(
|
||||||
|
f'/{Endpoint.imgres}?imgurl=', ''))
|
||||||
|
|
||||||
try:
|
try:
|
||||||
# Try to strip out only the necessary part of the web page link
|
# Try to strip out only the necessary part of the web page link
|
||||||
|
|
|
@ -18,7 +18,7 @@ class Config:
|
||||||
self.block_title = os.getenv('WHOOGLE_CONFIG_BLOCK_TITLE', '')
|
self.block_title = os.getenv('WHOOGLE_CONFIG_BLOCK_TITLE', '')
|
||||||
self.block_url = os.getenv('WHOOGLE_CONFIG_BLOCK_URL', '')
|
self.block_url = os.getenv('WHOOGLE_CONFIG_BLOCK_URL', '')
|
||||||
self.ctry = os.getenv('WHOOGLE_CONFIG_COUNTRY', '')
|
self.ctry = os.getenv('WHOOGLE_CONFIG_COUNTRY', '')
|
||||||
self.theme = os.getenv('WHOOGLE_CONFIG_THEME', '')
|
self.theme = os.getenv('WHOOGLE_CONFIG_THEME', 'system')
|
||||||
self.safe = read_config_bool('WHOOGLE_CONFIG_SAFE')
|
self.safe = read_config_bool('WHOOGLE_CONFIG_SAFE')
|
||||||
self.dark = read_config_bool('WHOOGLE_CONFIG_DARK') # deprecated
|
self.dark = read_config_bool('WHOOGLE_CONFIG_DARK') # deprecated
|
||||||
self.alts = read_config_bool('WHOOGLE_CONFIG_ALTS')
|
self.alts = read_config_bool('WHOOGLE_CONFIG_ALTS')
|
||||||
|
|
|
@ -0,0 +1,23 @@
|
||||||
|
from enum import Enum
|
||||||
|
|
||||||
|
|
||||||
|
class Endpoint(Enum):
|
||||||
|
autocomplete = 'autocomplete'
|
||||||
|
home = 'home'
|
||||||
|
healthz = 'healthz'
|
||||||
|
session = 'session'
|
||||||
|
config = 'config'
|
||||||
|
opensearch = 'opensearch.xml'
|
||||||
|
search = 'search'
|
||||||
|
search_html = 'search.html'
|
||||||
|
url = 'url'
|
||||||
|
imgres = 'imgres'
|
||||||
|
element = 'element'
|
||||||
|
window = 'window'
|
||||||
|
|
||||||
|
def __str__(self):
|
||||||
|
return self.value
|
||||||
|
|
||||||
|
def in_path(self, path: str) -> bool:
|
||||||
|
return path.startswith(self.value) or \
|
||||||
|
path.startswith(f'/{self.value}')
|
155
app/routes.py
155
app/routes.py
|
@ -1,16 +1,17 @@
|
||||||
import argparse
|
import argparse
|
||||||
import base64
|
import base64
|
||||||
import html
|
|
||||||
import io
|
import io
|
||||||
import json
|
import json
|
||||||
import pickle
|
import pickle
|
||||||
import urllib.parse as urlparse
|
import urllib.parse as urlparse
|
||||||
import uuid
|
import uuid
|
||||||
|
from datetime import timedelta
|
||||||
from functools import wraps
|
from functools import wraps
|
||||||
|
|
||||||
import waitress
|
import waitress
|
||||||
from app import app
|
from app import app
|
||||||
from app.models.config import Config
|
from app.models.config import Config
|
||||||
|
from app.models.endpoint import Endpoint
|
||||||
from app.request import Request, TorError
|
from app.request import Request, TorError
|
||||||
from app.utils.bangs import resolve_bang
|
from app.utils.bangs import resolve_bang
|
||||||
from app.utils.misc import read_config_bool, get_client_ip
|
from app.utils.misc import read_config_bool, get_client_ip
|
||||||
|
@ -22,6 +23,7 @@ from bs4 import BeautifulSoup as bsoup
|
||||||
from flask import jsonify, make_response, request, redirect, render_template, \
|
from flask import jsonify, make_response, request, redirect, render_template, \
|
||||||
send_file, session, url_for
|
send_file, session, url_for
|
||||||
from requests import exceptions, get
|
from requests import exceptions, get
|
||||||
|
from requests.models import PreparedRequest
|
||||||
|
|
||||||
# Load DDG bang json files only on init
|
# Load DDG bang json files only on init
|
||||||
bang_json = json.load(open(app.config['BANG_FILE']))
|
bang_json = json.load(open(app.config['BANG_FILE']))
|
||||||
|
@ -57,23 +59,79 @@ def auth_required(f):
|
||||||
return decorated
|
return decorated
|
||||||
|
|
||||||
|
|
||||||
|
def session_required(f):
|
||||||
|
@wraps(f)
|
||||||
|
def decorated(*args, **kwargs):
|
||||||
|
if (valid_user_session(session) and
|
||||||
|
'cookies_disabled' not in request.args):
|
||||||
|
g.session_key = session['key']
|
||||||
|
else:
|
||||||
|
session.pop('_permanent', None)
|
||||||
|
g.session_key = app.default_key
|
||||||
|
|
||||||
|
# Clear out old sessions
|
||||||
|
invalid_sessions = []
|
||||||
|
for user_session in os.listdir(app.config['SESSION_FILE_DIR']):
|
||||||
|
session_path = os.path.join(
|
||||||
|
app.config['SESSION_FILE_DIR'],
|
||||||
|
user_session)
|
||||||
|
try:
|
||||||
|
with open(session_path, 'rb') as session_file:
|
||||||
|
_ = pickle.load(session_file)
|
||||||
|
data = pickle.load(session_file)
|
||||||
|
if isinstance(data, dict) and 'valid' in data:
|
||||||
|
continue
|
||||||
|
invalid_sessions.append(session_path)
|
||||||
|
except (EOFError, FileNotFoundError):
|
||||||
|
pass
|
||||||
|
|
||||||
|
for invalid_session in invalid_sessions:
|
||||||
|
os.remove(invalid_session)
|
||||||
|
|
||||||
|
return f(*args, **kwargs)
|
||||||
|
|
||||||
|
return decorated
|
||||||
|
|
||||||
|
|
||||||
@app.before_request
|
@app.before_request
|
||||||
def before_request_func():
|
def before_request_func():
|
||||||
g.request_params = (
|
g.request_params = (
|
||||||
request.args if request.method == 'GET' else request.form
|
request.args if request.method == 'GET' else request.form
|
||||||
)
|
)
|
||||||
g.cookies_disabled = False
|
|
||||||
|
# Skip pre-request actions if verifying session
|
||||||
|
if '/session' in request.path and not valid_user_session(session):
|
||||||
|
return
|
||||||
|
|
||||||
|
default_config = json.load(open(app.config['DEFAULT_CONFIG'])) \
|
||||||
|
if os.path.exists(app.config['DEFAULT_CONFIG']) else {}
|
||||||
|
|
||||||
# Generate session values for user if unavailable
|
# Generate session values for user if unavailable
|
||||||
if not valid_user_session(session):
|
if (not valid_user_session(session) and
|
||||||
session['config'] = json.load(open(app.config['DEFAULT_CONFIG'])) \
|
'cookies_disabled' not in request.args):
|
||||||
if os.path.exists(app.config['DEFAULT_CONFIG']) else {}
|
session['config'] = default_config
|
||||||
session['uuid'] = str(uuid.uuid4())
|
session['uuid'] = str(uuid.uuid4())
|
||||||
session['key'] = generate_user_key(True)
|
session['key'] = generate_user_key()
|
||||||
|
|
||||||
# Flag cookies as possibly disabled in order to prevent against
|
# Skip checking for session on /autocomplete searches,
|
||||||
# unnecessary session directory expansion
|
# since they can be done from the browser search bar (aka
|
||||||
g.cookies_disabled = True
|
# no ability to initialize a session)
|
||||||
|
if not Endpoint.autocomplete.in_path(request.path):
|
||||||
|
return redirect(url_for(
|
||||||
|
'session_check',
|
||||||
|
session_id=session['uuid'],
|
||||||
|
follow=request.url), code=307)
|
||||||
|
else:
|
||||||
|
g.user_config = Config(**session['config'])
|
||||||
|
elif 'cookies_disabled' not in request.args:
|
||||||
|
# Set session as permanent
|
||||||
|
session.permanent = True
|
||||||
|
app.permanent_session_lifetime = timedelta(days=365)
|
||||||
|
g.user_config = Config(**session['config'])
|
||||||
|
else:
|
||||||
|
# User has cookies disabled, fall back to immutable default config
|
||||||
|
session.pop('_permanent', None)
|
||||||
|
g.user_config = Config(**default_config)
|
||||||
|
|
||||||
# Handle https upgrade
|
# Handle https upgrade
|
||||||
if needs_https(request.url):
|
if needs_https(request.url):
|
||||||
|
@ -81,8 +139,6 @@ def before_request_func():
|
||||||
request.url.replace('http://', 'https://', 1),
|
request.url.replace('http://', 'https://', 1),
|
||||||
code=308)
|
code=308)
|
||||||
|
|
||||||
g.user_config = Config(**session['config'])
|
|
||||||
|
|
||||||
if not g.user_config.url:
|
if not g.user_config.url:
|
||||||
g.user_config.url = request.url_root.replace(
|
g.user_config.url = request.url_root.replace(
|
||||||
'http://',
|
'http://',
|
||||||
|
@ -98,19 +154,6 @@ def before_request_func():
|
||||||
|
|
||||||
@app.after_request
|
@app.after_request
|
||||||
def after_request_func(resp):
|
def after_request_func(resp):
|
||||||
# Check if address consistently has cookies blocked,
|
|
||||||
# in which case start removing session files after creation.
|
|
||||||
#
|
|
||||||
# Note: This is primarily done to prevent overpopulation of session
|
|
||||||
# directories, since browsers that block cookies will still trigger
|
|
||||||
# Flask's session creation routine with every request.
|
|
||||||
if g.cookies_disabled and request.remote_addr not in app.no_cookie_ips:
|
|
||||||
app.no_cookie_ips.append(request.remote_addr)
|
|
||||||
elif g.cookies_disabled and request.remote_addr in app.no_cookie_ips:
|
|
||||||
session_list = list(session.keys())
|
|
||||||
for key in session_list:
|
|
||||||
session.pop(key)
|
|
||||||
|
|
||||||
resp.headers['Content-Security-Policy'] = app.config['CSP']
|
resp.headers['Content-Security-Policy'] = app.config['CSP']
|
||||||
if os.environ.get('HTTPS_ONLY', False):
|
if os.environ.get('HTTPS_ONLY', False):
|
||||||
resp.headers['Content-Security-Policy'] += 'upgrade-insecure-requests'
|
resp.headers['Content-Security-Policy'] += 'upgrade-insecure-requests'
|
||||||
|
@ -124,22 +167,28 @@ def unknown_page(e):
|
||||||
return redirect(g.app_location)
|
return redirect(g.app_location)
|
||||||
|
|
||||||
|
|
||||||
@app.route('/healthz', methods=['GET'])
|
@app.route(f'/{Endpoint.healthz}', methods=['GET'])
|
||||||
def healthz():
|
def healthz():
|
||||||
return ''
|
return ''
|
||||||
|
|
||||||
|
|
||||||
@app.route('/home', methods=['GET'])
|
@app.route(f'/{Endpoint.session}/<session_id>', methods=['GET', 'PUT', 'POST'])
|
||||||
def home():
|
def session_check(session_id):
|
||||||
return redirect(url_for('.index'))
|
if 'uuid' in session and session['uuid'] == session_id:
|
||||||
|
session['valid'] = True
|
||||||
|
return redirect(request.args.get('follow'), code=307)
|
||||||
|
else:
|
||||||
|
follow_url = request.args.get('follow')
|
||||||
|
req = PreparedRequest()
|
||||||
|
req.prepare_url(follow_url, {'cookies_disabled': 1})
|
||||||
|
session.pop('_permanent', None)
|
||||||
|
return redirect(req.url, code=307)
|
||||||
|
|
||||||
|
|
||||||
@app.route('/', methods=['GET'])
|
@app.route('/', methods=['GET'])
|
||||||
|
@app.route(f'/{Endpoint.home}', methods=['GET'])
|
||||||
@auth_required
|
@auth_required
|
||||||
def index():
|
def index():
|
||||||
# Reset keys
|
|
||||||
session['key'] = generate_user_key(g.cookies_disabled)
|
|
||||||
|
|
||||||
# Redirect if an error was raised
|
# Redirect if an error was raised
|
||||||
if 'error_message' in session and session['error_message']:
|
if 'error_message' in session and session['error_message']:
|
||||||
error_message = session['error_message']
|
error_message = session['error_message']
|
||||||
|
@ -157,13 +206,16 @@ def index():
|
||||||
logo=render_template(
|
logo=render_template(
|
||||||
'logo.html',
|
'logo.html',
|
||||||
dark=g.user_config.dark),
|
dark=g.user_config.dark),
|
||||||
config_disabled=app.config['CONFIG_DISABLE'],
|
config_disabled=(
|
||||||
|
app.config['CONFIG_DISABLE'] or
|
||||||
|
not valid_user_session(session) or
|
||||||
|
'cookies_disabled' in request.args),
|
||||||
config=g.user_config,
|
config=g.user_config,
|
||||||
tor_available=int(os.environ.get('TOR_AVAILABLE')),
|
tor_available=int(os.environ.get('TOR_AVAILABLE')),
|
||||||
version_number=app.config['VERSION_NUMBER'])
|
version_number=app.config['VERSION_NUMBER'])
|
||||||
|
|
||||||
|
|
||||||
@app.route('/opensearch.xml', methods=['GET'])
|
@app.route(f'/{Endpoint.opensearch}', methods=['GET'])
|
||||||
def opensearch():
|
def opensearch():
|
||||||
opensearch_url = g.app_location
|
opensearch_url = g.app_location
|
||||||
if opensearch_url.endswith('/'):
|
if opensearch_url.endswith('/'):
|
||||||
|
@ -183,7 +235,7 @@ def opensearch():
|
||||||
), 200, {'Content-Disposition': 'attachment; filename="opensearch.xml"'}
|
), 200, {'Content-Disposition': 'attachment; filename="opensearch.xml"'}
|
||||||
|
|
||||||
|
|
||||||
@app.route('/search.html', methods=['GET'])
|
@app.route(f'/{Endpoint.search_html}', methods=['GET'])
|
||||||
def search_html():
|
def search_html():
|
||||||
search_url = g.app_location
|
search_url = g.app_location
|
||||||
if search_url.endswith('/'):
|
if search_url.endswith('/'):
|
||||||
|
@ -191,7 +243,7 @@ def search_html():
|
||||||
return render_template('search.html', url=search_url)
|
return render_template('search.html', url=search_url)
|
||||||
|
|
||||||
|
|
||||||
@app.route('/autocomplete', methods=['GET', 'POST'])
|
@app.route(f'/{Endpoint.autocomplete}', methods=['GET', 'POST'])
|
||||||
def autocomplete():
|
def autocomplete():
|
||||||
ac_var = 'WHOOGLE_AUTOCOMPLETE'
|
ac_var = 'WHOOGLE_AUTOCOMPLETE'
|
||||||
if os.getenv(ac_var) and not read_config_bool(ac_var):
|
if os.getenv(ac_var) and not read_config_bool(ac_var):
|
||||||
|
@ -224,14 +276,14 @@ def autocomplete():
|
||||||
])
|
])
|
||||||
|
|
||||||
|
|
||||||
@app.route('/search', methods=['GET', 'POST'])
|
@app.route(f'/{Endpoint.search}', methods=['GET', 'POST'])
|
||||||
|
@session_required
|
||||||
@auth_required
|
@auth_required
|
||||||
def search():
|
def search():
|
||||||
# Update user config if specified in search args
|
# Update user config if specified in search args
|
||||||
g.user_config = g.user_config.from_params(g.request_params)
|
g.user_config = g.user_config.from_params(g.request_params)
|
||||||
|
|
||||||
search_util = Search(request, g.user_config, session,
|
search_util = Search(request, g.user_config, g.session_key)
|
||||||
cookies_disabled=g.cookies_disabled)
|
|
||||||
query = search_util.new_search_query()
|
query = search_util.new_search_query()
|
||||||
|
|
||||||
bang = resolve_bang(query=query, bangs_dict=bang_json)
|
bang = resolve_bang(query=query, bangs_dict=bang_json)
|
||||||
|
@ -240,7 +292,7 @@ def search():
|
||||||
|
|
||||||
# Redirect to home if invalid/blank search
|
# Redirect to home if invalid/blank search
|
||||||
if not query:
|
if not query:
|
||||||
return redirect('/')
|
return redirect(url_for('.index'))
|
||||||
|
|
||||||
# Generate response and number of external elements from the page
|
# Generate response and number of external elements from the page
|
||||||
try:
|
try:
|
||||||
|
@ -300,10 +352,13 @@ def search():
|
||||||
search_util.search_type else '')), resp_code
|
search_util.search_type else '')), resp_code
|
||||||
|
|
||||||
|
|
||||||
@app.route('/config', methods=['GET', 'POST', 'PUT'])
|
@app.route(f'/{Endpoint.config}', methods=['GET', 'POST', 'PUT'])
|
||||||
|
@session_required
|
||||||
@auth_required
|
@auth_required
|
||||||
def config():
|
def config():
|
||||||
config_disabled = app.config['CONFIG_DISABLE']
|
config_disabled = (
|
||||||
|
app.config['CONFIG_DISABLE'] or
|
||||||
|
not valid_user_session(session))
|
||||||
if request.method == 'GET':
|
if request.method == 'GET':
|
||||||
return json.dumps(g.user_config.__dict__)
|
return json.dumps(g.user_config.__dict__)
|
||||||
elif request.method == 'PUT' and not config_disabled:
|
elif request.method == 'PUT' and not config_disabled:
|
||||||
|
@ -330,18 +385,14 @@ def config():
|
||||||
app.config['CONFIG_PATH'],
|
app.config['CONFIG_PATH'],
|
||||||
request.args.get('name')), 'wb'))
|
request.args.get('name')), 'wb'))
|
||||||
|
|
||||||
# Overwrite default config if user has cookies disabled
|
|
||||||
if g.cookies_disabled:
|
|
||||||
open(app.config['DEFAULT_CONFIG'], 'w').write(
|
|
||||||
json.dumps(config_data, indent=4))
|
|
||||||
|
|
||||||
session['config'] = config_data
|
session['config'] = config_data
|
||||||
return redirect(config_data['url'])
|
return redirect(config_data['url'])
|
||||||
else:
|
else:
|
||||||
return redirect(url_for('.index'), code=403)
|
return redirect(url_for('.index'), code=403)
|
||||||
|
|
||||||
|
|
||||||
@app.route('/url', methods=['GET'])
|
@app.route(f'/{Endpoint.url}', methods=['GET'])
|
||||||
|
@session_required
|
||||||
@auth_required
|
@auth_required
|
||||||
def url():
|
def url():
|
||||||
if 'url' in request.args:
|
if 'url' in request.args:
|
||||||
|
@ -356,16 +407,18 @@ def url():
|
||||||
error_message='Unable to resolve query: ' + q)
|
error_message='Unable to resolve query: ' + q)
|
||||||
|
|
||||||
|
|
||||||
@app.route('/imgres')
|
@app.route(f'/{Endpoint.imgres}')
|
||||||
|
@session_required
|
||||||
@auth_required
|
@auth_required
|
||||||
def imgres():
|
def imgres():
|
||||||
return redirect(request.args.get('imgurl'))
|
return redirect(request.args.get('imgurl'))
|
||||||
|
|
||||||
|
|
||||||
@app.route('/element')
|
@app.route(f'/{Endpoint.element}')
|
||||||
|
@session_required
|
||||||
@auth_required
|
@auth_required
|
||||||
def element():
|
def element():
|
||||||
cipher_suite = Fernet(session['key'])
|
cipher_suite = Fernet(g.session_key)
|
||||||
src_url = cipher_suite.decrypt(request.args.get('url').encode()).decode()
|
src_url = cipher_suite.decrypt(request.args.get('url').encode()).decode()
|
||||||
src_type = request.args.get('type')
|
src_type = request.args.get('type')
|
||||||
|
|
||||||
|
@ -384,7 +437,7 @@ def element():
|
||||||
return send_file(io.BytesIO(empty_gif), mimetype='image/gif')
|
return send_file(io.BytesIO(empty_gif), mimetype='image/gif')
|
||||||
|
|
||||||
|
|
||||||
@app.route('/window')
|
@app.route(f'/{Endpoint.window}')
|
||||||
@auth_required
|
@auth_required
|
||||||
def window():
|
def window():
|
||||||
get_body = g.user_request.send(base_url=request.args.get('location')).text
|
get_body = g.user_request.send(base_url=request.args.get('location')).text
|
||||||
|
|
|
@ -1,3 +1,4 @@
|
||||||
|
from app.models.endpoint import Endpoint
|
||||||
from bs4 import BeautifulSoup, NavigableString
|
from bs4 import BeautifulSoup, NavigableString
|
||||||
import html
|
import html
|
||||||
import os
|
import os
|
||||||
|
@ -177,7 +178,7 @@ def append_nojs(result: BeautifulSoup) -> None:
|
||||||
|
|
||||||
"""
|
"""
|
||||||
nojs_link = BeautifulSoup(features='html.parser').new_tag('a')
|
nojs_link = BeautifulSoup(features='html.parser').new_tag('a')
|
||||||
nojs_link['href'] = '/window?location=' + result['href']
|
nojs_link['href'] = f'/{Endpoint.window}?location=' + result['href']
|
||||||
nojs_link.string = ' NoJS Link'
|
nojs_link.string = ' NoJS Link'
|
||||||
result.append(nojs_link)
|
result.append(nojs_link)
|
||||||
|
|
||||||
|
|
|
@ -52,16 +52,15 @@ class Search:
|
||||||
Attributes:
|
Attributes:
|
||||||
request: the incoming flask request
|
request: the incoming flask request
|
||||||
config: the current user config settings
|
config: the current user config settings
|
||||||
session: the flask user session
|
session_key: the flask user fernet key
|
||||||
"""
|
"""
|
||||||
|
def __init__(self, request, config, session_key, cookies_disabled=False):
|
||||||
def __init__(self, request, config, session, cookies_disabled=False):
|
|
||||||
method = request.method
|
method = request.method
|
||||||
self.request_params = request.args if method == 'GET' else request.form
|
self.request_params = request.args if method == 'GET' else request.form
|
||||||
self.user_agent = request.headers.get('User-Agent')
|
self.user_agent = request.headers.get('User-Agent')
|
||||||
self.feeling_lucky = False
|
self.feeling_lucky = False
|
||||||
self.config = config
|
self.config = config
|
||||||
self.session = session
|
self.session_key = session_key
|
||||||
self.query = ''
|
self.query = ''
|
||||||
self.cookies_disabled = cookies_disabled
|
self.cookies_disabled = cookies_disabled
|
||||||
self.search_type = self.request_params.get(
|
self.search_type = self.request_params.get(
|
||||||
|
@ -96,7 +95,7 @@ class Search:
|
||||||
else:
|
else:
|
||||||
# Attempt to decrypt if this is an internal link
|
# Attempt to decrypt if this is an internal link
|
||||||
try:
|
try:
|
||||||
q = Fernet(self.session['key']).decrypt(q.encode()).decode()
|
q = Fernet(self.session_key).decrypt(q.encode()).decode()
|
||||||
except InvalidToken:
|
except InvalidToken:
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
@ -115,7 +114,7 @@ class Search:
|
||||||
"""
|
"""
|
||||||
mobile = 'Android' in self.user_agent or 'iPhone' in self.user_agent
|
mobile = 'Android' in self.user_agent or 'iPhone' in self.user_agent
|
||||||
|
|
||||||
content_filter = Filter(self.session['key'],
|
content_filter = Filter(self.session_key,
|
||||||
mobile=mobile,
|
mobile=mobile,
|
||||||
config=self.config)
|
config=self.config)
|
||||||
full_query = gen_query(self.query,
|
full_query = gen_query(self.query,
|
||||||
|
|
|
@ -4,7 +4,7 @@ from flask import current_app as app
|
||||||
REQUIRED_SESSION_VALUES = ['uuid', 'config', 'key']
|
REQUIRED_SESSION_VALUES = ['uuid', 'config', 'key']
|
||||||
|
|
||||||
|
|
||||||
def generate_user_key(cookies_disabled=False) -> bytes:
|
def generate_user_key() -> bytes:
|
||||||
"""Generates a key for encrypting searches and element URLs
|
"""Generates a key for encrypting searches and element URLs
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
|
@ -16,9 +16,6 @@ def generate_user_key(cookies_disabled=False) -> bytes:
|
||||||
str: A unique Fernet key
|
str: A unique Fernet key
|
||||||
|
|
||||||
"""
|
"""
|
||||||
if cookies_disabled:
|
|
||||||
return app.default_key
|
|
||||||
|
|
||||||
# Generate/regenerate unique key per user
|
# Generate/regenerate unique key per user
|
||||||
return Fernet.generate_key()
|
return Fernet.generate_key()
|
||||||
|
|
||||||
|
|
|
@ -7,7 +7,7 @@ chardet==3.0.4
|
||||||
click==8.0.3
|
click==8.0.3
|
||||||
cryptography==3.3.2
|
cryptography==3.3.2
|
||||||
Flask==1.1.1
|
Flask==1.1.1
|
||||||
Flask-Session==0.3.2
|
Flask-Session==0.4.0
|
||||||
idna==2.9
|
idna==2.9
|
||||||
itsdangerous==1.1.0
|
itsdangerous==1.1.0
|
||||||
Jinja2==2.11.3
|
Jinja2==2.11.3
|
||||||
|
|
|
@ -1,12 +1,16 @@
|
||||||
|
from app.models.endpoint import Endpoint
|
||||||
|
|
||||||
|
|
||||||
def test_autocomplete_get(client):
|
def test_autocomplete_get(client):
|
||||||
rv = client.get('/autocomplete?q=green+eggs+and')
|
rv = client.get(f'/{Endpoint.autocomplete}?q=green+eggs+and')
|
||||||
assert rv._status_code == 200
|
assert rv._status_code == 200
|
||||||
assert len(rv.data) >= 1
|
assert len(rv.data) >= 1
|
||||||
assert b'green eggs and ham' in rv.data
|
assert b'green eggs and ham' in rv.data
|
||||||
|
|
||||||
|
|
||||||
def test_autocomplete_post(client):
|
def test_autocomplete_post(client):
|
||||||
rv = client.post('/autocomplete', data=dict(q='the+cat+in+the'))
|
rv = client.post(f'/{Endpoint.autocomplete}',
|
||||||
|
data=dict(q='the+cat+in+the'))
|
||||||
assert rv._status_code == 200
|
assert rv._status_code == 200
|
||||||
assert len(rv.data) >= 1
|
assert len(rv.data) >= 1
|
||||||
assert b'the cat in the hat' in rv.data
|
assert b'the cat in the hat' in rv.data
|
||||||
|
|
|
@ -1,6 +1,7 @@
|
||||||
from cryptography.fernet import Fernet
|
from cryptography.fernet import Fernet
|
||||||
|
|
||||||
from app import app
|
from app import app
|
||||||
|
from app.models.endpoint import Endpoint
|
||||||
from app.utils.session import generate_user_key, valid_user_session
|
from app.utils.session import generate_user_key, valid_user_session
|
||||||
|
|
||||||
|
|
||||||
|
@ -37,13 +38,13 @@ def test_query_decryption(client):
|
||||||
rv = client.get('/')
|
rv = client.get('/')
|
||||||
cookie = rv.headers['Set-Cookie']
|
cookie = rv.headers['Set-Cookie']
|
||||||
|
|
||||||
rv = client.get('/search?q=test+1', headers={'Cookie': cookie})
|
rv = client.get(f'/{Endpoint.search}?q=test+1', headers={'Cookie': cookie})
|
||||||
assert rv._status_code == 200
|
assert rv._status_code == 200
|
||||||
|
|
||||||
with client.session_transaction() as session:
|
with client.session_transaction() as session:
|
||||||
assert valid_user_session(session)
|
assert valid_user_session(session)
|
||||||
|
|
||||||
rv = client.get('/search?q=test+2', headers={'Cookie': cookie})
|
rv = client.get(f'/{Endpoint.search}?q=test+2', headers={'Cookie': cookie})
|
||||||
assert rv._status_code == 200
|
assert rv._status_code == 200
|
||||||
|
|
||||||
with client.session_transaction() as session:
|
with client.session_transaction() as session:
|
||||||
|
|
|
@ -1,5 +1,6 @@
|
||||||
from bs4 import BeautifulSoup
|
from bs4 import BeautifulSoup
|
||||||
from app.filter import Filter
|
from app.filter import Filter
|
||||||
|
from app.models.endpoint import Endpoint
|
||||||
from app.utils.session import generate_user_key
|
from app.utils.session import generate_user_key
|
||||||
from datetime import datetime
|
from datetime import datetime
|
||||||
from dateutil.parser import *
|
from dateutil.parser import *
|
||||||
|
@ -30,7 +31,7 @@ def get_search_results(data):
|
||||||
|
|
||||||
|
|
||||||
def test_get_results(client):
|
def test_get_results(client):
|
||||||
rv = client.get('/search?q=test')
|
rv = client.get(f'/{Endpoint.search}?q=test')
|
||||||
assert rv._status_code == 200
|
assert rv._status_code == 200
|
||||||
|
|
||||||
# Depending on the search, there can be more
|
# Depending on the search, there can be more
|
||||||
|
@ -41,7 +42,7 @@ def test_get_results(client):
|
||||||
|
|
||||||
|
|
||||||
def test_post_results(client):
|
def test_post_results(client):
|
||||||
rv = client.post('/search', data=dict(q='test'))
|
rv = client.post(f'/{Endpoint.search}', data=dict(q='test'))
|
||||||
assert rv._status_code == 200
|
assert rv._status_code == 200
|
||||||
|
|
||||||
# Depending on the search, there can be more
|
# Depending on the search, there can be more
|
||||||
|
@ -52,7 +53,7 @@ def test_post_results(client):
|
||||||
|
|
||||||
|
|
||||||
def test_translate_search(client):
|
def test_translate_search(client):
|
||||||
rv = client.post('/search', data=dict(q='translate hola'))
|
rv = client.post(f'/{Endpoint.search}', data=dict(q='translate hola'))
|
||||||
assert rv._status_code == 200
|
assert rv._status_code == 200
|
||||||
|
|
||||||
# Pretty weak test, but better than nothing
|
# Pretty weak test, but better than nothing
|
||||||
|
@ -62,7 +63,7 @@ def test_translate_search(client):
|
||||||
|
|
||||||
|
|
||||||
def test_block_results(client):
|
def test_block_results(client):
|
||||||
rv = client.post('/search', data=dict(q='pinterest'))
|
rv = client.post(f'/{Endpoint.search}', data=dict(q='pinterest'))
|
||||||
assert rv._status_code == 200
|
assert rv._status_code == 200
|
||||||
|
|
||||||
has_pinterest = False
|
has_pinterest = False
|
||||||
|
@ -74,10 +75,10 @@ def test_block_results(client):
|
||||||
assert has_pinterest
|
assert has_pinterest
|
||||||
|
|
||||||
demo_config['block'] = 'pinterest.com'
|
demo_config['block'] = 'pinterest.com'
|
||||||
rv = client.post('/config', data=demo_config)
|
rv = client.post(f'/{Endpoint.config}', data=demo_config)
|
||||||
assert rv._status_code == 302
|
assert rv._status_code == 302
|
||||||
|
|
||||||
rv = client.post('/search', data=dict(q='pinterest'))
|
rv = client.post(f'/{Endpoint.search}', data=dict(q='pinterest'))
|
||||||
assert rv._status_code == 200
|
assert rv._status_code == 200
|
||||||
|
|
||||||
for link in BeautifulSoup(rv.data, 'html.parser').find_all('a', href=True):
|
for link in BeautifulSoup(rv.data, 'html.parser').find_all('a', href=True):
|
||||||
|
@ -106,7 +107,7 @@ def test_recent_results(client):
|
||||||
}
|
}
|
||||||
|
|
||||||
for time, num_days in times.items():
|
for time, num_days in times.items():
|
||||||
rv = client.post('/search', data=dict(q='test :' + time))
|
rv = client.post(f'/{Endpoint.search}', data=dict(q='test :' + time))
|
||||||
result_divs = get_search_results(rv.data)
|
result_divs = get_search_results(rv.data)
|
||||||
|
|
||||||
current_date = datetime.now()
|
current_date = datetime.now()
|
||||||
|
|
|
@ -1,4 +1,5 @@
|
||||||
from app import app
|
from app import app
|
||||||
|
from app.models.endpoint import Endpoint
|
||||||
|
|
||||||
import json
|
import json
|
||||||
|
|
||||||
|
@ -11,47 +12,47 @@ def test_main(client):
|
||||||
|
|
||||||
|
|
||||||
def test_search(client):
|
def test_search(client):
|
||||||
rv = client.get('/search?q=test')
|
rv = client.get(f'/{Endpoint.search}?q=test')
|
||||||
assert rv._status_code == 200
|
assert rv._status_code == 200
|
||||||
|
|
||||||
|
|
||||||
def test_feeling_lucky(client):
|
def test_feeling_lucky(client):
|
||||||
rv = client.get('/search?q=!%20test')
|
rv = client.get(f'/{Endpoint.search}?q=!%20test')
|
||||||
assert rv._status_code == 303
|
assert rv._status_code == 303
|
||||||
|
|
||||||
|
|
||||||
def test_ddg_bang(client):
|
def test_ddg_bang(client):
|
||||||
# Bang at beginning of query
|
# Bang at beginning of query
|
||||||
rv = client.get('/search?q=!gh%20whoogle')
|
rv = client.get(f'/{Endpoint.search}?q=!gh%20whoogle')
|
||||||
assert rv._status_code == 302
|
assert rv._status_code == 302
|
||||||
assert rv.headers.get('Location').startswith('https://github.com')
|
assert rv.headers.get('Location').startswith('https://github.com')
|
||||||
|
|
||||||
# Move bang to end of query
|
# Move bang to end of query
|
||||||
rv = client.get('/search?q=github%20!w')
|
rv = client.get(f'/{Endpoint.search}?q=github%20!w')
|
||||||
assert rv._status_code == 302
|
assert rv._status_code == 302
|
||||||
assert rv.headers.get('Location').startswith('https://en.wikipedia.org')
|
assert rv.headers.get('Location').startswith('https://en.wikipedia.org')
|
||||||
|
|
||||||
# Move bang to middle of query
|
# Move bang to middle of query
|
||||||
rv = client.get('/search?q=big%20!r%20chungus')
|
rv = client.get(f'/{Endpoint.search}?q=big%20!r%20chungus')
|
||||||
assert rv._status_code == 302
|
assert rv._status_code == 302
|
||||||
assert rv.headers.get('Location').startswith('https://www.reddit.com')
|
assert rv.headers.get('Location').startswith('https://www.reddit.com')
|
||||||
|
|
||||||
# Move '!' to end of the bang
|
# Move '!' to end of the bang
|
||||||
rv = client.get('/search?q=gitlab%20w!')
|
rv = client.get(f'/{Endpoint.search}?q=gitlab%20w!')
|
||||||
assert rv._status_code == 302
|
assert rv._status_code == 302
|
||||||
assert rv.headers.get('Location').startswith('https://en.wikipedia.org')
|
assert rv.headers.get('Location').startswith('https://en.wikipedia.org')
|
||||||
|
|
||||||
# Ensure bang is case insensitive
|
# Ensure bang is case insensitive
|
||||||
rv = client.get('/search?q=!GH%20whoogle')
|
rv = client.get(f'/{Endpoint.search}?q=!GH%20whoogle')
|
||||||
assert rv._status_code == 302
|
assert rv._status_code == 302
|
||||||
assert rv.headers.get('Location').startswith('https://github.com')
|
assert rv.headers.get('Location').startswith('https://github.com')
|
||||||
|
|
||||||
|
|
||||||
def test_config(client):
|
def test_config(client):
|
||||||
rv = client.post('/config', data=demo_config)
|
rv = client.post(f'/{Endpoint.config}', data=demo_config)
|
||||||
assert rv._status_code == 302
|
assert rv._status_code == 302
|
||||||
|
|
||||||
rv = client.get('/config')
|
rv = client.get(f'/{Endpoint.config}')
|
||||||
assert rv._status_code == 200
|
assert rv._status_code == 200
|
||||||
|
|
||||||
config = json.loads(rv.data)
|
config = json.loads(rv.data)
|
||||||
|
@ -62,15 +63,15 @@ def test_config(client):
|
||||||
app.config['CONFIG_DISABLE'] = 1
|
app.config['CONFIG_DISABLE'] = 1
|
||||||
dark_mod = not demo_config['dark']
|
dark_mod = not demo_config['dark']
|
||||||
demo_config['dark'] = dark_mod
|
demo_config['dark'] = dark_mod
|
||||||
rv = client.post('/config', data=demo_config)
|
rv = client.post(f'/{Endpoint.config}', data=demo_config)
|
||||||
assert rv._status_code == 403
|
assert rv._status_code == 403
|
||||||
|
|
||||||
rv = client.get('/config')
|
rv = client.get(f'/{Endpoint.config}')
|
||||||
config = json.loads(rv.data)
|
config = json.loads(rv.data)
|
||||||
assert config['dark'] != dark_mod
|
assert config['dark'] != dark_mod
|
||||||
|
|
||||||
|
|
||||||
def test_opensearch(client):
|
def test_opensearch(client):
|
||||||
rv = client.get('/opensearch.xml')
|
rv = client.get(f'/{Endpoint.opensearch}')
|
||||||
assert rv._status_code == 200
|
assert rv._status_code == 200
|
||||||
assert '<ShortName>Whoogle</ShortName>' in str(rv.data)
|
assert '<ShortName>Whoogle</ShortName>' in str(rv.data)
|
||||||
|
|
Loading…
Reference in New Issue