Skip to content

Commit

Permalink
Redoing endpoint for getting counts of unread articles
Browse files Browse the repository at this point in the history
* denorming counts onto feeds
* counting on cluster `read` update
* counting on cluster create
* counting on feed crawling
  • Loading branch information
jaesivsm committed May 26, 2022
1 parent 96c9683 commit 2abb2c5
Show file tree
Hide file tree
Showing 11 changed files with 182 additions and 75 deletions.
4 changes: 4 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,10 @@ init-env:
$(RUN) flask bootstrap-database
$(RUN) flask db stamp $(DB_VER)

db: export JARR_CONFIG = $(CONF_FILE)
db:
$(RUN) flask db $(COMMAND)

stop-env:
$(COMPOSE) down --remove-orphans

Expand Down
16 changes: 8 additions & 8 deletions jarr/api/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@

from jarr.api.common import parse_meaningful_params
from jarr.controllers import ClusterController
from jarr.lib.enums import ReadReason
from jarr.lib.content_generator import migrate_content
from jarr.lib.enums import ReadReason
from jarr.metrics import READ

cluster_ns = Namespace('cluster', description='Cluster related operations')
Expand Down Expand Up @@ -58,16 +58,16 @@ class ClusterResource(Resource):
@cluster_ns.response(404, 'Not found')
@jwt_required()
def get(cluster_id):
cluc = ClusterController()
cluster = cluc.get(id=cluster_id)
cctrl = ClusterController()
cluster = cctrl.get(id=cluster_id)
if cluster.user_id != current_identity.id:
raise Forbidden()
cctrl.user_id = current_identity.id
code = 200
cluster.content = migrate_content(cluster.content)
if not cluster.read:
cluc.update({'id': cluster_id},
{'read': True,
'read_reason': ReadReason.read})
cctrl.update({'id': cluster_id},
{'read': True, 'read_reason': ReadReason.read})
READ.labels(reason=ReadReason.read.value).inc()
cluster.read = True
cluster.read_reason = ReadReason.read
Expand Down Expand Up @@ -106,9 +106,9 @@ def put(cluster_id):
def delete(cluster_id):
try:
ClusterController(current_identity.id).delete(cluster_id)
except NotFound:
except NotFound as not_found:
user_id = ClusterController().get(id=cluster_id).user_id
if user_id != current_identity.id:
raise Forbidden()
raise Forbidden() from not_found
raise
return None, 204
8 changes: 4 additions & 4 deletions jarr/api/one_page_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,9 +92,9 @@ def _get_filters(in_dict):
search_content = in_dict.get("search_content")
filters = []
if search_title or not filters:
filters.append({"title__ilike": "%%%s%%" % search_str})
filters.append({"title__ilike": f"%{search_str}%"})
if search_content:
filters.append({"content__ilike": "%%%s%%" % search_str})
filters.append({"content__ilike": f"%{search_str}%"})
if len(filters) == 1:
filters = filters[0]
else:
Expand Down Expand Up @@ -146,10 +146,10 @@ def put():
clu_ctrl = ClusterController(current_identity.id)
clusters = [clu for clu in clu_ctrl.join_read(limit=None, **filters)
if not attrs.get("only_singles")
or len(clu["feeds_id"]) == 1]
or len(clu["feeds_id"]) == 1]
if clusters:
clu_ctrl.update({'id__in': [clu['id'] for clu in clusters]},
{'read': True,
'read_reason': ReadReason.mass_marked})
READ.labels(ReadReason.mass_marked.value).inc(len(clusters))
return ClusterController(current_identity.id).get_unreads(), 200
return clu_ctrl.get_unreads(), 200
76 changes: 48 additions & 28 deletions jarr/controllers/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,11 @@
from sqlalchemy.sql import exists, select

from jarr.bootstrap import session
from jarr.controllers.article import ArticleController
from jarr.controllers.article import ArticleController, FeedController
from jarr.controllers.article_clusterizer import Clusterizer
from jarr.lib.filter import process_filters
from jarr.metrics import WORKER_BATCH
from jarr.models import Article, Cluster
from jarr.models import Article, Cluster, Feed

from .abstract import AbstractController

