You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

397 lines
14 KiB

# License: Apache 2.0. See LICENSE file in root directory.
# Copyright(c) 2021 Intel Corporation. All Rights Reserved.
"""
Brainstem Acroname Hub
See documentation for brainstem here:
https://acroname.com/reference/api/python/index.html
"""
from rspy import log, device_hub
import time
import platform, re
if __name__ == '__main__':
import os, sys, getopt
def usage():
ourname = os.path.basename( sys.argv[0] )
print( 'Syntax: acroname [options]' )
print( ' Control the acroname USB hub' )
print( 'Options:' )
print( ' --enable Enable all ports' )
print( ' --disable Disable all ports' )
print( ' --recycle Recycle all ports' )
print( ' --reset Reset the acroname' )
sys.exit(2)
try:
opts,args = getopt.getopt( sys.argv[1:], '',
longopts = [ 'help', 'recycle', 'enable', 'disable', 'reset' ])
except getopt.GetoptError as err:
print( '-F-', err ) # something like "option -a not recognized"
usage()
if args or not opts:
usage()
# See the end of the file for all the option handling
try:
import brainstem
except ModuleNotFoundError:
log.d( 'no acroname library is available' )
raise
class NoneFoundError( RuntimeError ):
def __init__( self, message = None ):
super().__init__( self, message or 'no Acroname module found' )
class Acroname(device_hub.device_hub):
def __init__(self):
super().__init__()
if discover() is None: # raise an error if there is no hub connected
raise NoneFoundError()
self.hub = None
self.all_hubs = None
def connect(self, reset = False, req_spec = None ):
"""
Connect to the hub. Raises RuntimeError on failure
:param reset: When true, the acroname will be reset as part of the connection process
:param req_spec: Required spec to connect to.
"""
if not self.hub:
self.hub = brainstem.stem.USBHub3p()
if req_spec:
specs = [req_spec]
else:
specs = discover()
spec = specs[0]
result = self.hub.connectFromSpec( spec )
if result != brainstem.result.Result.NO_ERROR:
raise RuntimeError( "failed to connect to Acroname (result={})".format( result ))
elif len(specs) > 1:
log.d( 'connected to', spec )
if reset:
log.d("resetting Acroname...")
result = self.hub.system.reset()
# According to brainstem support:
# Result error is expected, so we do not check it
# * there is also a brainstem internal console print on Linux "error release -4"
# Disconnection is needed after a reset command
self.hub.disconnect()
result = None
for i in range(10):
result = self.hub.connectFromSpec(spec)
if result != brainstem.result.Result.NO_ERROR:
time.sleep(1)
else:
log.d('reconnected')
return
raise RuntimeError("failed to reconnect to Acroname (result={})".format(result))
def is_connected(self):
return self.hub is not None and self.hub.isConnected()
def disconnect(self):
if self.hub:
self.hub.disconnect()
del self.hub
self.hub = None
def all_ports(self):
"""
:return: a list of all possible ports, even if currently unoccupied or disabled
"""
return range(8)
def ports(self):
"""
:return: a list of all ports currently occupied (and enabled)
"""
occupied_ports = []
for port in self.all_ports():
if self._port_power(port) > 0.0:
occupied_ports.append( port )
return occupied_ports
def is_port_enabled(self, port ):
"""
query if the input port number is enabled through Acroname
It doesn't mean the device is enumerated as sometimes if the FW crashed we can have a situation where the port is "Disconnected" and not "OK"
:param port: port number;
:return: True if Acroname enabled this port, False otherwise
"""
return self.port_state( port ) != "Disabled"
def port_state(self, port ):
if port < 0 or port > 7:
raise ValueError( "port number must be [0-7]" )
#
status = self.hub.usb.getPortState( port )
#log.d("getPortState for port", port ,"return", port_state_bitmask_to_string( status.value ))
return self.port_state_bitmask_to_string( status.value )
def port_state_bitmask_to_string(self, bitmask ):
"""
https://acroname.com/reference/devices/usbhub3p/entities/usb.html#usb-port-state
Bit | Port State: Result Bitwise Description
-----------------------------------------------
0 | USB Vbus Enabled - Port is Disabled/Enabled by the Acroname
1 | USB2 Data Enabled
2 | Reserved
3 | USB3 Data Enabled
4:10 | Reserved
11 | USB2 Device Attached
12 | USB3 Device Attached
13:18 | Reserved
19 | USB Error Flag
20 | USB2 Boost Enabled
21:22 | Reserved
23 | Device Attached
24:31 | Reserved
"""
if bitmask == 0: # Port is disabled by Acroname
return "Disabled"
if bitmask == 11: # Port is enabled but no device was detected
return "Disconnected"
if bitmask > 100: # Normally we hope it will cover "Device Attached" use cases (Note, can also be turn on when 'USB Error Flag' is on...but we havn't seen that )
return "OK"
return "Unknown Error ({})".format( bitmask )
def enable_ports(self, ports = None, disable_other_ports = False, sleep_on_change = 0 ):
"""
Set enable state to provided ports
:param ports: List of port numbers; if not provided, enable all ports
:param disable_other_ports: if True, the ports not in the list will be disabled
:param sleep_on_change: Number of seconds to sleep if any change is made
:return: True if no errors found, False otherwise
"""
result = True
changed = False
for port in self.all_ports():
#
if ports is None or port in ports:
if not self.is_port_enabled( port ):
#log.d( "enabling port", port)
action_result = self.hub.usb.setPortEnable( port )
if action_result != brainstem.result.Result.NO_ERROR:
result = False
log.e("Failed to enable port", port)
else:
changed = True
#
elif disable_other_ports:
if self.is_port_enabled( port ):
#log.d("disabling port", port)
action_result = self.hub.usb.setPortDisable( port )
if action_result != brainstem.result.Result.NO_ERROR:
result = False
else:
changed = True
#
if changed and sleep_on_change:
import time
time.sleep( sleep_on_change )
#
return result
def disable_ports(self, ports = None, sleep_on_change = 0 ):
"""
:param ports: List of port numbers; if not provided, disable all ports
:param sleep_on_change: Number of seconds to sleep if any change is made
:return: True if no errors found, False otherwise
"""
result = True
changed = False
for port in self.all_ports():
if ports is None or port in ports:
if self.is_port_enabled( port ):
#log.d("disabling port", port)
action_result = self.hub.usb.setPortDisable( port )
if action_result != brainstem.result.Result.NO_ERROR:
result = False
log.e("Failed to disable port", port)
else:
changed = True
if changed and sleep_on_change:
import time
time.sleep( sleep_on_change )
#
return result
def set_ports_usb2(self, portlist = None, timeout = 100e-3 ):
"""
Set USB ports to USB2
"""
if portlist is None:
portlist = self.ports()
#
self.recycle_ports( portlist, timeout = timeout )
#
for port in portlist:
self.hub.usb.setSuperSpeedDataEnable( port )
self.hub.usb.setHiSpeedDataEnable( port )
self.hub.usb.setSuperSpeedDataDisable( port )
def set_ports_usb3(self, portlist = None, timeout = 100e-3 ):
"""
Set USB ports to support USB3
"""
if portlist is None:
portlist = self.ports()
#
self.recycle_ports( portlist, timeout = timeout )
#
for port in portlist:
self.hub.usb.setSuperSpeedDataEnable( port )
self.hub.usb.setHiSpeedDataEnable( port )
self.hub.usb.setHiSpeedDataDisable( port )
def _port_power(self, port ):
"""
"""
if port < 0 or port > 7:
raise ValueError("port number can be only within 0 and 7 (inclusive)")
#
micro_volt = self.hub.usb.getPortVoltage( port )
micro_curr = self.hub.usb.getPortCurrent( port )
volt = float(micro_volt.value) / 10.0 ** 6
amps = float(micro_curr.value) / 10.0 ** 6
#
return volt * amps
if 'windows' in platform.system().lower():
def get_port_by_location(self, usb_location):
"""
"""
if usb_location:
#
# T265 locations look differently...
match = re.fullmatch(r'Port_#(\d+)\.Hub_#(\d+)', usb_location, re.IGNORECASE)
if match:
# We don't know how to get the port from these yet!
return None # int(match.group(2))
else:
split_location = [int(x) for x in usb_location.split('.')]
# lambda helper to return the last 2 non-zero numbers, used when connecting using an additional hub
# ex: laptop -> hub -> acroname
get_last_two_digits = lambda array: tuple(
reversed(list(reversed([i for i in array if i != 0]))[:2]))
# only the last two digits are necessary
first_index, second_index = get_last_two_digits(split_location)
return get_port_from_usb(first_index, second_index)
else:
def get_port_by_location(self, usb_location):
"""
"""
if not self.all_hubs:
self.all_hubs = set(device_hub.find_all_hubs('24ff')) # 24ff is Acroname VendorID
if usb_location:
#
# Devices connected through an acroname will be in one of two sub-hubs under the acroname main
# hub. Each is a 4-port hub with a different port (4 for ports 0-3, 3 for ports 4-7):
# /: Bus 02.Port 1: Dev 1, Class=root_hub, Driver=xhci_hcd/6p, 10000M
# |__ Port 2: Dev 2, If 0, Class=Hub, Driver=hub/4p, 5000M <--- ACRONAME
# |__ Port 3: Dev 3, If 0, Class=Hub, Driver=hub/4p, 5000M
# |__ Port X: Dev, If...
# |__ Port Y: ...
# |__ Port 4: Dev 4, If 0, Class=Hub, Driver=hub/4p, 5000M
# |__ Port Z: ...
# (above is output from 'lsusb -t')
# For the above acroname at '2-2' (bus 2, port 2), there are at least 3 devices:
# 2-2.3.X
# 2-2.3.Y
# 2-2.4.Z
# Given the two sub-ports (3.X, 3.Y, 4.Z), we can get the port number.
# NOTE: some of our devices are hubs themselves! For example, the SR300 will show as '2-2.3.2.1' --
# we must start a known hub or else the ports we look at are meaningless...
#
for port in self.all_hubs:
if usb_location.startswith(port + '.'):
match = re.search(r'^(\d+)\.(\d+)', usb_location[len(port) + 1:])
if match:
return get_port_from_usb(int(match.group(1)), int(match.group(2)))
specs = None
def discover(retries = 0):
"""
Return all Acroname module specs in a list. Raise NoneFoundError if one is not found!
"""
global specs
# see https://acroname.com/reference/_modules/brainstem/module.html#Module.discoverAndConnect
if specs is None:
log.d('discovering Acroname modules ...')
try:
log.debug_indent()
for i in range(retries + 1):
specs = brainstem.discover.findAllModules(brainstem.link.Spec.USB)
if not specs:
time.sleep(1)
else:
for spec in specs:
log.d( '...', spec )
break
if not specs:
raise NoneFoundError()
finally:
log.debug_unindent()
return specs
def get_port_from_usb(first_usb_index, second_usb_index ):
"""
Based on last two USB location index, provide the port number
"""
acroname_port_usb_map = {(4, 4): 0,
(4, 3): 1,
(4, 2): 2,
(4, 1): 3,
(3, 4): 4,
(3, 3): 5,
(3, 2): 6,
(3, 1): 7,
}
return acroname_port_usb_map[(first_usb_index, second_usb_index)]
if __name__ == '__main__':
acroname = Acroname()
for opt,arg in opts:
if opt in ('--enable'):
acroname.connect()
acroname.enable_ports() # so ports() will return all
elif opt in ('--disable'):
acroname.connect()
acroname.disable_ports()
elif opt in ('--recycle'):
acroname.connect()
acroname.enable_ports() # so ports() will return all
acroname.recycle_ports()
elif opt in ('--reset'):
acroname.connect( reset = True )