Skip to content

Commit c5eecac

Browse files
authored
Merge pull request #2172 from creideiki/sql-updates
SQL: add MSSQL support and allow storing only a subset of event fields
2 parents 9635e47 + 0b45ea6 commit c5eecac

File tree

5 files changed

+61
-13
lines changed

5 files changed

+61
-13
lines changed

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@ CHANGELOG
2020
- `intelmq.lib.message`:
2121
- Fix and pre-compile the regular expression for harmonization key names and also check keys in the `extra.` namespace (PR#2059 by Sebastian Wagner, fixes #1807).
2222
- `intelmq.lib.bot.SQLBot` was replaced by an SQLMixin in `intelmq.lib.mixins.SQLMixin`. The Generic DB Lookup Expert bot and the SQLOutput bot were updated accordingly.
23+
- Added support for MSSQL (PR#2171 by Karl-Johan Karlsson).
24+
- Added optional reconnect delay parameter (PR#2171 by Karl-Johan Karlsson).
2325
- Added an ExpertBot class - it should be used by all expert bots as a parent class
2426
- Introduced a module for IntelMQ related datatypes `intelmq.lib.datatypes` which for now only contains an Enum listing the four bot types
2527
- Added a `bottype` attribute to CollectorBot, ParserBot, ExpertBot, OutputBot

docs/user/bots.rst

Lines changed: 14 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -4006,7 +4006,7 @@ SQL
40064006
* `lookup:` no
40074007
* `public:` yes
40084008
* `cache (redis db):` none
4009-
* `description:` SQL is the bot responsible to send events to a PostgreSQL or SQLite Database, e.g. the IntelMQ :doc:`eventdb`
4009+
* `description:` SQL is the bot responsible to send events to a PostgreSQL, SQLite, or MSSQL Database, e.g. the IntelMQ :doc:`eventdb`
40104010
* `notes`: When activating autocommit, transactions are not used: http://initd.org/psycopg/docs/connection.html#connection.autocommit
40114011
40124012
**Configuration Parameters**
@@ -4015,15 +4015,17 @@ The parameters marked with 'PostgreSQL' will be sent to libpq via psycopg2. Chec
40154015
40164016
* `autocommit`: `psycopg's autocommit mode <http://initd.org/psycopg/docs/connection.html?#connection.autocommit>`_, optional, default True
40174017
* `connect_timeout`: Database connect_timeout, optional, default 5 seconds
4018-
* `engine`: 'postgresql' or 'sqlite'
4019-
* `database`: PostgreSQL database or SQLite file
4020-
* `host`: PostgreSQL host
4018+
* `engine`: 'postgresql', 'sqlite', or 'mssql'
4019+
* `database`: Database or SQLite file
4020+
* `host`: Database host
40214021
* `jsondict_as_string`: save JSONDict fields as JSON string, boolean. Default: true (like in versions before 1.1)
4022-
* `port`: PostgreSQL port
4023-
* `user`: PostgreSQL user
4024-
* `password`: PostgreSQL password
4025-
* `sslmode`: PostgreSQL sslmode, can be `'disable'`, `'allow'`, `'prefer'` (default), `'require'`, `'verify-ca'` or `'verify-full'`. See postgresql docs: https://www.postgresql.org/docs/current/static/libpq-connect.html#libpq-connect-sslmode
4022+
* `port`: Database port
4023+
* `user`: Database user
4024+
* `password`: Database password
4025+
* `sslmode`: Database sslmode, can be `'disable'`, `'allow'`, `'prefer'` (default), `'require'`, `'verify-ca'` or `'verify-full'`. See postgresql docs: https://www.postgresql.org/docs/current/static/libpq-connect.html#libpq-connect-sslmode
40264026
* `table`: name of the database table into which events are to be inserted
4027+
* `fields`: list of fields to read from the event. If None, read all fields
4028+
* `reconnect_delay`: number of seconds to wait before reconnecting in case of an error
40274029
40284030
**PostgreSQL**
40294031
@@ -4099,6 +4101,10 @@ Then, set the `database` parameter to the `your-db.db` file path.
40994101
41004102
.. _intelmq.bots.outputs.stomp.output:
41014103
4104+
**MSSQL**
4105+
4106+
For MSSQL support, the library `pymssql>=2.2` is required.
4107+
41024108
STOMP
41034109
^^^^^
41044110

intelmq/bots/outputs/sql/REQUIREMENTS.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,3 +2,4 @@
22
# SPDX-License-Identifier: AGPL-3.0-or-later
33

44
psycopg2-binary>=2.5.5
5+
pymssql>=2.2

intelmq/bots/outputs/sql/output.py

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,12 @@
1717
from intelmq.lib.mixins import SQLMixin
1818

1919

20+
def itemgetter_tuple(*items):
21+
def g(obj):
22+
return tuple(obj[item] for item in items)
23+
return g
24+
25+
2026
class SQLOutputBot(OutputBot, SQLMixin):
2127
"""Send events to a PostgreSQL or SQLite database"""
2228
autocommit = True
@@ -29,16 +35,21 @@ class SQLOutputBot(OutputBot, SQLMixin):
2935
sslmode = "require"
3036
table = 'events'
3137
user = "intelmq"
38+
fields = None
3239

3340
def init(self):
3441
super().init()
3542

3643
def process(self):
3744
event = self.receive_message().to_dict(jsondict_as_string=self.jsondict_as_string)
3845

39-
keys = '", "'.join(event.keys())
40-
values = list(event.values())
41-
fvalues = len(values) * f'{self.format_char}, '
46+
key_names = self.fields
47+
if key_names is None:
48+
key_names = event.keys()
49+
valid_keys = [key for key in key_names if key in event]
50+
keys = '", "'.join(valid_keys)
51+
values = itemgetter_tuple(*valid_keys)(event)
52+
fvalues = len(values) * '{0}, '.format(self.format_char)
4253
query = ('INSERT INTO {table} ("{keys}") VALUES ({values})'
4354
''.format(table=self.table, keys=keys, values=fvalues[:-2]))
4455

intelmq/lib/mixins/sql.py

Lines changed: 30 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@
77
"""
88
from intelmq.lib import exceptions
99

10+
from time import sleep
11+
1012

1113
class SQLMixin:
1214
"""
@@ -19,10 +21,12 @@ class SQLMixin:
1921

2022
POSTGRESQL = "postgresql"
2123
SQLITE = "sqlite"
24+
MSSQL = "mssql"
2225
_default_engine = "postgresql"
2326
engine = None
2427
# overwrite the default value from the OutputBot
2528
message_jsondict_as_string = True
29+
reconnect_delay = 0
2630

2731
def __init__(self, *args, **kwargs):
2832
self._init_sql()
@@ -33,7 +37,8 @@ def _init_sql(self):
3337
self.logger.debug("Running SQL Mixin initialization.")
3438
self._engine_name = getattr(self, 'engine', self._default_engine).lower()
3539
engines = {SQLMixin.POSTGRESQL: (self._init_postgresql, "%s"),
36-
SQLMixin.SQLITE: (self._init_sqlite, "?")}
40+
SQLMixin.SQLITE: (self._init_sqlite, "?"),
41+
SQLMixin.MSSQL: (self._init_mssql, "%s")}
3742
for key, val in engines.items():
3843
if self._engine_name == key:
3944
val[0]()
@@ -48,7 +53,7 @@ def _connect(self, engine, connect_args: dict, autocommitable: bool = False):
4853

4954
try:
5055
self.con = self._engine.connect(**connect_args)
51-
if autocommitable: # psycopg2 has it, sqlite3 has not
56+
if autocommitable: # psycopg2 and mssql has it, sqlite3 has not
5257
self.con.autocommit = getattr(self, 'autocommit', True) # True prevents deadlocks
5358
self.cur = self.con.cursor()
5459
except (self._engine.Error, Exception):
@@ -86,6 +91,23 @@ def _init_sqlite(self):
8691
}
8792
)
8893

