Skip to content

Commit 8362743

Browse files
authored
Merge pull request #9 from Sachin-Kottarathodi/use-case-fraud
Clean up
2 parents 5bd8c16 + 2496c6f commit 8362743

File tree

5 files changed

+226
-40
lines changed

5 files changed

+226
-40
lines changed
Lines changed: 171 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,171 @@
1+
{
2+
"annotations": {
3+
"list": [
4+
{
5+
"builtIn": 1,
6+
"datasource": "-- Grafana --",
7+
"enable": true,
8+
"hide": true,
9+
"iconColor": "rgba(0, 211, 255, 1)",
10+
"name": "Annotations & Alerts",
11+
"type": "dashboard"
12+
}
13+
]
14+
},
15+
"editable": true,
16+
"gnetId": null,
17+
"graphTooltip": 0,
18+
"id": 3,
19+
"links": [],
20+
"panels": [
21+
{
22+
"datasource": "Redis",
23+
"fieldConfig": {
24+
"defaults": {
25+
"custom": {
26+
"align": null,
27+
"filterable": false
28+
},
29+
"mappings": [],
30+
"thresholds": {
31+
"mode": "absolute",
32+
"steps": [
33+
{
34+
"color": "green",
35+
"value": null
36+
},
37+
{
38+
"color": "red",
39+
"value": 80
40+
}
41+
]
42+
}
43+
},
44+
"overrides": []
45+
},
46+
"gridPos": {
47+
"h": 9,
48+
"w": 12,
49+
"x": 0,
50+
"y": 0
51+
},
52+
"id": 2,
53+
"options": {
54+
"orientation": "horizontal",
55+
"reduceOptions": {
56+
"calcs": [
57+
"mean"
58+
],
59+
"fields": "",
60+
"values": false
61+
},
62+
"showThresholdLabels": false,
63+
"showThresholdMarkers": true
64+
},
65+
"pluginVersion": "7.3.7",
66+
"targets": [
67+
{
68+
"aggregation": "",
69+
"command": "ts.range",
70+
"keyName": "clean",
71+
"query": "",
72+
"refId": "A",
73+
"streaming": false,
74+
"type": "timeSeries"
75+
}
76+
],
77+
"timeFrom": null,
78+
"timeShift": null,
79+
"title": "Clean",
80+
"transparent": true,
81+
"type": "gauge"
82+
},
83+
{
84+
"datasource": "Redis",
85+
"fieldConfig": {
86+
"defaults": {
87+
"custom": {
88+
"align": null,
89+
"filterable": false
90+
},
91+
"mappings": [],
92+
"thresholds": {
93+
"mode": "absolute",
94+
"steps": [
95+
{
96+
"color": "green",
97+
"value": null
98+
},
99+
{
100+
"color": "red",
101+
"value": 0
102+
}
103+
]
104+
}
105+
},
106+
"overrides": []
107+
},
108+
"gridPos": {
109+
"h": 9,
110+
"w": 12,
111+
"x": 12,
112+
"y": 0
113+
},
114+
"id": 3,
115+
"options": {
116+
"orientation": "horizontal",
117+
"reduceOptions": {
118+
"calcs": [
119+
"mean"
120+
],
121+
"fields": "",
122+
"values": false
123+
},
124+
"showThresholdLabels": true,
125+
"showThresholdMarkers": true
126+
},
127+
"pluginVersion": "7.3.7",
128+
"targets": [
129+
{
130+
"aggregation": "",
131+
"command": "ts.range",
132+
"keyName": "click_spam",
133+
"query": "",
134+
"refId": "B",
135+
"streaming": false,
136+
"type": "timeSeries"
137+
},
138+
{
139+
"aggregation": "",
140+
"command": "ts.range",
141+
"keyName": "ip_blacklist",
142+
"query": "",
143+
"refId": "C",
144+
"streaming": false,
145+
"type": "timeSeries"
146+
}
147+
],
148+
"timeFrom": null,
149+
"timeShift": null,
150+
"title": "Fraud Types",
151+
"transparent": true,
152+
"type": "gauge"
153+
}
154+
],
155+
"refresh": "5s",
156+
"schemaVersion": 26,
157+
"style": "dark",
158+
"tags": [],
159+
"templating": {
160+
"list": []
161+
},
162+
"time": {
163+
"from": "now-30m",
164+
"to": "now"
165+
},
166+
"timepicker": {},
167+
"timezone": "",
168+
"title": "Fraud Stats",
169+
"uid": "P0uE4KEMz",
170+
"version": 3
171+
}

use-cases/fraud-detection/README.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,10 +34,11 @@ docker run -e REDIS_HOST='<redis-host>' -e REDIS_PORT=<redis-port> -p 5000:5000
3434

3535
This will start a docker which runs a flask server on port 5000.
3636

37+
3738
## Sample curl:
3839

