Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
6341b3b
added g test
dan-stats-1 Dec 18, 2022
3a13b19
Finished polling detector
dan-stats-1 Dec 22, 2022
f38d4db
finished unit tests for periodogram polling detector
dan-stats-1 Dec 31, 2022
7c705b9
removed unused variables
dan-stats-1 Jan 3, 2023
5dca70d
removed unused import
dan-stats-1 Jan 3, 2023
ba3274a
added additional test for multiple observations at each timestep
dan-stats-1 Jan 3, 2023
6060620
Merge branch 'main' into polling
dan-stats-1 Jan 3, 2023
e82f798
Merge branch 'main' into polling
petebryan Jan 4, 2023
af4b2d4
reformatted files and added tutorial notebook
dan-stats-1 Jan 4, 2023
5a320d1
reformatted to meet black, pylint, mypy and prospector requirements
dan-stats-1 Jan 4, 2023
fec45c6
Merge branch 'polling' of github.com:danielyates2/msticpy into polling
dan-stats-1 Jan 4, 2023
43cdf20
merged with polling branch
dan-stats-1 Jan 5, 2023
04d136f
Added further analysis to the documentation notebook
dan-stats-1 Jan 7, 2023
aaeed7f
reformatted code to conform with linting requirements
dan-stats-1 Jan 8, 2023
847ef3a
Merge branch 'polling' into polling_docs
dan-stats-1 Jan 8, 2023
ffd2b41
Finished polling docs
dan-stats-1 Jan 8, 2023
b151fc6
reformatted with black
dan-stats-1 Jan 8, 2023
23afe29
Merge branch 'main' into polling_docs
petebryan Jan 11, 2023
02ec7e9
Merge branch 'main' into polling_docs
ianhelle Jan 19, 2023
23e6f18
Removed junit xml file
dan-stats-1 Jan 21, 2023
7238d7f
Merge branch 'polling_docs' of github.com:danielyates2/msticpy into p…
dan-stats-1 Jan 21, 2023
67ce6cb
Merge branch 'main' into polling_docs
dan-stats-1 Jan 21, 2023
b2d1802
Merge branch 'main' into polling_docs
ianhelle Jan 23, 2023
06010cc
Merge branch 'main' into polling_docs
ianhelle Jan 23, 2023
1e39831
Merge branch 'main' into polling_docs
ianhelle Feb 3, 2023
69595f8
Added tests and functionality to accept a dataframe
dan-stats-1 Feb 6, 2023
1b18dc1
Merge branch 'polling_docs' of github.com:danielyates2/msticpy into p…
dan-stats-1 Feb 6, 2023
eb2c765
reformatted files to conform to linting requirements
dan-stats-1 Feb 6, 2023
fa7304c
Merge branch 'main' into polling_docs
ianhelle Feb 23, 2023
8a4dbcf
Merge branch 'main' into polling_docs
dan-stats-1 Mar 14, 2023
f0e3dfb
Merge branch 'main' into polling_docs
ianhelle Mar 16, 2023
c95a085
Merge branch 'main' into polling_docs
dan-stats-1 Mar 24, 2023
068c478
Merge branch 'main' into polling_docs
ianhelle Apr 11, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
459 changes: 459 additions & 0 deletions docs/notebooks/PollingDetection.ipynb

Large diffs are not rendered by default.

229 changes: 229 additions & 0 deletions msticpy/analysis/polling_detection.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,229 @@
# -------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for
# license information.
# --------------------------------------------------------------------------
"""
Polling detection module.

This module is used to highlight edges that are highly periodic and likely to be
generated automatically. The periodic edges could be software polling a server for
updates or malware beaconing and checking for instructions.

There is currently only one technique available for filtering polling data which is
the class PeriodogramPollingDetector.
"""
from collections import Counter
from typing import Optional, Tuple, Union, List

import numpy as np
import numpy.typing as npt
import pandas as pd

from scipy import signal, special

from ..common.utility import export


@export
class PeriodogramPollingDetector:
"""
Polling detector using the Periodogram to detect strong frequencies.

Attributes
----------
data: DataFrame
Dataframe containing the data to be analysed. Must contain a
column of edges and a column of timestamps

Methods
-------
detect_polling(timestamps, process_start, process_end, interval)
Detect strong periodic frequencies

"""

def __init__(self, data: pd.DataFrame, copy: bool = False) -> None:
"""
Create periodogram polling detector.

Parameters
----------
data: DataFrame
Dataframe containing the data to be analysed. Must contain a
column of edges and a column of timestamps

copy: bool
A bool to indicate whether to copy the dataframe supplied to data
"""
if copy:
self.data = data.copy()
else:
self.data = data

def _g_test(self, pxx: npt.NDArray, exclude_pi: bool) -> Tuple[float, float]:
"""
Carry out fishers g test for periodicity.

Fisher's g test tests the null hypothesis that the time series is gaussian white noise
against the alternative that there is a deterministic periodic component[1]

If the length of the time series is even then the intensity at pi should be excluded

If the length of the power spectral density estimate is larger than 700 then an approximate
p value is calculated otherwise the exact p value is calculate.

This implementation was taken from the R package GeneCycle[2]

Parameters
----------
pxx: ArrayLike
Estimate of the power spectral density

exclude_pi: bool
A bool to indicate whether the frequnecy located at pi should be removed.

Returns
-------
Tuple[float, float]
G test test statistic
G test P value

References
----------
[1] M. Ahdesmaki, H. Lahdesmaki and O. Yli-Harja, "Robust Fisher's Test for Periodicity
Detection in Noisy Biological Time Series," 2007 IEEE International Workshop on Genomic
Signal Processing and Statistics, 2007, pp. 1-4, doi: 10.1109/GENSIPS.2007.4365817.
[2] https://github.com/cran/GeneCycle/blob/master/R/fisher.g.test.R

"""
if exclude_pi:
pxx = pxx[:-1]

