Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 28 additions & 0 deletions elasticsearch-setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,34 @@ def create_index(self, index, setting):
}
create_index_list.append({"name": "users", "setting": users_setting})

tag_settings = {
'settings': {
'analysis': {
'normalizer': {
'lowercase_normalizer': {
'type': 'custom',
'char_filter': [],
'filter': ['lowercase']
}
}
}
},
'mappings': {
'tag': {
'properties': {
'name': {
'type': 'keyword',
'normalizer': 'lowercase_normalizer'
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

このように keyword で指定しておいて、normalizer で小文字フィルターをかけてあげると
Term level queries(言語解析されずに検索) で、かつ 大文字でも小文字でも検索対象になる ようにできる

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

上記精査ありがとうございます!

},
'created_at': {
'type': 'integer'
}
}
}
}
}
create_index_list.append({"name": "tags", "setting": tag_settings})
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[質問] ESのmappingなどのリリースって現状どう行なっているのでしょうか?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

あと、このスクリプトをローカルで動かそうとするとendpointの正規表現のところでコケてしまったんですが、既知の問題だったりしますか?
もし既知の問題であれば調べるのにコストかけるよりも聞いてしまいたいってのが背景なので、道であれば調査します。

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ES に関わるリリースは、ブルー・グリーンデプロイを行っています。
また、 endpoint 箇所でコケる件ですが、もしかしたら実行方法に問題あるかもしれません。
下記のように形で、ローカルIPを引数として渡しているか一度ご確認いただければと思います。

