Skip to content

Commit

Permalink
Merge pull request #173 from openedx/saleem-latif/ENT-7294-more-updates
Browse files Browse the repository at this point in the history
ENT-7294: Further performance enhancements for the algolia index command.
  • Loading branch information
saleem-latif authored Aug 1, 2023
2 parents 25e881e + 5ea5208 commit c7c4182
Show file tree
Hide file tree
Showing 6 changed files with 144 additions and 112 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,10 @@ Change Log
Unreleased

[1.43.2] - 2023-08-01
---------------------
* perf: Further performance enhancements for the algolia index command.

[1.43.1] - 2023-07-31
---------------------
* perf: Performance enhancements in job recomendations calculation.
Expand Down
2 changes: 1 addition & 1 deletion taxonomy/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,6 @@
# 2. MINOR version when you add functionality in a backwards compatible manner, and
# 3. PATCH version when you make backwards compatible bug fixes.
# More details can be found at https://semver.org/
__version__ = '1.43.1'
__version__ = '1.43.2'

default_app_config = 'taxonomy.apps.TaxonomyConfig' # pylint: disable=invalid-name
4 changes: 2 additions & 2 deletions taxonomy/algolia/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
],
}

JOBS_PAGE_SIZE = 1
JOBS_PAGE_SIZE = 1000

# This is the the maximum number of objects that should be embedded inside an algolia record.
# This is the maximum number of objects that should be embedded inside an algolia record.
EMBEDDED_OBJECT_LENGTH_CAP = 20
49 changes: 23 additions & 26 deletions taxonomy/algolia/serializers.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ class JobPostingSerializer(serializers.ModelSerializer):
"""
JobPosting serializer for algolia index.
This serializer will contain all of the metadata related to the job posting.
This serializer will contain all the metadata related to the job posting.
"""
class Meta:
model = JobPostings
Expand All @@ -30,7 +30,7 @@ class JobSerializer(serializers.ModelSerializer):
"""
Job serializer for algolia index.
This serializer will contain all of the metadata related to jobs and will also included metadata for skills and
This serializer will contain all the metadata related to jobs and will also include metadata for skills and
courses.
"""
skills = serializers.SerializerMethodField()
Expand Down Expand Up @@ -96,9 +96,15 @@ def get_industry_names(self, obj):
Arguments:
obj (Job): Job instance whose industries need to be fetched.
"""
return list(IndustryJobSkill.objects.filter(job=obj)
.order_by("industry__name")
.values_list('industry__name', flat=True).distinct())
return list(
IndustryJobSkill.objects.filter(
job=obj
).order_by(
'industry__name'
).values_list(
'industry__name', flat=True
).distinct()
)

def get_industries(self, obj):
"""
Expand All @@ -107,35 +113,26 @@ def get_industries(self, obj):
obj (Job): Job instance whose industries need to be fetched.
"""
industries = []
job_industries = list(IndustryJobSkill.objects.filter(job=obj).order_by("industry__name").
values_list('industry__name', flat=True).distinct())
job_industries = list(
IndustryJobSkill.objects.filter(
job=obj
).order_by(
'industry__name'
).values_list(
'industry__name', flat=True
).distinct()
)
for industry_name in job_industries:
industry_skills = self.context.get('industry_skills')[industry_name]
industries.append({'name': industry_name, 'skills': industry_skills})
return industries

@staticmethod
def extract_similar_jobs(recommendations, name):
"""
Extract similar jobs from recommendations.
Arguments:
recommendations (list): List containing dictionaries of job names and recommendations.
name (str): Name of the job for which recommendations are being extracted.
"""
similar_jobs = []
for recommendation in recommendations:
if recommendation['name'] == name:
similar_jobs = recommendation['similar_jobs']
break
return similar_jobs

def get_similar_jobs(self, obj):
"""
Get a list of recommendations.
"""
recommendations_data = self.context.get('jobs_with_recommendations', None)
return self.extract_similar_jobs(recommendations_data, obj.name)
jobs_data = self.context.get('jobs_data', {})
return jobs_data[obj.name]['similar_jobs']

def get_b2c_opt_in(self, obj):
"""
Expand Down Expand Up @@ -168,7 +165,7 @@ class JobSkillSerializer(serializers.ModelSerializer):
"""
JobSkill serializer for algolia index.
This serializer will contain all of the metadata related to the skill and will also included metadata for skills and
This serializer will contain all the metadata related to the skill and will also include metadata for skills and
courses.
"""
external_id = serializers.CharField(source='skill.external_id', default=None)
Expand Down
169 changes: 100 additions & 69 deletions taxonomy/algolia/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
Utility functions related to algolia indexing.
"""
import logging
import datetime
from datetime import datetime
from collections import deque, namedtuple

from django.conf import settings
Expand All @@ -19,6 +19,32 @@
JobRecommendation = namedtuple('JobRecommendation', 'name similarity')


class LogTime:
"""
Context manager to calculate and log the time taken by a piece of code.
"""
start = None

def __init__(self, message_prefix):
"""
Initialize the context with the message prefix.
"""
self.message_prefix = message_prefix

def __enter__(self):
"""
Start tracking the time.
"""
self.start = datetime.now()

def __exit__(self, *args, **kwargs):
"""
End time tracking and log the time taken by a piece of code.
"""
end = datetime.now()
LOGGER.info('%s: %s', self.message_prefix, end - self.start)


def index_jobs_data_in_algolia():
"""
Re-Index all jobs data to algolia.
Expand Down Expand Up @@ -57,27 +83,85 @@ def calculate_jaccard_similarity(set_a, set_b):
return float(0)


