forked from foodcloudirl/FoodCam
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathFoodCam.py
337 lines (304 loc) · 10.4 KB
/
FoodCam.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
import RPi.GPIO as GPIO
import time
from datetime import datetime
import os
import pycurl
from StringIO import StringIO
from urllib import urlencode
import json
import threading
import settings
import socket
import argparse
from picamera import PiCamera
import sys
sys.path.insert(0, '/home/pi/FoodCam/hx711py')
sys.path.insert(0, '/home/pi/FoodCam/rpi_ws281x/python')
from hx711 import HX711
from neopixel import *
#GPIO Setup
GPIO.setmode(GPIO.BCM)
GPIO.setup(settings.button, GPIO.IN, pull_up_down=GPIO.PUD_UP)
GPIO.setup(settings.blue, GPIO.OUT) #blue
GPIO.setup(settings.red, GPIO.OUT) #red
GPIO.setup(settings.amber, GPIO.OUT) #amber
GPIO.setup(settings.green, GPIO.OUT) #green
GPIO.output(settings.blue, settings.off) #blue led off
GPIO.output(settings.red, settings.off) #red led off
GPIO.output(settings.amber, settings.off) #amber led off
GPIO.output(settings.green, settings.off) #green led off
camera = PiCamera()
camera.start_preview()
buffer = StringIO()
buffer2 = StringIO()
image_control = pycurl.Curl()
image_control.setopt(image_control.URL,'localhost:8080/0/action/snapshot')
server = pycurl.Curl()
server.setopt(server.URL,settings.foodcamServerUrl+"?api_token="+settings.foodcamServerKey)
server.setopt(server.HTTPHEADER,['Accept: application/json'])
server.setopt(server.TIMEOUT, 55)
server.setopt(server.POST,1)
slack = pycurl.Curl()
slack.setopt(slack.URL,settings.slackUrl)
slack.setopt(slack.HTTPHEADER,['Accept: application/json'])
slack.setopt(slack.TIMEOUT, 55)
slack.setopt(slack.POST,1)
slackTest = pycurl.Curl()
slackTest.setopt(slackTest.URL,settings.slackTestUrl)
slackTest.setopt(slackTest.HTTPHEADER,['Accept: application/json'])
slackTest.setopt(slackTest.POST,1)
slackTest.setopt(slackTest.TIMEOUT, 55)
slackTest.setopt(slackTest.WRITEDATA,buffer)
mailgun = pycurl.Curl()
mailgun.setopt(mailgun.URL,settings.mailgunUrl)
mailgun.setopt(mailgun.USERPWD, settings.mailgunUser)
mailgun.setopt(mailgun.HTTPHEADER,['Accept: application/json'])
mailgun.setopt(mailgun.POST,1)
mailgun.setopt(mailgun.TIMEOUT, 55)
mailgun.setopt(mailgun.WRITEDATA,buffer2)
#globals
hx = HX711(settings.weight_data, settings.weight_clock)
network_warning = False
# Create NeoPixel object with appropriate configuration.
# Adafruit_NeoPixel(LED_COUNT, LED_PIN, LED_FREQ_HZ, LED_DMA, LED_INVERT, LED_BRIGHTNESS, LED_CHANNEL)
leds = Adafruit_NeoPixel(settings.leds_count, settings.leds_pin, 800000, 10, False, 255, 0)
leds.begin()
blue_spin_on = True
button_debounce = 45 #seconds
def led_on(leds):#array [red,amber,green,blue]
if leds[0]:
GPIO.output(settings.red, settings.on)
if leds[1]:
GPIO.output(settings.amber, settings.on)
if leds[2]:
GPIO.output(settings.green, settings.on)
if leds[3]:
GPIO.output(settings.blue, settings.on)
def led_off(leds):#array [red,amber,green,blue]
if leds[0]:
GPIO.output(settings.red, settings.off)
if leds[1]:
GPIO.output(settings.amber, settings.off)
if leds[2]:
GPIO.output(settings.green, settings.off)
if leds[3]:
GPIO.output(settings.blue, settings.off)
def blue_spin(i = 0):
global leds, blue_spin_on
if blue_spin_on:
threading.Timer(0.05, blue_spin, [(i+1)%leds.numPixels()]).start() # keep spinning every 50ms
leds.setPixelColor((i-3)%leds.numPixels(), Color(5, 5, 5)) # dim white
leds.setPixelColor((i-2)%leds.numPixels(), Color(5, 20, 50)) # dim white
leds.setPixelColor((i-1)%leds.numPixels(), Color(5, 40, 90)) # blueish white
leds.setPixelColor(i, Color(0, 60, 255)) # blue
leds.setPixelColor((i+1)%leds.numPixels(), Color(5, 40, 90)) # blueish white
leds.setPixelColor((i+2)%leds.numPixels(), Color(5, 20, 50)) # blueish white
leds.show()
def white():
global leds, blue_spin_on
blue_spin_on = False # stop timer
for i in range(leds.numPixels()):
leds.setPixelColor(i, Color(255,255,255))
leds.show()
def green():
global leds, blue_spin_on
blue_spin_on = False # stop timer
for i in range(leds.numPixels()):
leds.setPixelColor(i, Color(0,255,0))
leds.show()
def amber():
global leds, blue_spin_on
blue_spin_on = False # stop timer
for i in range(leds.numPixels()):
leds.setPixelColor(i, Color(255,200,0))
leds.show()
def red():
global leds, blue_spin_on
blue_spin_on = False # stop timer
for i in range(leds.numPixels()):
leds.setPixelColor(i, Color(255,0,0))
leds.show()
def ping():
global network_warning
threading.Timer(300.0, ping).start() # run ping function every 300
timer = time.gmtime()
ip = get_ip_address()
print("button ping: "+ip+", "+time.strftime('%b %d %Y %H:%M:%S',timer)+" UTC")
slackTest.setopt(slackTest.POSTFIELDS,'{"text":"ping from '+settings.location+' foodcam ('+ip+'): '+time.strftime('%b %d %Y %H:%M:%S',timer)+'"}')
try:
slackTest.perform()
network_warning = False
print('Ping to slack test: '+str(slackTest.getinfo(pycurl.RESPONSE_CODE)))
network_warning = (slackTest.getinfo(pycurl.RESPONSE_CODE) != 200)
if network_warning:
error_flash()
print("Ping issue: "+str(slackTest.getinfo(pycurl.RESPONSE_CODE)))
except pycurl.error as e:
network_warning = True
error_flash()
print('Error ping: '+str(e))
def blink():
global network_warning, leds
led_off([0,0,0,1]) #blue led off
if network_warning:
error_flash()
time.sleep(2)
led_on([0,0,0,1]) #blue led on
threading.Timer(0.1, blink).start()
def error_flash():
print('Error flash!!!')
led_on([1,0,0,0]) #red led on
time.sleep(0.1)
led_off([1,0,0,0]) #red led off
time.sleep(0.1)
led_on([1,0,0,0]) #red led on
time.sleep(0.1)
led_off([1,0,0,0]) #red led off
time.sleep(0.1)
led_on([1,0,0,0]) #red led on
time.sleep(0.1)
led_off([1,0,0,0]) #red led off
time.sleep(0.1)
led_on([1,0,0,0]) #red led on
time.sleep(0.1)
led_off([1,0,0,0]) #red led off
time.sleep(0.1)
def get_ip_address():
global network_warning
ip_address = '';
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
try:
s.connect(("8.8.8.8",80))
network_warning = False
ip_address = s.getsockname()[0]
s.close()
return ip_address
except pycurl.error as e:
network_warning = True
error_flash()
print('Error ip_address: '+str(e))
def upload_dropbox(filename):
global network_warning
try:
os.system('bash /home/pi/FoodCam/dropbox_uploader.sh upload /home/pi/'+filename+' /')
time.sleep(1)
bashIO = os.popen('bash /home/pi/FoodCam/dropbox_uploader.sh share /'+filename).read()
network_warning = False
print('dropbox_uploader: '+bashIO)
return bashIO.split('link: ')[1].replace('dl=0\n','raw=1')
except:
network_warning = True
error_flash()
print('dropbox_uploader issue.')
if bashIO == "":
network_warning = True
error_flash()
print('dropbox_uploader empty: '+bashIO)
def send_server(url, weight):
global network_warning
data = {'recipient':settings.recipient,
'weight':str(weight),
'location':settings.location,
'slackUrl':settings.slackUrl,
'email_to':settings.email_to,
'image_url':str(url)
}
print(data)
js = json.dumps(data)
server.setopt(server.HTTPHEADER, [ 'Content-Type: application/json' , 'Accept: application/json'])
server.setopt(server.POSTFIELDS,js)
try:
server.perform()
network_warning = False
print('Sent to server: '+str(server.getinfo(pycurl.RESPONSE_CODE)))
except pycurl.error as e:
network_warning = True
error_flash()
print('Error server: '+str(e))
#weight sensor
def setup_weight():
global hx
hx.set_reading_format("MSB", "MSB")
hx.set_reference_unit(170)#fc unit
hx.reset()
hx.tare()
print "Zeroing weight done!"
def read_weight():
w1 = hx.get_weight(settings.weight_data)
hx.power_down()
hx.power_up()
time.sleep(0.05)
w2 = hx.get_weight(settings.weight_data)
hx.power_down()
hx.power_up()
time.sleep(0.05)
w3 = hx.get_weight(settings.weight_data)
hx.power_down()
hx.power_up()
time.sleep(0.05)
w4 = hx.get_weight(settings.weight_data)
hx.power_down()
hx.power_up()
time.sleep(0.05)
w5 = hx.get_weight(settings.weight_data)
hx.power_down()
hx.power_up()
time.sleep(0.05)
avg_w = -1*(w1+w2+w3+w4+w5)/5
print 'w avg: '+str(avg_w)
return float(avg_w)/1000
#button press
def capture(channel):
global network_warning, blue_spin_on, button_debounce
now = time.time()
led_on([1,0,0,0]) #red led on
if settings.leds_enabled:
white()
print('Button Pressed, channel '+str(channel))
time.sleep(1.1)
try:
weight1 = read_weight()
weight = weight1 if weight1 > 0 else 0
weight_str = str(round(weight, 2))
print('Weight captured: '+weight_str)
except:
print('Error capturing weight')
now1 = datetime.now()
date_time = now1.strftime("%Y-%m-%d_%H:%M:%S")
filename = 'foodcam_image_'+date_time+'.jpg'
print(filename)
camera.capture('/home/pi/'+filename)
print('A photo has been taken')
time.sleep(0.6)
led_off([1,0,0,0]) #red led off
led_on([0,1,0,0]) #amber led on
if settings.leds_enabled:
amber()
url = upload_dropbox(filename)
if not url:
error_flash()
print('No image url, ending capture')
led_off([0,1,0,0]) #amber led off
return
send_server(url, weight)
led_off([0,1,0,0]) #amber led off
led_on([0,0,1,0]) #green led on
if settings.leds_enabled:
green()
later = time.time()
capture_time = int(later - now)
bounce_time = ((button_debounce-1)-capture_time) if capture_time<(button_debounce-1) else 0.5
print('Capture time: '+str(capture_time)+'. Green for '+str(bounce_time)+' seconds.')
time.sleep(bounce_time)
os.remove('/home/pi/'+filename)
led_off([0,0,1,0]) #green led off
if settings.leds_enabled:
blue_spin_on = True
blue_spin()
#inits
setup_weight()
blue_spin() # Blue wipe
# bounce must be greater than time to upload image and send to slack/email, eg 30 seconds
GPIO.add_event_detect(settings.button, GPIO.FALLING, callback=capture, bouncetime= (button_debounce*1000) )
#def exit():
# GPIO.cleanup() #Clean up GPIO on CTRL+C exit