Skip to content

Commit

Permalink
当mongodb sql长度大于4000个字符时,使用load方法执行sql;小于4000个字符时,直接执行sql,这样可以减少磁盘交换,…
Browse files Browse the repository at this point in the history
…节约archery性能
  • Loading branch information
czxin788 committed Jun 1, 2022
1 parent 70dfa35 commit c9f16f5
Showing 1 changed file with 42 additions and 33 deletions.
75 changes: 42 additions & 33 deletions sql/engines/mongo.py
Original file line number Diff line number Diff line change
Expand Up @@ -260,39 +260,48 @@ def exec_cmd(self, sql, db_name=None, slave_ok=''):
if self.user and self.password and self.port and self.host:
msg = ""
auth_db = self.instance.db_name or 'admin'
# 因为用mongo load方法执行js脚本,所以需要重新改写一下sql,以便回显js执行结果
sql = 'var result = ' + sql + '\nprintjson(result);'
# 因为要知道具体的临时文件位置,所以用了NamedTemporaryFile模块
with tempfile.NamedTemporaryFile(suffix=".js", prefix="mongo_", dir='/tmp/', delete=True) as fp:
fp.write(sql.encode('utf-8'))
fp.seek(0) # 把文件指针指向第一个,这样内容才能落到磁盘文件上
try:
if not sql.startswith('var host='): #在master节点执行的情况
cmd = "{mongo} --quiet -u {uname} -p '{password}' {host}:{port}/{auth_db} <<\\EOF\ndb=db.getSiblingDB(\"{db_name}\");{slave_ok}load('{tempfile}')\nEOF".format(
mongo=mongo, uname=self.user, password=self.password, host=self.host, port=self.port,
db_name=db_name, sql=sql, auth_db=auth_db, slave_ok=slave_ok, tempfile=fp.name)
else:
cmd = "{mongo} --quiet -u {user} -p '{password}' {host}:{port}/{auth_db} <<\\EOF\nrs.slaveOk();{sql}\nEOF".format(
mongo=mongo, user=self.user, password=self.password, host=self.host, port=self.port, db_name=db_name, sql=sql, auth_db=auth_db)
p = subprocess.Popen(cmd, shell=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
universal_newlines=True)
re_msg = []
for line in iter(p.stdout.read, ''):
re_msg.append(line)
# 因为返回的line中也有可能带有换行符,因此需要先全部转换成字符串
__msg = '\n'.join(re_msg)
_re_msg = []
for _line in __msg.split('\n'):
if not _re_msg and re.match('WARNING.*', _line):
# 第一行可能是WARNING语句,因此跳过
continue
_re_msg.append(_line)

msg = '\n'.join(_re_msg)
except Exception as e:
logger.warning(f"mongo语句执行报错,语句:{sql}{e}错误信息{traceback.format_exc()}")
sql_len = len(sql)
is_load = False # 默认不使用load方法执行mongodb sql语句
try:
if not sql.startswith('var host=') and sql_len > 4000: # 在master节点执行的情况,如果sql长度大于4000,就采取load js的方法
# 因为用mongo load方法执行js脚本,所以需要重新改写一下sql,以便回显js执行结果
sql = 'var result = ' + sql + '\nprintjson(result);'
# 因为要知道具体的临时文件位置,所以用了NamedTemporaryFile模块
fp = tempfile.NamedTemporaryFile(suffix=".js", prefix="mongo_", dir='/tmp/', delete=True)
fp.write(sql.encode('utf-8'))
fp.seek(0) # 把文件指针指向开始,这样写的sql内容才能落到磁盘文件上
cmd = "{mongo} --quiet -u {uname} -p '{password}' {host}:{port}/{auth_db} <<\\EOF\ndb=db.getSiblingDB(\"{db_name}\");{slave_ok}load('{tempfile_}')\nEOF".format(
mongo=mongo, uname=self.user, password=self.password, host=self.host, port=self.port,
db_name=db_name, sql=sql, auth_db=auth_db, slave_ok=slave_ok, tempfile_=fp.name)
is_load = True # 标记使用了load方法,用来在finally里面判断是否需要强制删除临时文件
elif not sql.startswith('var host=') and sql_len < 4000: # 在master节点执行的情况, 如果sql长度小于4000,就直接用mongo shell执行,减少磁盘交换,节省性能
cmd = "{mongo} --quiet -u {uname} -p '{password}' {host}:{port}/{auth_db} <<\\EOF\ndb=db.getSiblingDB(\"{db_name}\");{slave_ok}printjson({sql})\nEOF".format(
mongo=mongo, uname=self.user, password=self.password, host=self.host, port=self.port, db_name=db_name, sql=sql, auth_db=auth_db, slave_ok=slave_ok)
else:
cmd = "{mongo} --quiet -u {user} -p '{password}' {host}:{port}/{auth_db} <<\\EOF\nrs.slaveOk();{sql}\nEOF".format(
mongo=mongo, user=self.user, password=self.password, host=self.host, port=self.port, db_name=db_name, sql=sql, auth_db=auth_db)
p = subprocess.Popen(cmd, shell=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
universal_newlines=True)
re_msg = []
for line in iter(p.stdout.read, ''):
re_msg.append(line)
# 因为返回的line中也有可能带有换行符,因此需要先全部转换成字符串
__msg = '\n'.join(re_msg)
_re_msg = []
for _line in __msg.split('\n'):
if not _re_msg and re.match('WARNING.*', _line):
# 第一行可能是WARNING语句,因此跳过
continue
_re_msg.append(_line)

msg = '\n'.join(_re_msg)
except Exception as e:
logger.warning(f"mongo语句执行报错,语句:{sql}{e}错误信息{traceback.format_exc()}")
finally:
if is_load:
fp.close()
return msg

def get_master(self):
Expand Down

0 comments on commit c9f16f5

Please sign in to comment.