Expand All @@ -36,44 +36,61 @@ def clusterize_pending_articles(self):
self.user_id, art_count)
WORKER_BATCH.labels(worker_type='clusterizer').observe(art_count)
clusterizer = Clusterizer(self.user_id)
feed_ids, fctrl = set(), FeedController(self.user_id)
for article in actrl.read(cluster_id=None):
filter_result = process_filters(article.feed.filters,
{'tags': article.tags,
'title': article.title,
'link': article.link})
result = clusterizer.main(article, filter_result).id
results.append(result)
feed_ids.add(article.feed_id)
for feed_id in feed_ids:
fctrl.update_unread_count(feed_id)
return results

def update(self, filters, attrs, return_objs=False, commit=True):
if 'read' in attrs:
fctrl = FeedController(self.user_id)
if attrs['read']:
fctrl.decrease_unread_count(self._to_filters(
**{'read': False, **filters}))
else:
fctrl.increase_unread_count(self._to_filters(
**{'read': True, **filters}))
return super().update(filters, attrs, return_objs, commit)

# UI methods

def _preprocess_per_article_filters(self, filters):
"""Removing filters aimed at articles and transform them into filters
for clusters"""
art_filters = {}
for key in {'__or__', 'title__ilike', 'content__ilike'}\
.intersection(filters):
.intersection(filters):
art_filters[key] = filters.pop(key)

if art_filters:
art_contr = ArticleController(self.user_id)
filters['id__in'] = {line[0] for line in art_contr
.read(**art_filters).with_entities(Article.cluster_id)}
actrl = ArticleController(self.user_id)
filters['id__in'] = {
line[0] for line in
actrl.read(**art_filters).with_entities(Article.cluster_id)}

@staticmethod
def _get_selected(fields, art_f_alias, art_c_alias):
"""Return selected fields"""
selected_fields = list(fields.values())
selected_fields.append(func.array_agg(art_f_alias.feed_id,
type_=ARRAY(Integer)).label('feeds_id'))
selected_fields.append(func.array_agg(
art_f_alias.feed_id, type_=ARRAY(Integer)).label('feeds_id'))
return selected_fields

def _join_on_exist(self, query, alias, attr, value, filters):
val_col = getattr(alias, attr)
exist_query = exists(select([val_col])
.where(and_(alias.cluster_id == Cluster.id,
alias.user_id == self.user_id, val_col == value))
.correlate(Cluster).limit(1))
exist_query = exists(
select([val_col])
.where(and_(alias.cluster_id == Cluster.id,
alias.user_id == self.user_id, val_col == value))
.correlate(Cluster).limit(1))
return query.join(alias, and_(alias.user_id == self.user_id,
alias.cluster_id == Cluster.id,
*filters))\
Expand Down Expand Up @@ -111,8 +128,8 @@ def join_read(self, feed_id=None, limit=JR_PAGE_LENGTH, **filters):
art_feed_alias, art_cat_alias = aliased(Article), aliased(Article)
# DESC of what's going on below :
# base query with the above fields and the aggregations
query = session.query(*self._get_selected(JR_FIELDS,
art_feed_alias, art_cat_alias))
query = session.query(
*self._get_selected(JR_FIELDS, art_feed_alias, art_cat_alias))

# adding parent filter, but we can't just filter on one id, because
# we'll miss all the other parent of the cluster
Expand Down Expand Up @@ -140,6 +157,10 @@ def join_read(self, feed_id=None, limit=JR_PAGE_LENGTH, **filters):
.limit(limit))

def delete(self, obj_id, delete_articles=True):
# handling unread count
fctrl = FeedController(self.user_id)
fctrl.decrease_unread_count(self._to_filters(id=obj_id, read=False))

self.update({'id': obj_id}, {'main_article_id': None}, commit=False)
actrl = ArticleController(self.user_id)
if delete_articles:
Expand Down Expand Up @@ -168,22 +189,21 @@ def _count_by(self, group_on, **filters):
if self.user_id:
filters['user_id'] = self.user_id
return dict(session.query(group_on, func.count(Article.cluster_id))
.outerjoin(Cluster,
Article.cluster_id == Cluster.id)
.filter(*self._to_filters(**filters))
.group_by(group_on).all())
.outerjoin(Cluster,
Article.cluster_id == Cluster.id)
.filter(*self._to_filters(**filters))
.group_by(group_on).all())

