From be5fdae4f79c479f698b069f214c53d3a34fc44a Mon Sep 17 00:00:00 2001 From: Fredrik Johansson Date: Sat, 6 Nov 2021 19:38:54 +0100 Subject: [PATCH 1/3] Py2 -> Py3 --- dfu-convert | 50 +++++++++++++++++++++++++------------------------- 1 file changed, 25 insertions(+), 25 deletions(-) diff --git a/dfu-convert b/dfu-convert index 3d8eefd..b014871 100755 --- a/dfu-convert +++ b/dfu-convert @@ -1,4 +1,4 @@ -#!/usr/bin/env python2 +#!/usr/bin/env python # Written by Antonio Galea - 2010/11/18 # Distributed under Gnu LGPL 3.0 @@ -6,7 +6,6 @@ import sys,struct,zlib,os from optparse import OptionParser -from intelhex import IntelHex DEFAULT_DEVICE="0x0483:0xdf11" @@ -16,16 +15,16 @@ def consume(fmt,data,names): n = struct.calcsize(fmt) return named(struct.unpack(fmt,data[:n]),names),data[n:] def cstring(string): - return string.split('\0',1)[0] + return string.split(b'\0',1)[0] def compute_crc(data): return 0xFFFFFFFF & -zlib.crc32(data) -1 def parse(file,dump_images=False): - print 'File: "%s"' % file + print('File: "%s"' % file) data = open(file,'rb').read() crc = compute_crc(data[:-4]) prefix, data = consume('<5sBIB',data,'signature version size targets') - print '%(signature)s v%(version)d, image size: %(size)d, targets: %(targets)d' % prefix + print('%(signature)s v%(version)d, image size: %(size)d, targets: %(targets)d' % prefix) for t in range(prefix['targets']): tprefix, data = consume('<6sBI255s2I',data,'signature altsetting named name size elements') tprefix['num'] = t @@ -33,42 +32,42 @@ def parse(file,dump_images=False): tprefix['name'] = cstring(tprefix['name']) else: tprefix['name'] = '' - print '%(signature)s %(num)d, alt setting: %(altsetting)s, name: "%(name)s", size: %(size)d, elements: %(elements)d' % tprefix + print('%(signature)s %(num)d, alt setting: %(altsetting)s, name: "%(name)s", size: %(size)d, elements: %(elements)d' % tprefix) tsize = tprefix['size'] target, data = data[:tsize], data[tsize:] for e in range(tprefix['elements']): eprefix, target = consume('<2I',target,'address size') eprefix['num'] = e - print ' %(num)d, address: 0x%(address)08x, size: %(size)d' % eprefix + print(' %(num)d, address: 0x%(address)08x, size: %(size)d' % eprefix) esize = eprefix['size'] image, target = target[:esize], target[esize:] if dump_images: out = '%s.target%d.image%d.bin' % (file,t,e) open(out,'wb').write(image) - print ' DUMPED IMAGE TO "%s"' % out + print(' DUMPED IMAGE TO "%s"' % out) if len(target): - print "target %d: PARSE ERROR" % t + print("target %d: PARSE ERROR" % t) suffix = named(struct.unpack('<4H3sBI',data[:16]),'device product vendor dfu ufd len crc') - print 'usb: %(vendor)04x:%(product)04x, device: 0x%(device)04x, dfu: 0x%(dfu)04x, %(ufd)s, %(len)d, 0x%(crc)08x' % suffix + print('usb: %(vendor)04x:%(product)04x, device: 0x%(device)04x, dfu: 0x%(dfu)04x, %(ufd)s, %(len)d, 0x%(crc)08x' % suffix) if crc != suffix['crc']: - print "CRC ERROR: computed crc32 is 0x%08x" % crc + print("CRC ERROR: computed crc32 is 0x%08x" % crc) data = data[16:] if data: - print "PARSE ERROR" + print("PARSE ERROR") def build(file,targets,device=DEFAULT_DEVICE): - data = '' + data = b'' for t,target in enumerate(targets): - tdata = '' + tdata = b'' for image in target: - tdata += struct.pack('<2I',image['address'],len(image['data']))+image['data'] - tdata = struct.pack('<6sBI255s2I','Target',0,1,'ST...',len(tdata),len(target)) + tdata + tdata += struct.pack(b'<2I',image['address'],len(image['data']))+image['data'] + tdata = struct.pack(b'<6sBI255s2I',b'Target',0,1,b'ST...',len(tdata),len(target)) + tdata data += tdata - data = struct.pack('<5sBIB','DfuSe',1,len(data)+11,len(targets)) + data + data = struct.pack(b'<5sBIB',b'DfuSe',1,len(data)+11,len(targets)) + data v,d=map(lambda x: int(x,0) & 0xFFFF, device.split(':',1)) - data += struct.pack('<4H3sB',0,d,v,0x011a,'UFD',16) + data += struct.pack(b'<4H3sB',0,d,v,0x011a,b'UFD',16) crc = compute_crc(data) - data += struct.pack(' Date: Sat, 6 Nov 2021 20:41:02 +0100 Subject: [PATCH 2/3] bug fix: for STM32F042F4, needed more time wait for status and retry if we have usb pipe error enhancement: Scan USB interface for memory layout enhancement: Implement read out program from device enhancement: Implement flash a ordinary bin file (dumped file) enhancement: Erase complete memory if adress omitted --- dfuse-tool.py | 184 +++++++++++++++++++++++++++++++++++---------- dfuse/DfuDevice.py | 50 +++++++++--- dfuse/DfuState.py | 7 +- 3 files changed, 191 insertions(+), 50 deletions(-) diff --git a/dfuse-tool.py b/dfuse-tool.py index c65b50f..2cfc59a 100755 --- a/dfuse-tool.py +++ b/dfuse-tool.py @@ -28,7 +28,7 @@ def list_dfu(args): if usbdev is None: raise ValueError('No STM32 DfuSe device found.') - dfu = dfuse.DfuDevice(usbdev) + dfu = find_device(args) #dfuse.DfuDevice(usbdev) for name, alt in dfu.alternates(): print ("Device: [%.4x:%.4x] Cfg: %d Intf: %d Alt: %d '%s'" % ( \ alt.device.idVendor, \ @@ -37,23 +37,51 @@ def list_dfu(args): alt.bInterfaceNumber, \ alt.bAlternateSetting, \ name)) + mem = dfu.get_mem_layout() + print(" memory with %d pages %d bytes, start address %.8x" + % (mem['pageno'], mem['pagesize'], mem['address'] + )) + states = ["Readable" if mem['readable'] else "", + "Writable" if mem['writable'] else "", + "Erasable" if mem['erasable'] else ""] + print(" device state: %s\n" % ", ".join(states)) def leave_dfu(args): dfu = find_device(args) dfu.leave() status = dfu.get_status() if status[0] > 0: - raise RuntimeError("An error occured. Status: %r" % status) + raise RuntimeError("An error occured. Status: %r %r" % + (status[1], dfuse.DfuState.string(status[1]))) def erase(args): dfu = find_device(args) + mem = dfu.get_mem_layout() + if mem is None: + end = int(args.erase[1], 0) if len(args.erase) > 1 else 1024 + pagesize = 1024 + elif not mem['erasable']: + print("Device not erasable, exits now!") + return + else: + end = (int(args.erase[1], 0) if len(args.erase) > 1 else + mem['address'] + mem['pagesize'] * mem['pageno']) + pagesize = mem['pagesize'] print ("Erasing. Please wait this might be long ...") - dfu.erase(args.erase) - status = dfu.wait_while_state(dfuse.DfuState.DFU_DOWNLOAD_BUSY) - - if status[1] != dfuse.DfuState.DFU_DOWNLOAD_IDLE: - raise RuntimeError("An error occured. Device Status: %r" % status) + addr = int(args.erase[0], 0) if len(args.erase) else mem['address'] + cnt = 0 + while addr < end: + dfu.erase(addr) + dfu.get_status() # must send after erase command page page 16 AN3156 + dfu.wait_while_state(dfuse.DfuState.DFU_DOWNLOAD_BUSY) + print("Erasing page starting at %.8x" % addr) + addr += pagesize + + if dfu.get_status()[1] != dfuse.DfuState.DFU_DOWNLOAD_IDLE: + raise RuntimeError("An error occured. Status: %r %r" % + (status[1], dfuse.DfuState.string(status[1]))) + dfu.clear_status() print ("Done !") @@ -61,6 +89,11 @@ def flash(args): dfufile = args.flash[0] dfu = find_device(args) + mem = dfu.get_mem_layout() + if mem is not None and not mem['writable']: + print("Device not writable, exits now!") + return + if (dfufile.devInfo['vid'] != dfu.dev.idVendor or dfufile.devInfo['pid'] != dfu.dev.idProduct) and not args.force: raise ValueError("Vendor/Product id mismatch: [%.4x:%.4x] (file) [%.4x:%.4x] (device). Trying running with --force" % ( \ dfufile.devInfo['vid'], \ @@ -84,35 +117,97 @@ def flash(args): dfu.erase(image['address']) status = dfu.wait_while_state(dfuse.DfuState.DFU_DOWNLOAD_BUSY) if status[1] != dfuse.DfuState.DFU_DOWNLOAD_IDLE: - raise RuntimeError("An error occured. Device Status: %r" % status) + raise RuntimeError("An error occured. Status: %r %r" % + (status[1], dfuse.DfuState.string(status[1]))) print("Flashing ...") - transfer_size = 1024 - dfu.set_address(image['address']) - status = dfu.wait_while_state(dfuse.DfuState.DFU_DOWNLOAD_BUSY) - if status[1] != dfuse.DfuState.DFU_DOWNLOAD_IDLE: - raise RuntimeError("An error occured. Device Status: %r" % status) - - data = image['data'] - blocks = [data[i:i + transfer_size] for i in range(0, len(data), transfer_size)] - for blocknum, block in enumerate(blocks): - print("Flashing block %r" % blocknum) - dfu.write(blocknum, block) - status = dfu.wait_while_state(dfuse.DfuState.DFU_DOWNLOAD_BUSY) - if status[1] != dfuse.DfuState.DFU_DOWNLOAD_IDLE: - raise RuntimeError("An error occured. Device Status: %r" % status) + transfer_size = mem['pagesize'] if mem else 1024 + _flash(image['address'], transfer_size, image['data'], dfu) print("Done") +def flash_bin(args): + dfu = find_device(args) + mem = dfu.get_mem_layout() + if mem is None: + raise RuntimeError("USB interface description could not be parsed.") + if not mem['writable']: + raise RuntimeError("Device not writable.") + + with open(args.flash_bin[0],'rb') as f: + data = bytearray(f.read()) + if not data: + raise ValueError("File %r could not be read" % args.flash_bin[0]) + + print("Flashing ...") + transfer_size = mem['pagesize'] if mem else 1024 + _flash(mem['address'], transfer_size, data, dfu) - return + print("Done !") +def _flash(address, transfer_size, data, dfu): + dfu.set_address(address) status = dfu.wait_while_state(dfuse.DfuState.DFU_DOWNLOAD_BUSY) - - if status[1] != dfuse.DfuState.DFU_IDLE and status[1] != dfuse.DfuState.DFU_DOWNLOAD_IDLE: - raise RuntimeError("An error occured. Status: %r" % status) + if status[1] != dfuse.DfuState.DFU_DOWNLOAD_IDLE: + raise RuntimeError("An error occured. Status: %r %r" % + (status[1], dfuse.DfuState.string(status[1]))) - print ("Done !") + blocks = [data[i:i + transfer_size] for i in range(0, len(data), transfer_size)] + for blocknum, block in enumerate(blocks): + print("Flashing block %r %rbytes" % (blocknum, (blocknum + 1) * transfer_size)) + dfu.write(blocknum, block) + status = dfu.wait_while_state(dfuse.DfuState.DFU_DOWNLOAD_BUSY) + if status[1] != dfuse.DfuState.DFU_DOWNLOAD_IDLE: + raise RuntimeError("An error occured. Status: %r %r" % + (status[1], dfuse.DfuState.string(status[1]))) + + dfu.clear_status() + +def read(args): + dfu = find_device(args) + mem = dfu.get_mem_layout() + if mem is not None and not mem['readable']: + print("Device not readable, exits now!") + return + + transactions = 0 + maxbytes = mem['pagesize'] * (mem['pageno'] if len(args.read) < 2 else int(args.read[1], 0)) + data = bytearray() + bytes_read = 0 + bytes_to_read = 0 + + print("Copying data from DFU device") + + while bytes_read < maxbytes: + bytes_to_read = mem['pagesize'] + result = dfu.upload(transactions, bytes_to_read) + if not result: + break + elif transactions == 0: + transactions += 2 + continue + elif (len(result) > 0): + data.extend(result) + bytes_read += len(result) + transactions += 1 + print("Read page {0} {1} bytes".format(transactions -3, len(result))) + + if len(result) != bytes_to_read: + break + + status = dfu.get_status() + dfu.clear_status() + if status[1] not in [dfuse.DfuState.DFU_IDLE, dfuse.DfuState.DFU_UPLOAD_IDLE]: + filename = args.read[0]+'.error' + with open(filename,'wb') as f: + f.write(bytearray(data)) + raise RuntimeError("An error occured. Status: %r %r, saved retrieved data to file %r" % + (status[1], dfuse.DfuState.string(status[1]), filename)) + + print('Done, read {0} bytes'.format(bytes_read)) + + with open(args.read[0],'wb') as f: + f.write(bytearray(data)) parser = argparse.ArgumentParser(description="DfuSe flashing util for STM32") @@ -120,7 +215,11 @@ def flash(args): action.add_argument('--list', action='store_true', help='List available DfuSe interfaces') action.add_argument('--leave', action='store_true', help='Leave DFU mode') action.add_argument('--flash', nargs=1, action='store', help='Flash a DfuSe file', metavar='FILE', type=dfuse.DfuFile) -action.add_argument('--erase', action='store', help='Erase page at ADDRESS (must be page aligned)', metavar=('ADDRESS'), type=int) +action.add_argument('--flash-bin', nargs=1, action='store', help='Flash a ordinary bin file', metavar='FILE', type=str) +action.add_argument('--read', nargs='+', action='store', help='Read device memory', metavar=('SAVEFILE', 'PAGES'), type=str) +action.add_argument('--erase', nargs='*', action='store', + help='Erase all or from ADDRESS to page ENDADDR or end of memory (must be page aligned)', + metavar=('ADDRESS', 'ENDADDR')) devinfo = parser.add_argument_group('Device information') devinfo.add_argument('--vid', action='store', type=int, default=0x0483, help='Device\'s USB vendor id, defaults to 0x0483') @@ -134,15 +233,20 @@ def flash(args): args = parser.parse_args() -#try: -if args.list: - list_dfu(args) -elif args.leave: - leave_dfu(args) -elif args.erase is not None: - erase(args) -elif args.flash is not None: - flash(args) -#except Exception as e: -# print(e, file=sys.stderr) -# sys.exit(-1) +try: +#if 1: + if args.list: + list_dfu(args) + elif args.leave: + leave_dfu(args) + elif args.erase is not None: + erase(args) + elif args.flash is not None: + flash(args) + elif args.flash_bin is not None: + flash_bin(args) + elif args.read is not None: + read(args) +except Exception as e: + print(e, file=sys.stderr) + sys.exit(-1) diff --git a/dfuse/DfuDevice.py b/dfuse/DfuDevice.py index 70e1ad7..e27790a 100644 --- a/dfuse/DfuDevice.py +++ b/dfuse/DfuDevice.py @@ -1,5 +1,6 @@ import usb.util import time +import re DFU_REQUEST_SEND = 0x21 DFU_REQUEST_RECEIVE = 0xa1 @@ -32,7 +33,7 @@ def set_alternate(self, intf): self.intf = intf[1] else: self.intf = intf - + self.intf.set_altsetting() def control_msg(self, requestType, request, value, buffer): @@ -40,17 +41,21 @@ def control_msg(self, requestType, request, value, buffer): def detach(self, timeout): return self.control_msg(DFU_REQUEST_SEND, DFU_DETACH, timeout, None) - + def dnload(self, blockNum, data): return self.control_msg(DFU_REQUEST_SEND, DFU_DNLOAD, blockNum, data) - + def upload(self, blockNum, size): - return self.control_msg(DFU_REQUEST_RECEIVE, DFU_UPLOAD, blockNum, size) + try: + return self.control_msg(DFU_REQUEST_RECEIVE, DFU_UPLOAD, blockNum, size) + except usb.core.USBError as e: + if e.args[0] != 32: + raise e def get_status(self): status = self.control_msg(DFU_REQUEST_RECEIVE, DFU_GETSTATUS, 0, 6) return (status[0], status[4], status[1] + (status[2] << 8) + (status[3] << 16), status[5]) - + def clear_status(self): self.control_msg(DFU_REQUEST_SEND, DFU_CLRSTATUS, 0, None) @@ -62,7 +67,7 @@ def set_address(self, ap): def write(self, block, data): return self.dnload(block + 2, data) - + def erase(self, pa): return self.dnload(0x0, [0x41] + address_to_4bytes(pa)) @@ -79,10 +84,37 @@ def wait_while_state(self, state): states = state status = self.get_status() - + while (status[1] in states): - status = self.get_status() + try: + status = self.get_status() + except usb.core.USBError as e: + if e.args[0] != 32: + raise e time.sleep(status[2] / 1000) - + return status + def get_mem_layout(self): + intf = self.intf if not None else self.cfg[(0, 0)] + mem_layout_str = usb.util.get_string(self.dev, intf.iInterface) + + # refer to UM0290 page 31 for this how to interpret string + regex = re.compile(r"^@.*\/((?:0x)?[0-9a-fA-F]+)\/(\d+)[\*x](\d+)(.)(.)\s*$") + match = regex.match(mem_layout_str) + if match is not None: + grps = match.groups() + addr = int(grps[0], 0) + npages = int(grps[1], 10) + pagesz = int(grps[2], 10) + prefix = grps[3] + pagesz = (pagesz * 1024 if prefix.lower() == 'k' else + pagesz * 1024 *1024 if prefix.lower() == 'm' else pagesz) + return {'address':addr, + 'pageno':npages, + 'pagesize': pagesz, + 'readable': grps[4] in ['a', 'c', 'g'], + 'writable': grps[4] in ['d', 'e', 'f', 'g'], + 'erasable': grps[4] in ['b', 'c', 'f', 'g']} + return None + diff --git a/dfuse/DfuState.py b/dfuse/DfuState.py index 035631b..bb58333 100644 --- a/dfuse/DfuState.py +++ b/dfuse/DfuState.py @@ -10,4 +10,9 @@ class DfuState(): DFU_MANIFEST_WAIT_RESET = 0x08 DFU_UPLOAD_IDLE = 0x09 DFU_ERROR = 0x0a - + @staticmethod + def string(nr): + for itm in dir(DfuState): + if nr == DfuState.__dict__[itm]: + return itm + return "" From c36c150b3cc6e2b383c50c29b0f91088669fc6cf Mon Sep 17 00:00:00 2001 From: Fredrik Johansson Date: Sun, 7 Nov 2021 00:17:09 +0100 Subject: [PATCH 3/3] More timeout problems resolved --- dfuse-tool.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dfuse-tool.py b/dfuse-tool.py index 2cfc59a..ff03634 100755 --- a/dfuse-tool.py +++ b/dfuse-tool.py @@ -73,7 +73,7 @@ def erase(args): cnt = 0 while addr < end: dfu.erase(addr) - dfu.get_status() # must send after erase command page page 16 AN3156 + # must send after erase command page page 16 AN3156 dfu.wait_while_state(dfuse.DfuState.DFU_DOWNLOAD_BUSY) print("Erasing page starting at %.8x" % addr) addr += pagesize