|
| 1 | +# -*- coding: utf-8 -*- |
| 2 | +""" |
| 3 | +Perceptron Tagger. |
| 4 | +
|
| 5 | +This tagger is a port of the Textblob Averaged Perceptron Tagger |
| 6 | +Author: Matthew Honnibal <honnibal+gh@gmail.com>, |
| 7 | + Long Duong <longdt219@gmail.com> (NLTK port) |
| 8 | + Wannaphong Phatthiyaphaibun <wannaphong@kkumail.com> (PyThaiNLP port) |
| 9 | +URL: <https://github.com/sloria/textblob-aptagger> |
| 10 | + <https://nltk.org/> |
| 11 | +Copyright 2013 Matthew Honnibal |
| 12 | +NLTK modifications Copyright 2015 The NLTK Project |
| 13 | +PyThaiNLP modifications Copyright 2020 PyThaiNLP Project |
| 14 | +
|
| 15 | +This tagger is provided under the terms of the MIT License. |
| 16 | +""" |
| 17 | + |
| 18 | +from __future__ import absolute_import |
| 19 | + |
| 20 | +import os |
| 21 | +import pickle |
| 22 | +import random |
| 23 | +from collections import defaultdict |
| 24 | +from typing import Dict, Iterable, List, Tuple, Union |
| 25 | + |
| 26 | + |
| 27 | +class AveragedPerceptron(object): |
| 28 | + """ |
| 29 | + An averaged perceptron, as implemented by Matthew Honnibal. |
| 30 | +
|
| 31 | + See more implementation details here: |
| 32 | + http://honnibal.wordpress.com/2013/09/11/a-good-part-of-speechpos-tagger-in-about-200-lines-of-python/ |
| 33 | + """ |
| 34 | + |
| 35 | + def __init__(self) -> None: |
| 36 | + # Each feature gets its own weight vector, |
| 37 | + # so weights is a dict-of-dicts |
| 38 | + self.weights = {} |
| 39 | + self.classes = set() |
| 40 | + # The accumulated values, for the averaging. These will be keyed by |
| 41 | + # feature/class tuples |
| 42 | + self._totals = defaultdict(int) |
| 43 | + # The last time the feature was changed, for the averaging. Also |
| 44 | + # keyed by feature/class tuples |
| 45 | + # (tstamps is short for timestamps) |
| 46 | + self._tstamps = defaultdict(int) |
| 47 | + # Number of instances seen |
| 48 | + self.i = 0 |
| 49 | + |
| 50 | + def predict(self, features: Dict): |
| 51 | + """ |
| 52 | + Dot-product the features and current weights and return the best |
| 53 | + label. |
| 54 | + """ |
| 55 | + scores = defaultdict(float) |
| 56 | + for feat, value in features.items(): |
| 57 | + if feat not in self.weights or value == 0: |
| 58 | + continue |
| 59 | + weights = self.weights[feat] |
| 60 | + for label, weight in weights.items(): |
| 61 | + scores[label] += value * weight |
| 62 | + # Do a secondary alphabetic sort, for stability |
| 63 | + return max(self.classes, key=lambda label: (scores[label], label)) |
| 64 | + |
| 65 | + def update(self, truth, guess, features: Dict) -> None: |
| 66 | + """Update the feature weights.""" |
| 67 | + |
| 68 | + def upd_feat(c, f, w, v): |
| 69 | + param = (f, c) |
| 70 | + self._totals[param] += (self.i - self._tstamps[param]) * w |
| 71 | + self._tstamps[param] = self.i |
| 72 | + self.weights[f][c] = w + v |
| 73 | + |
| 74 | + self.i += 1 |
| 75 | + if truth == guess: |
| 76 | + return |
| 77 | + for f in features: |
| 78 | + weights = self.weights.setdefault(f, {}) |
| 79 | + upd_feat(truth, f, weights.get(truth, 0.0), 1.0) |
| 80 | + upd_feat(guess, f, weights.get(guess, 0.0), -1.0) |
| 81 | + |
| 82 | + def average_weights(self) -> None: |
| 83 | + """Average weights from all iterations.""" |
| 84 | + for feat, weights in self.weights.items(): |
| 85 | + new_feat_weights = {} |
| 86 | + for clas, weight in weights.items(): |
| 87 | + param = (feat, clas) |
| 88 | + total = self._totals[param] |
| 89 | + total += (self.i - self._tstamps[param]) * weight |
| 90 | + averaged = round(total / float(self.i), 3) |
| 91 | + if averaged: |
| 92 | + new_feat_weights[clas] = averaged |
| 93 | + self.weights[feat] = new_feat_weights |
| 94 | + |
| 95 | + |
| 96 | +class PerceptronTagger: |
| 97 | + """ |
| 98 | + Greedy Averaged Perceptron tagger, as implemented by Matthew Honnibal. |
| 99 | +
|
| 100 | + See more implementation details here: |
| 101 | + http://honnibal.wordpress.com/2013/09/11/a-good-part-of-speechpos-tagger-in-about-200-lines-of-python/ |
| 102 | +
|
| 103 | + >>> from pythainlp.tag import PerceptronTagger |
| 104 | + >>> tagger = PerceptronTagger() |
| 105 | + >>> data = [ |
| 106 | + [("คน", "N"), ("เดิน", "V")], |
| 107 | + [("แมว", "N"), ("เดิน", "V")], |
| 108 | + [("คน", "N"), ("วิ่ง", "V")], |
| 109 | + [("ปลา", "N"), ("ว่าย", "V")], |
| 110 | + [("นก", "N"), ("บิน", "V")], |
| 111 | + ] |
| 112 | + >>> tagger.train(data) |
| 113 | + >>> tagger.tag(["นก", "เดิน]) |
| 114 | + [('นก', 'N'), ('เดิน', 'V')] |
| 115 | +
|
| 116 | + """ |
| 117 | + |
| 118 | + START = ["-START-", "-START2-"] |
| 119 | + END = ["-END-", "-END2-"] |
| 120 | + AP_MODEL_LOC = "" |
| 121 | + |
| 122 | + def __init__(self, path: str = "") -> None: |
| 123 | + """ |
| 124 | + :param str path: model path |
| 125 | + """ |
| 126 | + self.model = AveragedPerceptron() |
| 127 | + self.tagdict = {} |
| 128 | + self.classes = set() |
| 129 | + if path != "": |
| 130 | + self.AP_MODEL_LOC = path |
| 131 | + self.load(self.AP_MODEL_LOC) |
| 132 | + |
| 133 | + def tag(self, tokens: Iterable[str]) -> List[Tuple[str, str]]: |
| 134 | + """Tags a string `tokens`.""" |
| 135 | + prev, prev2 = self.START |
| 136 | + output = [] |
| 137 | + |
| 138 | + context = self.START + [self._normalize(w) for w in tokens] + self.END |
| 139 | + for i, word in enumerate(tokens): |
| 140 | + tag = self.tagdict.get(word) |
| 141 | + if not tag: |
| 142 | + features = self._get_features(i, word, context, prev, prev2) |
| 143 | + tag = self.model.predict(features) |
| 144 | + output.append((word, tag)) |
| 145 | + prev2 = prev |
| 146 | + prev = tag |
| 147 | + return output |
| 148 | + |
| 149 | + def train( |
| 150 | + self, |
| 151 | + sentences: Iterable[Iterable[Tuple[str, str]]], |
| 152 | + save_loc: Union[str, None] = None, |
| 153 | + nr_iter: int = 5, |
| 154 | + ) -> None: |
| 155 | + """ |
| 156 | + Train a model from sentences, and save it at ``save_loc``. |
| 157 | + ``nr_iter`` controls the number of Perceptron training iterations. |
| 158 | +
|
| 159 | + :param sentences: A list of (words, tags) tuples. |
| 160 | + :param save_loc: If not ``None``, saves a pickled model in this \ |
| 161 | + location. |
| 162 | + :param nr_iter: Number of training iterations. |
| 163 | + """ |
| 164 | + self._make_tagdict(sentences) |
| 165 | + self.model.classes = self.classes |
| 166 | + for _ in range(nr_iter): |
| 167 | + c = 0 |
| 168 | + n = 0 |
| 169 | + for sentence in sentences: |
| 170 | + words, tags = zip(*sentence) |
| 171 | + |
| 172 | + prev, prev2 = self.START |
| 173 | + context = ( |
| 174 | + self.START + [self._normalize(w) for w in words] + self.END |
| 175 | + ) |
| 176 | + for i, word in enumerate(words): |
| 177 | + guess = self.tagdict.get(word) |
| 178 | + if not guess: |
| 179 | + feats = self._get_features( |
| 180 | + i, word, context, prev, prev2 |
| 181 | + ) |
| 182 | + guess = self.model.predict(feats) |
| 183 | + self.model.update(tags[i], guess, feats) |
| 184 | + prev2 = prev |
| 185 | + prev = guess |
| 186 | + c += guess == tags[i] |
| 187 | + n += 1 |
| 188 | + random.shuffle(sentences) |
| 189 | + self.model.average_weights() |
| 190 | + |
| 191 | + # save the model |
| 192 | + if save_loc is not None: |
| 193 | + data = {} |
| 194 | + data["weights"] = self.model.weights |
| 195 | + data["tagdict"] = self.tagdict |
| 196 | + data["classes"] = self.classes |
| 197 | + with open(save_loc, "wb") as f: |
| 198 | + pickle.dump(data, f, -1) |
| 199 | + |
| 200 | + def load(self, loc: str) -> None: |
| 201 | + """ |
| 202 | + Load a pickled model. |
| 203 | + :param str loc: model path |
| 204 | + """ |
| 205 | + try: |
| 206 | + with open(loc, "rb") as f: |
| 207 | + w_td_c = pickle.load(f) |
| 208 | + except IOError: |
| 209 | + msg = "Missing trontagger.pickle file." |
| 210 | + raise IOError(msg) |
| 211 | + self.model.weights = w_td_c["weights"] |
| 212 | + self.tagdict = w_td_c["tagdict"] |
| 213 | + self.classes = w_td_c["classes"] |
| 214 | + self.model.classes = self.classes |
| 215 | + |
| 216 | + def _normalize(self, word: str) -> str: |
| 217 | + """ |
| 218 | + Normalization used in pre-processing. |
| 219 | +
|
| 220 | + - All words are lower cased |
| 221 | + - Digits in the range 1800-2100 are represented as !YEAR; |
| 222 | + - Other digits are represented as !DIGITS |
| 223 | +
|
| 224 | + :rtype: str |
| 225 | + """ |
| 226 | + if "-" in word and word[0] != "-": |
| 227 | + return "!HYPHEN" |
| 228 | + elif word.isdigit() and len(word) == 4: |
| 229 | + return "!YEAR" |
| 230 | + elif word[0].isdigit(): |
| 231 | + return "!DIGITS" |
| 232 | + else: |
| 233 | + return word.lower() |
| 234 | + |
| 235 | + def _get_features( |
| 236 | + self, i: int, word: str, context: List[str], prev: str, prev2: str |
| 237 | + ) -> Dict: |
| 238 | + """ |
| 239 | + Map tokens into a feature representation, implemented as a |
| 240 | + {hashable: float} dict. If the features change, a new model must be |
| 241 | + trained. |
| 242 | + """ |
| 243 | + |
| 244 | + def add(name: str, *args): |
| 245 | + features[" ".join((name,) + tuple(args))] += 1 |
| 246 | + |
| 247 | + i += len(self.START) |
| 248 | + features = defaultdict(int) |
| 249 | + # It's useful to have a constant feature, |
| 250 | + # which acts sort of like a prior |
| 251 | + add("bias") |
| 252 | + add("i suffix", word[-3:]) |
| 253 | + add("i pref1", word[0]) |
| 254 | + add("i-1 tag", prev) |
| 255 | + add("i-2 tag", prev2) |
| 256 | + add("i tag+i-2 tag", prev, prev2) |
| 257 | + add("i word", context[i]) |
| 258 | + add("i-1 tag+i word", prev, context[i]) |
| 259 | + add("i-1 word", context[i - 1]) |
| 260 | + add("i-1 suffix", context[i - 1][-3:]) |
| 261 | + add("i-2 word", context[i - 2]) |
| 262 | + add("i+1 word", context[i + 1]) |
| 263 | + add("i+1 suffix", context[i + 1][-3:]) |
| 264 | + add("i+2 word", context[i + 2]) |
| 265 | + return features |
| 266 | + |
| 267 | + def _make_tagdict( |
| 268 | + self, sentences: Iterable[Iterable[Tuple[str, str]]] |
| 269 | + ) -> None: |
| 270 | + """Make a tag dictionary for single-tag words.""" |
| 271 | + counts = defaultdict(lambda: defaultdict(int)) |
| 272 | + for sentence in sentences: |
| 273 | + for word, tag in sentence: |
| 274 | + counts[word][tag] += 1 |
| 275 | + self.classes.add(tag) |
| 276 | + freq_thresh = 20 |
| 277 | + ambiguity_thresh = 0.97 |
| 278 | + for word, tag_freqs in counts.items(): |
| 279 | + tag, mode = max(tag_freqs.items(), key=lambda item: item[1]) |
| 280 | + n = sum(tag_freqs.values()) |
| 281 | + # Don't add rare words to the tag dictionary |
| 282 | + # Only add quite unambiguous words |
| 283 | + if n >= freq_thresh and (float(mode) / n) >= ambiguity_thresh: |
| 284 | + self.tagdict[word] = tag |
0 commit comments