PEP-8: Fix formatting issues, add CI workflow (#161)

Enforces PEP-8 formatting for all python code

Adds a github action build for checking pep8 formatting using pycodestyle
main
Ben Busby 2020-12-17 16:06:47 -05:00 committed by GitHub
parent b55aad3fdf
commit 375f4ee9fd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 329 additions and 157 deletions

22
.github/workflows/pep8.yml vendored Normal file
View File

@ -0,0 +1,22 @@
name: pep8
on:
push
jobs:
test:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- name: Set up Python
uses: actions/setup-python@v2
with:
python-version: '3.x'
- name: Install dependencies
run: |
python -m pip install --upgrade pip
pip install pycodestyle
- name: Run pycodestyle
run: |
pycodestyle --show-source --show-pep8 app/*
pycodestyle --show-source --show-pep8 test/*

View File

@ -6,20 +6,35 @@ from flask_session import Session
import os
from stem import Signal
app = Flask(__name__, static_folder=os.path.dirname(os.path.abspath(__file__)) + '/static')
app = Flask(__name__, static_folder=os.path.dirname(
os.path.abspath(__file__)) + '/static')
app.user_elements = {}
app.default_key_set = generate_user_keys()
app.no_cookie_ips = []
app.config['SECRET_KEY'] = os.urandom(32)
app.config['SESSION_TYPE'] = 'filesystem'
app.config['VERSION_NUMBER'] = '0.2.1'
app.config['APP_ROOT'] = os.getenv('APP_ROOT', os.path.dirname(os.path.abspath(__file__)))
app.config['STATIC_FOLDER'] = os.getenv('STATIC_FOLDER', os.path.join(app.config['APP_ROOT'], 'static'))
app.config['CONFIG_PATH'] = os.getenv('CONFIG_VOLUME', os.path.join(app.config['STATIC_FOLDER'], 'config'))
app.config['DEFAULT_CONFIG'] = os.path.join(app.config['CONFIG_PATH'], 'config.json')
app.config['SESSION_FILE_DIR'] = os.path.join(app.config['CONFIG_PATH'], 'session')
app.config['BANG_PATH'] = os.getenv('CONFIG_VOLUME', os.path.join(app.config['STATIC_FOLDER'], 'bangs'))
app.config['BANG_FILE'] = os.path.join(app.config['BANG_PATH'], 'bangs.json')
app.config['APP_ROOT'] = os.getenv(
'APP_ROOT',
os.path.dirname(os.path.abspath(__file__)))
app.config['STATIC_FOLDER'] = os.getenv(
'STATIC_FOLDER',
os.path.join(app.config['APP_ROOT'], 'static'))
app.config['CONFIG_PATH'] = os.getenv(
'CONFIG_VOLUME',
os.path.join(app.config['STATIC_FOLDER'], 'config'))
app.config['DEFAULT_CONFIG'] = os.path.join(
app.config['CONFIG_PATH'],
'config.json')
app.config['SESSION_FILE_DIR'] = os.path.join(
app.config['CONFIG_PATH'],
'session')
app.config['BANG_PATH'] = os.getenv(
'CONFIG_VOLUME',
os.path.join(app.config['STATIC_FOLDER'], 'bangs'))
app.config['BANG_FILE'] = os.path.join(
app.config['BANG_PATH'],
'bangs.json')
if not os.path.exists(app.config['CONFIG_PATH']):
os.makedirs(app.config['CONFIG_PATH'])
@ -38,4 +53,4 @@ Session(app)
# Attempt to acquire tor identity, to determine if Tor config is available
send_tor_signal(Signal.HEARTBEAT)
from app import routes
from app import routes # noqa

View File

@ -32,20 +32,27 @@ class Filter:
def reskin(self, page):
# Aesthetic only re-skinning
if self.dark:
page = page.replace('fff', '000').replace('202124', 'ddd').replace('1967D2', '3b85ea')
page = page.replace(
'fff', '000').replace(
'202124', 'ddd').replace(
'1967D2', '3b85ea')
return page
def encrypt_path(self, msg, is_element=False):
# Encrypts path to avoid plaintext results in logs
if is_element:
# Element paths are tracked differently in order for the element key to be regenerated
# once all elements have been loaded
enc_path = Fernet(self.user_keys['element_key']).encrypt(msg.encode()).decode()
# Element paths are encrypted separately from text, to allow key
# regeneration once all items have been served to the user
enc_path = Fernet(
self.user_keys['element_key']
).encrypt(msg.encode()).decode()
self._elements += 1
return enc_path
return Fernet(self.user_keys['text_key']).encrypt(msg.encode()).decode()
return Fernet(
self.user_keys['text_key']
).encrypt(msg.encode()).decode()
def clean(self, soup):
self.main_divs = soup.find('div', {'id': 'main'})
@ -75,7 +82,7 @@ class Filter:
if footer:
# Remove divs that have multiple links beyond just page navigation
[_.decompose() for _ in footer.find_all('div', recursive=False)
if len(_.find_all('a', href=True)) > 3]
if len(_.find_all('a', href=True)) > 3]
header = soup.find('header')
if header:
@ -88,8 +95,9 @@ class Filter:
return
for div in [_ for _ in self.main_divs.find_all('div', recursive=True)]:
has_ad = len([_ for _ in div.find_all('span', recursive=True) if has_ad_content(_.text)])
_ = div.decompose() if has_ad else None
div_ads = [_ for _ in div.find_all('span', recursive=True)
if has_ad_content(_.text)]
_ = div.decompose() if len(div_ads) else None
def fix_question_section(self):
if not self.main_divs:
@ -97,14 +105,14 @@ class Filter:
question_divs = [_ for _ in self.main_divs.find_all(
'div', recursive=False
) if len(_.find_all('h2')) > 0]
) if len(_.find_all('h2')) > 0]
if len(question_divs) == 0:
return
# Wrap section in details element to allow collapse/expand
details = BeautifulSoup(features='lxml').new_tag('details')
summary = BeautifulSoup(features='lxml').new_tag('summary')
details = BeautifulSoup('html.parser').new_tag('details')
summary = BeautifulSoup('html.parser').new_tag('summary')
summary.string = question_divs[0].find('h2').text
question_divs[0].find('h2').decompose()
details.append(summary)
@ -113,7 +121,7 @@ class Filter:
for question_div in question_divs:
questions = [_ for _ in question_div.find_all(
'div', recursive=True
) if _.text.endswith('?')]
) if _.text.endswith('?')]
for question in questions:
question['style'] = 'padding: 10px; font-style: italic;'
@ -131,11 +139,15 @@ class Filter:
element['src'] = BLANK_B64
return
element['src'] = 'element?url=' + self.encrypt_path(element_src, is_element=True) + \
'&type=' + urlparse.quote(mime)
# TODO: Non-mobile image results link to website instead of image
element['src'] = 'element?url=' + self.encrypt_path(
element_src,
is_element=True) + '&type=' + urlparse.quote(mime)
# FIXME: Non-mobile image results link to website instead of image
# if not self.mobile:
# img.append(BeautifulSoup(FULL_RES_IMG.format(element_src), 'html.parser'))
# img.append(
# BeautifulSoup(FULL_RES_IMG.format(element_src),
# 'html.parser'))
def update_styling(self, soup):
# Remove unnecessary button(s)
@ -149,8 +161,9 @@ class Filter:
# Update logo
logo = soup.find('a', {'class': 'l'})
if logo and self.mobile:
logo['style'] = 'display:flex; justify-content:center; align-items:center; color:#685e79; ' \
'font-size:18px; '
logo['style'] = ('display:flex; justify-content:center; '
'align-items:center; color:#685e79; '
'font-size:18px; ')
# Fix search bar length on mobile
try:
@ -163,7 +176,7 @@ class Filter:
# Replace href with only the intended destination (no "utm" type tags)
href = link['href'].replace('https://www.google.com', '')
if 'advanced_search' in href or 'tbm=shop' in href:
# TODO: The "Shopping" tab requires further filtering (see #136)
# FIXME: The "Shopping" tab requires further filtering (see #136)
# Temporarily removing all links to that tab for now.
link.decompose()
return
@ -171,20 +184,26 @@ class Filter:
link['target'] = '_blank'
result_link = urlparse.urlparse(href)
query_link = parse_qs(result_link.query)['q'][0] if '?q=' in href else ''
query_link = parse_qs(
result_link.query
)['q'][0] if '?q=' in href else ''
if query_link.startswith('/'):
# Internal google links (i.e. mail, maps, etc) should still be forwarded to Google
# Internal google links (i.e. mail, maps, etc) should still
# be forwarded to Google
link['href'] = 'https://google.com' + query_link
elif '/search?q=' in href:
# "li:1" implies the query should be interpreted verbatim, so we wrap it in double quotes
# "li:1" implies the query should be interpreted verbatim,
# which is accomplished by wrapping the query in double quotes
if 'li:1' in href:
query_link = '"' + query_link + '"'
new_search = 'search?q=' + self.encrypt_path(query_link)
query_params = parse_qs(urlparse.urlparse(href).query)
for param in VALID_PARAMS:
param_val = query_params[param][0] if param in query_params else ''
if param not in query_params:
continue
param_val = query_params[param][0]
new_search += '&' + param + '=' + param_val
link['href'] = new_search
elif 'url?q=' in href:
@ -199,9 +218,11 @@ class Filter:
# Replace link location if "alts" config is enabled
if self.alt_redirect:
# Search and replace all link descriptions with alternative location
# Search and replace all link descriptions
# with alternative location
link['href'] = get_site_alt(link['href'])
link_desc = link.find_all(text=re.compile('|'.join(SITE_ALTS.keys())))
link_desc = link.find_all(
text=re.compile('|'.join(SITE_ALTS.keys())))
if len(link_desc) == 0:
return

View File

@ -128,7 +128,7 @@ class Config:
{'name': 'Fiji', 'value': 'countryFJ'},
{'name': 'Finland', 'value': 'countryFI'},
{'name': 'France', 'value': 'countryFR'},
{'name': 'France\, Metropolitan', 'value': 'countryFX'},
{'name': r'France\, Metropolitan', 'value': 'countryFX'},
{'name': 'French Guiana', 'value': 'countryGF'},
{'name': 'French Polynesia', 'value': 'countryPF'},
{'name': 'French Southern Territories', 'value': 'countryTF'},
@ -167,7 +167,8 @@ class Config:
{'name': 'Kazakhstan', 'value': 'countryKZ'},
{'name': 'Kenya', 'value': 'countryKE'},
{'name': 'Kiribati', 'value': 'countryKI'},
{'name': 'Korea, Democratic People\'s Republic of', 'value': 'countryKP'},
{'name': 'Korea, Democratic People\'s Republic of',
'value': 'countryKP'},
{'name': 'Korea, Republic of', 'value': 'countryKR'},
{'name': 'Kuwait', 'value': 'countryKW'},
{'name': 'Kyrgyzstan', 'value': 'countryKG'},
@ -181,7 +182,8 @@ class Config:
{'name': 'Lithuania', 'value': 'countryLT'},
{'name': 'Luxembourg', 'value': 'countryLU'},
{'name': 'Macao', 'value': 'countryMO'},
{'name': 'Macedonia, the Former Yugosalv Republic of', 'value': 'countryMK'},
{'name': 'Macedonia, the Former Yugosalv Republic of',
'value': 'countryMK'},
{'name': 'Madagascar', 'value': 'countryMG'},
{'name': 'Malawi', 'value': 'countryMW'},
{'name': 'Malaysia', 'value': 'countryMY'},
@ -253,7 +255,8 @@ class Config:
{'name': 'Solomon Islands', 'value': 'countrySB'},
{'name': 'Somalia', 'value': 'countrySO'},
{'name': 'South Africa', 'value': 'countryZA'},
{'name': 'South Georgia and the South Sandwich Islands', 'value': 'countryGS'},
{'name': 'South Georgia and the South Sandwich Islands',
'value': 'countryGS'},
{'name': 'Spain', 'value': 'countryES'},
{'name': 'Sri Lanka', 'value': 'countryLK'},
{'name': 'Sudan', 'value': 'countrySD'},
@ -310,6 +313,12 @@ class Config:
self.alts = False
self.new_tab = False
self.get_only = False
self.safe_keys = [
'lang_search',
'lang_interface',
'ctry',
'dark'
]
for key, value in kwargs.items():
setattr(self, key, value)
@ -338,12 +347,7 @@ class Config:
array
"""
return key in [
'lang_search',
'lang_interface',
'ctry',
'dark'
]
return key in self.safe_keys
def from_params(self, params) -> 'Config':
"""Modify user config with search parameters. This is primarily

View File

@ -8,9 +8,9 @@ import os
from stem import Signal, SocketError
from stem.control import Controller
# Core Google search URLs
SEARCH_URL = 'https://www.google.com/search?gbv=1&q='
AUTOCOMPLETE_URL = 'https://suggestqueries.google.com/complete/search?client=toolbar&'
AUTOCOMPLETE_URL = ('https://suggestqueries.google.com/'
'complete/search?client=toolbar&')
MOBILE_UA = '{}/5.0 (Android 0; Mobile; rv:54.0) Gecko/54.0 {}/59.0'
DESKTOP_UA = '{}/5.0 (X11; {} x86_64; rv:75.0) Gecko/20100101 {}/75.0'
@ -72,11 +72,16 @@ def gen_query(query, args, config, near_city=None) -> str:
result_tbs = args.get('tbs')
param_dict['tbs'] = '&tbs=' + result_tbs
# Occasionally the 'tbs' param provided by google also contains a field for 'lr', but formatted
# strangely. This is a (admittedly not very elegant) solution for this.
# Ex/ &tbs=qdr:h,lr:lang_1pl --> the lr param needs to be extracted and have the "1" digit removed in this case
# Occasionally the 'tbs' param provided by google also contains a
# field for 'lr', but formatted strangely. This is a rough solution
# for this.
#
# Example:
# &tbs=qdr:h,lr:lang_1pl
# -- the lr param needs to be extracted and remove the leading '1'
sub_lang = [_ for _ in result_tbs.split(',') if 'lr:' in _]
sub_lang = sub_lang[0][sub_lang[0].find('lr:') + 3:len(sub_lang[0])] if len(sub_lang) > 0 else ''
sub_lang = sub_lang[0][sub_lang[0].find('lr:') +
3:len(sub_lang[0])] if len(sub_lang) > 0 else ''
# Ensure search query is parsable
query = urlparse.quote(query)
@ -93,20 +98,26 @@ def gen_query(query, args, config, near_city=None) -> str:
if near_city:
param_dict['near'] = '&near=' + urlparse.quote(near_city)
# Set language for results (lr) if source isn't set, otherwise use the result
# language param provided by google (but with the strange digit(s) removed)
# Set language for results (lr) if source isn't set, otherwise use the
# result language param provided in the results
if 'source' in args:
param_dict['source'] = '&source=' + args.get('source')
param_dict['lr'] = ('&lr=' + ''.join([_ for _ in sub_lang if not _.isdigit()])) if sub_lang else ''
param_dict['lr'] = ('&lr=' + ''.join(
[_ for _ in sub_lang if not _.isdigit()]
)) if sub_lang else ''
else:
param_dict['lr'] = ('&lr=' + config.lang_search) if config.lang_search else ''
param_dict['lr'] = (
'&lr=' + config.lang_search
) if config.lang_search else ''
# Set autocorrected search ignore
# 'nfpr' defines the exclusion of results from an auto-corrected query
if 'nfpr' in args:
param_dict['nfpr'] = '&nfpr=' + args.get('nfpr')
param_dict['cr'] = ('&cr=' + config.ctry) if config.ctry else ''
param_dict['hl'] = ('&hl=' + config.lang_interface.replace('lang_', '')) if config.lang_interface else ''
param_dict['hl'] = (
'&hl=' + config.lang_interface.replace('lang_', '')
) if config.lang_interface else ''
param_dict['safe'] = '&safe=' + ('active' if config.safe else 'off')
for val in param_dict.values():
@ -126,6 +137,7 @@ class Request:
root_path -- the root path of the whoogle instance
config -- the user's current whoogle configuration
"""
def __init__(self, normal_ua, root_path, config: Config):
# Send heartbeat to Tor, used in determining if the user can or cannot
# enable Tor for future requests
@ -143,9 +155,10 @@ class Request:
':' + os.environ.get('WHOOGLE_PROXY_PASS')
self.proxies = {
'http': os.environ.get('WHOOGLE_PROXY_TYPE') + '://' +
auth_str + '@' + os.environ.get('WHOOGLE_PROXY_LOC'),
auth_str + '@' + os.environ.get('WHOOGLE_PROXY_LOC'),
}
self.proxies['https'] = self.proxies['http'].replace('http', 'https')
self.proxies['https'] = self.proxies['http'].replace('http',
'https')
else:
self.proxies = {
'http': 'socks5://127.0.0.1:9050',
@ -169,7 +182,8 @@ class Request:
"""
ac_query = dict(hl=self.language, q=query)
response = self.send(base_url=AUTOCOMPLETE_URL, query=urlparse.urlencode(ac_query)).text
response = self.send(base_url=AUTOCOMPLETE_URL,
query=urlparse.urlencode(ac_query)).text
if response:
dom = etree.fromstring(response)
@ -178,14 +192,14 @@ class Request:
return []
def send(self, base_url=SEARCH_URL, query='', attempt=0) -> Response:
"""Sends an outbound request to a URL. Optionally sends the request using Tor, if
enabled by the user.
"""Sends an outbound request to a URL. Optionally sends the request
using Tor, if enabled by the user.
Args:
base_url: The URL to use in the request
query: The optional query string for the request
attempt: The number of attempts made for the request (used for cycling
through Tor identities, if enabled)
attempt: The number of attempts made for the request
(used for cycling through Tor identities, if enabled)
Returns:
Response: The Response object returned by the requests call
@ -195,21 +209,30 @@ class Request:
'User-Agent': self.modified_user_agent
}
# Validate Tor connection and request new identity if the last one failed
if self.tor and not send_tor_signal(Signal.NEWNYM if attempt > 0 else Signal.HEARTBEAT):
raise TorError("Tor was previously enabled, but the connection has been dropped. Please check your " +
"Tor configuration and try again.", disable=True)
# Validate Tor conn and request new identity if the last one failed
if self.tor and not send_tor_signal(
Signal.NEWNYM if attempt > 0 else Signal.HEARTBEAT):
raise TorError(
"Tor was previously enabled, but the connection has been "
"dropped. Please check your Tor configuration and try again.",
disable=True)
# Make sure that the tor connection is valid, if enabled
if self.tor:
tor_check = requests.get('https://check.torproject.org/', proxies=self.proxies, headers=headers)
tor_check = requests.get('https://check.torproject.org/',
proxies=self.proxies, headers=headers)
self.tor_valid = 'Congratulations' in tor_check.text
if not self.tor_valid:
raise TorError("Tor connection succeeded, but the connection could not be validated by torproject.org",
disable=True)
raise TorError(
"Tor connection succeeded, but the connection could not "
"be validated by torproject.org",
disable=True)
response = requests.get(base_url + query, proxies=self.proxies, headers=headers)
response = requests.get(
base_url + query,
proxies=self.proxies,
headers=headers)
# Retry query with new identity if using Tor (max 10 attempts)
if 'form id="captcha-form"' in response.text and self.tor:

View File

@ -9,7 +9,8 @@ import uuid
from functools import wraps
import waitress
from flask import jsonify, make_response, request, redirect, render_template, send_file, session, url_for
from flask import jsonify, make_response, request, redirect, render_template, \
send_file, session, url_for
from requests import exceptions
from app import app
@ -30,23 +31,30 @@ def auth_required(f):
# Skip if username/password not set
whoogle_user = os.getenv('WHOOGLE_USER', '')
whoogle_pass = os.getenv('WHOOGLE_PASS', '')
if (not whoogle_user or not whoogle_pass) or \
(auth and whoogle_user == auth.username and whoogle_pass == auth.password):
if (not whoogle_user or not whoogle_pass) or (
auth
and whoogle_user == auth.username
and whoogle_pass == auth.password):
return f(*args, **kwargs)
else:
return make_response('Not logged in', 401, {'WWW-Authenticate': 'Basic realm="Login Required"'})
return make_response('Not logged in', 401, {
'WWW-Authenticate': 'Basic realm="Login Required"'})
return decorated
@app.before_request
def before_request_func():
g.request_params = request.args if request.method == 'GET' else request.form
g.request_params = (
request.args if request.method == 'GET' else request.form
)
g.cookies_disabled = False
# Generate session values for user if unavailable
if not valid_user_session(session):
session['config'] = json.load(open(app.config['DEFAULT_CONFIG'])) \
if os.path.exists(app.config['DEFAULT_CONFIG']) else {'url': request.url_root}
if os.path.exists(app.config['DEFAULT_CONFIG']) else {
'url': request.url_root}
session['uuid'] = str(uuid.uuid4())
session['fernet_keys'] = generate_user_keys(True)
@ -63,12 +71,16 @@ def before_request_func():
is_http = request.url.startswith('http://')
if (is_heroku and is_http) or (https_only and is_http):
return redirect(request.url.replace('http://', 'https://', 1), code=308)
return redirect(
request.url.replace('http://', 'https://', 1),
code=308)
g.user_config = Config(**session['config'])
if not g.user_config.url:
g.user_config.url = request.url_root.replace('http://', 'https://') if https_only else request.url_root
g.user_config.url = request.url_root.replace(
'http://',
'https://') if https_only else request.url_root
g.user_request = Request(
request.headers.get('User-Agent'),
@ -82,13 +94,17 @@ def before_request_func():
def after_request_func(response):
if app.user_elements[session['uuid']] <= 0 and '/element' in request.url:
# Regenerate element key if all elements have been served to user
session['fernet_keys']['element_key'] = '' if not g.cookies_disabled else app.default_key_set['element_key']
session['fernet_keys'][
'element_key'] = '' if not g.cookies_disabled else \
app.default_key_set['element_key']
app.user_elements[session['uuid']] = 0
# Check if address consistently has cookies blocked, in which case start removing session
# files after creation.
# Note: This is primarily done to prevent overpopulation of session directories, since browsers that
# block cookies will still trigger Flask's session creation routine with every request.
# Check if address consistently has cookies blocked,
# in which case start removing session files after creation.
#
# Note: This is primarily done to prevent overpopulation of session
# directories, since browsers that block cookies will still trigger
# Flask's session creation routine with every request.
if g.cookies_disabled and request.remote_addr not in app.no_cookie_ips:
app.no_cookie_ips.append(request.remote_addr)
elif g.cookies_disabled and request.remote_addr in app.no_cookie_ips:
@ -101,6 +117,7 @@ def after_request_func(response):
@app.errorhandler(404)
def unknown_page(e):
app.logger.warn(e)
return redirect(g.app_location)
@ -109,7 +126,8 @@ def unknown_page(e):
def index():
# Reset keys
session['fernet_keys'] = generate_user_keys(g.cookies_disabled)
error_message = session['error_message'] if 'error_message' in session else ''
error_message = session[
'error_message'] if 'error_message' in session else ''
session['error_message'] = ''
return render_template('index.html',
@ -128,7 +146,8 @@ def opensearch():
if opensearch_url.endswith('/'):
opensearch_url = opensearch_url[:-1]
get_only = g.user_config.get_only or 'Chrome' in request.headers.get('User-Agent')
get_only = g.user_config.get_only or 'Chrome' in request.headers.get(
'User-Agent')
return render_template(
'opensearch.xml',
@ -147,16 +166,23 @@ def autocomplete():
# Search bangs if the query begins with "!", but not "! " (feeling lucky)
if q.startswith('!') and len(q) > 1 and not q.startswith('! '):
return jsonify([q, [bang_json[_]['suggestion'] for _ in bang_json if _.startswith(q)]])
return jsonify([q, [bang_json[_]['suggestion'] for _ in bang_json if
_.startswith(q)]])
if not q and not request.data:
return jsonify({'?': []})
elif request.data:
q = urlparse.unquote_plus(request.data.decode('utf-8').replace('q=', ''))
q = urlparse.unquote_plus(
request.data.decode('utf-8').replace('q=', ''))
# Return a list of suggestions for the query
# Note: If Tor is enabled, this returns nothing, as the request is almost always rejected
return jsonify([q, g.user_request.autocomplete(q) if not g.user_config.tor else []])
#
# Note: If Tor is enabled, this returns nothing, as the request is
# almost always rejected
return jsonify([
q,
g.user_request.autocomplete(q) if not g.user_config.tor else []
])
@app.route('/search', methods=['GET', 'POST'])
@ -168,7 +194,8 @@ def search():
# Update user config if specified in search args
g.user_config = g.user_config.from_params(g.request_params)
search_util = RoutingUtils(request, g.user_config, session, cookies_disabled=g.cookies_disabled)
search_util = RoutingUtils(request, g.user_config, session,
cookies_disabled=g.cookies_disabled)
query = search_util.new_search_query()
resolved_bangs = search_util.bang_operator(bang_json)
@ -183,14 +210,17 @@ def search():
try:
response, elements = search_util.generate_response()
except TorError as e:
session['error_message'] = e.message + ("\\n\\nTor config is now disabled!" if e.disable else "")
session['config']['tor'] = False if e.disable else session['config']['tor']
session['error_message'] = e.message + (
"\\n\\nTor config is now disabled!" if e.disable else "")
session['config']['tor'] = False if e.disable else session['config'][
'tor']
return redirect(url_for('.index'))
if search_util.feeling_lucky or elements < 0:
return redirect(response, code=303)
# Keep count of external elements to fetch before element key can be regenerated
# Keep count of external elements to fetch before
# the element key can be regenerated
app.user_elements[session['uuid']] = elements
return render_template(
@ -200,12 +230,13 @@ def search():
dark_mode=g.user_config.dark,
response=response,
version_number=app.config['VERSION_NUMBER'],
search_header=render_template(
search_header=(render_template(
'header.html',
dark_mode=g.user_config.dark,
query=urlparse.unquote(query),
search_type=search_util.search_type,
mobile=g.user_request.mobile) if 'isch' not in search_util.search_type else '')
mobile=g.user_request.mobile)
if 'isch' not in search_util.search_type else ''))
@app.route('/config', methods=['GET', 'POST', 'PUT'])
@ -215,8 +246,12 @@ def config():
return json.dumps(g.user_config.__dict__)
elif request.method == 'PUT':
if 'name' in request.args:
config_pkl = os.path.join(app.config['CONFIG_PATH'], request.args.get('name'))
session['config'] = pickle.load(open(config_pkl, 'rb')) if os.path.exists(config_pkl) else session['config']
config_pkl = os.path.join(
app.config['CONFIG_PATH'],
request.args.get('name'))
session['config'] = (pickle.load(open(config_pkl, 'rb'))
if os.path.exists(config_pkl)
else session['config'])
return json.dumps(session['config'])
else:
return json.dumps({})
@ -227,11 +262,16 @@ def config():
# Save config by name to allow a user to easily load later
if 'name' in request.args:
pickle.dump(config_data, open(os.path.join(app.config['CONFIG_PATH'], request.args.get('name')), 'wb'))
pickle.dump(
config_data,
open(os.path.join(
app.config['CONFIG_PATH'],
request.args.get('name')), 'wb'))
# Overwrite default config if user has cookies disabled
if g.cookies_disabled:
open(app.config['DEFAULT_CONFIG'], 'w').write(json.dumps(config_data, indent=4))
open(app.config['DEFAULT_CONFIG'], 'w').write(
json.dumps(config_data, indent=4))
session['config'] = config_data
return redirect(config_data['url'])
@ -274,7 +314,8 @@ def element():
except exceptions.RequestException:
pass
empty_gif = base64.b64decode('R0lGODlhAQABAIAAAP///////yH5BAEKAAEALAAAAAABAAEAAAICTAEAOw==')
empty_gif = base64.b64decode(
'R0lGODlhAQABAIAAAP///////yH5BAEKAAEALAAAAAABAAEAAAICTAEAOw==')
return send_file(io.BytesIO(empty_gif), mimetype='image/gif')
@ -282,38 +323,62 @@ def element():
@auth_required
def window():
get_body = g.user_request.send(base_url=request.args.get('location')).text
get_body = get_body.replace('src="/', 'src="' + request.args.get('location') + '"')
get_body = get_body.replace('href="/', 'href="' + request.args.get('location') + '"')
get_body = get_body.replace('src="/',
'src="' + request.args.get('location') + '"')
get_body = get_body.replace('href="/',
'href="' + request.args.get('location') + '"')
results = BeautifulSoup(get_body, 'html.parser')
results = bsoup(get_body, 'html.parser')
try:
for script in results('script'):
script.decompose()
except Exception:
pass
for script in results('script'):
script.decompose()
return render_template('display.html', response=results)
def run_app():
parser = argparse.ArgumentParser(description='Whoogle Search console runner')
parser.add_argument('--port', default=5000, metavar='<port number>',
help='Specifies a port to run on (default 5000)')
parser.add_argument('--host', default='127.0.0.1', metavar='<ip address>',
help='Specifies the host address to use (default 127.0.0.1)')
parser.add_argument('--debug', default=False, action='store_true',
help='Activates debug mode for the server (default False)')
parser.add_argument('--https-only', default=False, action='store_true',
help='Enforces HTTPS redirects for all requests')
parser.add_argument('--userpass', default='', metavar='<username:password>',
help='Sets a username/password basic auth combo (default None)')
parser.add_argument('--proxyauth', default='', metavar='<username:password>',
help='Sets a username/password for a HTTP/SOCKS proxy (default None)')
parser.add_argument('--proxytype', default='', metavar='<socks4|socks5|http>',
help='Sets a proxy type for all connections (default None)')
parser.add_argument('--proxyloc', default='', metavar='<location:port>',
help='Sets a proxy location for all connections (default None)')
parser = argparse.ArgumentParser(
description='Whoogle Search console runner')
parser.add_argument(
'--port',
default=5000,
metavar='<port number>',
help='Specifies a port to run on (default 5000)')
parser.add_argument(
'--host',
default='127.0.0.1',
metavar='<ip address>',
help='Specifies the host address to use (default 127.0.0.1)')
parser.add_argument(
'--debug',
default=False,
action='store_true',
help='Activates debug mode for the server (default False)')
parser.add_argument(
'--https-only',
default=False,
action='store_true',
help='Enforces HTTPS redirects for all requests')
parser.add_argument(
'--userpass',
default='',
metavar='<username:password>',
help='Sets a username/password basic auth combo (default None)')
parser.add_argument(
'--proxyauth',
default='',
metavar='<username:password>',
help='Sets a username/password for a HTTP/SOCKS proxy (default None)')
parser.add_argument(
'--proxytype',
default='',
metavar='<socks4|socks5|http>',
help='Sets a proxy type for all connections (default None)')
parser.add_argument(
'--proxyloc',
default='',
metavar='<location:port>',
help='Sets a proxy location for all connections (default None)')
args = parser.parse_args()
if args.userpass:

View File

@ -7,14 +7,16 @@ SKIP_ARGS = ['ref_src', 'utm']
FULL_RES_IMG = '<br/><a href="{}">Full Image</a>'
GOOG_IMG = '/images/branding/searchlogo/1x/googlelogo'
LOGO_URL = GOOG_IMG + '_desk'
BLANK_B64 = '''

'''
BLANK_B64 = ('data:image/png;base64,'
'iVBORw0KGgoAAAANSUhEUgAAAAoAAAAKCAQAAAAnOwc2AAAAD0lEQVR42mNkw'
'AIYh7IgAAVVAAuInjI5AAAAAElFTkSuQmCC')
# Ad keywords
BLACKLIST = [
'ad', 'anuncio', 'annuncio', 'annonce', 'Anzeige', '广告', '廣告', 'Reklama', 'Реклама', 'Anunț', '광고',
'annons', 'Annonse', 'Iklan', '広告', 'Augl.', 'Mainos', 'Advertentie', 'إعلان', 'Գովազդ', 'विज्ञापन', 'Reklam',
'آگهی', 'Reklāma', 'Reklaam', 'Διαφήμιση', 'מודעה', 'Hirdetés', 'Anúncio'
'ad', 'anuncio', 'annuncio', 'annonce', 'Anzeige', '广告', '廣告', 'Reklama',
'Реклама', 'Anunț', '광고', 'annons', 'Annonse', 'Iklan', '広告', 'Augl.',
'Mainos', 'Advertentie', 'إعلان', 'Գովազդ', 'विज्ञापन', 'Reklam', 'آگهی',
'Reklāma', 'Reklaam', 'Διαφήμιση', 'מודעה', 'Hirdetés', 'Anúncio'
]
SITE_ALTS = {
@ -25,7 +27,8 @@ SITE_ALTS = {
def has_ad_content(element: str):
return element.upper() in (value.upper() for value in BLACKLIST) or '' in element
return element.upper() in (value.upper() for value in BLACKLIST) \
or '' in element
def get_first_link(soup):

View File

@ -1,25 +1,26 @@
from app.filter import Filter, get_first_link
from app.utils.session_utils import generate_user_keys
from app.request import gen_query
from bs4 import BeautifulSoup
from bs4 import BeautifulSoup as bsoup
from cryptography.fernet import Fernet, InvalidToken
from flask import g
from typing import Any, Tuple
TOR_BANNER = '<hr><h1 style="text-align: center">You are using Tor</h1><hr>'
class RoutingUtils:
def __init__(self, request, config, session, cookies_disabled=False):
self.request_params = request.args if request.method == 'GET' else request.form
method = request.method
self.request_params = request.args if method == 'GET' else request.form
self.user_agent = request.headers.get('User-Agent')
self.feeling_lucky = False
self.config = config
self.session = session
self.query = ''
self.cookies_disabled = cookies_disabled
self.search_type = self.request_params.get('tbm') if 'tbm' in self.request_params else ''
self.search_type = self.request_params.get(
'tbm') if 'tbm' in self.request_params else ''
def __getitem__(self, name):
return getattr(self, name)
@ -45,7 +46,9 @@ class RoutingUtils:
else:
# Attempt to decrypt if this is an internal link
try:
q = Fernet(self.session['fernet_keys']['text_key']).decrypt(q.encode()).decode()
q = Fernet(
self.session['fernet_keys']['text_key']
).decrypt(q.encode()).decode()
except InvalidToken:
pass
@ -53,29 +56,40 @@ class RoutingUtils:
self.session['fernet_keys']['text_key'] = generate_user_keys(
cookies_disabled=self.cookies_disabled)['text_key']
# Format depending on whether or not the query is a "feeling lucky" query
# Strip leading '! ' for "feeling lucky" queries
self.feeling_lucky = q.startswith('! ')
self.query = q[2:] if self.feeling_lucky else q
return self.query
def bang_operator(self, bangs_dict: dict) -> str:
for operator in bangs_dict.keys():
if self.query.split(' ')[0] == operator:
return bangs_dict[operator]['url'].format(self.query.replace(operator, '').strip())
if self.query.split(' ')[0] != operator:
continue
return bangs_dict[operator]['url'].format(
self.query.replace(operator, '').strip())
return ''
def generate_response(self) -> Tuple[Any, int]:
mobile = 'Android' in self.user_agent or 'iPhone' in self.user_agent
content_filter = Filter(self.session['fernet_keys'], mobile=mobile, config=self.config)
full_query = gen_query(self.query, self.request_params, self.config, content_filter.near)
content_filter = Filter(
self.session['fernet_keys'],
mobile=mobile,
config=self.config)
full_query = gen_query(
self.query,
self.request_params,
self.config,
content_filter.near)
get_body = g.user_request.send(query=full_query)
# Produce cleanable html soup from response
html_soup = BeautifulSoup(content_filter.reskin(get_body.text), 'html.parser')
html_soup.insert(0, BeautifulSoup(
TOR_BANNER,
features='lxml') if g.user_request.tor_valid else BeautifulSoup("", features="lxml"))
html_soup = bsoup(content_filter.reskin(get_body.text), 'html.parser')
html_soup.insert(
0,
bsoup(TOR_BANNER, 'html.parser')
if g.user_request.tor_valid else bsoup('', 'html.parser'))
if self.feeling_lucky:
return get_first_link(html_soup), 1
@ -84,10 +98,12 @@ class RoutingUtils:
# Append user config to all search links, if available
param_str = ''.join('&{}={}'.format(k, v)
for k, v in self.request_params.to_dict(flat=True).items()
if self.config.is_safe_key(k))
for k, v in
self.request_params.to_dict(flat=True).items()
if self.config.is_safe_key(k))
for link in formatted_results.find_all('a', href=True):
if 'search?' not in link['href'] or link['href'].index('search?') > 1:
if 'search?' not in link['href'] or link['href'].index(
'search?') > 1:
continue
link['href'] += param_str

View File

@ -18,6 +18,7 @@ more-itertools==8.3.0
packaging==20.4
pluggy==0.13.1
py==1.8.1
pycodestyle==2.6.0
pycparser==2.19
pyOpenSSL==19.1.0
pyparsing==2.4.7

View File

@ -3,13 +3,12 @@ from app.filter import Filter
from app.utils.session_utils import generate_user_keys
from datetime import datetime
from dateutil.parser import *
import json
import os
def get_search_results(data):
secret_key = generate_user_keys()
soup = Filter(user_keys=secret_key).clean(BeautifulSoup(data, 'html.parser'))
soup = Filter(user_keys=secret_key).clean(
BeautifulSoup(data, 'html.parser'))
main_divs = soup.find('div', {'id': 'main'})
assert len(main_divs) > 1
@ -17,7 +16,9 @@ def get_search_results(data):
result_divs = []
for div in main_divs:
# Result divs should only have 1 inner div
if len(list(div.children)) != 1 or not div.findChild() or 'div' not in div.findChild().name:
if (len(list(div.children)) != 1
or not div.findChild()
or 'div' not in div.findChild().name):
continue
result_divs.append(div)
@ -78,6 +79,7 @@ def test_recent_results(client):
try:
date = parse(date_span)
assert (current_date - date).days <= (num_days + 5) # Date can have a little bit of wiggle room
# Date can have a little bit of wiggle room
assert (current_date - date).days <= (num_days + 5)
except ParserError:
pass