94+
def _init_mssql(self):
95+
try:
96+
import pymssql
97+
except ImportError:
98+
raise exceptions.MissingDependencyError("pymssql")
99+
100+
self._connect(pymssql,
101+
{"server": self.host,
102+
"user": self.user,
103+
"password": self.password,
104+
"database": self.database,
105+
"login_timeout": getattr(self, 'connect_timeout', 5),
106+
"port": self.port,
107+
"as_dict": True
108+
},
109+
autocommitable=True)
110+
89111
def execute(self, query: str, values: tuple, rollback=False):
90112
try:
91113
self.logger.debug('Executing %r.', (query, values))
@@ -102,13 +124,19 @@ def execute(self, query: str, values: tuple, rollback=False):
102124
except self._engine.OperationalError:
103125
self.logger.exception('Executed rollback command '
104126
'after failed query execution.')
127+
if self.reconnect_delay > 0:
128+
sleep(self.reconnect_delay)
105129
self._init_sql()
106130
except Exception:
107131
self.logger.exception('Cursor has been closed, connecting '
108132
'again.')
133+
if self.reconnect_delay > 0:
134+
sleep(self.reconnect_delay)
109135
self._init_sql()
110136
else:
111137
self.logger.exception('Database connection problem, connecting again.')
138+
if self.reconnect_delay > 0:
139+
sleep(self.reconnect_delay)
112140
self._init_sql()
113141
else:
114142
return True

0 commit comments

Comments
 (0)