Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions _hands-on/parallelise/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
# Parallelise

## Exercises
* [Fetch text and do topic modeling](lda.md) - this exercise involves installing prerequisites and four instances of modifying non-parallel code to be parallel.
293 changes: 293 additions & 0 deletions _hands-on/parallelise/lda.md

Large diffs are not rendered by default.

69 changes: 69 additions & 0 deletions _hands-on/parallelise/parse_vrt.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
import os
import sys
import time
from lxml import etree

# a list of common semantically useless lemmas
stopwords = set([line.strip() for line in open('stopwords.txt')])

def is_content_word(lemma):
return lemma.isalpha() and lemma not in stopwords

# EDIT THIS FUNCTION
def parse_vrt_in_dir(dirname):
'''
Parse each file ending in .vrt in dirname in parallel, and return their concatenation.
'''
start_time = time.time()
sys.stderr.write(f"Running parse_vrt_in_dir...\n");

# Exercise 1: parallelise parsing the corpora
# Hint: you can use the Python standard library for this
retval = []
for filename in os.listdir(dirname):
if not filename.endswith('.vrt'):
continue
retval += vrt2lemmalists(os.path.join(dirname, filename))

# How long did we take?
sys.stderr.write(
f"...finished in {time.time() - start_time:.2f} s\n")
return retval

def vrt2lemmalists(filename, max_texts = None, lemma_col = 3):
'''
Parse each text in a VRT file into a list of lemmas, and return a list of those lists.
'''

sys.stderr.write(f" Reading {filename}\n")
retval = []
fobj = open(filename, "rb")
parser = etree.XMLParser(recover = True)

text_count = 0
token_count = 0
for line in fobj:
if max_texts and text_count >= max_texts:
break
parser.feed(line)
if line.strip() != b'</text>':
continue
# A text has ended
text_count += 1
text = parser.close()
this_text = []
for leaf in text.iter():
tokens = leaf.text.strip()
if tokens == "":
continue
for token in tokens.split('\n'):
token_count += 1
lemma = token.split('\t')[lemma_col-1]
if is_content_word(lemma):
this_text.append(lemma)
retval.append(this_text)
sys.stderr.write(f" Finished reading {filename}, {text_count} texts and {token_count} tokens\n")
return retval

if __name__ == '__main__':
parse_vrt_in_dir(sys.argv[1])
71 changes: 71 additions & 0 deletions _hands-on/parallelise/parse_vrt_solution.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
import os
import sys
import time
from lxml import etree

# process-based parallelism
from multiprocessing import Pool

# a list of common semantically useless lemmas
stopwords = set([line.strip() for line in open('stopwords.txt')])

def is_content_word(lemma):
return lemma.isalpha() and lemma not in stopwords

def parse_vrt_in_dir(dirname):
'''
Parse each file ending in .vrt in dirname in parallel, and return their concatenation.
'''
start_time = time.time()
sys.stderr.write(f"Running parse_vrt_in_dir...\n");

# Exercise 1 solution (one possible one): we map each filename to a
# vrt2lemmalists call using multiprocessing.Pool
retval = []
# First we get the valid file names
filenames = [os.path.join(dirname, filename) for filename in os.listdir(dirname) if filename.endswith('.vrt')]
# Then we initialize a Pool object
with Pool() as pool: # by default, processes = number of cores
for result in pool.map(vrt2lemmalists, filenames):
retval += result
# How long did we take?
sys.stderr.write(
f"...finished in {time.time() - start_time:.2f} s\n")
return retval

def vrt2lemmalists(filename, max_texts = None, lemma_col = 3):
'''
Parse each text in a VRT file into a list of lemmas, and return a list of those lists.
'''

sys.stderr.write(f"Reading {filename}\n")
retval = []
fobj = open(filename, "rb")
parser = etree.XMLParser(recover = True)

text_count = 0
token_count = 0
for line in fobj:
if max_texts and text_count >= max_texts:
break
parser.feed(line)
if line.strip() != b'</text>':
continue
# A text has ended
text_count += 1
text = parser.close()
this_text = []
for leaf in text.iter():
tokens = leaf.text.strip()
if tokens != "":
for token in tokens.split('\n'):
token_count += 1
lemma = token.split('\t')[lemma_col-1]
if is_content_word(lemma):
this_text.append(lemma)
retval.append(this_text)
sys.stderr.write(f"Finished reading {filename}, {text_count} texts and {token_count} tokens\n")
return retval

if __name__ == '__main__':
parse_vrt_in_dir(sys.argv[1])
2 changes: 2 additions & 0 deletions _hands-on/parallelise/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
lxml
gensim
50 changes: 50 additions & 0 deletions _hands-on/parallelise/stopwords.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
ei
että
hän
ja
jo
joka
jos
kuin
kun
mikä
mutta
muu
myös
niin
olla
oma
pitää
saada
se
tehdä
tulla
tuo
tämä
voida
vuosi
mukaan
aika
uusi
kertoa
sanoa
nyt
kaikki
tai
kaksi
toinen
viime
hyvä
jälkeen
vielä
yksi
asia
kanssa
yli
osa
ensimmäinen
sekä
alkaa
vain
sitten
mennä
58 changes: 58 additions & 0 deletions _hands-on/parallelise/topics.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
# Comments beginning with "Exercise" mark places to edit the code

