|
| 1 | +# Multi Service example |
| 2 | +# |
| 3 | +# 1) starts/stops some number of services. |
| 4 | +# 2) starts/gets result from a task defined in the service. |
| 5 | +# In this case, the task is to generate psudo random primes inefficiently. |
| 6 | +# 3) Given some number of tasks, schedules them on the available services. |
| 7 | +# 4) Displays average execution time for all the tasks in |
| 8 | +# 'micro seconds per task unit'. Normalized because task time is 'random'. |
| 9 | +# |
| 10 | +# See the build() method for configuring the number of services or tasks. |
| 11 | +# |
| 12 | +# DEPENDS ON: |
| 13 | +# |
| 14 | +# A) kivy==master |
| 15 | +# kivy==2.0.0 has an issue, |
| 16 | +# see https://github.com/Android-for-Python/Android-for-Python-Users#service-lifetime |
| 17 | +# B) oscpy ( minimum 0.6.0 ) |
| 18 | +# |
| 19 | +# Based, in part, on https://github.com/tshirtman/kivy_service_osc |
| 20 | +# |
| 21 | +# Source https://github.com/Android-for-Python/Multi-Service-Example |
| 22 | + |
| 23 | + |
| 24 | +from kivy.app import App |
| 25 | +from kivy.lang import Builder |
| 26 | +from kivy.utils import platform |
| 27 | +from kivy.clock import Clock |
| 28 | +from functools import partial |
| 29 | +from oscpy.client import OSCClient |
| 30 | +from oscpy.server import OSCThreadServer |
| 31 | +from time import time, sleep |
| 32 | +from os import cpu_count |
| 33 | + |
| 34 | +if platform == 'android': |
| 35 | + from jnius import autoclass |
| 36 | +elif platform in ('linux', 'linux2', 'macos', 'win'): |
| 37 | + from runpy import run_path |
| 38 | + from threading import Thread |
| 39 | +else: |
| 40 | + raise NotImplementedError("service start not implemented on this platform") |
| 41 | + |
| 42 | +KV = ''' |
| 43 | +BoxLayout: |
| 44 | + orientation: 'vertical' |
| 45 | + BoxLayout: |
| 46 | + size_hint_y: None |
| 47 | + height: '30sp' |
| 48 | + Button: |
| 49 | + text: 'start services' |
| 50 | + on_press: app.start_services() |
| 51 | + Button: |
| 52 | + text: 'start tasks' |
| 53 | + on_press: app.start_tasks() |
| 54 | + Button: |
| 55 | + text: 'stop services' |
| 56 | + on_press: app.stop_services() |
| 57 | +
|
| 58 | + ScrollView: |
| 59 | + Label: |
| 60 | + id: label |
| 61 | + size_hint_y: None |
| 62 | + height: self.texture_size[1] |
| 63 | + text_size: self.size[0], None |
| 64 | +
|
| 65 | +''' |
| 66 | + |
| 67 | +class MultiService(App): |
| 68 | + ### Build |
| 69 | + ################### |
| 70 | + def build(self): |
| 71 | + self.server = server = OSCThreadServer() |
| 72 | + server.listen( |
| 73 | + address=b'localhost', |
| 74 | + port=3002, #### Also hardcoded 3002 in service.py |
| 75 | + default=True, |
| 76 | + ) |
| 77 | + server.bind(b'/result', self.task_finished) |
| 78 | + server.bind(b'/tcip_port', self.save_tcip_port) |
| 79 | + server.bind(b'/echo', self.recieve_echo) |
| 80 | + self.service = None |
| 81 | + self.num_services_ready = 0 |
| 82 | + self.clients = [] |
| 83 | + self.tcpip_ports = [] |
| 84 | + ################# Configure here ################# |
| 85 | + # num services MUST be <= number in buildozer.spec |
| 86 | + # num services approx upper bound is os.cpu_count |
| 87 | + self.num_buildozer_spec_services = 8 |
| 88 | + self.number_of_services = min(6, cpu_count()) |
| 89 | + self.number_of_tasks = 20 |
| 90 | + ################################################## |
| 91 | + self.root = Builder.load_string(KV) |
| 92 | + return self.root |
| 93 | + |
| 94 | + ### Manage Services |
| 95 | + ################### |
| 96 | + def start_services(self): |
| 97 | + if not self.service: |
| 98 | + for i in range(self.number_of_services): |
| 99 | + self.start_service(i) |
| 100 | + |
| 101 | + def start_service(self,id): |
| 102 | + if platform == 'android': |
| 103 | + from android import mActivity |
| 104 | + context = mActivity.getApplicationContext() |
| 105 | + SERVICE_NAME = str(context.getPackageName()) +\ |
| 106 | + '.Service' + 'Worker_' + str(id) |
| 107 | + self.service = autoclass(SERVICE_NAME) |
| 108 | + self.service.start(mActivity,'') |
| 109 | + |
| 110 | + elif platform in ('linux', 'linux2', 'macos', 'win'): |
| 111 | + # Usually 'import multiprocessing' |
| 112 | + # This is for debug of service.py behavior (not performance) |
| 113 | + self.service = Thread( |
| 114 | + target=run_path, |
| 115 | + args=['service.py'], |
| 116 | + kwargs={'run_name': '__main__'}, |
| 117 | + daemon=True |
| 118 | + ) |
| 119 | + self.service.start() |
| 120 | + |
| 121 | + def stop_services(self): |
| 122 | + for client in self.clients: |
| 123 | + client.send_message(b'/stop_service', []) |
| 124 | + self.service = None |
| 125 | + self.clients = [] |
| 126 | + self.tcpip_ports = [] |
| 127 | + self.num_services_ready = 0 |
| 128 | + |
| 129 | + def save_tcip_port(self,message): |
| 130 | + msg = message.decode('utf8') |
| 131 | + if len(self.clients) == self.number_of_services: |
| 132 | + # a service has restarted and reported a tcpip port |
| 133 | + # if it is the same port there is nothing to do |
| 134 | + # else we look for an unresponsive service and replace it. |
| 135 | + if msg not in self.tcpip_ports: |
| 136 | + self.echoes = [] |
| 137 | + for p,c in zip(self.tcpip_ports,self.clients): |
| 138 | + c.send_message(b'/echo',[p.encode('utf8'),]) |
| 139 | + # We dont know how long all the responses will take. |
| 140 | + # Guess 2 sec, this is OK because we wont get any new |
| 141 | + # results from the killed service id |
| 142 | + Clock.schedule_once(partial(self.replace_service,msg),2) |
| 143 | + else: |
| 144 | + self.tcpip_ports.append(msg) |
| 145 | + # Each service listens on its own tcpip port, |
| 146 | + # Make a Client to talk to that service |
| 147 | + self.clients.append(OSCClient(b'localhost',int(msg))) |
| 148 | + # When we get them all |
| 149 | + if len(self.clients) == self.number_of_services: |
| 150 | + self.num_services_ready = self.number_of_services |
| 151 | + self.root.ids.label.text +=\ |
| 152 | + 'Started ' + str(self.number_of_services) + ' services\n' |
| 153 | + |
| 154 | + def recieve_echo(self,message): |
| 155 | + self.echoes.append(message.decode('utf8')) |
| 156 | + |
| 157 | + ### Replace a killed service |
| 158 | + ############################ |
| 159 | + def replace_service(self,msg,dt): |
| 160 | + for p in self.tcpip_ports: |
| 161 | + if p not in self.echoes: |
| 162 | + # replace the port |
| 163 | + id = self.tcpip_ports.index(p) |
| 164 | + self.tcpip_ports[id] = msg |
| 165 | + self.clients[id] = OSCClient(b'localhost',int(msg)) |
| 166 | + # reuse the restarted service |
| 167 | + # the lost result is replaced with a new result |
| 168 | + if self.last_task_number < self.number_of_tasks: |
| 169 | + self.start_task(int(id)) |
| 170 | + return |
| 171 | + |
| 172 | + ### Manage Tasks |
| 173 | + ################### |
| 174 | + def start_tasks(self): |
| 175 | + if self.num_services_ready: |
| 176 | + self.root.ids.label.text +=\ |
| 177 | + 'Started '+str(self.number_of_tasks)+' tasks, wait.' |
| 178 | + self.result_magnitude = 0 |
| 179 | + self.num_results = 0 |
| 180 | + self.last_task_number = 0 |
| 181 | + self.start_time = time() |
| 182 | + for i in range(min(self.number_of_tasks, |
| 183 | + self.num_services_ready, |
| 184 | + self.num_buildozer_spec_services)): |
| 185 | + self.num_services_ready -= 1 |
| 186 | + self.last_task_number += 1 |
| 187 | + self.start_task(i) |
| 188 | + else: |
| 189 | + self.root.ids.label.text += 'No services available\n' |
| 190 | + |
| 191 | + def start_task(self, id): |
| 192 | + self.clients[id].send_message(b'/start_task', |
| 193 | + [str(id).encode('utf8'),]) |
| 194 | + |
| 195 | + def task_finished(self,message): |
| 196 | + id, res = message.decode('utf8').split(',') |
| 197 | + # service available |
| 198 | + self.num_services_ready +=1 |
| 199 | + # collect result |
| 200 | + self.result_magnitude += int(res) |
| 201 | + self.num_results += 1 |
| 202 | + # new task ? |
| 203 | + if self.last_task_number < self.number_of_tasks: |
| 204 | + self.num_services_ready -= 1 |
| 205 | + self.last_task_number += 1 |
| 206 | + self.start_task(int(id)) |
| 207 | + self.display_result(id, res) |
| 208 | + |
| 209 | + ### Display results |
| 210 | + ################### |
| 211 | + def display_result(self, id, res): |
| 212 | + if self.root: |
| 213 | + #self.root.ids.label.text += ' ' + id + ' ' + res + '\n' |
| 214 | + self.root.ids.label.text += '.' |
| 215 | + if self.number_of_tasks == self.num_results: |
| 216 | + self.root.ids.label.text += '\n' |
| 217 | + # the tasks have different execution times |
| 218 | + # a task unit is 'execution time'/'prime value' |
| 219 | + msg = str(round((time() - self.start_time)*1000000/\ |
| 220 | + self.result_magnitude)) |
| 221 | + msg += ' micro seconds per normalized prime\n' |
| 222 | + self.root.ids.label.text += msg |
| 223 | + |
| 224 | +if __name__ == '__main__': |
| 225 | + MultiService().run() |
0 commit comments