def combine_jobs_and_skills_data(jobs_qs):
def calculate_job_skills(jobs_qs):
"""
Combine jobs and skills data.
Fetch and skills for each job.
Arguments:
jobs_qs (QuerySet): Django queryset of Job model that will be used as a starting point to fetch skills data.
Returns:
(list<dict>): A list of dicts containing job and their skills in a list.
(dict<str: dict>): A dictionary with job name as the key and the value against each key is a dict containing
job details, including `skills`.
"""
all_job_and_skills_data = []
job_details = {}
for job in jobs_qs.all():
skills = list(
skills = set(
JobSkills.objects.filter(job=job).values_list('skill__name', flat=True)
)
all_job_and_skills_data.append({
'name': job.name,
job_details[job.name] = {
'skills': skills,
})
}

return job_details


def calculate_job_recommendations(jobs_data):
"""
Calculate job recommendations.
Note: `jobs_data` will be treated as mutable (instead of creating a new dict to return)
to reduce memory footprint of this function.
Args:
jobs_data (dict<str: dict>): A dictionary containing jobs data like skills. key of the dict is jobs name and
the value dict should at-least contain a set of skills against `skills` key.
Returns:
(dict<str: dict>): The same dict from the argument, with `similar_jobs` added against each job.
"""
SIMILAR_JOBS_COUNT = 3
job_recommendations = deque([], maxlen=SIMILAR_JOBS_COUNT)
for job_name, job in jobs_data.items():
for candidate_job_name, candidate_job in jobs_data.items():
if job_name == candidate_job_name:
continue

jaccard_similarity = calculate_jaccard_similarity(job['skills'], candidate_job['skills'])

insert_item_in_ordered_queue(
queue=job_recommendations,
item=JobRecommendation(job_name, jaccard_similarity),
key=lambda item: item.similarity,
)

jobs_data[job_name]['similar_jobs'] = [item.name for item in job_recommendations]

return jobs_data


def fetch_and_combine_job_details(jobs_qs):
"""
Fetch data related to jobs, combine it in the form of a dict and return.
return all_job_and_skills_data
The jobs data that we are interested in is listed below.
1. skills: A set of skills that are associated with the corresponding job.
2. similar_jobs: Other jobs that are similar to the corresponding job.
Arguments:
jobs_qs (QuerySet): Django queryset of Job model that will be used as a starting point to fetch skills data.
Returns:
(dict<str: dict>): A dictionary with job name as the key and the value against each key is a dict containing
job details, including `skills` and `similar_jobs`.
"""
with LogTime('[TAXONOMY] Time taken to fetch and combine skills data for jobs'):
jobs_data = calculate_job_skills(jobs_qs)

with LogTime('[TAXONOMY] Time taken to fetch and combine job recommendations'):
jobs_data = calculate_job_recommendations(jobs_data=jobs_data)

return jobs_data


def insert_item_in_ordered_queue(queue, item, key=lambda arg: arg):
Expand Down Expand Up @@ -110,46 +194,6 @@ def insert_item_in_ordered_queue(queue, item, key=lambda arg: arg):
return


def calculate_job_recommendations(jobs):
"""
Calculate job recommendations.
Args:
jobs (list<dict>): A list of dicts containing job and their skills in a list.
Returns:
(list<dict>): A list of dicts containing jobs and their recommended jobs.
"""
SIMILAR_JOBS_COUNT = 3
job_recommendations = deque([], maxlen=SIMILAR_JOBS_COUNT)
jobs_and_recommendations = []

# converting skills list into set, to avoid repeated converting in the nested loop.
jobs = [
{'name': job['name'], 'skills': set(job['skills'])} for job in jobs
]

for job in jobs:
for candidate_job in jobs:
if job['name'] == candidate_job['name']:
continue

jaccard_similarity = calculate_jaccard_similarity(job['skills'], candidate_job['skills'])

insert_item_in_ordered_queue(
queue=job_recommendations,
item=JobRecommendation(job['name'], jaccard_similarity),
key=lambda item: item.similarity,
)

jobs_and_recommendations.append({
'name': job['name'],
'similar_jobs': [item.name for item in job_recommendations],
})

return jobs_and_recommendations


def combine_industry_skills():
"""
Constructs a dict with keys as industry names and values as their skills.
Expand Down Expand Up @@ -196,25 +240,12 @@ def fetch_jobs_data():
"""
qs = Job.objects.exclude(name__isnull=True)

combine_start_time = datetime.datetime.now()
LOGGER.info('[TAXONOMY] Started combining Jobs and their skills for recommendations calculation.')
all_job_and_skills = combine_jobs_and_skills_data(qs)
industry_skills = combine_industry_skills()
combine_end_time = datetime.datetime.now()
LOGGER.info(
'[TAXONOMY] Time taken to combine jobs and skills data: %s',
combine_end_time - combine_start_time
)

recommendations_start_time = datetime.datetime.now()
LOGGER.info('[TAXONOMY] Started calculating Job recommendations.')
jobs_with_recommendations = calculate_job_recommendations(all_job_and_skills)
recommendations_end_time = datetime.datetime.now()
LOGGER.info('[TAXONOMY] Started combining skills and recommendations data for the jobs.')
jobs_data = fetch_and_combine_job_details(qs)
LOGGER.info('[TAXONOMY] Finished calculating job recommendations and skills.')

LOGGER.info(
'[TAXONOMY] Time taken to combine jobs and skills data: %s',
recommendations_end_time - recommendations_start_time
)
with LogTime('[TAXONOMY] Time taken to combine industry skills data'):
industry_skills = combine_industry_skills()

start, page_size = 0, JOBS_PAGE_SIZE
jobs = []
Expand All @@ -224,7 +255,7 @@ def fetch_jobs_data():
qs[start:start + page_size],
many=True,
context={
'jobs_with_recommendations': jobs_with_recommendations,
'jobs_data': jobs_data,
'industry_skills': industry_skills,
'jobs_having_job_skills': get_job_ids(JobSkills.objects),
'jobs_having_industry_skills': get_job_ids(IndustryJobSkill.objects),
Expand Down
28 changes: 14 additions & 14 deletions tests/algolia/test_serializers.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,6 @@ class TestJobSerializer(TaxonomyTestCase, TestCase):
def setUp(self):
super().setUp()
Job.objects.all().delete()
self.data = {
'jobs_with_recommendations': [
{
"name": "Job Name 1",
"similar_jobs": ["Job A", "Job B", "Job C"]
},
{
"name": "Job Name 2",
"similar_jobs": ["Job A", "Job B", "Job C"]
},
]
}

@mock.patch('taxonomy.algolia.utils.JOBS_PAGE_SIZE', 5) # this is done to trigger the pagination flow.
def test_jobs_data(self):
Expand All @@ -42,7 +30,12 @@ def test_jobs_data(self):
for job_skill in job_skills:
factories.JobPostingsFactory.create(job=job_skill.job)

job_serializer = JobSerializer(Job.objects, context=self.data, many=True)
context = {
'jobs_data': {
job_skill.job.name: {'similar_jobs': ["Job A", "Job B", "Job C"]} for job_skill in job_skills
}
}
job_serializer = JobSerializer(Job.objects, context=context, many=True)
jobs_data = job_serializer.data

# Assert all jobs are included in the data returned by the serializer
Expand Down Expand Up @@ -83,7 +76,14 @@ def test_job_allowlist_attribute(self):
job_skills.append(factories.JobSkillFactory.create(job=job))
for job_skill in job_skills:
factories.JobPostingsFactory.create(job=job_skill.job)
job_serializer = JobSerializer(Job.objects, context=self.data, many=True)

context = {
'jobs_data': {
job_skill.job.name: {
'similar_jobs': ["Job A", "Job B", "Job C"]} for job_skill in job_skills
}
}
job_serializer = JobSerializer(Job.objects, context=context, many=True)
jobs_data = job_serializer.data
for job in jobs_data:
if job["external_id"] == "ET123456789":
Expand Down

0 comments on commit c7c4182

Please sign in to comment.