#!/usr/bin/python3 import sqlite3 from flask import Flask, make_response, request, render_template, g, send_from_directory from flask_paginate import Pagination, get_page_parameter from flask_assets import Environment import confuse import re DATABASE = "file:../data/diffs.db?mode=ro" CONFIG_FILE = "../data/config.yaml" config = confuse.Configuration("headline", __name__) config.set_file(CONFIG_FILE) app = Flask(__name__) assets = Environment(app) assets.register("css", "main.css", output="dist/main.%(version)s.css") assets.url_expire = False def get_db(): db = getattr(g, "_database", None) if db is None: db = g._database = sqlite3.connect(DATABASE, uri=True) db.row_factory = sqlite3.Row return db @app.teardown_appcontext def close_connection(exception): db = getattr(g, "_database", None) if db is not None: db.close() def websearch_to_fts_query(search: str): """ Converts web searches into fts queries: 'this is "a test"' -> '"this" OR "is" OR "a test"' """ return " OR ".join( [ '"' + m.group(0) + '"' for m in re.finditer(r'(?<=")[^"]+(?=")|[^\s"]+', search) ] ) def get_feeds(): return [ { "rss_source": str(conf["rss_source"]), "unique_tag": str(conf["unique_tag"]), "feed_name": str(conf["name"]), } for conf in config["feeds"] ] @app.route("/") def index(): db = get_db().cursor() search = request.args.get("search", type=str, default="") query = websearch_to_fts_query(search) if search else None selected_feeds = request.args.getlist("feeds[]") sql_select = "SELECT * FROM diffs " sql_select_args = [] sql_count = "SELECT count(*) FROM diffs " sql_count_args = [] if query: sql_part_query = "JOIN (SELECT rowid FROM diffs_fts(?)) filter ON filter.rowid = diffs.diff_id " sql_select = sql_select + sql_part_query sql_select_args.append(query) sql_count = sql_count + sql_part_query sql_count_args.append(query) if selected_feeds: sql_part_feeds = f"WHERE feed_name in ({','.join(['?' for _ in range(len(selected_feeds))])}) " sql_select = sql_select + sql_part_feeds sql_select_args += selected_feeds sql_count = sql_count + sql_part_feeds sql_count_args += selected_feeds # flask-paginate page = request.args.get(get_page_parameter(), type=int, default=1) db.execute(sql_count, sql_count_args) diff_count = db.fetchall()[0][0] pagination = Pagination( page=page, total=diff_count, record_name="diffs", css_framework="bootstrap5" ) page_skip = pagination.skip per_page = pagination.per_page # Create and execute final query after getting page info sql_part_pagination = "ORDER BY diff_id DESC LIMIT ? OFFSET ? " sql_select = sql_select + sql_part_pagination sql_select_args += [per_page, page_skip] print(sql_select) db.execute(sql_select, sql_select_args) # This would be a cleaner way to do it, but I have no clue how to make it work. Giving multiple feeds to the query is just seemingly impossible in sqlite. # What about switching to Elasticsearch? :) # if selected_feeds and query: # feeds = str(selected_feeds).strip("[]") # db.execute( # "SELECT * FROM diffs JOIN (SELECT rowid FROM diffs_fts(?)) filter ON filter.rowid = diffs.diff_id WHERE feed_name IN (?) ORDER BY diff_id DESC LIMIT ? OFFSET ?", # (query, feeds, per_page, page_skip), # ) # elif query: # db.execute( # "SELECT * FROM diffs JOIN (SELECT rowid FROM diffs_fts(?)) filter ON filter.rowid = diffs.diff_id ORDER BY diff_id DESC LIMIT ? OFFSET ?", # (query, per_page, page_skip), # ) # elif selected_feeds: # feeds = str(selected_feeds).strip("[]").replace("'", '"') # print(feeds) # db.execute( # f"SELECT * FROM diffs WHERE feed_name IN ({','.join(['?']*len(selected_feeds))}) ORDER BY diff_id DESC LIMIT ? OFFSET ?", # (selected_feeds, per_page, page_skip), # ) # else: # db.execute( # "SELECT * FROM diffs ORDER BY diff_id DESC LIMIT ? OFFSET ?", # (per_page, page_skip), # ) diffs = db.fetchall() html = render_template( "index.html", diffs=diffs, page=page, pagination=pagination, diff_count=diff_count, search=search, feeds=get_feeds(), selected_feeds=selected_feeds, ) res = make_response(html) res.cache_control.max_age = 60 res.cache_control.public = True return res @app.route("/article/") def article_detail(article_id: str): db = get_db().cursor() db.execute("SELECT * FROM diffs WHERE article_id = ?", (article_id,)) result = db.fetchall() if len(result) == 0: return make_response(render_template("not_found.html"), 404) article_url = result[0]["article_url"] return render_template( "article_detail.html", article_id=article_id, article_url=article_url, diffs=result, ) @app.route("/about") def about(): return render_template("about.html") @app.route("/feeds") def feed_list(): return render_template("feeds.html", feeds=get_feeds()) @app.route("/robots.txt") def static_from_root(): return send_from_directory(app.static_folder or "static", request.path[1:]) if __name__ == "__main__": app.run(host="0.0.0.0")