You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
280 lines
9.7 KiB
280 lines
9.7 KiB
2 months ago
|
# License: Apache 2.0. See LICENSE file in root directory.
|
||
|
# Copyright(c) 2021 Intel Corporation. All Rights Reserved.
|
||
|
|
||
|
#test:device D400* !D457
|
||
|
|
||
|
import pyrealsense2 as rs, os, time, tempfile, platform, sys
|
||
|
from rspy import devices, log, test
|
||
|
|
||
|
cp = dp = None
|
||
|
color_format = depth_format = None
|
||
|
color_fps = depth_fps = None
|
||
|
color_width = depth_width = None
|
||
|
color_height = depth_height = None
|
||
|
previous_depth_frame_number = -1
|
||
|
previous_color_frame_number = -1
|
||
|
got_frames_rgb = False
|
||
|
got_frames_depth = False
|
||
|
|
||
|
dev = test.find_first_device_or_exit()
|
||
|
depth_sensor = dev.first_depth_sensor()
|
||
|
color_sensor = dev.first_color_sensor()
|
||
|
|
||
|
# The test also checks frame drops, therefore D400-specific relaxation must apply
|
||
|
# The follow code is borrowed fro test-drops-on-set.py and later can be merged/refactored
|
||
|
product_line = dev.get_info(rs.camera_info.product_line)
|
||
|
is_d400 = False
|
||
|
if product_line == "D400":
|
||
|
is_d400 = True # Allow for frame counter reset while streaming
|
||
|
|
||
|
# Our KPI is to prevent sequential frame drops, therefore single frame drop is allowed.
|
||
|
allowed_drops = 1
|
||
|
|
||
|
# finding the wanted profile settings. We want to use default settings except for color fps where we want
|
||
|
# the lowest value available
|
||
|
for p in color_sensor.profiles:
|
||
|
if p.is_default() and p.stream_type() == rs.stream.color:
|
||
|
color_format = p.format()
|
||
|
color_fps = p.fps()
|
||
|
color_width = p.as_video_stream_profile().width()
|
||
|
color_height = p.as_video_stream_profile().height()
|
||
|
break
|
||
|
for p in color_sensor.profiles:
|
||
|
if p.stream_type() == rs.stream.color and p.format() == color_format and \
|
||
|
p.fps() < color_fps and\
|
||
|
p.as_video_stream_profile().width() == color_width and \
|
||
|
p.as_video_stream_profile().height() == color_height:
|
||
|
color_fps = p.fps()
|
||
|
for p in depth_sensor.profiles:
|
||
|
if p.is_default() and p.stream_type() == rs.stream.depth:
|
||
|
depth_format = p.format()
|
||
|
depth_fps = p.fps()
|
||
|
depth_width = p.as_video_stream_profile().width()
|
||
|
depth_height = p.as_video_stream_profile().height()
|
||
|
break
|
||
|
|
||
|
def color_frame_call_back( frame ):
|
||
|
global previous_color_frame_number
|
||
|
global is_d400
|
||
|
global allowed_drops
|
||
|
global got_frames_rgb
|
||
|
got_frames_rgb = True
|
||
|
test.check_frame_drops( frame, previous_color_frame_number, allowed_drops, is_d400 )
|
||
|
previous_color_frame_number = frame.get_frame_number()
|
||
|
|
||
|
def depth_frame_call_back( frame ):
|
||
|
global previous_depth_frame_number
|
||
|
global is_d400
|
||
|
global allowed_drops
|
||
|
global got_frames_depth
|
||
|
|
||
|
got_frames_depth = True
|
||
|
test.check_frame_drops( frame, previous_depth_frame_number, allowed_drops, is_d400 )
|
||
|
previous_depth_frame_number = frame.get_frame_number()
|
||
|
|
||
|
def restart_profiles():
|
||
|
"""
|
||
|
You can't use the same profile twice, but we need the same profile several times. So this function resets the
|
||
|
profiles with the given parameters to allow quick profile creation
|
||
|
"""
|
||
|
global cp, dp, color_sensor, depth_sensor
|
||
|
global color_format, color_fps, color_width, color_height
|
||
|
global depth_format, depth_fps, depth_width, depth_height
|
||
|
cp = next( p for p in color_sensor.profiles if p.fps() == color_fps
|
||
|
and p.stream_type() == rs.stream.color
|
||
|
and p.format() == color_format
|
||
|
and p.as_video_stream_profile().width() == color_width
|
||
|
and p.as_video_stream_profile().height() == color_height )
|
||
|
|
||
|
dp = next( p for p in depth_sensor.profiles if p.fps() == depth_fps
|
||
|
and p.stream_type() == rs.stream.depth
|
||
|
and p.format() == p.format() == depth_format
|
||
|
and p.as_video_stream_profile().width() == depth_width
|
||
|
and p.as_video_stream_profile().height() == depth_height )
|
||
|
|
||
|
def stop_pipeline( pipeline ):
|
||
|
if pipeline:
|
||
|
try:
|
||
|
pipeline.stop()
|
||
|
except RuntimeError as rte:
|
||
|
# if the error Occurred because the pipeline wasn't started we ignore it
|
||
|
if str( rte ) != "stop() cannot be called before start()":
|
||
|
test.unexpected_exception()
|
||
|
except Exception:
|
||
|
test.unexpected_exception()
|
||
|
|
||
|
def stop_sensor( sensor ):
|
||
|
if sensor:
|
||
|
# if the sensor is already closed get_active_streams returns an empty list
|
||
|
if sensor.get_active_streams():
|
||
|
try:
|
||
|
sensor.stop()
|
||
|
except RuntimeError as rte:
|
||
|
if str( rte ) != "stop_streaming() failed. UVC device is not streaming!":
|
||
|
test.unexpected_exception()
|
||
|
except Exception:
|
||
|
test.unexpected_exception()
|
||
|
sensor.close()
|
||
|
|
||
|
# create temporary folder to record to that will be deleted automatically at the end of the script
|
||
|
# (requires that no files are being held open inside this directory. Important to not keep any handle open to a file
|
||
|
# in this directory, any handle as such must be set to None)
|
||
|
temp_dir = tempfile.TemporaryDirectory( prefix='recordings_' )
|
||
|
file_name = temp_dir.name + os.sep + 'rec.bag'
|
||
|
|
||
|
################################################################################################
|
||
|
test.start("Trying to record and playback using pipeline interface")
|
||
|
|
||
|
cfg = pipeline = None
|
||
|
try:
|
||
|
# creating a pipeline and recording to a file
|
||
|
pipeline = rs.pipeline()
|
||
|
cfg = rs.config()
|
||
|
cfg.enable_record_to_file( file_name )
|
||
|
pipeline.start( cfg )
|
||
|
time.sleep(3)
|
||
|
pipeline.stop()
|
||
|
# we create a new pipeline and use it to playback from the file we just recoded to
|
||
|
pipeline = rs.pipeline()
|
||
|
cfg = rs.config()
|
||
|
cfg.enable_device_from_file(file_name)
|
||
|
pipeline.start(cfg)
|
||
|
# if the record-playback worked we will get frames, otherwise the next line will timeout and throw
|
||
|
pipeline.wait_for_frames()
|
||
|
except Exception:
|
||
|
test.unexpected_exception()
|
||
|
finally: # we must remove all references to the file so we can use it again in the next test
|
||
|
cfg = None
|
||
|
stop_pipeline( pipeline )
|
||
|
|
||
|
test.finish()
|
||
|
|
||
|
################################################################################################
|
||
|
test.start("Trying to record and playback using sensor interface")
|
||
|
|
||
|
recorder = depth_sensor = color_sensor = playback = None
|
||
|
try:
|
||
|
dev = test.find_first_device_or_exit()
|
||
|
recorder = rs.recorder( file_name, dev )
|
||
|
depth_sensor = dev.first_depth_sensor()
|
||
|
color_sensor = dev.first_color_sensor()
|
||
|
|
||
|
restart_profiles()
|
||
|
|
||
|
depth_sensor.open( dp )
|
||
|
depth_sensor.start( lambda f: None )
|
||
|
color_sensor.open( cp )
|
||
|
color_sensor.start( lambda f: None )
|
||
|
|
||
|
time.sleep(3)
|
||
|
|
||
|
recorder.pause()
|
||
|
recorder = None
|
||
|
color_sensor.stop()
|
||
|
color_sensor.close()
|
||
|
depth_sensor.stop()
|
||
|
depth_sensor.close()
|
||
|
|
||
|
color_filters = [f.get_info(rs.camera_info.name) for f in color_sensor.get_recommended_filters()]
|
||
|
depth_filters = [f.get_info(rs.camera_info.name) for f in depth_sensor.get_recommended_filters()]
|
||
|
|
||
|
test.check( len(color_filters) > 0 )
|
||
|
test.check( len(depth_filters) > 0 )
|
||
|
|
||
|
ctx = rs.context()
|
||
|
playback = ctx.load_device( file_name )
|
||
|
|
||
|
depth_sensor = playback.first_depth_sensor()
|
||
|
color_sensor = playback.first_color_sensor()
|
||
|
|
||
|
playback_color_filters = [f.get_info(rs.camera_info.name) for f in color_sensor.get_recommended_filters()]
|
||
|
playback_depth_filters = [f.get_info(rs.camera_info.name) for f in depth_sensor.get_recommended_filters()]
|
||
|
|
||
|
test.check_equal_lists( playback_color_filters, color_filters )
|
||
|
test.check_equal_lists( playback_depth_filters, depth_filters )
|
||
|
|
||
|
restart_profiles()
|
||
|
|
||
|
depth_sensor.open( dp )
|
||
|
depth_sensor.start( depth_frame_call_back )
|
||
|
color_sensor.open( cp )
|
||
|
color_sensor.start( color_frame_call_back )
|
||
|
|
||
|
time.sleep(3)
|
||
|
|
||
|
# if record and playback worked we will receive frames, the callback functions will be called and got-frames
|
||
|
# will be True. If the record and playback failed it will be false
|
||
|
test.check( got_frames_depth )
|
||
|
test.check( got_frames_rgb )
|
||
|
except Exception:
|
||
|
test.unexpected_exception()
|
||
|
finally: # we must remove all references to the file so we can use it again in the next test
|
||
|
stop_sensor( depth_sensor )
|
||
|
depth_sensor = None
|
||
|
stop_sensor( color_sensor )
|
||
|
color_sensor = None
|
||
|
if recorder:
|
||
|
recorder = None
|
||
|
if playback:
|
||
|
playback = None
|
||
|
|
||
|
test.finish()
|
||
|
|
||
|
#####################################################################################################
|
||
|
test.start("Trying to record and playback using sensor interface with syncer")
|
||
|
|
||
|
try:
|
||
|
sync = rs.syncer()
|
||
|
dev = test.find_first_device_or_exit()
|
||
|
recorder = rs.recorder( file_name, dev )
|
||
|
depth_sensor = dev.first_depth_sensor()
|
||
|
color_sensor = dev.first_color_sensor()
|
||
|
|
||
|
restart_profiles()
|
||
|
|
||
|
depth_sensor.open( dp )
|
||
|
depth_sensor.start( sync )
|
||
|
color_sensor.open( cp )
|
||
|
color_sensor.start( sync )
|
||
|
|
||
|
time.sleep(3)
|
||
|
|
||
|
recorder.pause()
|
||
|
recorder = None
|
||
|
color_sensor.stop()
|
||
|
color_sensor.close()
|
||
|
depth_sensor.stop()
|
||
|
depth_sensor.close()
|
||
|
|
||
|
ctx = rs.context()
|
||
|
playback = ctx.load_device( file_name )
|
||
|
|
||
|
depth_sensor = playback.first_depth_sensor()
|
||
|
color_sensor = playback.first_color_sensor()
|
||
|
|
||
|
restart_profiles()
|
||
|
|
||
|
depth_sensor.open( dp )
|
||
|
depth_sensor.start( sync )
|
||
|
color_sensor.open( cp )
|
||
|
color_sensor.start( sync )
|
||
|
|
||
|
# if the record-playback worked we will get frames, otherwise the next line will timeout and throw
|
||
|
sync.wait_for_frames()
|
||
|
except Exception:
|
||
|
test.unexpected_exception()
|
||
|
finally: # we must remove all references to the file so the temporary folder can be deleted
|
||
|
stop_sensor( depth_sensor )
|
||
|
depth_sensor = None
|
||
|
stop_sensor( color_sensor )
|
||
|
color_sensor = None
|
||
|
if recorder:
|
||
|
recorder = None
|
||
|
if playback:
|
||
|
playback = None
|
||
|
|
||
|
test.finish()
|
||
|
|
||
|
#############################################################################################
|
||
|
test.print_results_and_exit()
|