-
-
Notifications
You must be signed in to change notification settings - Fork 9
/
Copy pathInstax-Bluetooth-socket.py
198 lines (166 loc) · 8.07 KB
/
Instax-Bluetooth-socket.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
#! /usr/bin/env python3
from Types import EventType, InfoType
from struct import pack, unpack_from
import asyncio
from bleak import BleakScanner
import sys
if sys.platform == 'linux':
from InstaxLinux import InstaxLinux as InstaxPlatform
elif sys.platform == 'darwin':
import asyncio
from InstaxMacos import InstaxMacos as InstaxPlatform
else:
raise NotImplementedError(f'Platform ${sys.platform} not implemented')
class InstaxBluetooth(InstaxPlatform):
def __init__(self, deviceAddress=None, deviceName=None, printEnabled=False, printerName=None):
"""
Initialize the InstaxBluetooth class.
printEnabled: by default, actual printing is disabled to prevent misprints.
printerName: if specified, will only connect to a printer with this name.
"""
# super(InstaxPlatform, self).__init__(*args, **kwargs)
self.printEnabled = printEnabled
self.isConnected = False
self.device = None
self.deviceAddress = deviceAddress
self.deviceName = deviceName
self.batteryState = None
self.batteryPercentage = None
self.printsLeft = None
super().__init__()
def enable_printing(self):
""" Enable printing. """
self.printEnabled = True
def disable_printing(self):
""" Disable printing. """
self.printEnabled = False
# async def find_device(self, timeout=0, mode='ANDROID'):
# """" Scan for our device and return it when found """
# print('Looking for instax printer...')
# secondsTried = 0
# while True:
# devices = await BleakScanner.discover(timeout=1)
# for device in devices:
# if (self.deviceName is None and device.name.startswith('INSTAX-')) or \
# device.name == self.deviceName or device.address == self.deviceAddress:
# if device.address.startswith('FA:AB:BC'): # found the IOS endpoint, convert to ANDROID
# device.address = device.address.replace('FA:AB:BC', '88:B4:36')
# device.name = device.name.replace('IOS', 'ANDROID')
# return device
# secondsTried += 1
# if timeout != 0 and secondsTried >= timeout:
# return None
def create_color_payload(self, colorArray, speed, repeat, when):
"""
Create a payload for a color pattern.
colorArray: array of RGB values to use in animation, e.g. [[255, 0, 0], [0, 255, 0], [0, 0, 255]]
speed: time per frame/color. Higher is slower animation
repeat: 0 = don't repeat (so play once), 1-254 = times to repeat, 255 = repeat forever
when: 0 = normal, 1 = on print, 2 = on print completion, 3 = pattern switch
"""
payload = pack('BBBB', when, len(colorArray), speed, repeat)
for color in colorArray:
payload += pack('BBB', color[0], color[1], color[2])
return payload
def send_led_pattern(self, pattern, speed=5, repeat=255, when=0):
""" Send a LED pattern to the Instax printer. """
payload = self.create_color_payload(pattern, speed, repeat, when)
packet = self.create_packet(EventType.LED_PATTERN_SETTINGS, payload)
return self.send_packet(packet)
def prettify_bytearray(self, value):
""" Helper funtion to convert a bytearray to a string of hex values. """
return ' '.join([f'{x:02x}' for x in value])
def create_checksum(self, bytearray):
""" Create a checksum for a given packet. """
return (255 - (sum(bytearray) & 255)) & 255
def create_packet(self, eventType, payload=b''):
""" Create a packet to send to the printer. """
if isinstance(eventType, EventType): # allows passing in an event or a value directly
eventType = eventType.value
header = b'\x41\x62' # Ab from client to printer, Ba from printer to client
opCode = bytes([eventType[0], eventType[1]])
packetSize = pack('>H', 7 + len(payload))
packet = header + packetSize + opCode + payload
packet += pack('B', self.create_checksum(packet))
return packet
def validate_checksum(self, packet):
""" Validate the checksum of a packet. """
return (sum(packet) & 255) == 255
def get_device_state(self):
""" Get device state, like battery level, if it is
charging, number of prints left, etc. """
packet = self.create_packet(EventType.SUPPORT_FUNCTION_INFO, bytes([InfoType.BATTERY_INFO.value]))
resp = self.send_packet(packet)
self.parse_response(resp)
packet = self.create_packet(EventType.SUPPORT_FUNCTION_INFO, bytes([InfoType.PRINTER_FUNCTION_INFO.value]))
resp = self.send_packet(packet)
self.parse_response(resp)
# TODO: printer doesn't seem to respond to this?
# def shut_down(self):
# """ Shut down the printer. """
# packet = self.create_packet(EventType.SHUT_DOWN)
# return self.send_packet(packet)
def image_to_bytes(self, imagePath):
""" Convert an image to a bytearray """
imgdata = None
try:
# TODO: I think returning image.read() already returns bytes so no need for bytearray?
with open(imagePath, "rb") as image:
imgdata = bytearray(image.read())
return imgdata
except Exception as e:
print('Error loading image: ', e)
def print_image(self, imgSrc):
""" print an image. Either pass a path to an image or a bytearray"""
if isinstance(imgSrc, str): # if it's a path, load the image contents
imgData = self.image_to_bytes(imgSrc)
else: # the data passed is the image itself
imgData = imgSrc
printCommands = [
self.create_packet(EventType.PRINT_IMAGE_DOWNLOAD_START, b'\x02\x00\x00\x00\x00\x00' + pack('>H', len(imgData)))
]
# divide image data into chunks of 900 bytes and pad the last chunk with zeroes if needed
imgDataChunks = [imgData[i:i + 900] for i in range(0, len(imgData), 900)]
if len(imgDataChunks[-1]) < 900:
imgDataChunks[-1] = imgDataChunks[-1] + bytes(900 - len(imgDataChunks[-1]))
for index, chunk in enumerate(imgDataChunks):
imgDataChunks[index] = pack('>I', index) + chunk # add chunk number as int (4 bytes)
printCommands.append(self.create_packet(EventType.PRINT_IMAGE_DOWNLOAD_DATA, pack('>I', index) + chunk))
if self.printEnabled:
printCommands.extend([
self.create_packet(EventType.PRINT_IMAGE_DOWNLOAD_END),
self.create_packet(EventType.PRINT_IMAGE),
self.create_packet((0, 2), b'\x02'),
])
else:
print("Printing is disabled. Sending all packets except for PRINT_IMAGE command")
for index, packet in enumerate(printCommands):
print(f'sending image packet {index+1}/{len(printCommands)}')
self.send_packet(packet)
def get_accelerometer(self):
""" Get accelerometer data from the printer. """
packet = self.create_packet(EventType.XYZ_AXIS_INFO)
resp = self.send_packet(packet)
self.parse_response(resp)
def main():
""" Example usage of the Instax-Bluetooth module """
# let the module search for the first instax printer it finds
# instax = InstaxBluetooth()
# or specify your device address to skip searching
instax = InstaxBluetooth(deviceAddress='88:B4:36:4E:20:CE')
# uncomment the next line to enable actual printing
# otherwise it will go through the whole printing process except
# for sending the final 'go print' command
# instax.enable_printing()
instax.connect()
if instax.isConnected:
instax.send_led_pattern([[255, 0, 0], [0, 255, 0], [0, 0, 255]])
# instax.print_image('example.jpg')
instax.get_device_state()
# instax.get_accelerometer()
# instax.print_image('example.jpg')
if __name__ == '__main__':
if sys.platform == 'linux':
main()
elif sys.playform == 'darwin':
asyncio.run(main())