Skip to content

Commit

Permalink
[remote-udaf](sample) add some python demo (apache#11760)
Browse files Browse the repository at this point in the history
  • Loading branch information
chenlinzhong authored Aug 16, 2022
1 parent f3f1bbc commit 8a3ee91
Show file tree
Hide file tree
Showing 2 changed files with 198 additions and 4 deletions.
129 changes: 128 additions & 1 deletion samples/doris-demo/remote-udaf-python-demo/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,4 +26,131 @@ under the License.
# Run

`python function_server_demo.py 9000`
`9000` is the port that the server will listen on
`9000` is the port that the server will listen on

# Demo


```
//create one table such as table2
CREATE TABLE `table2` (
`event_day` date NULL,
`siteid` int(11) NULL DEFAULT "10",
`citycode` smallint(6) NULL,
`visitinfo` varchar(1024) NULL DEFAULT "",
`pv` varchar(1024) REPLACE NULL DEFAULT "0"
) ENGINE=OLAP
AGGREGATE KEY(`event_day`, `siteid`, `citycode`, `visitinfo`)
COMMENT 'OLAP'
DISTRIBUTED BY HASH(`siteid`) BUCKETS 10
PROPERTIES (
"replication_allocation" = "tag.location.default: 1",
"in_memory" = "false",
"storage_format" = "V2"
)
//import some data
MySQL [test_db]> select * from table2;
+------------+--------+----------+------------------------------------+------+
| event_day | siteid | citycode | visitinfo | pv |
+------------+--------+----------+------------------------------------+------+
| 2017-07-03 | 8 | 12 | {"ip":"192.168.0.5","source":"pc"} | 81 |
| 2017-07-03 | 37 | 12 | {"ip":"192.168.0.3","source":"pc"} | 81 |
| 2017-07-03 | 67 | 16 | {"ip":"192.168.0.2","source":"pc"} | 79 |
| 2017-07-03 | 101 | 11 | {"ip":"192.168.0.5","source":"pc"} | 65 |
| 2017-07-03 | 32 | 15 | {"ip":"192.168.0.1","source":"pc"} | 188 |
| 2017-07-03 | 103 | 12 | {"ip":"192.168.0.5","source":"pc"} | 123 |
| 2017-07-03 | 104 | 16 | {"ip":"192.168.0.5","source":"pc"} | 79 |
| 2017-07-03 | 3 | 12 | {"ip":"192.168.0.3","source":"pc"} | 123 |
| 2017-07-03 | 3 | 15 | {"ip":"192.168.0.2","source":"pc"} | 188 |
| 2017-07-03 | 13 | 11 | {"ip":"192.168.0.1","source":"pc"} | 65 |
| 2017-07-03 | 53 | 12 | {"ip":"192.168.0.2","source":"pc"} | 123 |
| 2017-07-03 | 1 | 11 | {"ip":"192.168.0.1","source":"pc"} | 65 |
| 2017-07-03 | 7 | 16 | {"ip":"192.168.0.4","source":"pc"} | 79 |
| 2017-07-03 | 102 | 15 | {"ip":"192.168.0.5","source":"pc"} | 188 |
| 2017-07-03 | 105 | 12 | {"ip":"192.168.0.5","source":"pc"} | 81 |
+------------+--------+----------+------------------------------------+------+
```

### 1. find most visit top 3 ip
```
MySQL [test_db]> CREATE AGGREGATE FUNCTION rpc_count_visit_info(varchar(1024)) RETURNS varchar(1024) PROPERTIES (
"TYPE"="RPC",
"OBJECT_FILE"="127.0.0.1:9000",
"update_fn"="rpc_count_visit_info_update",
"merge_fn"="rpc_count_visit_info_merge",
"finalize_fn"="rpc_count_visit_info_finalize"
);
MySQL [test_db]> select rpc_count_visit_info(visitinfo) from table2;
+--------------------------------------------+
| rpc_count_visit_info(`visitinfo`) |
+--------------------------------------------+
| 192.168.0.5:6 192.168.0.2:3 192.168.0.1:3 |
+--------------------------------------------+
1 row in set (0.036 sec)
MySQL [test_db]> select citycode, rpc_count_visit_info(visitinfo) from table2 group by citycode;
+----------+--------------------------------------------+
| citycode | rpc_count_visit_info(`visitinfo`) |
+----------+--------------------------------------------+
| 15 | 192.168.0.2:1 192.168.0.1:1 192.168.0.5:1 |
| 11 | 192.168.0.1:2 192.168.0.5:1 |
| 12 | 192.168.0.5:3 192.168.0.3:2 192.168.0.2:1 |
| 16 | 192.168.0.2:1 192.168.0.4:1 192.168.0.5:1 |
+----------+--------------------------------------------+
4 rows in set (0.050 sec)
```
### 2. sum pv
```
CREATE AGGREGATE FUNCTION rpc_sum(bigint) RETURNS bigint PROPERTIES (
"TYPE"="RPC",
"OBJECT_FILE"="127.0.0.1:9700",
"update_fn"="rpc_sum_update",
"merge_fn"="rpc_sum_merge",
"finalize_fn"="rpc_sum_finalize"
);
MySQL [test_db]> select citycode, rpc_sum(pv) from table2 group by citycode;
+----------+---------------+
| citycode | rpc_sum(`pv`) |
+----------+---------------+
| 15 | 564 |
| 11 | 195 |
| 12 | 612 |
| 16 | 237 |
+----------+---------------+
4 rows in set (0.067 sec)
MySQL [test_db]> select rpc_sum(pv) from table2;
+---------------+
| rpc_sum(`pv`) |
+---------------+
| 1608 |
+---------------+
1 row in set (0.030 sec)
```

### 3. avg pv

```
CREATE AGGREGATE FUNCTION rpc_avg(int) RETURNS double PROPERTIES (
"TYPE"="RPC",
"OBJECT_FILE"="127.0.0.1:9000",
"update_fn"="rpc_avg_update",
"merge_fn"="rpc_avg_merge",
"finalize_fn"="rpc_avg_finalize"
);
MySQL [test_db]> select citycode, rpc_avg(pv) from table2 group by citycode;
+----------+---------------+
| citycode | rpc_avg(`pv`) |
+----------+---------------+
| 15 | 188 |
| 11 | 65 |
| 12 | 102 |
| 16 | 79 |
+----------+---------------+
4 rows in set (0.039 sec)
MySQL [test_db]> select rpc_avg(pv) from table2;
+---------------+
| rpc_avg(`pv`) |
+---------------+
| 107.2 |
+---------------+
1 row in set (0.028 sec)
```
73 changes: 70 additions & 3 deletions samples/doris-demo/remote-udaf-python-demo/function_server_demo.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
import types_pb2
import sys
import time;
import json



class FunctionServerDemo(function_service_pb2_grpc.PFunctionServiceServicer):
Expand All @@ -36,7 +38,6 @@ def fn_call(self, request, context):
status = types_pb2.PStatus()
status.status_code = 0
response.status.CopyFrom(status)

if request.function_name == "rpc_sum_update":
result = types_pb2.PValues()
result.has_null = False
Expand Down Expand Up @@ -107,8 +108,8 @@ def fn_call(self, request, context):
for i in range(args_len):
total += request.args[i].double_value[0]
size += request.args[i].int32_value[0]
result.add_double.append(total)
result.add_int32.append(size)
result.double_value.append(total)
result.int32_value.append(size)
response.result.append(result)

if request.function_name == "rpc_avg_finalize":
Expand All @@ -122,6 +123,72 @@ def fn_call(self, request, context):
avg = total / size
result.double_value.append(avg)
response.result.append(result)
if request.function_name == "rpc_count_visit_info_update":
result = types_pb2.PValues()
result.has_null = False
result_type = types_pb2.PGenericType()
result_type.id = types_pb2.PGenericType.STRING
result.type.CopyFrom(result_type)
size = len(request.args[0].string_value)
currentMap=dict()
if request.HasField("context"):
context = request.context.function_context.args_data[0].string_value[0]
currentMap = json.loads(context)
for i in range(size):
s = request.args[0].string_value[i]
mapInfo = json.loads(s)
ip=mapInfo['ip']
if currentMap.has_key(ip):
last_val=currentMap[ip]
last_val+=1
currentMap[ip] = last_val
else:
currentMap[ip] = 1
json_dict = json.dumps(currentMap)
result.string_value.append(json_dict)
response.result.append(result)

if request.function_name == "rpc_count_visit_info_merge":
result = types_pb2.PValues()
result.has_null = False
result_type = types_pb2.PGenericType()
result_type.id = types_pb2.PGenericType.STRING
result.type.CopyFrom(result_type)

context1 = request.args[0].string_value[0]
currentMap1 = json.loads(context1)
context2 = request.args[1].string_value[0]
currentMap2 = json.loads(context2)
for ip,num in currentMap2.items():
if currentMap1.has_key(ip):
currentMap1[ip] = currentMap1[ip] + num
else:
currentMap1[ip] = num
json_dict = json.dumps(currentMap1)
result.string_value.append(json_dict)
response.result.append(result)

if request.function_name == "rpc_count_visit_info_finalize":
result = types_pb2.PValues()
result.has_null = False
result_type = types_pb2.PGenericType()
result_type.id = types_pb2.PGenericType.STRING
result.type.CopyFrom(result_type)

context = request.context.function_context.args_data[0].string_value[0]
currentMap = json.loads(context)
sortedMap=sorted(currentMap.items(), key = lambda kv:(kv[1], kv[0]),reverse=True)
resultMap=dict()
topN=3
if len(sortedMap) < topN:
topN = len(sortedMap)
finalResult=""
print(sortedMap)
for i in range(topN):
ip=sortedMap[i][0]
finalResult +=ip +":"+str(sortedMap[i][1]) +" "
result.string_value.append(finalResult)
response.result.append(result)
return response

def check_fn(self, request, context):
Expand Down

0 comments on commit 8a3ee91

Please sign in to comment.