Skip to content

Commit

Permalink
Crawler: refact of the clusterizing process
Browse files Browse the repository at this point in the history
Fetching corpus for TF-IDF once, avoiding recalculation of simple
vectors
Filtering corpus per article by vector size
Introducing delay between clusterizing
Introducing min vector size
No clustering on empty processed content (video / image / embedded)
  • Loading branch information
jaesivsm committed May 20, 2020
1 parent 86e7eb9 commit c793afc
Show file tree
Hide file tree
Showing 19 changed files with 523 additions and 326 deletions.
2 changes: 1 addition & 1 deletion Pipfile
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ rauth = "==0.7.3"
redis = "==2.10.6"
requests = ">=2.21.0"
SQLAlchemy = "==1.3.3"
the-conf = "==0.0.15"
the-conf = "==0.0.16"
trafilatura = "==0.4.1"
flask-cors = "==3.0.8"

Expand Down
6 changes: 3 additions & 3 deletions Pipfile.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 7 additions & 1 deletion example_conf/jarr.circleci.json
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,11 @@
"login": "admin",
"passwd": "admin"
},
"clustering": {
"tfidf": {
"min_vector_size": 2
}
},
"log": {
"level": 0
},
Expand All @@ -13,5 +18,6 @@
},
"celery": {"broker": "amqp://0.0.0.0//",
"backend": "redis://0.0.0.0:6379/0",
"BROKER_URL": "amqp://0.0.0.0//"}
"BROKER_URL": "amqp://0.0.0.0//"},
"auth": {"secret_key": "my not so secret key"}
}
8 changes: 7 additions & 1 deletion example_conf/jarr.test.json
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,11 @@
"login": "admin",
"passwd": "admin"
},
"clustering": {
"tfidf": {
"min_vector_size": 2
}
},
"log": {
"level": 0
},
Expand All @@ -14,5 +19,6 @@
},
"celery": {"broker": "amqp://0.0.0.0//",
"backend": "redis://0.0.0.0:6379/0",
"BROKER_URL": "amqp://0.0.0.0//"}
"BROKER_URL": "amqp://0.0.0.0//"},
"auth": {"secret_key": "my not so secret key"}
}
73 changes: 1 addition & 72 deletions jarr/bootstrap.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,78 +14,7 @@

from prometheus_distributed_client import set_redis_conn

