@@ -124,28 +124,37 @@ static int fw_unix_create(struct flb_in_fw_config *ctx)
124124static int in_fw_collect (struct flb_input_instance * ins ,
125125 struct flb_config * config , void * in_context )
126126{
127+ int state_backup ;
127128 struct flb_connection * connection ;
128129 struct fw_conn * conn ;
129130 struct flb_in_fw_config * ctx ;
130131
131132 ctx = in_context ;
132133
134+ state_backup = ctx -> state ;
135+ ctx -> state = FW_INSTANCE_STATE_ACCEPTING_CLIENT ;
136+
133137 connection = flb_downstream_conn_get (ctx -> downstream );
134138
135139 if (connection == NULL ) {
136140 flb_plg_error (ctx -> ins , "could not accept new connection" );
141+ ctx -> state = state_backup ;
137142
138143 return -1 ;
139144 }
140145
141146 if (!config -> is_ingestion_active ) {
142147 flb_downstream_conn_release (connection );
148+ ctx -> state = state_backup ;
149+
143150 return -1 ;
144151 }
145152
146153 if (ctx -> is_paused ) {
147154 flb_downstream_conn_release (connection );
148155 flb_plg_trace (ins , "TCP connection will be closed FD=%i" , connection -> fd );
156+ ctx -> state = state_backup ;
157+
149158 return -1 ;
150159 }
151160
@@ -154,9 +163,17 @@ static int in_fw_collect(struct flb_input_instance *ins,
154163 conn = fw_conn_add (connection , ctx );
155164 if (!conn ) {
156165 flb_downstream_conn_release (connection );
166+ ctx -> state = state_backup ;
167+
157168 return -1 ;
158169 }
159170
171+ ctx -> state = state_backup ;
172+
173+ if (ctx -> state == FW_INSTANCE_STATE_PAUSED ) {
174+ fw_conn_del_all (ctx );
175+ }
176+
160177 return 0 ;
161178}
162179
@@ -263,6 +280,7 @@ static int in_fw_init(struct flb_input_instance *ins,
263280 return -1 ;
264281 }
265282
283+ ctx -> state = FW_INSTANCE_STATE_RUNNING ;
266284 ctx -> coll_fd = -1 ;
267285 ctx -> ins = ins ;
268286 mk_list_init (& ctx -> connections );
@@ -386,7 +404,10 @@ static void in_fw_pause(void *data, struct flb_config *config)
386404 return ;
387405 }
388406
389- fw_conn_del_all (ctx );
407+ if (ctx -> state == FW_INSTANCE_STATE_RUNNING ) {
408+ fw_conn_del_all (ctx );
409+ }
410+
390411 ctx -> is_paused = FLB_TRUE ;
391412 ret = pthread_mutex_unlock (& ctx -> conn_mutex );
392413 if (ret != 0 ) {
@@ -406,6 +427,8 @@ static void in_fw_pause(void *data, struct flb_config *config)
406427 if (config -> is_ingestion_active == FLB_FALSE ) {
407428 fw_conn_del_all (ctx );
408429 }
430+
431+ ctx -> state = FW_INSTANCE_STATE_PAUSED ;
409432}
410433
411434static void in_fw_resume (void * data , struct flb_config * config ) {
@@ -427,6 +450,8 @@ static void in_fw_resume(void *data, struct flb_config *config) {
427450 flb_plg_error (ctx -> ins , "cannot unlock collector mutex" );
428451 return ;
429452 }
453+
454+ ctx -> state = FW_INSTANCE_STATE_RUNNING ;
430455 }
431456}
432457
0 commit comments