1
+ /// Example of kafka-json-processor-generator plugin, in Rust.
2
+ ///
3
+ /// This plugin has the same purpose as ./kjp-generator-generators/static_field.sh.
4
+ /// The following code will generate a processor that adds a static field to the output JSON.
5
+ /// See the test at the end of this file for an example of generated code.
6
+ ///
7
+ /// This executable needs to be run with the following arguments:
8
+ /// `./static_field $function_name field $field_name value $static_value`
9
+ /// Example:
10
+ /// `./static_field static_field_function field '$.hello' value 'Greetings, folks!'`
11
+ ///
12
+ /// It will be activated by placing a compiled executable in the generators directory
13
+ /// (default: ./generators, can be set with --generators-path <GENERATORS_PATH> when using kjp_generator).
14
+
15
+ use kjp_generator_plugin:: { GeneratorError , json_path_to_object_key, JsonFieldName , ProcessorParams , return_generated} ;
16
+ use kjp_generator_plugin:: GeneratorError :: RequiredConfigNotFound ;
17
+
18
+ fn main ( ) {
19
+ return_generated ( static_field) ;
20
+ }
21
+
22
+ /// Generates a processor that adds a static field to JSON
23
+ ///
24
+ /// This generates a function that adds a static field to a JSON from topic.
25
+ /// Available config options:
26
+ /// - "field" - static field name
27
+ /// - "value" - a string to put in this field
28
+ pub fn static_field ( params : ProcessorParams ) -> Result < String , GeneratorError > {
29
+ let function_name = params. function_name ;
30
+ let config = params. config ;
31
+
32
+ let target_field = json_path_to_object_key (
33
+ config. get ( "field" )
34
+ . ok_or_else ( || RequiredConfigNotFound {
35
+ function_name : function_name. to_string ( ) ,
36
+ field_name : "field" . to_string ( ) ,
37
+ description : None ,
38
+ } ) ?
39
+ ) ;
40
+
41
+ let value = config. get ( "value" )
42
+ . ok_or_else ( || RequiredConfigNotFound {
43
+ function_name : function_name. to_string ( ) ,
44
+ field_name : "value" . to_string ( ) ,
45
+ description : None ,
46
+ } ) ?
47
+ . escape_for_json ( ) ;
48
+
49
+ Ok ( FUNCTION_TEMPLATE
50
+ . replace ( FUNCTION_NAME , & function_name)
51
+ . replace ( TARGET_FIELD , & target_field)
52
+ . replace ( VALUE , & value)
53
+ )
54
+ }
55
+
56
+ const FUNCTION_TEMPLATE : & str = r##"
57
+ fn %%FUNCTION_NAME%%(_input: &Value, message: &mut OutputMessage) -> Result<(), ProcessingError> {
58
+ message.insert_val(%%TARGET_FIELD%%, Value::String("%%VALUE%%".to_string()))?;
59
+ Ok(())
60
+ }
61
+ "## ;
62
+
63
+ const FUNCTION_NAME : & str = "%%FUNCTION_NAME%%" ;
64
+ const TARGET_FIELD : & str = "%%TARGET_FIELD%%" ;
65
+ const VALUE : & str = "%%VALUE%%" ;
66
+
67
+ #[ cfg( test) ]
68
+ mod test {
69
+ use std:: collections:: HashMap ;
70
+ use kjp_generator_plugin:: ProcessorParams ;
71
+ use crate :: static_field;
72
+
73
+ #[ test]
74
+ fn should_generate_static_field ( ) {
75
+ let mut config = HashMap :: new ( ) ;
76
+ config. insert ( "field" . to_string ( ) , "$.example[0]" . to_string ( ) ) ;
77
+ config. insert ( "value" . to_string ( ) , "abcdef" . to_string ( ) ) ;
78
+ let result = static_field ( ProcessorParams {
79
+ function_name : "abc1" . to_string ( ) ,
80
+ config,
81
+ } ) ;
82
+ assert_eq ! ( Ok ( r##"
83
+ fn abc1(_input: &Value, message: &mut OutputMessage) -> Result<(), ProcessingError> {
84
+ message.insert_val(&[Key("example".to_string()), Index(0)], Value::String("abcdef".to_string()))?;
85
+ Ok(())
86
+ }
87
+ "## . to_string( ) ) , result) ;
88
+ }
89
+ }
0 commit comments