conf = TheConf({'config_files': ['/etc/jarr/jarr.json', '~/.config/jarr.json'],
'config_file_environ': ['JARR_CONFIG'],
'source_order': ['env', 'files'],
'parameters': [
{'jarr_testing': {'default': False, 'type': bool}},
{'cluster_default': [
{'time_delta': {'default': 20, 'type': int}},
{'tfidf_enabled': {'default': True, 'type': bool}},
{'tfidf_min_sample_size': {'default': 10, 'type': int}},
{'tfidf_min_score': {'default': .75, 'type': float}}]},
{'timezone': {'default': 'Europe/Paris', 'type': str}},
{'app': [{'url': {'default': 'http://0.0.0.0:3000'}}]},
{'api': [{'scheme': {'default': 'http'}},
{'admin_mail': {'default': None}},
{'server_name': {'default': '', 'type': str}}]},
{'db': [{'pg_uri': {'default': 'postgresql://postgresql/jarr'}},
{'redis': [{'host': {'default': 'redis'}},
{'db': {'default': 1, 'type': int}},
{'port': {'default': 6379, 'type': int}},
{'password': {'default': None}}]},
{'metrics': [{'host': {'default': 'redis'}},
{'db': {'default': 2, 'type': int}},
{'port': {'default': 6379, 'type': int}},
{'password': {'default': None}}]},
]},
{'celery': [{'broker': {'default': 'amqp://rabbitmq//'}},
{'backend': {'default': 'redis://redis:6379/0'}},
{'broker_url': {'default': 'amqp://rabbitmq//'}},
{'task_serializer': {'default': 'json'}},
{'result_serializer': {'default': 'json'}},
{'timezone': {'default': 'Europe/Paris'}},
{'enable_utc': {'default': True, 'type': bool}},
{'imports': {'default': ('ep_celery',
'jarr.crawler.main'),
'type': tuple}},
{'task_default_queue': {'default': 'jarr'}},
{'task_default_exchange': {'default': 'jarr'}}]},
{'log': [{'level': {'default': logging.WARNING, 'type': int}},
{'path': {'default': "jarr.log"}}]},
{'crawler': [{'idle_delay': {'default': 2 * 60, 'type': int}},
{'user_agent': {'default': 'Mozilla/5.0 (compatible; '
'jarr.info)'}},
{'batch_size': {'default': 0, 'type': int}},
{'timeout': {'default': 30, 'type': int}}]},
{'plugins': [{'readability_key': {'default': ''}},
{'rss_bridge': {'default': ''}}]},
{'auth': [{'secret_key': {'default': str(random.getrandbits(128))
}},
{'jwt_header_prefix': {'default': 'JWT', 'type': str}},
{'expiration_sec': {'default': 24 * 3600, 'type': int}},
{'allow_signup': {'default': True, 'type': bool}}]},
{'oauth': [{'allow_signup': {'default': False, 'type': bool}},
{'twitter': [{'id': {'default': ''}},
{'secret': {'default': ''}}]},
{'facebook': [{'id': {'default': ''}},
{'secret': {'default': ''}}]},
{'google': [{'id': {'default': ''}},
{'secret': {'default': ''}}]},
{'linuxfr': [{'id': {'default': ''}},
{'secret': {'default': ''}}]}]},
{'notification': [{'email': {'default': ''}},
{'host': {'default': ''}},
{'starttls': {'type': bool, 'default': True}},
{'port': {'type': int, 'default': 587}},
{'login': {'default': ''}},
{'password': {'default': ''}}]},
{'feed': [{'error_max': {'type': int, 'default': 6}},
{'error_threshold': {'type': int, 'default': 3}},
{'min_expires': {'type': int, 'default': 3600 / 2}},
{'max_expires': {'type': int, 'default': 7 * 24 * 3600}},
{'stop_fetch': {'default': 30, 'type': int}}]},
]})
conf = TheConf('jarr/metaconf.yml')


def is_secure_served():
Expand Down
4 changes: 4 additions & 0 deletions jarr/controllers/abstract.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,10 @@ def _to_filters(cls, **filters):
if key == '__or__':
db_filters.add(or_(*[and_(*cls._to_filters(**sub_filter))
for sub_filter in value]))
elif key == '__and__':
for sub_filter in value:
for k, v in sub_filter.items():
db_filters.add(cls._to_comparison(k, cls._db_cls)(v))
else:
db_filters.add(cls._to_comparison(key, cls._db_cls)(value))
return db_filters
Expand Down
225 changes: 225 additions & 0 deletions jarr/controllers/article_clusterizer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,225 @@
import logging
from functools import partial
from collections import defaultdict
from datetime import timedelta

from sqlalchemy import and_, or_

from jarr.bootstrap import session, conf
from jarr.controllers import ArticleController
from jarr.controllers.article import to_vector
from jarr.lib.article_cleaner import fetch_and_parse
from jarr.lib.clustering_af.grouper import get_best_match_and_score
from jarr.lib.content_generator import generate_content
from jarr.lib.enums import ArticleType, ClusterReason, ReadReason
from jarr.metrics import ARTICLE_CREATION
from jarr.signals import event
from jarr.models import Article, Cluster, Feed
from jarr.utils import get_tfidf_pref

logger = logging.getLogger(__name__)
NO_CLUSTER_TYPE = {ArticleType.image, ArticleType.video, ArticleType.embedded}
WAKABLE_REASONS = {ReadReason.marked, ReadReason.mass_marked,
ReadReason.filtered}
cluster_event = partial(event.send, module=__name__)