python elasticsearch-setup.py $(curl https://checkip.amazonaws.com/)


for index in create_index_list:
name = index["name"]
if esconfig.check_index_exists(name):
Expand Down
111 changes: 89 additions & 22 deletions src/common/tag_util.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import os
import re
import time

Expand All @@ -8,9 +7,7 @@

class TagUtil:
@classmethod
def create_and_count(cls, dynamodb, before_tag_names, after_tag_names):
tag_table = dynamodb.Table(os.environ['TAG_TABLE_NAME'])

def create_and_count(cls, elasticsearch, before_tag_names, after_tag_names):
if before_tag_names is None:
before_tag_names = []

Expand All @@ -19,34 +16,79 @@ def create_and_count(cls, dynamodb, before_tag_names, after_tag_names):

for tag_name in after_tag_names:
if before_tag_names is None or tag_name not in before_tag_names:
if tag_table.get_item(Key={'name': tag_name}).get('Item'):

# 大文字小文字区別せずに存在チェックを行いDB(ES)にすでに存在する値を取得する
tag = cls.__get_item_case_insensitive(elasticsearch, tag_name)

if tag:
# タグが追加された場合カウントを+1する
TagUtil.update_count(tag_table, tag_name, 1)
# タグがDBに存在しない場合は新規作成する
cls.update_count(elasticsearch, tag['name'], 1)
# タグがDB(ES)に存在しない場合は新規作成する
else:
tag_table.put_item(
Item={
'name': tag_name,
'count': 1,
'created_at': int(time.time())
}
)
cls.create_tag(elasticsearch, tag_name)

# タグが外された場合カウントを-1する
for tag_name in before_tag_names:
if tag_name not in after_tag_names:
if tag_table.get_item(Key={'name': tag_name}).get('Item'):
cls.update_count(tag_table, tag_name, -1)
tag = cls.__get_item_case_insensitive(elasticsearch, tag_name)
if tag:
cls.update_count(elasticsearch, tag['name'], -1)

@classmethod
def update_count(cls, elasticsearch, tag_name, num):
update_script = {
'script': {
'source': 'ctx._source.count += params.count',
'lang': 'painless',
'params': {
'count': num
}
}
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

カウンターの更新には
https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-update.html
を使っている。

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note, this operation still means full reindex of the document, it just removes some network roundtrips and reduces chances of version conflicts between the get and the index. The _source field needs to be enabled for this feature to work.

にあるように、一回取得して〜カウントして〜更新して〜、と比べると

  • ラウンドトリップが減る
  • その分整合性がおかしくなる可能性が減る

という話であって整合性が保たれるわけではなく、ダーティリード的なものは起こりうる、というところは注意。

と言ってもDynamoDBのatomic counterも整合性は保証されないので、整合性レベルが低くなるわけではない
https://docs.aws.amazon.com/ja_jp/amazondynamodb/latest/developerguide/WorkingWithItems.html#WorkingWithItems.AtomicCounters


elasticsearch.update(index='tags', doc_type='tag', id=tag_name, body=update_script)

"""
ここで作成されたtagが検索対象になるまで(__get_item_case_insensitiveの条件として引っかかってくるまで)1sほどかかる
これはESのセグメントマージという仕様によるものでどうしても回避したい場合は `elasticsearch.indices.refresh(index='tags')` をcreate後に行う必要がある
しかし、ESのデフォルト挙動を無理やり変えることになり、返ってパフォーマンス低下が起きる可能性もあるので特に何もしていない
"""
@classmethod
def update_count(cls, table, tag_name, num):
table.update_item(
Key={'name': tag_name},
UpdateExpression='set #attr = #attr + :increment',
ExpressionAttributeNames={'#attr': 'count'},
ExpressionAttributeValues={':increment': num}
def create_tag(cls, elasticsearch, tag_name):
tag = {
'name': tag_name,
'count': 1,
'created_at': int(time.time())
}

elasticsearch.index(
index='tags',
doc_type='tag',
id=tag['name'],
body=tag
)

"""
与えられたタグ名をElasticSearchに問い合わせ(大文字小文字区別せず)
すでに存在する場合はElasticSearchに存在する文字列に完全一致する形に変換し、タグ名の配列を返却する
"""
@classmethod
def get_tags_with_name_collation(cls, elasticsearch, tag_names):
if not tag_names:
return tag_names

results = []

for tag_name in tag_names:
tag = cls.__get_item_case_insensitive(elasticsearch, tag_name)

if tag:
results.append(tag['name'])
else:
results.append(tag_name)

return results
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

リスト内包表記とかでもかけるのだが、上にも書いたようにすこし意図がわかりにくいのであえて冗長に書いている


@staticmethod
def validate_format(tags):
pattern = re.compile(settings.TAG_DENIED_SYMBOL_PATTERN)
Expand All @@ -60,3 +102,28 @@ def validate_format(tags):
for symbol in settings.TAG_ALLOWED_SYMBOLS:
if tag[0] == symbol or tag[-1] == symbol:
raise ValidationError("tags don't support {str} with start and end of character".format(str=symbol))

@classmethod
def __get_item_case_insensitive(cls, elasticsearch, tag_name):
body = {
'query': {
'bool': {
'must': [
{'term': {'name': tag_name}}
]
}
}
}

res = elasticsearch.search(
index='tags',
doc_type='tag',
body=body
)

tags = [item['_source'] for item in res['hits']['hits']]

if not tags:
return None

return tags[0]
21 changes: 20 additions & 1 deletion src/handlers/me/articles/drafts/publish/handler.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,29 @@
# -*- coding: utf-8 -*-
import os

import boto3
from elasticsearch import Elasticsearch, RequestsHttpConnection
from requests_aws4auth import AWS4Auth

from me_articles_drafts_publish import MeArticlesDraftsPublish

dynamodb = boto3.resource('dynamodb')
awsauth = AWS4Auth(
os.environ['AWS_ACCESS_KEY_ID'],
os.environ['AWS_SECRET_ACCESS_KEY'],
os.environ['AWS_REGION'],
'es',
session_token=os.environ['AWS_SESSION_TOKEN']
)
elasticsearch = Elasticsearch(
hosts=[{'host': os.environ['ELASTIC_SEARCH_ENDPOINT'], 'port': 443}],
http_auth=awsauth,
use_ssl=True,
verify_certs=True,
connection_class=RequestsHttpConnection
)


def lambda_handler(event, context):
me_articles_drafts_publish = MeArticlesDraftsPublish(event, context, dynamodb)
me_articles_drafts_publish = MeArticlesDraftsPublish(event, context, dynamodb=dynamodb, elasticsearch=elasticsearch)
return me_articles_drafts_publish.main()
Original file line number Diff line number Diff line change
Expand Up @@ -60,12 +60,12 @@ def exec_main_proc(self):
':article_status': 'public',
':one': 1,
':topic': self.params['topic'],
':tags': self.params.get('tags')
':tags': TagUtil.get_tags_with_name_collation(self.elasticsearch, self.params.get('tags'))
}
)

try:
TagUtil.create_and_count(self.dynamodb, article_info_before.get('tags'), self.params.get('tags'))
TagUtil.create_and_count(self.elasticsearch, article_info_before.get('tags'), self.params.get('tags'))
except Exception as e:
logging.fatal(e)
traceback.print_exc()
Expand Down
19 changes: 19 additions & 0 deletions src/handlers/me/articles/public/republish/handler.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,27 @@
# -*- coding: utf-8 -*-
import os

import boto3
from elasticsearch import Elasticsearch, RequestsHttpConnection
from requests_aws4auth import AWS4Auth

from me_articles_public_republish import MeArticlesPublicRepublish

dynamodb = boto3.resource('dynamodb')
awsauth = AWS4Auth(
os.environ['AWS_ACCESS_KEY_ID'],
os.environ['AWS_SECRET_ACCESS_KEY'],
os.environ['AWS_REGION'],
'es',
session_token=os.environ['AWS_SESSION_TOKEN']
)
elasticsearch = Elasticsearch(
hosts=[{'host': os.environ['ELASTIC_SEARCH_ENDPOINT'], 'port': 443}],
http_auth=awsauth,
use_ssl=True,
verify_certs=True,
connection_class=RequestsHttpConnection
)


def lambda_handler(event, context):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ def __update_article_info(self, article_content_edit):
':eye_catch_url': article_content_edit['eye_catch_url'],
':sync_elasticsearch': 1,
':topic': self.params['topic'],
':tags': self.params.get('tags')
':tags': TagUtil.get_tags_with_name_collation(self.elasticsearch, self.params.get('tags'))
}
)

Expand Down
Loading