diff options
Diffstat (limited to 'df10ch_setup_pkg/usb1.py')
-rw-r--r-- | df10ch_setup_pkg/usb1.py | 1507 |
1 files changed, 1507 insertions, 0 deletions
diff --git a/df10ch_setup_pkg/usb1.py b/df10ch_setup_pkg/usb1.py new file mode 100644 index 0000000..bc0d8db --- /dev/null +++ b/df10ch_setup_pkg/usb1.py @@ -0,0 +1,1507 @@ +""" +Pythonic wrapper for libusb-1.0. + +This file has been extracted from the python-libusb1 project: + http://github.com/vpelletier/python-libusb1 + +The first thing you must do is to get an "USB context". To do so, create a +LibUSBContext instance. +Then, you can use it to browse available USB devices and open the one you want +to talk to. +At this point, you should have a USBDeviceHandle instance (as returned by +LibUSBContext or USBDevice instances), and you can start exchanging with the +device. + +Features: +- Basic device settings (configuration & interface selection, ...) +- String descriptor lookups (ASCII & unicode), and list supported language + codes +- Synchronous I/O (control, bulk, interrupt) +- Asynchronous I/O (control, bulk, interrupt, isochronous) + Note: Isochronous support is experimental. + See USBPoller, USBTransfer and USBTransferHelper. +""" + +import libusb1 +from ctypes import byref, create_string_buffer, c_int, sizeof, POINTER, \ + create_unicode_buffer, c_wchar, cast, c_uint16, c_ubyte, string_at, \ + addressof, c_void_p, cdll +from cStringIO import StringIO +import sys +from ctypes.util import find_library + +__all__ = ['LibUSBContext', 'USBDeviceHandle', 'USBDevice', + 'USBPoller', 'USBTransfer', 'USBTransferHelper', 'EVENT_CALLBACK_SET'] + +if sys.version_info[:2] >= (2, 6): + if sys.platform == 'win32': + from ctypes import get_last_error as get_errno + else: + from ctypes import get_errno +else: + def get_errno(): + raise NotImplementedError("Your python version doesn't support " + "errno/last_error") + +__libc_name = find_library('c') +if __libc_name is None: + # Of course, will leak memory. + # Should we warn user ? How ? + _free = lambda x: None +else: + _free = getattr(cdll, __libc_name).free +del __libc_name + +# Default string length +# From a comment in libusb-1.0: "Some devices choke on size > 255" +STRING_LENGTH = 255 + +EVENT_CALLBACK_SET = frozenset(( + libusb1.LIBUSB_TRANSFER_COMPLETED, + libusb1.LIBUSB_TRANSFER_ERROR, + libusb1.LIBUSB_TRANSFER_TIMED_OUT, + libusb1.LIBUSB_TRANSFER_CANCELLED, + libusb1.LIBUSB_TRANSFER_STALL, + libusb1.LIBUSB_TRANSFER_NO_DEVICE, + libusb1.LIBUSB_TRANSFER_OVERFLOW, +)) + +DEFAULT_ASYNC_TRANSFER_ERROR_CALLBACK = lambda x: False + +def create_binary_buffer(string_or_len): + # Prevent ctypes from adding a trailing null char. + if isinstance(string_or_len, basestring): + result = create_string_buffer(string_or_len, len(string_or_len)) + else: + result = create_string_buffer(string_or_len) + return result + +class USBTransfer(object): + """ + USB asynchronous transfer control & data. + + All modification methods will raise if called on a submitted transfer. + Methods noted as "should not be called on a submitted transfer" will not + prevent you from reading, but returned value is unspecified. + """ + # Prevent garbage collector from freeing the free function before our + # instances, as we need it to property destruct them. + __libusb_free_transfer = libusb1.libusb_free_transfer + __libusb_cancel_transfer = libusb1.libusb_cancel_transfer + __USBError = libusb1.USBError + __LIBUSB_ERROR_NOT_FOUND = libusb1.LIBUSB_ERROR_NOT_FOUND + __transfer = None + __initialized = False + __submitted = False + __callback = None + __ctypesCallbackWrapper = None + + def __init__(self, handle, iso_packets=0): + """ + You should not instanciate this class directly. + Call "getTransfer" method on an USBDeviceHandle instance to get + instances of this class. + """ + if iso_packets < 0: + raise ValueError('Cannot request a negative number of iso ' + 'packets.') + self.__handle = handle + self.__num_iso_packets = iso_packets + result = libusb1.libusb_alloc_transfer(iso_packets) + if not result: + raise libusb1.USBError('Unable to get a transfer object') + self.__transfer = result + self.__ctypesCallbackWrapper = libusb1.libusb_transfer_cb_fn_p( + self.__callbackWrapper) + + def close(self): + """ + Stop using this transfer. + This removes some references to other python objects, to help garbage + collection. + Raises if called on a submitted transfer. + This does not prevent future reuse of instance (calling one of + "setControl", "setBulk", "setInterrupt" or "setIsochronous" methods + will initialize it properly again), just makes it ready to be + garbage-collected. + It is not mandatory to call it either, if you have no problems with + garbage collection. + """ + if self.__submitted: + raise ValueError('Cannot close a submitted transfer') + self.__initialized = False + self.__callback = None + + def __del__(self): + if self.__transfer is not None: + try: + # If this doesn't raise, we're doomed; transfer was submitted, + # still python decided to garbage-collect this instance. + # Stick to libusb's documentation, and don't free the + # transfer. If interpreter is shutting down, kernel will + # reclaim memory anyway. + # Note: we can't prevent transfer's buffer from being + # garbage-collected as soon as there will be no remaining + # reference to transfer, so a segfault might happen anyway. + # Should we warn user ? How ? + self.cancel() + except self.__USBError, exception: + if exception.value == self.__LIBUSB_ERROR_NOT_FOUND: + # Transfer was not submitted, we can free it. + self.__libusb_free_transfer(self.__transfer) + else: + raise + + def __callbackWrapper(self, transfer_p): + """ + Makes it possible for user-provided callback to alter transfer when + fired (ie, mark transfer as not submitted upon call). + """ + self.__submitted = False + callback = self.__callback + if callback is not None: + callback(self) + + def setCallback(self, callback): + """ + Change transfer's callback. + """ + if self.__submitted: + raise ValueError('Cannot alter a submitted transfer') + self.__callback = callback + + def getCallback(self): + """ + Get currently set callback. + """ + return self.__callback + + def setControl(self, request_type, request, value, index, buffer_or_len, + callback=None, user_data=None, timeout=0): + """ + Setup transfer for control use. + + request_type, request, value, index: See USBDeviceHandle.controlWrite. + buffer_or_len: either a string (when sending data), or expected data + length (when receiving data) + callback: function to call upon event. Called with transfer as + parameter, return value ignored. + user_data: to pass some data to/from callback + timeout: in milliseconds, how long to wait for devices acknowledgement + or data. Set to 0 to disable. + """ + if self.__submitted: + raise ValueError('Cannot alter a submitted transfer') + if isinstance(buffer_or_len, basestring): + length = len(buffer_or_len) + string_buffer = create_binary_buffer( + ' ' * libusb1.LIBUSB_CONTROL_SETUP_SIZE + buffer_or_len) + else: + length = buffer_or_len + string_buffer = create_binary_buffer(length + \ + libusb1.LIBUSB_CONTROL_SETUP_SIZE) + self.__initialized = False + libusb1.libusb_fill_control_setup(string_buffer, request_type, + request, value, index, length) + libusb1.libusb_fill_control_transfer(self.__transfer, self.__handle, + string_buffer, self.__ctypesCallbackWrapper, user_data, timeout) + self.__callback = callback + self.__initialized = True + + def setBulk(self, endpoint, buffer_or_len, callback=None, user_data=None, + timeout=0): + """ + Setup transfer for bulk use. + + endpoint: endpoint to submit transfer to (implicitly sets transfer + direction). + buffer_or_len: either a string (when sending data), or expected data + length (when receiving data) + callback: function to call upon event. Called with transfer as + parameter, return value ignored. + user_data: to pass some data to/from callback + timeout: in milliseconds, how long to wait for devices acknowledgement + or data. Set to 0 to disable. + """ + if self.__submitted: + raise ValueError('Cannot alter a submitted transfer') + string_buffer = create_binary_buffer(buffer_or_len) + self.__initialized = False + libusb1.libusb_fill_bulk_transfer(self.__transfer, self.__handle, + endpoint, string_buffer, sizeof(string_buffer), + self.__ctypesCallbackWrapper, user_data, timeout) + self.__callback = callback + self.__initialized = True + + def setInterrupt(self, endpoint, buffer_or_len, callback=None, + user_data=None, timeout=0): + """ + Setup transfer for interrupt use. + + endpoint: endpoint to submit transfer to (implicitly sets transfer + direction). + buffer_or_len: either a string (when sending data), or expected data + length (when receiving data) + callback: function to call upon event. Called with transfer as + parameter, return value ignored. + user_data: to pass some data to/from callback + timeout: in milliseconds, how long to wait for devices acknowledgement + or data. Set to 0 to disable. + """ + if self.__submitted: + raise ValueError('Cannot alter a submitted transfer') + string_buffer = create_binary_buffer(buffer_or_len) + self.__initialized = False + libusb1.libusb_fill_interrupt_transfer(self.__transfer, self.__handle, + endpoint, string_buffer, sizeof(string_buffer), + self.__ctypesCallbackWrapper, user_data, timeout) + self.__callback = callback + self.__initialized = True + + def setIsochronous(self, endpoint, buffer_or_len, callback=None, + user_data=None, timeout=0, iso_transfer_length_list=None): + """ + Setup transfer for isochronous use. + + endpoint: endpoint to submit transfer to (implicitly sets transfer + direction). + buffer_or_len: either a string (when sending data), or expected data + length (when receiving data) + callback: function to call upon event. Called with transfer as + parameter, return value ignored. + user_data: to pass some data to/from callback + timeout: in milliseconds, how long to wait for devices acknowledgement + or data. Set to 0 to disable. + iso_transfer_length_list: list of individual transfer sizes. If not + provided, buffer_or_len's size will be divided evenly among the + number of ISO transfers given to receive current instance, rounded + down. Providing a list allows overriding this (both the number of + ISO transfers and their individual lengths). + """ + if self.__submitted: + raise ValueError('Cannot alter a submitted transfer') + num_iso_packets = self.__num_iso_packets + if num_iso_packets == 0: + raise TypeError('This transfer canot be used for isochronous I/O. ' + 'You must get another one with a non-zero iso_packets ' + 'parameter.') + string_buffer = create_binary_buffer(buffer_or_len) + buffer_length = sizeof(string_buffer) + if iso_transfer_length_list is None: + iso_length = buffer_length // num_iso_packets + iso_transfer_length_list = [iso_length] * num_iso_packets + configured_iso_packets = len(iso_transfer_length_list) + if configured_iso_packets > num_iso_packets: + raise ValueError('Too many ISO transfer lengths (%i), there are ' + 'only %i ISO transfers available' % (configured_iso_packets, + num_iso_packets)) + if sum(iso_transfer_length_list) > buffer_length: + raise ValueError('ISO transfers too long (%i), there are only ' + '%i bytes available' % (sum(iso_transfer_length_list), + buffer_length)) + transfer_p = self.__transfer + max_iso_packet_size = self.__handle.getDevice().getMaxISOPacketSize( + endpoint) + self.__initialized = False + libusb1.libusb_fill_iso_transfer(transfer_p, self.__handle, + endpoint, string_buffer, buffer_length, configured_iso_packets, + self.__ctypesCallbackWrapper, user_data, timeout) + for length, iso_packet_desc in zip(iso_transfer_length_list, + libusb1.get_iso_packet_list(transfer_p)): + if length <= 0: + raise ValueError('Negative/null length transfers are not ' + 'possible.') + if length > max_iso_packet_size: + raise ValueError('Packet too big (%i) for this endpoint: %i ' + 'bytes max') + iso_packet_desc.length = length + self.__callback = callback + self.__initialized = True + + def getType(self): + """ + Get transfer type. + See libusb1.libusb_transfer_type. + """ + return self.__transfer.contents.type + + def getEndpoint(self): + """ + Get endpoint. + """ + return self.__transfer.contents.endpoint + + def getStatus(self): + """ + Get transfer status. + Should not be called on a submitted transfer. + """ + return self.__transfer.contents.status + + def getActualLength(self): + """ + Get actually transfered data length. + Should not be called on a submitted transfer. + """ + return self.__transfer.contents.actual_length + + def getBuffer(self): + """ + Get data buffer content. + Should not be called on a submitted transfer. + """ + transfer_p = self.__transfer + transfer = transfer_p.contents + if transfer.type == libusb1.LIBUSB_TRANSFER_TYPE_CONTROL: + result = libusb1.libusb_control_transfer_get_data(transfer_p) + else: + result = string_at(transfer.buffer, transfer.length) + return result + + def getISOBufferList(self): + """ + Get individual ISO transfer's buffer. + Returns a list with one item per ISO transfer, with their + individually-configured sizes. + Should not be called on a submitted transfer. + """ + transfer_p = self.__transfer + transfer = transfer_p.contents + if transfer.type != libusb1.LIBUSB_TRANSFER_TYPE_ISOCHRONOUS: + raise TypeError('This method cannot be called on non-iso ' + 'transfers.') + return libusb1.get_iso_packet_buffer_list(transfer_p) + + def getISOSetupList(self): + """ + Get individual ISO transfer's setup. + Returns a list of dicts, each containing an individual ISO transfer + parameters: + - length + - actual_length + - status + (see libusb1's API documentation for their signification) + Should not be called on a submitted transfer (except for 'length' + values). + """ + transfer_p = self.__transfer + transfer = transfer_p.contents + if transfer.type != libusb1.LIBUSB_TRANSFER_TYPE_ISOCHRONOUS: + raise TypeError('This method cannot be called on non-iso ' + 'transfers.') + return [{ + 'length': x.length, + 'actual_length': x.actual_length, + 'status': x.status, + } for x in libusb1.get_iso_packet_list(transfer_p)] + + def setBuffer(self, buffer_or_len): + """ + Replace buffer with a new one. + Allows resizing read buffer and replacing data sent. + Note: resizing is not allowed for isochronous buffer (use + setIsochronous). + Note: disallowed on control transfers (use setControl). + """ + if self.__submitted: + raise ValueError('Cannot alter a submitted transfer') + transfer = self.__transfer.contents + if transfer.type == libusb1.LIBUSB_TRANSFER_TYPE_CONTROL: + raise ValueError('To alter control transfer buffer, use ' + 'setControl') + buff = create_binary_buffer(buffer_or_len) + if transfer.type == libusb1.LIBUSB_TRANSFER_TYPE_ISOCHRONOUS and \ + sizeof(buff) != transfer.length: + raise ValueError('To alter isochronous transfer buffer length, ' + 'use setIsochronous') + transfer.buffer = cast(buff, c_void_p) + transfer.length = sizeof(buff) + + def isSubmitted(self): + """ + Tells if this transfer is submitted and still pending. + """ + return self.__submitted + + def submit(self): + """ + Submit transfer for asynchronous handling. + """ + if self.__submitted: + raise ValueError('Cannot submit a submitted transfer') + if not self.__initialized: + raise ValueError('Cannot submit a transfer until it has been ' + 'initialized') + self.__submitted = True + result = libusb1.libusb_submit_transfer(self.__transfer) + if result: + self.__submitted = False + raise libusb1.USBError(result) + + def cancel(self): + """ + Cancel transfer. + Note: cancellation happens asynchronously, so you must wait for + LIBUSB_TRANSFER_CANCELLED. + """ + result = self.__libusb_cancel_transfer(self.__transfer) + if result: + raise self.__USBError(result) + self.__submitted = False + +class USBTransferHelper(object): + """ + Simplifies subscribing to the same transfer over and over, and callback + handling: + - no need to read event status to execute apropriate code, just setup + different functions for each status code + - just return True instead of calling submit + + Callbacks used in this class must follow the callback API described in + USBTransfer, and are expected to return a boolean: + - True if transfer is to be submitted again (to receive/send more data) + - False otherwise + + Note: as per libusb1 specifications, isochronous transfer global state + might be LIBUSB_TRANSFER_COMPLETED although some individual packets might + have an error status. You can check individual packet status by calling + getISOSetupList on transfer object in your callback. + """ + def __init__(self, transfer=None): + """ + Create a transfer callback dispatcher. + + transfer parameter is deprecated. If provided, it will be equivalent + to: + helper = USBTransferHelper() + transfer.setCallback(helper) + and also allows using deprecated methods on this class (otherwise, + they raise AttributeError). + """ + if transfer is not None: + # Deprecated: to drop + self.__transfer = transfer + transfer.setCallback(self) + self.__event_callback_dict = {} + self.__errorCallback = DEFAULT_ASYNC_TRANSFER_ERROR_CALLBACK + + def submit(self): + """ + Submit the asynchronous read request. + Deprecated. Use submit on transfer. + """ + # Deprecated: to drop + self.__transfer.submit() + + def cancel(self): + """ + Cancel a pending read request. + Deprecated. Use cancel on transfer. + """ + # Deprecated: to drop + self.__transfer.cancel() + + def setEventCallback(self, event, callback): + """ + Set a function to call for a given event. + Possible event identifiers are listed in EVENT_CALLBACK_SET. + """ + if event not in EVENT_CALLBACK_SET: + raise ValueError('Unknown event %r.' % (event, )) + self.__event_callback_dict[event] = callback + + def setDefaultCallback(self, callback): + """ + Set the function to call for event which don't have a specific callback + registered. + The initial default callback does nothing and returns False. + """ + self.__errorCallback = callback + + def getEventCallback(self, event, default=None): + """ + Return the function registered to be called for given event identifier. + """ + return self.__event_callback_dict.get(event, default) + + def __call__(self, transfer): + """ + Callback to set on transfers. + """ + if self.getEventCallback(transfer.getStatus(), self.__errorCallback)( + transfer): + transfer.submit() + + def isSubmited(self): + """ + Returns whether this reader is currently waiting for an event. + Deprecatd. Use isSubmitted on transfer. + """ + # Deprecated: to drop + return self.__transfer.isSubmitted() + +class USBPoller(object): + """ + Class allowing integration of USB event polling in a file-descriptor + monitoring event loop. + """ + def __init__(self, context, poller): + """ + Create a poller for given context. + Warning: it will not check if another poller instance was already + present for that context, and will replace it. + + poller is a polling instance implementing the following methods: + - register(fd, event_flags) + event_flags have the same meaning as in poll API (POLLIN & POLLOUT) + - unregister(fd) + - poll(timeout) + timeout being a float in seconds, or None if there is no timeout. + It must return a list of (descriptor, event) pairs. + Note: USBPoller is itself a valid poller. + """ + self.__context = context + self.__poller = poller + self.__fd_set = set() + context.setPollFDNotifiers(self._registerFD, self._unregisterFD) + for fd, events in context.getPollFDList(): + self._registerFD(fd, events) + + def __del__(self): + self.__context.setPollFDNotifiers(None, None) + + def poll(self, timeout=None): + """ + Poll for events. + timeout can be a float in seconds, or None for no timeout. + Returns a list of (descriptor, event) pairs. + """ + next_usb_timeout = self.__context.getNextTimeout() + if timeout is None: + usb_timeout = next_usb_timeout + elif next_usb_timeout: + usb_timeout = min(next_usb_timeout, timeout) + else: + usb_timeout = timeout + event_list = self.__poller.poll(usb_timeout) + if event_list: + fd_set = self.__fd_set + result = [(x, y) for x, y in event_list if x not in fd_set] + if len(result) != len(event_list): + self.__context.handleEventsTimeout() + else: + result = event_list + self.__context.handleEventsTimeout() + return result + + def register(self, fd, events): + """ + Register an USB-unrelated fd to poller. + Convenience method. + """ + if fd in self.__fd_set: + raise ValueError('This fd is a special USB event fd, it cannot ' + 'be polled.') + self.__poller.register(fd, events) + + def unregister(self, fd): + """ + Unregister an USB-unrelated fd from poller. + Convenience method. + """ + if fd in self.__fd_set: + raise ValueError('This fd is a special USB event fd, it must ' + 'stay registered.') + self.__poller.unregister(fd) + + def _registerFD(self, fd, events, user_data=None): + self.register(fd, events) + self.__fd_set.add(fd) + + def _unregisterFD(self, fd, user_data=None): + self.__fd_set.discard(fd) + self.unregister(fd) + +class USBDeviceHandle(object): + """ + Represents an opened USB device. + """ + __handle = None + __libusb_close = libusb1.libusb_close + + def __init__(self, context, handle, device): + """ + You should not instanciate this class directly. + Call "open" method on an USBDevice instance to get an USBDeviceHandle + instance. + """ + # XXX Context parameter is just here as a hint for garbage collector: + # It must collect USBDeviceHandle instances before their LibUSBContext. + self.__context = context + self.__handle = handle + self.__device = device + + def __del__(self): + self.close() + + def close(self): + """ + Close this handle. If not called explicitely, will be called by + destructor. + """ + handle = self.__handle + if handle is not None: + self.__libusb_close(handle) + self.__handle = None + + def getDevice(self): + """ + Get an USBDevice instance for the device accessed through this handle. + Useful for example to query its configurations. + """ + return self.__device + + def getConfiguration(self): + """ + Get the current configuration number for this device. + """ + configuration = c_int() + result = libusb1.libusb_get_configuration(self.__handle, + byref(configuration)) + if result: + raise libusb1.USBError(result) + return configuration + + def setConfiguration(self, configuration): + """ + Set the configuration number for this device. + """ + result = libusb1.libusb_set_configuration(self.__handle, configuration) + if result: + raise libusb1.USBError(result) + + def claimInterface(self, interface): + """ + Claim (= get exclusive access to) given interface number. Required to + receive/send data. + """ + result = libusb1.libusb_claim_interface(self.__handle, interface) + if result: + raise libusb1.USBError(result) + + def releaseInterface(self, interface): + """ + Release interface, allowing another process to use it. + """ + result = libusb1.libusb_release_interface(self.__handle, interface) + if result: + raise libusb1.USBError(result) + + def setInterfaceAltSetting(self, interface, alt_setting): + """ + Set interface's alternative setting (both parameters are integers). + """ + result = libusb1.libusb_set_interface_alt_setting(self.__handle, + interface, + alt_setting) + if result: + raise libusb1.USBError(result) + + def clearHalt(self, endpoint): + """ + Clear a halt state on given endpoint number. + """ + result = libusb1.libusb_clear_halt(self.__handle, endpoint) + if result: + raise libusb1.USBError(result) + + def resetDevice(self): + """ + Reinitialise current device. + Attempts to restore current configuration & alt settings. + If this fails, will result in a device diconnect & reconnect, so you + have to close current device and rediscover it (notified by a + LIBUSB_ERROR_NOT_FOUND error code). + """ + result = libusb1.libusb_reset_device(self.__handle) + if result: + raise libusb1.USBError(result) + + def kernelDriverActive(self, interface): + """ + Tell whether a kernel driver is active on given interface number. + """ + result = libusb1.libusb_kernel_driver_active(self.__handle, interface) + if result == 0: + is_active = False + elif result == 1: + is_active = True + else: + raise libusb1.USBError(result) + return is_active + + def detachKernelDriver(self, interface): + """ + Ask kernel driver to detach from given interface number. + """ + result = libusb1.libusb_detach_kernel_driver(self.__handle, interface) + if result: + raise libusb1.USBError(result) + + def attachKernelDriver(self, interface): + """ + Ask kernel driver to re-attach to given interface number. + """ + result = libusb1.libusb_attach_kernel_driver(self.__handle, interface) + if result: + raise libusb1.USBError(result) + + def getSupportedLanguageList(self): + """ + Return a list of USB language identifiers (as integers) supported by + current device for its string descriptors. + """ + descriptor_string = create_binary_buffer(STRING_LENGTH) + result = libusb1.libusb_get_string_descriptor(self.__handle, + 0, 0, descriptor_string, sizeof(descriptor_string)) + if result < 0: + if result == libusb1.LIBUSB_ERROR_PIPE: + # From libusb_control_transfer doc: + # control request not supported by the device + return [] + raise libusb1.USBError(result) + length = cast(descriptor_string, POINTER(c_ubyte))[0] + langid_list = cast(descriptor_string, POINTER(c_uint16)) + result = [] + append = result.append + for offset in xrange(1, length / 2): + append(libusb1.libusb_le16_to_cpu(langid_list[offset])) + return result + + def getStringDescriptor(self, descriptor, lang_id): + """ + Fetch description string for given descriptor and in given language. + Use getSupportedLanguageList to know which languages are available. + Return value is an unicode string. + Return None if there is no such descriptor on device. + """ + descriptor_string = create_unicode_buffer( + STRING_LENGTH / sizeof(c_wchar)) + result = libusb1.libusb_get_string_descriptor(self.__handle, + descriptor, lang_id, descriptor_string, sizeof(descriptor_string)) + if result == libusb1.LIBUSB_ERROR_NOT_FOUND: + return None + if result < 0: + raise libusb1.USBError(result) + return descriptor_string.value + + def getASCIIStringDescriptor(self, descriptor): + """ + Fetch description string for given descriptor in first available + language. + Return value is an ASCII string. + Return None if there is no such descriptor on device. + """ + descriptor_string = create_binary_buffer(STRING_LENGTH) + result = libusb1.libusb_get_string_descriptor_ascii(self.__handle, + descriptor, descriptor_string, sizeof(descriptor_string)) + if result == libusb1.LIBUSB_ERROR_NOT_FOUND: + return None + if result < 0: + raise libusb1.USBError(result) + return descriptor_string.value + + # Sync I/O + + def _controlTransfer(self, request_type, request, value, index, data, + length, timeout): + result = libusb1.libusb_control_transfer(self.__handle, request_type, + request, value, index, data, length, timeout) + if result < 0: + raise libusb1.USBError(result) + return result + + def controlWrite(self, request_type, request, value, index, data, + timeout=0): + """ + Synchronous control write. + request_type: request type bitmask (bmRequestType), see libusb1 + constants LIBUSB_TYPE_* and LIBUSB_RECIPIENT_*. + request: request id (some values are standard). + value, index, data: meaning is request-dependent. + timeout: in milliseconds, how long to wait for device acknowledgement. + Set to 0 to disable. + + Returns the number of bytes actually sent. + """ + request_type = (request_type & ~libusb1.USB_ENDPOINT_DIR_MASK) | \ + libusb1.LIBUSB_ENDPOINT_OUT + data = create_binary_buffer(data) + return self._controlTransfer(request_type, request, value, index, data, + sizeof(data), timeout) + + def controlRead(self, request_type, request, value, index, length, + timeout=0): + """ + Synchronous control read. + timeout: in milliseconds, how long to wait for data. Set to 0 to + disable. + See controlWrite for other parameters description. + + Returns received data. + """ + request_type = (request_type & ~libusb1.USB_ENDPOINT_DIR_MASK) | \ + libusb1.LIBUSB_ENDPOINT_IN + data = create_binary_buffer(length) + transferred = self._controlTransfer(request_type, request, value, + index, data, length, timeout) + return data.raw[:transferred] + + def _bulkTransfer(self, endpoint, data, length, timeout): + transferred = c_int() + result = libusb1.libusb_bulk_transfer(self.__handle, endpoint, + data, length, byref(transferred), timeout) + if result: + raise libusb1.USBError(result) + return transferred.value + + def bulkWrite(self, endpoint, data, timeout=0): + """ + Synchronous bulk write. + endpoint: endpoint to send data to. + data: data to send. + timeout: in milliseconds, how long to wait for device acknowledgement. + Set to 0 to disable. + + Returns the number of bytes actually sent. + """ + endpoint = (endpoint & ~libusb1.USB_ENDPOINT_DIR_MASK) | \ + libusb1.LIBUSB_ENDPOINT_OUT + data = create_binary_buffer(data) + return self._bulkTransfer(endpoint, data, sizeof(data), timeout) + + def bulkRead(self, endpoint, length, timeout=0): + """ + Synchronous bulk read. + timeout: in milliseconds, how long to wait for data. Set to 0 to + disable. + See bulkWrite for other parameters description. + + Returns received data. + """ + endpoint = (endpoint & ~libusb1.USB_ENDPOINT_DIR_MASK) | \ + libusb1.LIBUSB_ENDPOINT_IN + data = create_binary_buffer(length) + transferred = self._bulkTransfer(endpoint, data, length, timeout) + return data.raw[:transferred] + + def _interruptTransfer(self, endpoint, data, length, timeout): + transferred = c_int() + result = libusb1.libusb_interrupt_transfer(self.__handle, endpoint, + data, length, byref(transferred), timeout) + if result: + raise libusb1.USBError(result) + return transferred.value + + def interruptWrite(self, endpoint, data, timeout=0): + """ + Synchronous interrupt write. + endpoint: endpoint to send data to. + data: data to send. + timeout: in milliseconds, how long to wait for device acknowledgement. + Set to 0 to disable. + + Returns the number of bytes actually sent. + """ + endpoint = (endpoint & ~libusb1.USB_ENDPOINT_DIR_MASK) | \ + libusb1.LIBUSB_ENDPOINT_OUT + data = create_binary_buffer(data) + return self._interruptTransfer(endpoint, data, sizeof(data), timeout) + + def interruptRead(self, endpoint, length, timeout=0): + """ + Synchronous interrupt write. + timeout: in milliseconds, how long to wait for data. Set to 0 to + disable. + See interruptRead for other parameters description. + + Returns received data. + """ + endpoint = (endpoint & ~libusb1.USB_ENDPOINT_DIR_MASK) | \ + libusb1.LIBUSB_ENDPOINT_IN + data = create_binary_buffer(length) + transferred = self._interruptTransfer(endpoint, data, length, timeout) + return data.raw[:transferred] + + def getTransfer(self, iso_packets=0): + """ + Get an empty transfer for asynchronous use. + iso_packets: the number of isochronous transfer descriptors to + allocate. + """ + return USBTransfer(self.__handle, iso_packets) + +class USBConfiguration(object): + def __init__(self, config): + if not isinstance(config, libusb1.libusb_config_descriptor): + raise TypeError('Unexpected descriptor type.') + self.__config = config + + def getNumInterfaces(self): + return self.__config.bNumInterfaces + + def getConfigurationValue(self): + return self.__config.bConfigurationValue + + def getDescriptor(self): + return self.__config.iConfiguration + + def getAttributes(self): + return self.__config.bmAttributes + + def getMaxPower(self): + return self.__config.MaxPower * 2 + + def getExtra(self): + return libusb1.get_extra(self.__config) + + def iterInterfaces(self): + interface_list = self.__config.interface + for interface_num in xrange(self.getNumInterfaces()): + yield USBInterface(interface_list[interface_num]) + + def __getitem__(self, interface): + if not isinstance(interface, int): + raise TypeError('interface parameter must be an integer') + if not (0 <= interface < self.getNumInterfaces()): + raise IndexError('No such interface: %r' % (interface, )) + return USBInterface(self.__config.interface[interface]) + +class USBInterface(object): + def __init__(self, interface): + if not isinstance(interface, libusb1.libusb_interface): + raise TypeError('Unexpected descriptor type.') + self.__interface = interface + + def getNumSettings(self): + return self.__interface.num_altsetting + + def iterSettings(self): + alt_setting_list = self.__interface.altsetting + for alt_setting_num in xrange(self.getNumSettings()): + yield USBInterfaceSetting(alt_setting_list[alt_setting_num]) + + def __getitem__(self, alt_setting): + if not isinstance(alt_setting, int): + raise TypeError('alt_setting parameter must be an integer') + if not (0 <= alt_setting < self.getNumSettings()): + raise IndexError('No such setting: %r' % (alt_setting, )) + return USBInterfaceSetting(self.__interface.altsetting[alt_setting]) + +class USBInterfaceSetting(object): + def __init__(self, alt_setting): + if not isinstance(alt_setting, libusb1.libusb_interface_descriptor): + raise TypeError('Unexpected descriptor type.') + self.__alt_setting = alt_setting + + def getNumber(self): + return self.__alt_setting.bInterfaceNumber + + def getAlternateSetting(self): + return self.__alt_setting.bAlternateSetting + + def getNumEndpoints(self): + return self.__alt_setting.bNumEndpoints + + def getClass(self): + return self.__alt_setting.bInterfaceClass + + def getSubClass(self): + return self.__alt_setting.bInterfaceSubClass + + def getClassTupple(self): + """ + For convenience: class and subclass are probably often matched + simultaneously. + """ + alt_setting = self.__alt_setting + return (alt_setting.bInterfaceClass, alt_setting.bInterfaceSubClass) + + def getProtocol(self): + return self.__alt_setting.bInterfaceProtocol + + def getDescriptor(self): + return self.__alt_setting.iInterface + + def getExtra(self): + return libusb1.get_extra(self.__alt_setting) + + def iterEndpoints(self): + endpoint_list = self.__alt_setting.endpoint + for endpoint_num in xrange(self.getNumEndpoints()): + yield USBEndPoint(endpoint_list[endpoint_num]) + + def __getitem__(self, endpoint): + if not isinstance(endpoint, int): + raise TypeError('endpoint parameter must be an integer') + if not (0 <= endpoint < self.getNumEndpoints()): + raise ValueError('No such endpoint: %r' % (endpoint, )) + return USBEndPoint(self.__alt_setting.endpoint[endpoint]) + +class USBEndPoint(object): + def __init__(self, endpoint): + if not isinstance(endpoint, libusb1.libusb_endpoint_descriptor): + raise TypeError('Unexpected descriptor type.') + self.__endpoint = endpoint + + def getAddress(self): + return self.__endpoint.bEndpointAddress + + def getAttributes(self): + return self.__endpoint.bmAttributes + + def getMaxPacketSize(self): + return self.__endpoint.wMaxPacketSize + + def getInterval(self): + return self.__endpoint.bInterval + + def getRefresh(self): + return self.__endpoint.bRefresh + + def getSyncAddress(self): + return self.__endpoint.bSynchAddress + + def getExtra(self): + return libusb1.get_extra(self.__endpoint) + +class USBDevice(object): + """ + Represents a USB device. + """ + + __configuration_descriptor_list = None + __libusb_unref_device = libusb1.libusb_unref_device + __libusb_free_config_descriptor = libusb1.libusb_free_config_descriptor + __byref = byref + + def __init__(self, context, device_p): + """ + You should not instanciate this class directly. + Call LibUSBContext methods to receive instances of this class. + """ + # Important: device_p refcount must be incremented before being given + # to this constructor. This class will decrement refcount upon + # destruction. + self.__context = context + self.device_p = device_p + # Fetch device descriptor + device_descriptor = libusb1.libusb_device_descriptor() + result = libusb1.libusb_get_device_descriptor(device_p, + byref(device_descriptor)) + if result: + raise libusb1.USBError(result) + self.device_descriptor = device_descriptor + # Fetch all configuration descriptors + self.__configuration_descriptor_list = [] + append = self.__configuration_descriptor_list.append + for configuration_id in xrange(device_descriptor.bNumConfigurations): + config = libusb1.libusb_config_descriptor_p() + result = libusb1.libusb_get_config_descriptor(device_p, + configuration_id, byref(config)) + if result == libusb1.LIBUSB_ERROR_NOT_FOUND: + # Some devices (ex windows' root hubs) tell they have one + # configuration, but they have no configuration descriptor. + continue + if result: + raise libusb1.USBError(result) + append(config.contents) + + def __del__(self): + self.__libusb_unref_device(self.device_p) + if self.__configuration_descriptor_list is not None: + byref = self.__byref + for config in self.__configuration_descriptor_list: + self.__libusb_free_config_descriptor(byref(config)) + + def __str__(self): + return 'Bus %03i Device %03i: ID %04x:%04x' % ( + self.getBusNumber(), + self.getDeviceAddress(), + self.getVendorID(), + self.getProductID(), + ) + + def reprConfigurations(self): + """ + Get a string representation of device's configurations. + Note: opens the device temporarily. + + Deprecated. This is useless. It doesn't even showcases API usage, as + it accesses privape properties. + """ + out = StringIO() + for config in self.__configuration_descriptor_list: + print >> out, 'Configuration %i: %s' % (config.bConfigurationValue, + self._getASCIIStringDescriptor(config.iConfiguration)) + print >> out, ' Max Power: %i mA' % (config.MaxPower * 2, ) + # TODO: bmAttributes dump + for interface_num in xrange(config.bNumInterfaces): + interface = config.interface[interface_num] + print >> out, ' Interface %i' % (interface_num, ) + for alt_setting_num in xrange(interface.num_altsetting): + altsetting = interface.altsetting[alt_setting_num] + print >> out, ' Alt Setting %i: %s' % (alt_setting_num, + self._getASCIIStringDescriptor(altsetting.iInterface)) + print >> out, ' Class: %02x Subclass: %02x' % \ + (altsetting.bInterfaceClass, + altsetting.bInterfaceSubClass) + print >> out, ' Protocol: %02x' % \ + (altsetting.bInterfaceProtocol, ) + for endpoint_num in xrange(altsetting.bNumEndpoints): + endpoint = altsetting.endpoint[endpoint_num] + print >> out, ' Endpoint %i' % (endpoint_num, ) + print >> out, ' Address: %02x' % \ + (endpoint.bEndpointAddress, ) + attribute_list = [] + transfer_type = endpoint.bmAttributes & \ + libusb1.LIBUSB_TRANSFER_TYPE_MASK + attribute_list.append(libusb1.libusb_transfer_type( + transfer_type + )) + if transfer_type == \ + libusb1.LIBUSB_TRANSFER_TYPE_ISOCHRONOUS: + attribute_list.append(libusb1.libusb_iso_sync_type( + (endpoint.bmAttributes & \ + libusb1.LIBUSB_ISO_SYNC_TYPE_MASK) >> 2 + )) + attribute_list.append(libusb1.libusb_iso_usage_type( + (endpoint.bmAttributes & \ + libusb1.LIBUSB_ISO_USAGE_TYPE_MASK) >> 4 + )) + print >> out, ' Attributes: %s' % \ + (', '.join(attribute_list), ) + print >> out, ' Max Packet Size: %i' % \ + (endpoint.wMaxPacketSize, ) + print >> out, ' Interval: %i' % \ + (endpoint.bInterval, ) + print >> out, ' Refresh: %i' % \ + (endpoint.bRefresh, ) + print >> out, ' Sync Address: %02x' % \ + (endpoint.bSynchAddress, ) + return out.getvalue() + + def iterConfiguations(self): + for config in self.__configuration_descriptor_list: + yield USBConfiguration(config) + + def iterSettings(self): + for config in self.__configuration_descriptor_list: + for interface in USBConfiguration(config).iterInterfaces(): + for setting in interface.iterSettings(): + yield setting + + def getBusNumber(self): + """ + Get device's bus number. + """ + return libusb1.libusb_get_bus_number(self.device_p) + + def getDeviceAddress(self): + """ + Get device's address on its bus. + """ + return libusb1.libusb_get_device_address(self.device_p) + + def getbcdUSB(self): + """ + Get the USB spec version device complies to, in BCD format. + """ + return self.device_descriptor.bcdUSB + + def getDeviceClass(self): + """ + Get device's class id. + """ + return self.device_descriptor.bDeviceClass + + def getDeviceSubClass(self): + """ + Get device's subclass id. + """ + return self.device_descriptor.bDeviceSubClass + + def getDeviceProtocol(self): + """ + Get device's protocol id. + """ + return self.device_descriptor.bDeviceProtocol + + def getMaxPacketSize0(self): + """ + Get device's max packet size for endpoint 0 (control). + """ + return self.device_descriptor.bMaxPacketSize0 + + def getMaxISOPacketSize(self, endpoint): + """ + Get the maximum size for a single isochronous packet for given + endpoint. + """ + result = libusb1.libusb_get_max_iso_packet_size(self.device_p, endpoint) + if result < 0: + raise libusb1.USBError(result) + return result + + def getVendorID(self): + """ + Get device's vendor id. + """ + return self.device_descriptor.idVendor + + def getProductID(self): + """ + Get device's product id. + """ + return self.device_descriptor.idProduct + + def getbcdDevice(self): + """ + Get device's release number. + """ + return self.device_descriptor.bcdDevice + + def getSupportedLanguageList(self): + """ + Get the list of language ids device has string descriptors for. + """ + temp_handle = self.open() + return temp_handle.getSupportedLanguageList() + + def _getStringDescriptor(self, descriptor, lang_id): + if descriptor == 0: + result = None + else: + temp_handle = self.open() + result = temp_handle.getStringDescriptor(descriptor, lang_id) + return result + + def _getASCIIStringDescriptor(self, descriptor): + if descriptor == 0: + result = None + else: + temp_handle = self.open() + result = temp_handle.getASCIIStringDescriptor(descriptor) + return result + + def getManufacturer(self): + """ + Get device's manufaturer name. + Note: opens the device temporarily. + """ + return self._getASCIIStringDescriptor( + self.device_descriptor.iManufacturer) + + def getProduct(self): + """ + Get device's product name. + Note: opens the device temporarily. + """ + return self._getASCIIStringDescriptor(self.device_descriptor.iProduct) + + def getSerialNumber(self): + """ + Get device's serial number. + Note: opens the device temporarily. + """ + return self._getASCIIStringDescriptor( + self.device_descriptor.iSerialNumber) + + def getNumConfigurations(self): + """ + Get device's number of possible configurations. + """ + return self.device_descriptor.bNumConfigurations + + def open(self): + """ + Open device. + Returns an USBDeviceHandle instance. + """ + handle = libusb1.libusb_device_handle_p() + result = libusb1.libusb_open(self.device_p, byref(handle)) + if result: + raise libusb1.USBError(result) + return USBDeviceHandle(self.__context, handle, self) + +class LibUSBContext(object): + """ + libusb1 USB context. + + Provides methods to enumerate & look up USB devices. + Also provides access to global (device-independent) libusb1 functions. + """ + __libusb_exit = libusb1.libusb_exit + __context_p = None + __added_cb = None + __removed_cb = None + __libusb_set_pollfd_notifiers = libusb1.libusb_set_pollfd_notifiers + + def __init__(self): + """ + Create a new USB context. + """ + context_p = libusb1.libusb_context_p() + result = libusb1.libusb_init(byref(context_p)) + if result: + raise libusb1.USBError(result) + self.__context_p = context_p + + def __del__(self): + self.exit() + + def exit(self): + """ + Close (destroy) this USB context. + """ + context_p = self.__context_p + if context_p is not None: + self.__libusb_exit(context_p) + self.__context_p = None + self.__added_cb = None + self.__removed_cb = None + + def getDeviceList(self): + """ + Return a list of all USB devices currently plugged in, as USBDevice + instances. + """ + device_p_p = libusb1.libusb_device_p_p() + libusb_device_p = libusb1.libusb_device_p + device_list_len = libusb1.libusb_get_device_list(self.__context_p, + byref(device_p_p)) + # Instanciate our own libusb_device_p object so we can free + # libusb-provided device list. Is this a bug in ctypes that it doesn't + # copy pointer value (=pointed memory address) ? At least, it's not so + # convenient and forces using such weird code. + result = [USBDevice(self, libusb_device_p(x.contents)) + for x in device_p_p[:device_list_len]] + libusb1.libusb_free_device_list(device_p_p, 0) + return result + + def openByVendorIDAndProductID(self, vendor_id, product_id): + """ + Get the first USB device matching given vendor and product ids. + Returns an USBDeviceHandle instance, or None if no present device + match. + """ + for device in self.getDeviceList(): + if device.getVendorID() == vendor_id and \ + device.getProductID() == product_id: + result = device.open() + break + else: + result = None + return result + + def getPollFDList(self): + """ + Return file descriptors to be used to poll USB events. + You should not have to call this method, unless you are integrating + this class with a polling mechanism. + """ + pollfd_p_p = libusb1.libusb_get_pollfds(self.__context_p) + if not pollfd_p_p: + errno = get_errno() + if errno: + raise OSError(errno) + else: + # Assume not implemented + raise NotImplementedError("Your libusb doesn't seem to " + "implement pollable FDs") + try: + result = [] + append = result.append + fd_index = 0 + while pollfd_p_p[fd_index]: + append((pollfd_p_p[fd_index].contents.fd, + pollfd_p_p[fd_index].contents.events)) + fd_index += 1 + finally: + _free(pollfd_p_p) + return result + + def handleEvents(self): + """ + Handle any pending event (blocking). + See libusb1 documentation for details (there is a timeout, so it's + not "really" blocking). + """ + result = libusb1.libusb_handle_events(self.__context_p) + if result: + raise libusb1.USBError(result) + + def handleEventsTimeout(self, tv=0): + """ + Handle any pending event. + If tv is 0, will return immediately after handling already-pending + events. + Othewire, defines the maximum amount of time to wait for events, in + seconds. + """ + if tv is None: + tv = 0 + tv_s = int(tv) + tv = libusb1.timeval(tv_s, int((tv - tv_s) * 1000000)) + result = libusb1.libusb_handle_events_timeout(self.__context_p, + byref(tv)) + if result: + raise libusb1.USBError(result) + + def setPollFDNotifiers(self, added_cb=None, removed_cb=None, + user_data=None): + """ + Give libusb1 methods to call when it should add/remove file descriptor + for polling. + You should not have to call this method, unless you are integrating + this class with a polling mechanism. + """ + if added_cb is None: + added_cb = POINTER(None) + else: + added_cb = libusb1.libusb_pollfd_added_cb_p(added_cb) + if removed_cb is None: + removed_cb = POINTER(None) + else: + removed_cb = libusb1.libusb_pollfd_removed_cb_p(removed_cb) + self.__added_cb = added_cb + self.__removed_cb = removed_cb + self.__libusb_set_pollfd_notifiers(self.__context_p, added_cb, + removed_cb, user_data) + + def getNextTimeout(self): + """ + Returns the next internal timeout that libusb needs to handle, in + seconds, or None if no timeout is needed. + You should not have to call this method, unless you are integrating + this class with a polling mechanism. + """ + timeval = libusb1.timeval() + result = libusb1.libusb_get_next_timeout(self.__context_p, + byref(timeval)) + if result == 0: + result = None + elif result == 1: + result = timeval.tv_sec + (timeval.tv_usec * 0.000001) + else: + raise libusb1.USBError(result) + return result + + def setDebug(self, level): + """ + Set debugging level. + Note: depending on libusb compilation settings, this might have no + effect. + """ + libusb1.libusb_set_debug(self.__context_p, level) + |