class Clusterizer:

def __init__(self, user_id=None):
self.user_id = user_id
self.corpus = None # type: list
self._config_cache = defaultdict(lambda: defaultdict(dict))

def get_config(self, obj, attr):
def cache(val):
self._config_cache[obj.__class__.__name__][attr][obj.id] = val
return val
if obj.id in self._config_cache[obj.__class__.__name__]:
return self._config_cache[obj.__class__.__name__][obj.id]
if obj.__class__.__name__ == "Article":
return cache(self.get_config(obj.feed, attr))
if obj.__class__.__name__ == "Cluster":
return cache(all(self.get_config(article, attr)
for article in obj.articles))
val = getattr(obj, attr)
if val is not None:
logger.debug("%r.%s is %r", obj, attr, val)
return cache(val)
if obj.__class__.__name__ == "Feed" and obj.category_id:
return cache(self.get_config(obj.category, attr))
return cache(self.get_config(obj.user, attr))

def add_to_corpus(self, article):
if self.corpus is None:
self.corpus = []
if article.article_type not in NO_CLUSTER_TYPE:
self.corpus.append(article)

def get_neighbors(self, article):
if self.corpus is None:
filters = {"__and__": [{'vector__ne': None}, {'vector__ne': ''}],
"article_type": None}
self.corpus = list(self._get_query_for_clustering(
article, filters=filters, filter_tfidf=True))
tfidf_conf = conf.clustering.tfidf
low_bound = article.simple_vector_magnitude / tfidf_conf.size_factor
high_bound = article.simple_vector_magnitude * tfidf_conf.size_factor
low_bound = max(tfidf_conf.min_vector_size, low_bound)
for candidate in self.corpus:
if low_bound <= candidate.simple_vector_magnitude <= high_bound:
yield candidate

def _get_cluster_by_link(self, article):
for candidate in self._get_query_for_clustering(article,
{'link_hash': article.link_hash}):
article.cluster_reason = ClusterReason.link
cluster_event(context='link', result='match', level=logging.INFO)
return candidate.cluster

def _get_cluster_by_similarity(self, article):
neighbors = list(self.get_neighbors(article))

min_sample_size = get_tfidf_pref(article.feed, 'min_sample_size')
if len(neighbors) < min_sample_size:
logger.info('only %d docs against %d required, no TFIDF for %r',
len(neighbors), min_sample_size, article)
cluster_event(context='tfidf', result='sample size forbird')
return None

best_match, score = get_best_match_and_score(article, neighbors)
if score > get_tfidf_pref(article.feed, 'min_score'):
article.cluster_reason = ClusterReason.tf_idf
article.cluster_score = int(score * 1000)
article.cluster_tfidf_neighbor_size = len(neighbors)
article.cluster_tfidf_with = best_match.id
cluster_event(context='tfidf', result='match', level=logging.INFO)
return best_match.cluster
cluster_event(context='tfidf', result='miss')

def _get_query_for_clustering(self, article, filters, filter_tfidf=False):
time_delta = timedelta(days=conf.clustering.time_delta)
date_cond = {'date__lt': article.date + time_delta,
'date__gt': article.date - time_delta}
retr_cond = {'retrieved_date__lt': article.retrieved_date + time_delta,
'retrieved_date__gt': article.retrieved_date - time_delta}
filters.update({'cluster_id__ne': None,
'user_id': article.user_id,
'id__ne': article.id,
'__or__': [date_cond, retr_cond]})
if article.category_id \
and not self.get_config(article, 'cluster_same_category'):
filters['category_id__ne'] = article.category_id
if not self.get_config(article, 'cluster_same_feed'):
filters['feed_id__ne'] = article.feed_id

feed_join = [Feed.id == Article.feed_id,
or_(Feed.cluster_enabled.__eq__(True),
Feed.cluster_enabled.__eq__(None))]
if filter_tfidf:
feed_join.append(or_(Feed.cluster_tfidf_enabled.__eq__(True),
Feed.cluster_tfidf_enabled.__eq__(None)))