def get_unreads(self):
counters = defaultdict(int)
for cid, fid, unread in session.query(Article.category_id,
Article.feed_id,
func.count(Cluster.id))\
.join(Article, and_(Article.cluster_id == Cluster.id,
Article.user_id == self.user_id))\
.filter(and_(Cluster.user_id == self.user_id,
Cluster.read.__eq__(False)))\
.group_by(Article.category_id, Article.feed_id):
fctrl = FeedController(self.user_id)
query = session.query(Feed.category_id, Feed.id, Feed.unread_count,
).where(Feed.user_id == self.user_id)
for cid, fid, unread in query:
counters[f"feed-{fid}"] = unread or 0
if counters[f"feed-{fid}"] < 0:
counters[f"feed-{fid}"] = fctrl.update_unread_count(
fid, return_count=True)
if cid:
counters["categ-%d" % cid] += unread
counters["feed-%d" % fid] = unread
counters[f"categ-{cid}"] += counters[f"feed-{fid}"]
return counters
36 changes: 36 additions & 0 deletions jarr/controllers/feed.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from datetime import datetime, timedelta, timezone

import dateutil.parser
from sqlalchemy import and_, func
from sqlalchemy.sql import delete, select
from werkzeug.exceptions import Forbidden

Expand Down Expand Up @@ -35,6 +36,25 @@ def __actrl(self):
from .article import ArticleController
return ArticleController(self.user_id)

def _set_unread_count(self, filters: set, fctr: int):
q = session.query(Article.feed_id, func.count(Article.id))\
.join(Cluster, and_(Cluster.user_id == self.user_id,
Cluster.id == Article.cluster_id,
Article.user_id == self.user_id, *filters))\
.group_by(Article.feed_id)
counts = defaultdict(list)
for fid, cnt in q:
counts[cnt].append(fid)
for cnt, fids in counts.items():
self.update({'id__in': fids},
{Feed.unread_count: Feed.unread_count + (cnt * fctr)})

def decrease_unread_count(self, filters: set):
self._set_unread_count(filters, -1)

def increase_unread_count(self, filters: set):
self._set_unread_count(filters, 1)

def list_w_categ(self):
feeds = defaultdict(list)
for row in session.query(*LIST_W_CATEG_MAPPING.values())\
Expand Down Expand Up @@ -247,3 +267,19 @@ def select_art(col):
Cluster.user_id == feed.user_id,
Cluster.main_article_id.__eq__(None)))
return super().delete(obj_id)

def update_unread_count(self, feed_id, return_count=False):
where = tuple()
if self.user_id:
where = (Cluster.user_id == self.user_id,
Article.user_id == self.user_id)
unread = session.query(func.count(Article.id))\
.join(Cluster, and_(Article.cluster_id == Cluster.id,
Cluster.user_id == Article.user_id,
Cluster.read.__eq__(False), *where)
).filter(Article.feed_id == feed_id)
if return_count:
unread = unread.first()[0]
self.update({'id': feed_id}, {'unread_count': unread})
if return_count:
return unread
7 changes: 4 additions & 3 deletions jarr/crawler/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,10 @@
@celery_app.task(name='crawler')
@lock('process-feed')
def process_feed(feed_id):
crawler = FeedController().get(id=feed_id).crawler
logger.warning("%r is gonna crawl", crawler)
crawler.crawl()
feed = FeedController().get(id=feed_id)
logger.warning("%r is gonna crawl", feed.crawler)
feed.crawler.crawl()
FeedController(feed.user_id).update_unread_count(feed.id)


@celery_app.task(name='clusterizer')
Expand Down
1 change: 1 addition & 0 deletions jarr/models/feed.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ class Feed(Base): # type: ignore
nullable=False)
created_date = Column(UTCDateTime, default=utc_now)
filters = Column(PickleType, default=[])
unread_count = Column(Integer, default=0)

# integration control
feed_type = Column(Enum(FeedType),
Expand Down
24 changes: 24 additions & 0 deletions migrations/versions/20220524_adding_feed_unread_count.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
"""Adding `Feed.unread_count` column
Revision ID: f67f8fbefe1c
Revises: 511346f4372e
Create Date: 2022-05-24 15:37:02.985536
"""

# revision identifiers, used by Alembic.
revision = 'f67f8fbefe1c'
down_revision = '511346f4372e'
branch_labels = None
depends_on = None

from alembic import op
import sqlalchemy as sa


def upgrade():
op.add_column('feed', sa.Column('unread_count', sa.Integer(),
nullable=True, default=0))

def downgrade():
op.drop_column('feed', 'unread_count')
Loading

0 comments on commit 2abb2c5

Please sign in to comment.