refactoring to prevent web.py being the middle of the universe

This commit is contained in:
Ozzieisaacs
2020-12-12 11:23:17 +01:00
parent 0d7f2e157a
commit 1905e0ee6f
15 changed files with 622 additions and 502 deletions

View File

@ -31,20 +31,25 @@ from datetime import datetime, timedelta
from babel import Locale as LC
from babel.dates import format_datetime
from flask import Blueprint, flash, redirect, url_for, abort, request, make_response, send_from_directory
from flask_login import login_required, current_user, logout_user
from flask import Blueprint, flash, redirect, url_for, abort, request, make_response, send_from_directory, g
from flask_login import login_required, current_user, logout_user, confirm_login
from flask_babel import gettext as _
from sqlalchemy import and_
from sqlalchemy.exc import IntegrityError, OperationalError, InvalidRequestError
from sqlalchemy.sql.expression import func
from sqlalchemy.sql.expression import func, or_
from . import constants, logger, helper, services
from . import db, calibre_db, ub, web_server, get_locale, config, updater_thread, babel, gdriveutils
from .helper import check_valid_domain, send_test_mail, reset_password, generate_password_hash
from .gdriveutils import is_gdrive_ready, gdrive_support
from .web import admin_required, render_title_template, before_request, unconfigured
from .render_template import render_title_template
from . import debug_info
try:
from functools import wraps
except ImportError:
pass # We're not using Python 3
log = logger.create()
feature_support = {
@ -73,6 +78,49 @@ feature_support['gdrive'] = gdrive_support
admi = Blueprint('admin', __name__)
def admin_required(f):
"""
Checks if current_user.role == 1
"""
@wraps(f)
def inner(*args, **kwargs):
if current_user.role_admin():
return f(*args, **kwargs)
abort(403)
return inner
def unconfigured(f):
"""
Checks if calibre-web instance is not configured
"""
@wraps(f)
def inner(*args, **kwargs):
if not config.db_configured:
return f(*args, **kwargs)
abort(403)
return inner
@admi.before_app_request
def before_request():
if current_user.is_authenticated:
confirm_login()
g.constants = constants
g.user = current_user
g.allow_registration = config.config_public_reg
g.allow_anonymous = config.config_anonbrowse
g.allow_upload = config.config_uploading
g.current_theme = config.config_theme
g.config_authors_max = config.config_authors_max
g.shelves_access = ub.session.query(ub.Shelf).filter(
or_(ub.Shelf.is_public == 1, ub.Shelf.user_id == current_user.id)).order_by(ub.Shelf.name).all()
if not config.db_configured and request.endpoint not in (
'admin.basic_configuration', 'login') and '/static/' not in request.path:
return redirect(url_for('admin.basic_configuration'))
@admi.route("/admin")
@login_required
@ -1269,3 +1317,110 @@ def get_updater_status():
except Exception:
status['status'] = 11
return json.dumps(status)
@admi.route('/import_ldap_users')
@login_required
@admin_required
def import_ldap_users():
showtext = {}
try:
new_users = services.ldap.get_group_members(config.config_ldap_group_name)
except (services.ldap.LDAPException, TypeError, AttributeError, KeyError) as e:
log.exception(e)
showtext['text'] = _(u'Error: %(ldaperror)s', ldaperror=e)
return json.dumps(showtext)
if not new_users:
log.debug('LDAP empty response')
showtext['text'] = _(u'Error: No user returned in response of LDAP server')
return json.dumps(showtext)
imported = 0
for username in new_users:
user = username.decode('utf-8')
if '=' in user:
# if member object field is empty take user object as filter
if config.config_ldap_member_user_object:
query_filter = config.config_ldap_member_user_object
else:
query_filter = config.config_ldap_user_object
try:
user_identifier = extract_user_identifier(user, query_filter)
except Exception as e:
log.warning(e)
continue
else:
user_identifier = user
query_filter = None
try:
user_data = services.ldap.get_object_details(user=user_identifier, query_filter=query_filter)
except AttributeError as e:
log.exception(e)
continue
if user_data:
user_login_field = extract_dynamic_field_from_filter(user, config.config_ldap_user_object)
username = user_data[user_login_field][0].decode('utf-8')
# check for duplicate username
if ub.session.query(ub.User).filter(func.lower(ub.User.nickname) == username.lower()).first():
# if ub.session.query(ub.User).filter(ub.User.nickname == username).first():
log.warning("LDAP User %s Already in Database", user_data)
continue
kindlemail = ''
if 'mail' in user_data:
useremail = user_data['mail'][0].decode('utf-8')
if (len(user_data['mail']) > 1):
kindlemail = user_data['mail'][1].decode('utf-8')
else:
log.debug('No Mail Field Found in LDAP Response')
useremail = username + '@email.com'
# check for duplicate email
if ub.session.query(ub.User).filter(func.lower(ub.User.email) == useremail.lower()).first():
log.warning("LDAP Email %s Already in Database", user_data)
continue
content = ub.User()
content.nickname = username
content.password = '' # dummy password which will be replaced by ldap one
content.email = useremail
content.kindle_mail = kindlemail
content.role = config.config_default_role
content.sidebar_view = config.config_default_show
content.allowed_tags = config.config_allowed_tags
content.denied_tags = config.config_denied_tags
content.allowed_column_value = config.config_allowed_column_value
content.denied_column_value = config.config_denied_column_value
ub.session.add(content)
try:
ub.session.commit()
imported +=1
except Exception as e:
log.warning("Failed to create LDAP user: %s - %s", user, e)
ub.session.rollback()
showtext['text'] = _(u'Failed to Create at Least One LDAP User')
else:
log.warning("LDAP User: %s Not Found", user)
showtext['text'] = _(u'At Least One LDAP User Not Found in Database')
if not showtext:
showtext['text'] = _(u'{} User Successfully Imported'.format(imported))
return json.dumps(showtext)
def extract_user_data_from_field(user, field):
match = re.search(field + "=([\d\s\w-]+)", user, re.IGNORECASE | re.UNICODE)
if match:
return match.group(1)
else:
raise Exception("Could Not Parse LDAP User: {}".format(user))
def extract_dynamic_field_from_filter(user, filter):
match = re.search("([a-zA-Z0-9-]+)=%s", filter, re.IGNORECASE | re.UNICODE)
if match:
return match.group(1)
else:
raise Exception("Could Not Parse LDAP Userfield: {}", user)
def extract_user_identifier(user, filter):
dynamic_field = extract_dynamic_field_from_filter(user, filter)
return extract_user_data_from_field(user, dynamic_field)