query = ArticleController(article.user_id).read(**filters)\
.join(Feed, and_(*feed_join))

# operations involving categories are complicated, handling in software
for candidate in query:
if not self.get_config(candidate, "cluster_enabled"):
continue
if filter_tfidf and \
not self.get_config(candidate, "cluster_tfidf_enabled"):
continue
yield candidate

def _create_from_article(self, article,
cluster_read=None, cluster_liked=False,
parsing_result=None):
cluster = Cluster(user_id=article.user_id)
article.cluster_reason = ClusterReason.original
return self.enrich_cluster(cluster, article,
cluster_read, cluster_liked,
force_article_as_main=True,
parsing_result=parsing_result)

def enrich_cluster(self, cluster, article,
cluster_read=None, cluster_liked=False,
force_article_as_main=False, parsing_result=None):
parsing_result = parsing_result or {}
article.cluster = cluster
# handling read status
if cluster.read is None: # no read status, new cluster
cluster.read = bool(cluster_read)
elif cluster_read is not None: # filters indicate a read status
cluster.read = cluster.read and cluster_read
cluster.read_reason = ReadReason.filtered
logger.debug('marking as read because of filter %r', cluster)
elif (cluster.read # waking up a cluster
and cluster.read_reason in WAKABLE_REASONS
and self.get_config(article, 'cluster_wake_up')
and self.get_config(cluster, 'cluster_wake_up')):
cluster.read = False
logger.debug('waking up %r', cluster)
# once one article is liked the cluster is liked
cluster.liked = cluster.liked or cluster_liked
if force_article_as_main or cluster.main_date > article.date:
cluster.main_title = parsing_result.get('title', article.title)
cluster.main_date = article.date
cluster.main_link = article.link
cluster.main_feed_title = article.feed.title
cluster.main_article_id = article.id
if not cluster.content:
success, content = generate_content(article, parsing_result)
if success:
cluster.content = content
self.add_to_corpus(article)
session.add(cluster)
session.add(article)
session.commit()
ARTICLE_CREATION.labels(read_reason=cluster.read_reason,
read='read' if cluster.read else 'unread',
cluster=article.cluster_reason.value).inc()
return cluster

def main(self, article, filter_result=None):
"""Will add given article to a fitting cluster or create a cluster
fitting that article."""
filter_result = filter_result or {}
allow_clustering = filter_result.get('clustering', True)
filter_read = filter_result.get('read')
filter_liked = filter_result.get('liked')
logger.info('%r - processed filter: %r', article, filter_result)
cluster_config = self.get_config(article.feed, 'cluster_enabled')

# fetching article so that vector comparison is made on full content
parsing_result = None
if article.feed.truncated_content:
parsing_result = fetch_and_parse(article.link)
if parsing_result.get('parsed_content'):
article.reset_simple_vector()
article.vector = to_vector(
article.title, article.tags, article.content,
parsing_result)
session.add(article)
session.commit()
article = ArticleController().get(id=article.id)

if not allow_clustering:
cluster_event(context='clustering', result='filter forbid')
elif not cluster_config:
cluster_event(context='clustering', result='config forbid')
else:
cluster = self._get_cluster_by_link(article)
if not cluster:
if not self.get_config(article.feed, 'cluster_tfidf_enabled'):
cluster_event(context='tfidf', result='config forbid')
elif article.article_type in NO_CLUSTER_TYPE:
cluster_event(context='tfidf', result='wrong article type')
else:
cluster = self._get_cluster_by_similarity(article)
if cluster:
return self.enrich_cluster(cluster, article,
filter_read, filter_liked,
parsing_result=parsing_result)
return self._create_from_article(article, filter_read, filter_liked,
parsing_result)
Loading

0 comments on commit c793afc

Please sign in to comment.