-
Notifications
You must be signed in to change notification settings - Fork 11
/
mssqlTools.py
128 lines (105 loc) · 5.88 KB
/
mssqlTools.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# Copyright 2017 Jonathan Schultz
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from __future__ import print_function
import os
import sys
import subprocess
import tempfile
import shutil
import random
class mssqlAPI(object):
# Function to execute a command either locally or remotely
def executecommand(self, command):
if not self.server: # ie server is on same machine as this script
return subprocess.check_output(command, text=True).strip()
else:
# This quoting of arguments is a bit of a hack but seems to work
return subprocess.check_output(['ssh', self.sshserver] + [('"' + word + '"') if ' ' in word else word for word in command], text=True).strip()
# Function to execute a helper script either locally or remotely
def executescript(self, script, arglist=None):
if not self.server: # ie server is on same machine as this script
return subprocess.check_output([self.helperpath + script] + (arglist or []), text=True).strip()
else:
subprocess.call(['scp', '-q', self.helperpath + script, self.sshserver + ':' + self.tmpdir])
return subprocess.check_output(['ssh', self.sshserver, self.tmpdir + '\\' + script] + (arglist or []), text=True).strip()
def __init__(self, server, user='', port=None, instance=None, version=None, verbosity=1):
self.server = server
self.user = user
self.port = port
self.instance = instance
self.verbosity = verbosity
self.helperpath = os.path.dirname(os.path.realpath(__file__)) + os.path.sep + 'helpers' + os.path.sep
if self.server is None: # ie MSSQL server is on local machine
if os.name != 'nt':
raise RuntimeError("This does not appear to be a Windows machine so --server must be specified.")
else:
self.sshserver = ((self.user + '@') if self.user else '') + self.server
self.tmpdir = subprocess.check_output(['ssh', self.sshserver, r'echo %tmp%'], text=True).strip()
if self.instance is None:
regquery = self.executecommand(['reg', 'query', 'HKLM\\Software\\Microsoft\\Microsoft SQL Server\\Instance Names\\SQL']).splitlines()
for regqueryline in regquery[1:]:
regquerydata = regqueryline.split()
instancename = regquerydata[0]
instanceversion = regquerydata[2].split('.')[0]
if self.verbosity > 1:
print("Found SQL server instance " + instancename + " version " + instanceversion, file=sys.stderr)
if (not version or instanceversion == version):
self.instance = instancename
break
else:
raise RuntimeError('No suitable SQL self.server self.instance found')
if self.verbosity > 1:
print("Using MSSQL instance: " + self.instance, file=sys.stderr)
if self.port is None:
regquery = self.executecommand(['reg', 'query', 'HKLM\\SOFTWARE\\Microsoft\\Microsoft SQL Server\\' + self.instance + '\\MSSQLServer\\SuperSocketNetLib\\Tcp']).splitlines()
self.port = int(regquery[1].split()[2])
if self.verbosity > 1:
print("Using port: " + str(self.port), file=sys.stderr)
def attach(self, filename, dbname):
# Generate a filename for the temporary MDB file
if self.server is None: # ie MSSQL server is on local machine
mdbFilename = tempfile.mktemp()
shutil.copy(filename, mdbFilename)
else:
mdbFilename = self.tmpdir + r'\mssqltools' + str(random.randint(0,99999)).zfill(5)
subprocess.call(['scp', '-q', filename, self.sshserver + ':' + mdbFilename])
self.executescript('mssqlAttach.bat', [mdbFilename, dbname, self.instance])
if self.verbosity > 1:
print("Attached database " + dbname, file=sys.stderr)
def create(self, dbname):
self.executescript('mssqlCreate.bat', [dbname, self.instance])
if self.verbosity > 1:
print("Created database " + dbname, file=sys.stderr)
def detach(self, dbname):
serverinstance = 'localhost\\' + self.instance
self.executecommand(['sqlcmd', '-S', serverinstance, '-Q', 'EXEC sp_detach_db ' + dbname])
def drop(self, dbname):
serverinstance = 'localhost\\' + self.instance
self.executecommand(['sqlcmd', '-S', serverinstance, '-Q', 'DROP DATABASE ' + dbname])
def save(self, filename, dbname):
if self.server is None: # ie MSSQL server is on local machine
self.executescript('mssqlSave.bat', [filename, dbname, self.instance])
else:
mdbFilename = self.tmpdir + r'\mssqltools' + str(random.randint(0,99999)).zfill(5)
self.executescript('mssqlSave.bat', [mdbFilename, dbname, self.instance])
subprocess.call(['scp', '-qT', self.sshserver + ':' + mdbFilename, filename])
def list(self):
serverinstance = 'localhost\\' + self.instance
dblist = self.executecommand(['sqlcmd', '-W', '-S', serverinstance, '-h', '-1', '-Q', 'SET NOCOUNT ON; SELECT name FROM master.dbo.sysdatabases']).split()
# Ignore the first four internal databases: master, tempdb, model and msdb
return dblist[4:]