pxx_length = len(pxx)
test_statistic = np.max(pxx) / sum(pxx)
upper = np.floor(1 / test_statistic).astype("int")

if pxx_length > 700:
p_value = 1 - (1 - np.exp(-pxx_length * test_statistic)) ** pxx_length
else:
compose = []
for j in range(1, upper):
compose.append(
(-1) ** (j - 1)
* np.exp(
np.log(special.binom(pxx_length, j))
+ (pxx_length - 1) * np.log(1 - j * test_statistic)
)
)

p_value = sum(compose)

p_value = min(p_value, 1)

return test_statistic, p_value

def _detect_polling_arr(
self,
timestamps: npt.NDArray,
process_start: int,
process_end: int,
interval: int = 1,
) -> Tuple[float, float, float]:
"""
Carry out periodogram polling detecton on an array of timestamps.

Carries out the the procedure outlined in [1] to detect if the arrival times have a strong
periodic component.
The procedure estimates the periodogram for the data and passes the results to fishers G
test.

For more information run PeriodogramPollingDetector._g_test.__doc__

This code was adapted from [2].

Parameters
----------
timestamps: ArrayLike
An array like object containing connection arrival times as timestamps
process_start: int
The timestamp representing the start of the counting process
process_end: int
The timestamp representing the end of the counting process
interval: int
The interval in seconds between observations

Returns
-------
p_val: float
The p value from fishers G test

References
----------
[1] Heard, N. A. and Rubin-Delanchy, P. T. G. and Lawson, D. J. (2014) Filtering
automated polling traffic in computer network flow data. In proceedings of IEEE
Joint Intelligence and Security Informatics Conference 2014
[2] https://github.com/fraspass/human_activity/blob/master/fourier.py

"""
time_steps = np.arange(process_start, process_end, step=interval)
counting_process = Counter(timestamps)

dn_ = np.array([counting_process[t] for t in time_steps])
dn_star = dn_ - len(timestamps) / len(time_steps)

freq, pxx = signal.periodogram(dn_star)

max_pxx_freq = freq[np.argmax(pxx)]

if len(dn_star) % 2 == 0:
_, p_val = self._g_test(pxx, True)
else:
_, p_val = self._g_test(pxx, False)

return p_val, max_pxx_freq, 1 / max_pxx_freq

def detect_polling(
self, time_column: str, groupby: Optional[Union[List[str], str]] = None
) -> None:
"""
Detect the time interval which is highly periodic.

Runs PeriodogramPollingDetector._detect_polling_arr on the time_column and populates a
p_value column, dominant_frequency column and dominant_interval column.

If groupby column(s) are given then PeriodogramPollingDetector._detect_polling_arr is ran on
each group.

Parameters
----------
time_column: str
The name of the column that contains timestamps
groupby: str or list[str], optional
Column(s) to group by
"""
ts_col = self.data[time_column]

start = min(ts_col)
end = max(ts_col)

if not groupby:
p_value, freq, interval = self._detect_polling_arr(ts_col, start, end)

self.data["p_value"] = p_value
self.data["dominant_frequency"] = freq
self.data["dominant_interval"] = interval
else:
grouped_results = self.data.groupby(groupby).apply(
lambda x: self._detect_polling_arr(
x[time_column], min(x[time_column]), max(x[time_column])
)
)

grouped_results_df = pd.DataFrame(
grouped_results.tolist(),
columns=["p_value", "dominant_frequency", "dominant_interval"],
index=grouped_results.index,
).reset_index()

self.data = self.data.merge(grouped_results_df)
Empty file.
44 changes: 44 additions & 0 deletions tests/analysis/polling_detection/conftest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
# -------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for
# license information.
# --------------------------------------------------------------------------
"""Polling detection module test fixtures"""
import numpy as np
import pandas as pd
import pytest

np.random.seed(10)


@pytest.fixture()
def periodic_data():
np.random.seed(10)

N = 86400
start_ts = 1669852800
end_ts = start_ts + N

homo_pois = np.random.poisson(1.5, N)
freq = 0.01666666666666
periodic = (10 * np.sin(2 * np.pi * freq * np.arange(0, N))).astype("int")
periodic[periodic < 0] = 0
x = (periodic + homo_pois).astype("bool")
ts = np.arange(start_ts, end_ts)[x]

return pd.DataFrame({"edges": "periodic_edge", "timestamps": ts})


@pytest.fixture()
def non_periodic_data():
np.random.seed(10)

N = 86400
start_ts = 1669852800
end_ts = start_ts + N

homo_pois = np.random.poisson(1.5, N)
x = homo_pois.astype("bool")
ts = np.arange(start_ts, end_ts)[x]

return pd.DataFrame({"edges": "non_periodic_edge", "timestamps": ts})
Loading