3940
```
40-
curl --location --request POST 'localhost:5000' \
41+
curl --request POST 'localhost:5000' \
4142
--header 'Content-Type: application/json' \
4243
--data-raw '{
4344
"device_id": "111-000-000",
Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,11 @@
11
class Constants:
22

33
IP_CUCKOO_FILTER_NAME = "ip_cf"
4-
AD_STACK_WINDOW = 10 * 1000 # default window in seconds
5-
AD_STACK_THRESHOLD = 2
6-
CLEAN_STREAM_NAME = 'clean_stream'
7-
FRAUD_STREAM_NAME = 'fraud_stream'
8-
CLEAN_TS = "clean_ts"
9-
FRAUD_TS = "fraud_ts"
4+
CLICK_SPAM_WINDOW_IN_SEC = 10
5+
CLICK_SPAM_THRESHOLD = 2
6+
STREAM_NAME = "data_stream"
7+
CLEAN = "clean"
8+
FRAUD = "fraud"
9+
CLICK_SPAM = "click_spam"
10+
IP_BLACKLIST = "ip_blacklist"
11+
Lines changed: 26 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import time
2-
import json
2+
import os
33

44
from singleton_decorator import singleton
55

@@ -11,38 +11,40 @@
1111
class FraudChecks:
1212

1313
def __init__(self):
14-
pass
14+
self.click_spam_window_in_sec = int(os.getenv("CLICK_SPAM_WINDOW_IN_SEC", Constants.CLICK_SPAM_WINDOW_IN_SEC)) * 1000
15+
self.click_spam_threshold = int(os.getenv("CLICK_SPAM_THRESHOLD", Constants.CLICK_SPAM_THRESHOLD))
1516

1617
def check_fraud(self, data):
17-
if self.ip_fraud(data):
18-
return 'Fraud IP'
19-
if self.ad_stack(data):
20-
return 'Ad Stacked'
21-
return 'Clean Event'
18+
data['status'] = Constants.CLEAN
19+
data['fraud_type'] = Constants.CLEAN
20+
data['ts'] = int(time.time()*1000)
21+
if self.ip_fraud(data) or self.click_spam(data):
22+
pass
23+
self.publish(data)
24+
return data['fraud_type']
2225

2326
def ip_fraud(self, data):
2427
exists = RedisConn().bloom().cfExists(Constants.IP_CUCKOO_FILTER_NAME, data['ip'])
25-
self.publish(data, "Clean" if not exists else "Fraud")
28+
if exists:
29+
data['fraud_type'] = Constants.IP_BLACKLIST
30+
data['status'] = Constants.FRAUD
31+
2632
return exists
2733

28-
def ad_stack(self, data):
34+
def click_spam(self, data):
2935
##
3036
# 'key' of each sorted is the device_id received.
31-
# Ad stacked is True if the count in the range of scores (which is timestamp of event) -
37+
# click spam is True if the count in the range of scores (which is timestamp of event) -
3238
# - is greater than a threshold.
3339
#
3440
##
35-
ts = int(time.time() * 1000)
36-
is_ad_stacked = False
37-
member = json.dumps({'device_id': data['device_id'], 'transaction_id': data['transaction_id'], 'ts': ts})
38-
RedisConn().redis().zadd(data.get('device_id'), {member: ts})
39-
count = RedisConn().redis().zcount(data.get('device_id'), ts - Constants.AD_STACK_WINDOW, ts)
40-
if count > Constants.AD_STACK_THRESHOLD:
41-
is_ad_stacked = True
42-
self.publish(data, "Fraud" if is_ad_stacked else "Clean")
43-
return is_ad_stacked
44-
45-
def publish(self, data, status):
46-
data['status'] = status
47-
stream = Constants.CLEAN_STREAM_NAME if status == 'Clean' else Constants.FRAUD_STREAM_NAME
48-
RedisConn().redis().xadd(stream, data, id='*')
41+
is_click_spammed = False
42+
count = RedisConn().redis().zcount(data.get('device_id'), data['ts'] - self.click_spam_window_in_sec, data['ts'])
43+
if count >= self.click_spam_threshold:
44+
is_click_spammed = True
45+
data['fraud_type'] = Constants.CLICK_SPAM
46+
data['status'] = Constants.FRAUD
47+
return is_click_spammed
48+
49+
def publish(self, data):
50+
RedisConn().redis().xadd(Constants.STREAM_NAME, data, id='*')
Lines changed: 19 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,11 @@
11
from gearsclient import GearsRemoteBuilder as GearsBuilder
2-
from gearsclient import execute
3-
42

53
from redis_conn import RedisConn
6-
from constants import Constants
4+
5+
import os
6+
import json
7+
import redis
8+
from redistimeseries.client import Client
79

810

911
class Setup:
@@ -12,15 +14,23 @@ def init(self):
1214
self.__register_gears()
1315

1416
def __register_gears(self):
15-
# Todo: need better way to check if gears is registered.
1617
redis_conn = RedisConn().redis()
1718
is_reg = redis_conn.get("gears_registered")
1819
if is_reg and int(is_reg) == 1:
1920
# Gears already registered
2021
return
2122

22-
GearsBuilder(reader='StreamReader', r=redis_conn).foreach(lambda x: execute("TS.INCRBY", "clean_ts", 1))\
23-
.register(Constants.CLEAN_STREAM_NAME)
24-
GearsBuilder(reader='StreamReader', r=redis_conn).foreach(lambda x: execute("TS.INCRBY", "fraud_ts", 1))\
25-
.register(Constants.FRAUD_STREAM_NAME)
26-
redis_conn.set("gears_registered", 1)
23+
def stream_handler(item):
24+
data = item['value']
25+
member = json.dumps(
26+
{'device_id': data['device_id'],
27+
'transaction_id': data['transaction_id'],
28+
'ts': data['ts'],
29+
})
30+
redis.Redis().zadd(data.get('device_id'), {member: data['ts']})
31+
Client().incrby(data['fraud_type'], 1)
32+
33+
GearsBuilder(reader='StreamReader', r=redis_conn, requirements=["redis", "redistimeseries"]).foreach(stream_handler).register('data_stream')
34+
# To avoid multiple gears from being registered for single use case, set this when register is done,
35+
# unset this if you want to re-register the Gear when application runs again.
36+
redis_conn.set("gears_registered", 1)

0 commit comments

Comments
 (0)