@@ -91,20 +91,22 @@ def subscribe_event(self, subscribe_channel='__keyevent@0__:expired', handler='s
91
91
pubsub_client .subscribe (subscribe_channel )
92
92
for message in pubsub_client .listen ():
93
93
expired_key = self .get_key (message ['data' ])
94
- if expired_key .startswith ("emails_" ):
95
- shadow_key = '_%s' % expired_key
96
- try :
97
- if shadow_key :
98
- expired_key_value = self .redis_client .get (shadow_key )
99
- if expired_key_value :
100
- expired_key_value = json .dumps (expired_key_value .decode ('utf-8' ))
101
- expired_key_json = json .loads (expired_key_value )
102
- if expired_key_json :
103
- self .send_to_sqs (expired_key_json )
104
- except Exception as e :
105
- print (e )
94
+ shadow_key = '_%s' % expired_key
95
+ try :
106
96
if shadow_key :
107
- self .redis_client .delete (shadow_key )
97
+ expired_key_value = self .redis_client .get (shadow_key )
98
+ if expired_key_value :
99
+ expired_key_value = json .dumps (expired_key_value .decode ('utf-8' ))
100
+ expired_key_json = json .loads (expired_key_value )
101
+ if expired_key_json :
102
+ if expired_key .startswith ("emails_" ):
103
+ self .send_to_sqs (expired_key_json )
104
+ elif expired_key .startswith ("checkpoint_dependency_" ):
105
+ self .send_to_redis_tasks (expired_key_json )
106
+ except Exception as e :
107
+ print (e )
108
+ if shadow_key :
109
+ self .redis_client .delete (shadow_key )
108
110
except Exception as e :
109
111
print (e )
110
112
@@ -165,3 +167,12 @@ def send_to_sqs(self, msg):
165
167
print (' -- Sent to SQS -- ' )
166
168
except Exception as e :
167
169
print (e )
170
+
171
+
172
+ def send_to_redis_tasks (self , msg ):
173
+ try :
174
+ channel_name = 'dependency_execute'
175
+ self .redis_client .publish (channel_name , json .dumps (msg ))
176
+ print (' -- Sent to SQS -- ' )
177
+ except Exception as e :
178
+ print (e )
0 commit comments