Source code for ps.herald.bare_html_api

from datetime import datetime, timedelta
from flask import (
    Blueprint,
    redirect,
    render_template,
    request,
    url_for,
)

# from werkzeug.exceptions import abort

import sqlalchemy

from ps.basic import Config
from ps.herald import __version__
from ps.herald.model import Log
from ps.herald.database import get_session


bp = Blueprint("bare_html_api", __name__)


[docs]@bp.route("/rm_logs", methods=["GET"]) def rm_logs(): session = get_session() Config.logger.debug( "got request to rm logentries", extra={"package_version": __version__} ) # 30 days before Today start_date = str(datetime.now() - timedelta(days=30)) deleted = session.query(Log).filter(Log.created < start_date).delete() Config.logger.debug( f"deleted {deleted} Logs ", extra={"package_version": __version__} ) session.commit() return redirect(url_for("bare_html_api.index"))
[docs]@bp.route("/", methods=["GET", "POST"]) def index(): session = get_session() Config.logger.debug("got request", extra={"package_version": __version__}) if request.method == "POST": set_search_params(request) pids = session.query(Log.produkt_id).distinct().all() + [("not_selected",)] pid_options = [ isset(row[0], SEARCH_ATTRIBUTES["PRODUKT_ID"]) for row in pids ] sids = session.query(Log.system_id).distinct().all() + [("not_selected",)] system_id_options = [ isset(row[0], SEARCH_ATTRIBUTES["SYSTEM_ID"]) for row in sids ] sub_sids = session.query(Log.sub_system_id).distinct().all() + [ ("not_selected",) ] sub_system_id_options = [ isset(row[0], SEARCH_ATTRIBUTES["SUB_SYSTEM_ID"]) for row in sub_sids ] sub_sub_sids = session.query(Log.sub_sub_system_id).distinct().all() + [ ("not_selected",) ] sub_sub_system_id_options = [ isset(row[0], SEARCH_ATTRIBUTES["SUB_SUB_SYSTEM_ID"]) for row in sub_sub_sids ] u1s = session.query(Log.user_spec_1).distinct().all() + [("not_selected",)] user_spec_1_id_options = [ isset(row[0], SEARCH_ATTRIBUTES["USER_SPEC_1"]) for row in u1s ] u2s = session.query(Log.user_spec_2).distinct().all() + [("not_selected",)] user_spec_2_id_options = [ isset(row[0], SEARCH_ATTRIBUTES["USER_SPEC_2"]) for row in u2s ] pattern = SEARCH_ATTRIBUTES["pattern"] starting_at = SEARCH_ATTRIBUTES["starting_at"] notify_level = SEARCH_ATTRIBUTES["notify_level"] max_rows = SEARCH_ATTRIBUTES["max_rows"] old_row_first = SEARCH_ATTRIBUTES["old_row_first"] query_obj = session.query(Log) if SEARCH_ATTRIBUTES["PRODUKT_ID"] != "not_selected": query_obj = query_obj.filter( Log.produkt_id == SEARCH_ATTRIBUTES["PRODUKT_ID"] ) if SEARCH_ATTRIBUTES["SYSTEM_ID"] != "not_selected": query_obj = query_obj.filter( Log.system_id == SEARCH_ATTRIBUTES["SYSTEM_ID"] ) if SEARCH_ATTRIBUTES["SUB_SYSTEM_ID"] != "not_selected": query_obj = query_obj.filter( Log.sub_system_id == SEARCH_ATTRIBUTES["SUB_SYSTEM_ID"] ) if SEARCH_ATTRIBUTES["SUB_SUB_SYSTEM_ID"] != "not_selected": query_obj = query_obj.filter( Log.sub_sub_system_id == SEARCH_ATTRIBUTES["SUB_SUB_SYSTEM_ID"] ) if SEARCH_ATTRIBUTES["USER_SPEC_1"] != "not_selected": query_obj = query_obj.filter( Log.user_spec_1 == SEARCH_ATTRIBUTES["USER_SPEC_1"] ) if SEARCH_ATTRIBUTES["USER_SPEC_2"] != "not_selected": query_obj = query_obj.filter( Log.user_spec_2 == SEARCH_ATTRIBUTES["USER_SPEC_2"] ) if SEARCH_ATTRIBUTES["pattern"] != "not_selected": query_obj = query_obj.filter( Log.message.like("%%%s%%" % (SEARCH_ATTRIBUTES["pattern"])) | Log.funcname.like("%%%s%%" % (SEARCH_ATTRIBUTES["pattern"])) | Log.module.like("%%%s%%" % (SEARCH_ATTRIBUTES["pattern"])) ) if SEARCH_ATTRIBUTES["starting_at"] != "not_selected": query_obj = query_obj.filter( Log.created >= SEARCH_ATTRIBUTES["starting_at"] ) if SEARCH_ATTRIBUTES["notify_level"] != "not_selected": query_obj = query_obj.filter( Log.levelno >= int(SEARCH_ATTRIBUTES["notify_level"]) ) if SEARCH_ATTRIBUTES["old_row_first"] == "asc": ordered_by_expr = sqlalchemy.sql.expression.asc(Log.created) else: ordered_by_expr = sqlalchemy.sql.expression.desc(Log.created) if SEARCH_ATTRIBUTES["max_rows"] != "not_selected": records = ( query_obj.order_by(ordered_by_expr) .limit(int(SEARCH_ATTRIBUTES["max_rows"])) .all() ) else: records = query_obj.order_by(ordered_by_expr).all() k = locals() k.pop("self", None) return render_template("bare_html_api/index.html", **k) # return render_template("index.html" ) return "Hello, World!"
[docs]@bp.route("/hello") def hello(): return "Hello, World!"
[docs]@bp.route("/hello_to_ps_basic_logger") def hello_to_ps_basic_logger(): Config.logger.info("Hello World called") return "Hello, World!"
AVAILABLE_SEARCH_ATTRIBUTES = [ "PRODUKT_ID", "SYSTEM_ID", "SUB_SYSTEM_ID", "SUB_SUB_SYSTEM_ID", "USER_SPEC_1", "USER_SPEC_2", "pattern", "starting_at", "notify_level", "max_rows", "old_row_first", "max_rows", ] SEARCH_ATTRIBUTES = {} for attribute in AVAILABLE_SEARCH_ATTRIBUTES: SEARCH_ATTRIBUTES[attribute] = "not_selected" SEARCH_ATTRIBUTES["max_rows"] = "50"
[docs]def set_search_params(request): for attribute in AVAILABLE_SEARCH_ATTRIBUTES: new_value = request.form[attribute].strip() if new_value != "not_selected" and new_value != "": SEARCH_ATTRIBUTES[attribute] = new_value else: SEARCH_ATTRIBUTES[attribute] = "not_selected"
[docs]def isset(param1, param2): if str(param1) == str(param2): return {"value": param1, "selected": True} return {"value": param1, "selected": False}