import gensim
import os
import sys
import time
from parse_vrt import parse_vrt_in_dir

n_topics = 10

processed_corpus = []
dirname = sys.argv[1]

start_time = time.time()
sys.stderr.write(f"Running parse_vrt_in_dir...\n");
corpus_lemmalists = parse_vrt_in_dir(dirname)
sys.stderr.write(
f"...finished in {time.time() - start_time:.2f} s\n")

sys.stderr.write("Building gensim dictionary... "); sys.stderr.flush()
start_time = time.time()

# Exercise 4: Parallelise building the dictionary
# Hint: the dictionary has a merge_with(other) method
dictionary = gensim.corpora.Dictionary(corpus_lemmalists)
sys.stderr.write(f"Done in {time.time() - start_time:.2f} s\n")
sys.stderr.write("Computing BOW corpus... "); sys.stderr.flush()
start_time = time.time()

# Exercise 3: Parallelise computing bow_corpus
# Hint: send the corpus in suitable-sized chunks to processes that map
# the corpus with the function dictionary.doc2bow
bow_corpus = [dictionary.doc2bow(text) for text in corpus_lemmalists]
sys.stderr.write(f"Done in {time.time() - start_time:.2f} s\n")
sys.stderr.write("Computing LDA model... "); sys.stderr.flush()
start_time = time.time()

# Exercise 2: replace LdaModel with a parallel version
# Hint: you can simply replace the model name, but do look at the API,
# choose a number of processes, and test which one works best. Warning:
# memory consumption will grow with number of processes, it's possible to run
# out if you have a lot of cores!
lda = gensim.models.LdaModel(bow_corpus, num_topics = n_topics)
sys.stderr.write(f"Done in {time.time() - start_time:.2f} s\n")

sys.stderr.write("Computing model coherence... \n")
start_time = time.time()
cm = gensim.models.coherencemodel.CoherenceModel(
model=lda, corpus=bow_corpus, dictionary=dictionary, coherence='u_mass')
print(f" Coherence with {n_topics} topics was {cm.get_coherence()}")
sys.stderr.write(f"Done in {time.time() - start_time:.2f} s\n")

for topic in enumerate(lda.show_topics(num_topics = n_topics,
num_words = 10,
formatted = False)):
print(f"Topic {topic[0] + 1}:")
for word, probability in topic[1][1]:
print(" " + dictionary[int(word)])
59 changes: 59 additions & 0 deletions _hands-on/parallelise/topics_solution.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
import gensim
import os
import sys
import time
from parse_vrt_solution import parse_vrt_in_dir
from multiprocessing import Pool

processed_corpus = []
dirname = sys.argv[1]
n_workers = 4

corpus_lemmalists = parse_vrt_in_dir(dirname)

sys.stderr.write("Building gensim dictionary... "); sys.stderr.flush()
start_time = time.time()

# Solution to exercise 4: Parallelise computing the gensim dictionary
# In this case, we need to build one object (the dictionary) out of a
# collection of data. The object in question has a method .merge_with(other),
# which we can use to turn a collection of objects into one. But we also
# need to split the source data, which is a list, into sublists.

# Split a list into sublists of length n
# returns a generator, so we don't generate a whole new list with everything
def split_list(l, n):
return (l[i:i+n] for i in range(0, len(l), n))

dictionary = None
with Pool(processes = n_workers) as pool:
for sub_dictionary in pool.map(gensim.corpora.Dictionary,
split_list(corpus_lemmalists, 5000)):
if dictionary is None:
dictionary = sub_dictionary
else:
dictionary.merge_with(sub_dictionary)

dictionary = gensim.corpora.Dictionary(corpus_lemmalists)
sys.stderr.write(f"Done in {time.time() - start_time:.2f} s\n")
sys.stderr.write("Computing BOW corpus... "); sys.stderr.flush()
start_time = time.time()

# Solution to exercise 3: Parallelise computing bow_corpus
with Pool(processes = n_workers) as pool:
bow_corpus = pool.map(dictionary.doc2bow, corpus_lemmalists)
sys.stderr.write(f"Done in {time.time() - start_time:.2f} s\n")

# Exercise 2: replace LdaModel with a parallel version
sys.stderr.write("Computing LDA model... "); sys.stderr.flush()
start_time = time.time()
# Workers should be number of physical cores, up to a limit. Good idea
# to test this if it's important.
lda = gensim.models.LdaMulticore(bow_corpus, num_topics = 10, workers = n_workers)
sys.stderr.write(f"Done in {time.time() - start_time:.2f} s\n")
for topic in enumerate(lda.show_topics(num_topics = 10,
num_words = 10,
formatted = False)):
print(f"Topic {topic[0] + 1}:")
for word, probability in topic[1][1]:
print(" " + dictionary[int(word)])