Skip to content

Commit

Permalink
Fixes for CI (#30)
Browse files Browse the repository at this point in the history
Authors:
  - David Gardner (https://github.com/dagardner-nv)

Approvers:
  - Michael Demoret (https://github.com/mdemoret-nv)

URL: #30
  • Loading branch information
dagardner-nv authored Jan 27, 2023
1 parent e8d8632 commit 94c9e51
Show file tree
Hide file tree
Showing 4 changed files with 311 additions and 225 deletions.
115 changes: 62 additions & 53 deletions asset-clustering/training-tuning-inference/data_preprocessing.py
Original file line number Diff line number Diff line change
@@ -1,40 +1,53 @@
import cudf
# SPDX-FileCopyrightText: Copyright (c) 2023 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import bz2
import logging
import time
import click
import numpy as np
from utils import *
from collections import defaultdict
from itertools import chain

import click
import numpy as np
from utils import compute_diff_source_logon_cnt
from utils import compute_eventid_cnt
from utils import compute_eventid_cnt_source
from utils import compute_logins_with_loghostuname
from utils import compute_username_cnt
from utils import compute_username_domain_cnt
from utils import get_fnames
from utils import logon_types
from utils import read_wls

import cudf

VALID_LOGON_TYPES = {0, 2, 3, 4, 5, 7, 8, 9, 10, 11, 12}

# List of tuples (EventID, feature name), where a feature name denotes
# frequency of corresp. EventID, by asset appearing in LogHost field.
EVENTID_CNTFEAT = [
(4624, 'total_logins_cnt'),
(4625, 'accnt_fail_logon_cnt'),
(4634, 'total_logoff_cnt'),
(4647, 'total_user_initi_logoff_cnt'),
(4648, 'logon_explicit_cred_frm_cnt'),
(4672, 'spl_pvlgs'),
(4776, 'domain_ctr_validate_cnt'),
(4802, 'scrnsaver_invok_cnt'),
(4803, 'scrnsaver_dismiss_cnt')]
EVENTID_CNTFEAT = [(4624, 'total_logins_cnt'), (4625, 'accnt_fail_logon_cnt'), (4634, 'total_logoff_cnt'),
(4647, 'total_user_initi_logoff_cnt'), (4648, 'logon_explicit_cred_frm_cnt'), (4672, 'spl_pvlgs'),
(4776, 'domain_ctr_validate_cnt'), (4802, 'scrnsaver_invok_cnt'), (4803, 'scrnsaver_dismiss_cnt')]
# (4768, 'TGT_req_cnt'), (4769, 'TGS_req_cnt')
# 4768 & 4769 not used since 100% of LogHost for 4768,4769 is ActiveDirectory

# EVENTIDFORSOURCE_CNTFEAT & EVENTIDFORDEST_CNTFEAT are similar to EVENTID_CNTFEAT
# except that they corresp. to frequency of an EventID, by asset, appearing in
# Source & Destination fields resp.
EVENTIDFORSOURCE_CNTFEAT = [
(4624, 'total_logins_src_cnt'),
(4625, 'accnt_fail_logon_src_cnt'),
(4768, 'TGT_req_src_cnt'),
(4769, 'TGS_req_src_cnt'),
(4776, 'domain_ctr_validate_src_cnt')
]
EVENTIDFORSOURCE_CNTFEAT = [(4624, 'total_logins_src_cnt'), (4625, 'accnt_fail_logon_src_cnt'),
(4768, 'TGT_req_src_cnt'), (4769, 'TGS_req_src_cnt'), (4776, 'domain_ctr_validate_src_cnt')]
EVENTIDFORDEST_CNTFEAT = [(4648, 'logon_explicit_cred_to_cnt')]


Expand All @@ -61,7 +74,7 @@ def host_aggr(df, host, uniq_values_dict, count_cols):
newhosts = newhosts - set(host.index.to_pandas())
newhosts.discard(None)

frac_cols = ['uname_other_compacnt_login_frac','uname_that_compacnt_login_frac']
frac_cols = ['uname_other_compacnt_login_frac', 'uname_that_compacnt_login_frac']
newhost = cudf.DataFrame({'LogHost': newhosts}).set_index('LogHost')
newhost[count_cols] = 0
newhost[frac_cols] = 0.0
Expand All @@ -77,26 +90,28 @@ def host_aggr(df, host, uniq_values_dict, count_cols):
df = df.loc[(df['Source'].isna()) | (df['Destination'].isna())]
if numrows < df.shape[0]:
logging.debug("Filtering Rows if SOURCE & DESTINATION neq NA")
logging.debug("Removed {} ROWS".format(numrows-df.shape[0]))
logging.debug("Removed {} ROWS".format(numrows - df.shape[0]))

host = compute_logins_with_loghostuname(df, host, login_eventids=[4624,])
host = compute_logins_with_loghostuname(df, host, login_eventids=[
4624,
])
host = logon_types(df, host, VALID_LOGON_TYPES)
host, uniq_values_dict = compute_diff_source_logon_cnt(df, host, uniq_values_dict)
host, uniq_values_dict = compute_username_cnt(df, host, uniq_values_dict)
host, uniq_values_dict = compute_username_domain_cnt(df, host, uniq_values_dict)

for evtuple in EVENTID_CNTFEAT:
evid, ev_str = evtuple
host = compute_eventid_cnt(df , evid, ev_str, host)
host = compute_eventid_cnt(df, evid, ev_str, host)

for evtuple in EVENTIDFORSOURCE_CNTFEAT:
evid, ev_str = evtuple
host = compute_eventid_cnt_source(df , evid, ev_str, host)
host = compute_eventid_cnt_source(df, evid, ev_str, host)
host[count_cols] = host[count_cols].fillna(value=0, inplace=False)
host['uname_other_compacnt_login_frac'] = host['uname_other_compacnt_login_cnt']/host['total_logins_cnt']
host['uname_other_compacnt_login_frac'] = host['uname_other_compacnt_login_cnt'] / host['total_logins_cnt']
host['uname_other_compacnt_login_frac'] = host['uname_other_compacnt_login_frac'].replace(np.inf, -1.)

host['uname_that_compacnt_login_frac'] = host['uname_that_compacnt_login_cnt']/host['total_logins_cnt']
host['uname_that_compacnt_login_frac'] = host['uname_that_compacnt_login_cnt'] / host['total_logins_cnt']
host['uname_that_compacnt_login_frac'] = host['uname_that_compacnt_login_frac'].replace(np.inf, -1.)

return host, uniq_values_dict
Expand Down Expand Up @@ -125,11 +140,7 @@ def initialize_hostdf():
count_cols += ['uname_other_compacnt_login_cnt', 'uname_that_compacnt_login_cnt']
host = cudf.DataFrame(columns=['LogHost']).set_index('LogHost')

uniq_values_dict = {
'Sources': defaultdict(set),
'Unames': defaultdict(set),
'UserDomains': defaultdict(set)
}
uniq_values_dict = {'Sources': defaultdict(set), 'Unames': defaultdict(set), 'UserDomains': defaultdict(set)}
return host, uniq_values_dict, count_cols


Expand Down Expand Up @@ -164,41 +175,40 @@ def read_process_data(wls_files, readsize=1000000, max_lines=1e15):
df_wls = read_wls(current_block, file_path=False)
host_df, uniq_vals_dict = host_aggr(df_wls, host_df, uniq_vals_dict, count_cols)

total_lines += len(current_block)/1000000
total_lines += len(current_block) / 1000000
iter_ += 1

if iter_ % 10000 == 0:
proc_speed = 1000.0*total_lines / (time.time() - t0)
logging.info(
'{:.3f}M Lines, {:.2f}K/sec'.format(total_lines, proc_speed))
logging.debug('host shape:{}'.format(hostdf.shape))
if total_lines*1e6 > max_lines:
logging.info("Breaking for loop. total_lines={}>{}".format(total_lines, max_lines))
break
proc_speed = 1000.0 * total_lines / (time.time() - t0)
logging.info('{:.3f}M Lines, {:.2f}K/sec'.format(total_lines, proc_speed))
logging.debug('host shape:{}'.format(host_df.shape))
if total_lines * 1e6 > max_lines:
logging.info("Breaking for loop. total_lines={}>{}".format(total_lines, max_lines))
break
fi.close()
return hostdf
return host_df


@click.command()
@click.option('--debug', is_flag=True)
@click.option('--data_range', default='day-01-day-01',
help='Range of dates for which wls files need to be read and preprocessed. '\
'For example, data_range=day-01-day_03 reads wls_day-01.bz2, wls_day-02.bz2'\
'and wls_day-03.bz2, preprocess them and prepare a combined dataset.')
@click.option('--data_range',
default='day-01-day-01',
help=('Range of dates for which wls files need to be read and preprocessed. '
'For example, data_range=day-01-day_03 reads wls_day-01.bz2, wls_day-02.bz2 '
'and wls_day-03.bz2, preprocess them and prepare a combined dataset.'))
def run(**kwargs):
global dataset_path
debug_mode = kwargs['debug']
logging.basicConfig(level=logging.DEBUG, datefmt='%m%d-%H%M',
format='%(asctime)s: %(message)s')
logging.basicConfig(level=logging.DEBUG, datefmt='%m%d-%H%M', format='%(asctime)s: %(message)s')
dataset_path = '../datasets/'
ipfile_suffix = kwargs['data_range']
if debug_mode:
max_lines = 5e6
readsize = 32768*32
readsize = 32768 * 32
opfile_suffix = '_{:d}Mlines'.format(int(max_lines / 1e6))
else:
max_lines = 1e15
readsize = 32768*32*30
readsize = 32768 * 32 * 30
opfile_suffix = '_' + ipfile_suffix
logger_fname = 'logs/dataprocess_{}.log'.format(ipfile_suffix)
fh = logging.FileHandler(filename=logger_fname, mode='a')
Expand All @@ -207,8 +217,7 @@ def run(**kwargs):
logging.getLogger().addHandler(fh)
print("Logging in {}".format(logger_fname))

logging.info("DataProcess for WLS files {}. Read Size:{}MB\n\n".format(
ipfile_suffix, readsize//2**20))
logging.info("DataProcess for WLS files {}. Read Size:{}MB\n\n".format(ipfile_suffix, readsize // 2**20))
wls_files = get_fnames(dataset_path, ipfile_suffix)
host_df = read_process_data(wls_files, readsize, max_lines)
logging.debug("Number of hosts:{}".format(host_df.shape[0]))
Expand All @@ -217,4 +226,4 @@ def run(**kwargs):

if __name__ == '__main__':

run()
run()
69 changes: 47 additions & 22 deletions asset-clustering/training-tuning-inference/inference.py
Original file line number Diff line number Diff line change
@@ -1,25 +1,48 @@
# SPDX-FileCopyrightText: Copyright (c) 2023 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import datetime
import logging
import pickle

import click
from utils import compute_chars
from utils import normalize_host_data


@click.command()
@click.option('--model', default='dbscan', help='Clustering method to use.'\
' Valid choices are \'kmeans\' or \'dbscan\'. Default is \'dbscan\'.'\
'The corresponding model pickle file will be read from the relative'\
'path \'../models/ \'.')
@click.option('--data_fname', default='host_agg_data_day-11_day-15.csv',\
help='Name of the Preprocessed csv dataset to perofrm inference. The given'\
'file name will be read from the relative path \'../datasets/ \'')
@click.option('--num_days', default=5.0, help='Number of days worth of data used'\
'in preparing the dataset. Used to normalize the features.')
@click.option('--compute_cluster_chars', is_flag=True, help='Boolean flag. If '\
'not provided, script just performs inference and output the cluster sizes.'\
'If provided, additionally analyzes for the top salient features of each cluster'\
'and prints the analysis to stdout.')
@click.option('--model',
default='dbscan',
help=('Clustering method to use. '
'Valid choices are \'kmeans\' or \'dbscan\'. Default is \'dbscan\'. '
'The corresponding model pickle file will be read from the relative '
'path \'../models/ \'.'))
@click.option('--data_fname',
default='host_agg_data_day-11_day-15.csv',
help=('Name of the Preprocessed csv dataset to perofrm inference. The given '
'file name will be read from the relative path \'../datasets/ \''))
@click.option('--num_days',
default=5.0,
help=('Number of days worth of data used '
'in preparing the dataset. Used to normalize the features.'))
@click.option('--compute_cluster_chars',
is_flag=True,
help=('Boolean flag. If '
'not provided, script just performs inference and output the cluster sizes. '
'If provided, additionally analyzes for the top salient features of each cluster '
'and prints the analysis to stdout.'))
def run(**kwargs):
dataset_path = '../datasets/'
model_path = '../models/'
Expand All @@ -30,38 +53,40 @@ def run(**kwargs):
assert model in ['kmeans', 'dbscan'], \
"Valid choices for model are kmeans or dbscan"

data_path = dataset_path + kwargs['data_fname']
data_path = dataset_path + kwargs['data_fname']
df, df_norm = normalize_host_data(data_path)

if model=='dbscan':
if model == 'dbscan':
fname = model_path + 'dbscan_eps0.0005.pkl'
clust = "cluster_dbscan_eps0.0005_minkp1"

dbsc_model, pca, pca_dims = pickle.load(open(fname, "rb"))
df_pca = pca.transform(df_norm).iloc[:,:pca_dims]
df_pca = pca.transform(df_norm).iloc[:, :pca_dims]
df[clust] = dbsc_model.fit_predict(df_pca)

elif model=='kmeans':
elif model == 'kmeans':
fname = model_path + 'kmeans_16clusts.pkl'
clust = "cluster_KM_16"

kmeans_model, pca, pca_dims = pickle.load(open(fname, "rb"))
df_pca = pca.transform(df_norm).iloc[:,:pca_dims]
df_pca = pca.transform(df_norm).iloc[:, :pca_dims]
df[clust] = kmeans_model.predict(df_pca)

print("Cluster Size:\n{}".format(df[clust].value_counts()))

if compute_cluster_chars:
cluster_chars = compute_chars(df, clust, cluster_id=0, num_days=num_days)
compute_chars(df, clust, cluster_id=0, num_days=num_days)

return


if __name__ == '__main__':
dt = datetime.date.today()
logger_fname = 'logs/inference.log'.format(dt.strftime('%d%m%y'))
logger_fname = 'logs/inference.log'
print("Logging in {}".format(logger_fname))
logging.basicConfig(level=logging.DEBUG, filename=logger_fname,
filemode='a', format='%(asctime)s: %(message)s',
logging.basicConfig(level=logging.DEBUG,
filename=logger_fname,
filemode='a',
format='%(asctime)s: %(message)s',
datefmt='%m%d-%H%M')
run()
Loading

0 comments on commit 94c9e51

Please sign in to comment.