-
Couldn't load subscription status.
- Fork 0
Requesting Images for radio modes #7
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: radio_modes
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,131 @@ | ||
| from .message import Message | ||
| import os | ||
| from .headers import IMAGE_START, IMAGE_MID, IMAGE_END | ||
|
|
||
| class ImageMessage(Message): | ||
| """ | ||
| encodes JPEG files into packets that can be transmitted over RF | ||
| - works for baseline DCT | ||
| You can find out if your JPEG image uses baseline DCT by looking at the start of frame | ||
| bytes. If they are FFC0, it is baseline otherwise it will be FFC2 | ||
| """ | ||
|
|
||
| headers = { | ||
| 0xFF, 0xD8, 0xC0, 0xC2, 0xC4, 0xDA, 0xDB, 0xDD, 0xFE, 0xD9 | ||
| } | ||
|
|
||
| # SIG = bytearray.fromhex("FF") | ||
| SIG = bytearray(1) | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. what is SIG/SOS/EOI? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. These are specific bytes in the JPEG file format. All JPEG's start and end with the same bytes |
||
| SIG[0] = 0xFF | ||
| # SOI = bytearray.fromhex("D8") # Start of image | ||
| # SOFb = bytearray.fromhex("C0") # Start of frame (baseline DCT) | ||
| # SOFp = bytearray.fromhex("C2") # start of frame (progressive DCT) | ||
| # DHT = bytearray.fromhex("C4") # Define Huffman Tables | ||
| # SOS = bytearray.fromhex("FFDA") # Start of scan | ||
| SOS = bytearray(2) | ||
| SOS[0] = 0xFF | ||
| SOS[1] = 0xDA | ||
| # DQT = bytearray.fromhex("DB") # Define Quntization table | ||
| # DRI = bytearray.fromhex("DD") # Define Restart Interval | ||
| # RST = bytearray.fromhex("D") # Restart | ||
| # FLEX = bytearray.fromhex("E") # Variable | ||
| # CMT = bytearray.fromhex("FE") # Comment | ||
| # EOI = bytearray.fromhex("FFD9") # End of Image | ||
| EOI = bytearray(2) | ||
| EOI[0] = 0xFF | ||
| EOI[1] = 0xD9 | ||
|
|
||
| def __init__(self, filepath, packet_size) -> None: | ||
| self.packet_size = packet_size | ||
| self.filepath = filepath | ||
| self.length = os.stat(filepath)[6] | ||
| self.sent_packet_len = 0 | ||
| self.cursor = 0 | ||
| self.in_scan = False | ||
| self.file_err = False | ||
| self.scan_size = ((self.packet_size - 1) // 64) * 64 | ||
|
|
||
| def packet(self): | ||
| """ | ||
| Packetizes the image into packets of a specified size limit | ||
| Packet 1 | ||
| SOI and JFIF-APP0 | ||
| Packet 2 to packet i | ||
| comment | ||
| packet i + 1 to packet j | ||
| frame, Quntization and huffman tables | ||
| packet j + 1 to k | ||
| image scan | ||
| """ | ||
| next_packet_found = False | ||
| data_len = 0 | ||
|
|
||
| try: | ||
| with open(self.filepath, "rb") as file: | ||
| file.seek(self.cursor) | ||
| data_bytes = file.read(self.packet_size - 1) | ||
| except Exception as e: | ||
| print(f"Error reading from image file: {e}") | ||
| self.file_err = True | ||
|
|
||
| if self.in_scan: | ||
| """ | ||
| Should use 64 byte increments in the image scan section | ||
| """ | ||
| data_len = self.scan_size | ||
| packet = bytearray(self.scan_size + 1) | ||
| packet[1:] = data_bytes[0:data_len] | ||
| else: | ||
| """ | ||
| If we are stil in the header bytes | ||
| """ | ||
| if self.SOS in data_bytes[:2]: | ||
| """ | ||
| If SOS sends just the SOS bytes | ||
| """ | ||
| self.in_scan = True | ||
| next_packet_found = True | ||
| data_len = 2 | ||
| if self.sent_packet_len != 0: | ||
| next_packet_found = True | ||
| data_len = self.sent_packet_len - 1 | ||
| length = len(data_bytes) | ||
| bdr = bytearray(reversed(data_bytes)) | ||
| start = 1 | ||
| while not next_packet_found: | ||
| signal_index = bdr.find(self.SIG, 1, self.packet_size - 1) | ||
| if signal_index == -1: | ||
| """section is larger than packet size""" | ||
| data_len = self.packet_size - 1 | ||
| next_packet_found = True | ||
| if bdr[signal_index - 1] in self.headers: | ||
| data_len = length - signal_index - 1 | ||
| next_packet_found = True | ||
| else: | ||
| start += signal_index | ||
|
|
||
| packet = bytearray(data_len + 1) | ||
| packet[1:] = data_bytes[0:data_len] | ||
| if self.cursor == 0: | ||
| """start packet""" | ||
| packet[0] = IMAGE_START | ||
| elif self.EOI in packet: | ||
| """end packet""" | ||
| packet[0] = IMAGE_END | ||
| else: | ||
| """mid packet""" | ||
| packet[0] = IMAGE_MID | ||
| self.sent_packet_len = data_len + 1 | ||
|
|
||
| return packet, True | ||
|
|
||
| def done(self): | ||
| return (self.length <= self.cursor) or self.file_err | ||
|
|
||
| def ack(self): | ||
| """ | ||
| confirms that we should move to the next packet of info | ||
| """ | ||
| self.cursor += self.sent_packet_len - 1 | ||
| self.sent_packet_len = 0 | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,52 @@ | ||
| """The Transmission Queue is a max heap of messages to be transmitted. | ||
| Messages must support the `__lt__`, `__le__`, `__eq__`, `__ge__`, and `__gt__` operators. | ||
| This enables to the max heap to compare messages based on their priority. | ||
| """ | ||
| from .queue import Queue | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. you definitely just coppied this from the other queue, but this is probably a bad idea to add all this boiler plate just to make a slightly easier import. At least on the pycubed-mini side we started having memory issues and stuff like this doesn't help. Not at all your fault given this is how it's done else where. I'd either just make it the three lines needed ie: from .queue import Queue
limit = 100
image_queue = Queue(limit)Or leave a issue somewhere to do this later (so we don't forget). There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I agree with Losha here, We could make this a much smaller file. |
||
|
|
||
| limit = 100 | ||
| image_queue = Queue(limit) | ||
|
|
||
|
|
||
| def enq(msg): | ||
| """Push a filepath on the image queue | ||
| :param msg: The message to push | ||
| :type msg: string | ||
| """ | ||
| image_queue.enq(msg) | ||
|
|
||
|
|
||
| def peek(): | ||
| """Returns the next filepath to an image to be transmitted | ||
| :return: The next filepath to be transmitted | ||
| :rtype: string | ||
| """ | ||
| return image_queue.peek() | ||
|
|
||
|
|
||
| def pop(): | ||
| """Returns the next filepath to be transmitted and removes it from the transmission queue | ||
| :return: The next fielpath to be transmitted | ||
| :rtype: string | ||
| """ | ||
| return image_queue.deq() | ||
|
|
||
|
|
||
| def empty(): | ||
| """Returns if the transmission queue is empty""" | ||
| return image_queue.empty() | ||
|
|
||
|
|
||
| def clear(): | ||
| """Clears the transmission queue""" | ||
| global image_queue | ||
| image_queue = Queue(limit) | ||
|
|
||
|
|
||
| def size(): | ||
| """Returns the number of messages in the transmission queue""" | ||
| return image_queue.length | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Response is used within the send_command function.