Skip to content

Eeprom write and network programming #2

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
155 changes: 128 additions & 27 deletions arduinobootloader/arduinobootloader.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
arduino and wiring protocols. In turn, they are a subset of the
STK500 V1 and V2 protocols respectively.
'''
import select
import socket
from os import environ

# From Kivy source code: On Android sys.platform returns 'linux2',
Expand All @@ -30,27 +32,27 @@
RESP_STK_IN_SYNC = 0x14
"""Start message of the Stk500v1"""

AVR_ATMEL_CPUS = {0x1E9608: ["ATmega640", (128*2), 1024],
0x1E9802: ["ATmega2561", (128*2), 1024],
0x1E9801: ["ATmega2560", (128*2), 1024],
0x1E9703: ["ATmega1280", (128*2), 512],
0x1E9705: ["ATmega1284P", (128*2), 512],
0x1E9704: ["ATmega1281", (128*2), 512],
0x1E9782: ["AT90USB1287", (128 * 2), 512],
0x1E9702: ["ATmega128", (128*2), 512],
0x1E9602: ["ATmega64", (128*2), 256],
0x1E9502: ["ATmega32", (64*2), 256],
0x1E9403: ["ATmega16", (64*2), 128],
0x1E9307: ["ATmega8", (32 * 2), 128],
0x1E930A: ["ATmega88", (32*2), 128],
0x1E9406: ["ATmega168", (64*2), 256],
0x1E950F: ["ATmega328P", (64*2), 256],
0x1E9514: ["ATmega328", (64*2), 256],
0x1E9404: ["ATmega162", (64*2), 128],
0x1E9402: ["ATmega163", (64*2), 128],
0x1E9405: ["ATmega169", (64*2), 128],
0x1E9306: ["ATmega8515", (32*2), 128],
0x1E9308: ["ATmega8535", (32*2), 128]}
AVR_ATMEL_CPUS = {0x1E9608: ["ATmega640", (128*2), 1024, 8, 512],
0x1E9802: ["ATmega2561", (128*2), 1024, 8, 512],
0x1E9801: ["ATmega2560", (128*2), 1024, 8, 512],
0x1E9703: ["ATmega1280", (128*2), 512, 8, 512],
0x1E9705: ["ATmega1284P", (128*2), 512, 8, 512],
0x1E9704: ["ATmega1281", (128*2), 512, 8, 512],
0x1E9782: ["AT90USB1287", (128 * 2), 512, 8, 512],
0x1E9702: ["ATmega128", (128*2), 512, 8, 512],
0x1E9602: ["ATmega64", (128*2), 256, 8, 256],
0x1E9502: ["ATmega32", (64*2), 256, 4, 256],
0x1E9403: ["ATmega16", (64*2), 128, 4, 128],
0x1E9307: ["ATmega8", (32 * 2), 128, 4, 128],
0x1E930A: ["ATmega88", (32*2), 128, 4, 128],
0x1E9406: ["ATmega168", (64*2), 256, 4, 128],
0x1E950F: ["ATmega328P", (64*2), 256, 4, 256],
0x1E9514: ["ATmega328", (64*2), 256, 4, 256],
0x1E9404: ["ATmega162", (64*2), 128, 4, 128],
0x1E9402: ["ATmega163", (64*2), 128, 0, 128],
0x1E9405: ["ATmega169", (64*2), 128, 4, 128],
0x1E9306: ["ATmega8515", (32*2), 128, 4, 128],
0x1E9308: ["ATmega8535", (32*2), 128, 0, 128]}

"""
Dictionary with the list of Atmel AVR 8 CPUs used by Arduino boards.
Expand Down Expand Up @@ -85,6 +87,12 @@
CMD_READ_FLASH_ISP = 0x14
"""Read the flash of the Stk500v2 Protocol"""

CMD_PROGRAM_EEPROM_ISP = 0x15
"""Write the EEPROM of the Stk500v2 Protocol"""

CMD_READ_EEPROM_ISP = 0x16
"""READ the EEPROM of the Stk500v2 Protocol"""

CMD_LEAVE_PROGMODE_ISP = 0x11
"""Leave the programmer mode of the Stk500v2 Protocol"""

Expand All @@ -106,6 +114,71 @@
CPU_SIG3 = 2
"""Cpu signature part 3"""


class SocketWrapper(object):
def __init__(self, host, port):
self._host = host
self._port = port
self._socket = None
self._connected = False
self._timeout = 1
self.connect()

def __del__(self):
if self._socket:
self._socket.close()

def connect(self):
self._socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self._socket.connect((self._host, self._port))
self._socket.settimeout(0)
self._connected = True

@property
def timeout(self):
return self._timeout

@timeout.setter
def timeout(self, value):
self._timeout = value

@property
def is_open(self):
return self._connected

def read(self, size):
init_time = time.time()
elapsed = time.time() - init_time
data = b''
while len(data) < size and elapsed <= self.timeout:
select_timeout = self.timeout - elapsed
if select_timeout < 0:
select_timeout = 0
rin, _, _ = select.select([self._socket], [], [], select_timeout)
if self._socket in rin:
d = self._socket.recv(size-len(data))
if d:
data += d
else:
break
elapsed = time.time() - init_time
return data

def write(self, buffer):
return self._socket.sendall(buffer)

def close(self):
self._socket.close()
self._connected = False

def reset_input_buffer(self):
if self._socket:
self._socket.close()
del self._socket
self._connected = False
self.connect()


class ArduinoBootloader(object):
"""Contains the two inner classes that support the Stk500 V1 and V2 protocols
for comunicate with arduino bootloaders.
Expand All @@ -119,6 +192,8 @@ def __init__(self, *args, **kwargs):
self._cpu_name = ""
self._cpu_page_size = 0
self._cpu_pages = 0
self._eeprom_page_size = 0
self._eeprom_pages = 0
self._programmer_name = ""
self._programmer = None

Expand Down Expand Up @@ -167,6 +242,24 @@ def cpu_pages(self):
"""
return self._cpu_pages

@property
def eeprom_page_size(self):
"""EEProm pages

:setter: pages
:type: int
"""
return self._eeprom_page_size

@property
def eeprom_pages(self):
"""EEProm pages

:setter: pages
:type: int
"""
return self._eeprom_pages

@property
def programmer_name(self):
"""Name given by Atmel to its programmers, for example (ISP_V2).
Expand Down Expand Up @@ -207,11 +300,15 @@ def _is_cpu_signature(self, signature):
self._cpu_name = list_cpu[0]
self._cpu_page_size = list_cpu[1]
self._cpu_pages = list_cpu[2]
self._eeprom_page_size = list_cpu[3]
self._eeprom_pages = list_cpu[4]
return True
except KeyError:
self._cpu_name = "signature: {:06x}".format(signature)
self._cpu_page_size = 0
self._cpu_pages = 0
self._eeprom_page_size = 0
self._eeprom_pages = 0
return False

def _find_device_port(self):
Expand Down Expand Up @@ -260,7 +357,11 @@ def open(self, port=None, speed=115200):
if self.device:
self.device.USB_READ_TIMEOUT_MILLIS = 1000
else:
self.device = serial.Serial(port, speed, 8, 'N', 1, timeout=1)
if port[0:4] == 'net:':
port = port[4:]
self.device = SocketWrapper(port, speed)
else:
self.device = serial.Serial(port, speed, 8, 'N', 1, timeout=1)

self.port = port

Expand Down Expand Up @@ -601,8 +702,8 @@ def write_memory(self, buffer, address, flash=True):
msg[1] = (buff_len & 0xFF)
msg.extend(buffer)
"""The seven bytes preceding the data are not used."""
if self._send_command(CMD_PROGRAM_FLASH_ISP, msg):
return self._recv_answer(CMD_PROGRAM_FLASH_ISP)
if self._send_command(CMD_PROGRAM_FLASH_ISP if flash else CMD_PROGRAM_EEPROM_ISP, msg):
return self._recv_answer(CMD_PROGRAM_FLASH_ISP if flash else CMD_PROGRAM_EEPROM_ISP)
return False

def read_memory(self, address, count, flash=True):
Expand All @@ -622,8 +723,8 @@ def read_memory(self, address, count, flash=True):
msg[0] = ((count >> 8) & 0xFF)
msg[1] = (count & 0xFF)
"""The third byte is not used"""
if self._send_command(CMD_READ_FLASH_ISP, msg):
if self._recv_answer(CMD_READ_FLASH_ISP):
if self._send_command(CMD_READ_FLASH_ISP if flash else CMD_READ_EEPROM_ISP, msg):
if self._recv_answer(CMD_READ_FLASH_ISP if flash else CMD_READ_EEPROM_ISP):
"""The end of data is marked with STATUS_OK"""
if self._answer[-1] == STATUS_CMD_OK:
del self._answer[-1]
Expand Down Expand Up @@ -782,4 +883,4 @@ def _read_headear(self):
head = bytearray(self._ab.device.read(4))
if len(head) == 4 and head[3] == TOKEN and self._sequence_number == head[0]:
return head
return None
return None
4 changes: 3 additions & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
@@ -1 +1,3 @@
pyserial>=3.4
pyserial>=3.4
IntelHex>=2.3
progressbar>=2.5
48 changes: 28 additions & 20 deletions scripts/arduinoflash.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
It is used to write / verify and read the flash memory of an Arduino board.
The input / output file format is Intel Hexadecimal."""

VERSION = '0.2.0'
VERSION = '0.3.0'

import argparse
import sys
Expand All @@ -18,6 +18,8 @@
group = parser.add_mutually_exclusive_group()
parser.add_argument("filename", help="filename in hexadecimal Intel format")
parser.add_argument("--version", action="store_true", help="script version")
parser.add_argument("-e", "--eeprom", action="store_true", help="program eeprom")
parser.add_argument("-d", "--device", help="specify the device. Use net: for TCP connection")
parser.add_argument("-b", "--baudrate", type=int, required=True, help="old bootolader (57600) Optiboot (115200)")
parser.add_argument("-p", "--programmer", required=True, help="programmer version - Nano (Stk500v1) Mega (Stk500v2)")
group.add_argument("-r", "--read", action="store_true", help="read the cpu flash memory")
Expand Down Expand Up @@ -51,7 +53,7 @@ def exit_by_error(msg):
sys.exit(0)


if prg.open(speed=args.baudrate):
if prg.open(port=args.device, speed=args.baudrate):
print("AVR device initialized and ready to accept instructions")
address = 0
if not prg.board_request():
Expand All @@ -65,6 +67,7 @@ def exit_by_error(msg):

print("cpu name: {}".format(ab.cpu_name))

page_size = ab.cpu_page_size if not args.eeprom else ab.eeprom_page_size
if args.update:
print("reading input file: {}".format(args.filename))

Expand All @@ -75,13 +78,13 @@ def exit_by_error(msg):
except (AddressOverlapError, HexRecordError):
exit_by_error(msg="error, file format")

print("writing flash: {} bytes".format(ih.maxaddr()))
bar = progressbar.ProgressBar(max_value=ih.maxaddr(), prefix="writing ")
bar.start(init=True)
for address in range(0, ih.maxaddr(), ab.cpu_page_size):
buffer = ih.tobinarray(start=address, size=ab.cpu_page_size)
if not prg.write_memory(buffer, address):
exit_by_error(msg="writing flash memory")
print("writing {}: {} bytes".format("flash" if not args.eeprom else "eeprom", ih.maxaddr()))
bar = progressbar.ProgressBar(maxval=ih.maxaddr())
bar.start()
for address in range(0, ih.maxaddr(), page_size):
buffer = ih.tobinarray(start=address, size=page_size)
if not prg.write_memory(buffer, address, flash=not args.eeprom):
exit_by_error(msg="writing {} memory".format("flash" if not args.eeprom else "eeprom"))

bar.update(address)

Expand All @@ -91,26 +94,31 @@ def exit_by_error(msg):

if args.update:
max_address = ih.maxaddr()
print("reading and verifying flash memory")
print("reading and verifying {} memory".format("flash" if not args.eeprom else "eeprom"))

elif args.read:
max_address = int(ab.cpu_page_size * ab.cpu_pages)
print("reading flash memory")
if not args.eeprom:
max_address = int(ab.cpu_page_size * ab.cpu_pages)
else:
max_address = int(ab.eeprom_page_size * ab.eeprom_pages)
print("reading {} memory".format("flash" if not args.eeprom else "eeprom"))
else:
max_address = 0
page_size = 0

bar = progressbar.ProgressBar(max_value=max_address, prefix="reading ")
bar.start(init=True)
bar = progressbar.ProgressBar(maxval=max_address)
bar.start()

for address in range(0, max_address, ab.cpu_page_size):
read_buffer = prg.read_memory(address, ab.cpu_page_size)
for address in range(0, max_address, page_size):
read_buffer = prg.read_memory(address, page_size, flash=not args.eeprom)
if read_buffer is None:
exit_by_error(msg="reading flash memory")
exit_by_error(msg="reading {} memory".format("flash" if not args.eeprom else "eeprom"))

if args.update:
if read_buffer != ih.tobinarray(start=address, size=ab.cpu_page_size):
if read_buffer != ih.tobinarray(start=address, size=page_size):
exit_by_error(msg="file not match")
elif args.read:
for i in range(0, ab.cpu_page_size):
for i in range(0, page_size):
dict_hex[address + i] = read_buffer[i]

bar.update(address)
Expand All @@ -125,7 +133,7 @@ def exit_by_error(msg):
except FileNotFoundError:
exit_by_error(msg="the file cannot be created")

print("\nflash done, thank you")
print("\nprogram done, thank you")

prg.leave_bootloader()
prg.close()
Expand Down