You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

280 lines
9.7 KiB

# License: Apache 2.0. See LICENSE file in root directory.
# Copyright(c) 2021 Intel Corporation. All Rights Reserved.
#test:device D400* !D457
import pyrealsense2 as rs, os, time, tempfile, platform, sys
from rspy import devices, log, test
cp = dp = None
color_format = depth_format = None
color_fps = depth_fps = None
color_width = depth_width = None
color_height = depth_height = None
previous_depth_frame_number = -1
previous_color_frame_number = -1
got_frames_rgb = False
got_frames_depth = False
dev = test.find_first_device_or_exit()
depth_sensor = dev.first_depth_sensor()
color_sensor = dev.first_color_sensor()
# The test also checks frame drops, therefore D400-specific relaxation must apply
# The follow code is borrowed fro test-drops-on-set.py and later can be merged/refactored
product_line = dev.get_info(rs.camera_info.product_line)
is_d400 = False
if product_line == "D400":
is_d400 = True # Allow for frame counter reset while streaming
# Our KPI is to prevent sequential frame drops, therefore single frame drop is allowed.
allowed_drops = 1
# finding the wanted profile settings. We want to use default settings except for color fps where we want
# the lowest value available
for p in color_sensor.profiles:
if p.is_default() and p.stream_type() == rs.stream.color:
color_format = p.format()
color_fps = p.fps()
color_width = p.as_video_stream_profile().width()
color_height = p.as_video_stream_profile().height()
break
for p in color_sensor.profiles:
if p.stream_type() == rs.stream.color and p.format() == color_format and \
p.fps() < color_fps and\
p.as_video_stream_profile().width() == color_width and \
p.as_video_stream_profile().height() == color_height:
color_fps = p.fps()
for p in depth_sensor.profiles:
if p.is_default() and p.stream_type() == rs.stream.depth:
depth_format = p.format()
depth_fps = p.fps()
depth_width = p.as_video_stream_profile().width()
depth_height = p.as_video_stream_profile().height()
break
def color_frame_call_back( frame ):
global previous_color_frame_number
global is_d400
global allowed_drops
global got_frames_rgb
got_frames_rgb = True
test.check_frame_drops( frame, previous_color_frame_number, allowed_drops, is_d400 )
previous_color_frame_number = frame.get_frame_number()
def depth_frame_call_back( frame ):
global previous_depth_frame_number
global is_d400
global allowed_drops
global got_frames_depth
got_frames_depth = True
test.check_frame_drops( frame, previous_depth_frame_number, allowed_drops, is_d400 )
previous_depth_frame_number = frame.get_frame_number()
def restart_profiles():
"""
You can't use the same profile twice, but we need the same profile several times. So this function resets the
profiles with the given parameters to allow quick profile creation
"""
global cp, dp, color_sensor, depth_sensor
global color_format, color_fps, color_width, color_height
global depth_format, depth_fps, depth_width, depth_height
cp = next( p for p in color_sensor.profiles if p.fps() == color_fps
and p.stream_type() == rs.stream.color
and p.format() == color_format
and p.as_video_stream_profile().width() == color_width
and p.as_video_stream_profile().height() == color_height )
dp = next( p for p in depth_sensor.profiles if p.fps() == depth_fps
and p.stream_type() == rs.stream.depth
and p.format() == p.format() == depth_format
and p.as_video_stream_profile().width() == depth_width
and p.as_video_stream_profile().height() == depth_height )
def stop_pipeline( pipeline ):
if pipeline:
try:
pipeline.stop()
except RuntimeError as rte:
# if the error Occurred because the pipeline wasn't started we ignore it
if str( rte ) != "stop() cannot be called before start()":
test.unexpected_exception()
except Exception:
test.unexpected_exception()
def stop_sensor( sensor ):
if sensor:
# if the sensor is already closed get_active_streams returns an empty list
if sensor.get_active_streams():
try:
sensor.stop()
except RuntimeError as rte:
if str( rte ) != "stop_streaming() failed. UVC device is not streaming!":
test.unexpected_exception()
except Exception:
test.unexpected_exception()
sensor.close()
# create temporary folder to record to that will be deleted automatically at the end of the script
# (requires that no files are being held open inside this directory. Important to not keep any handle open to a file
# in this directory, any handle as such must be set to None)
temp_dir = tempfile.TemporaryDirectory( prefix='recordings_' )
file_name = temp_dir.name + os.sep + 'rec.bag'
################################################################################################
test.start("Trying to record and playback using pipeline interface")
cfg = pipeline = None
try:
# creating a pipeline and recording to a file
pipeline = rs.pipeline()
cfg = rs.config()
cfg.enable_record_to_file( file_name )
pipeline.start( cfg )
time.sleep(3)
pipeline.stop()
# we create a new pipeline and use it to playback from the file we just recoded to
pipeline = rs.pipeline()
cfg = rs.config()
cfg.enable_device_from_file(file_name)
pipeline.start(cfg)
# if the record-playback worked we will get frames, otherwise the next line will timeout and throw
pipeline.wait_for_frames()
except Exception:
test.unexpected_exception()
finally: # we must remove all references to the file so we can use it again in the next test
cfg = None
stop_pipeline( pipeline )
test.finish()
################################################################################################
test.start("Trying to record and playback using sensor interface")
recorder = depth_sensor = color_sensor = playback = None
try:
dev = test.find_first_device_or_exit()
recorder = rs.recorder( file_name, dev )
depth_sensor = dev.first_depth_sensor()
color_sensor = dev.first_color_sensor()
restart_profiles()
depth_sensor.open( dp )
depth_sensor.start( lambda f: None )
color_sensor.open( cp )
color_sensor.start( lambda f: None )
time.sleep(3)
recorder.pause()
recorder = None
color_sensor.stop()
color_sensor.close()
depth_sensor.stop()
depth_sensor.close()
color_filters = [f.get_info(rs.camera_info.name) for f in color_sensor.get_recommended_filters()]
depth_filters = [f.get_info(rs.camera_info.name) for f in depth_sensor.get_recommended_filters()]
test.check( len(color_filters) > 0 )
test.check( len(depth_filters) > 0 )
ctx = rs.context()
playback = ctx.load_device( file_name )
depth_sensor = playback.first_depth_sensor()
color_sensor = playback.first_color_sensor()
playback_color_filters = [f.get_info(rs.camera_info.name) for f in color_sensor.get_recommended_filters()]
playback_depth_filters = [f.get_info(rs.camera_info.name) for f in depth_sensor.get_recommended_filters()]
test.check_equal_lists( playback_color_filters, color_filters )
test.check_equal_lists( playback_depth_filters, depth_filters )
restart_profiles()
depth_sensor.open( dp )
depth_sensor.start( depth_frame_call_back )
color_sensor.open( cp )
color_sensor.start( color_frame_call_back )
time.sleep(3)
# if record and playback worked we will receive frames, the callback functions will be called and got-frames
# will be True. If the record and playback failed it will be false
test.check( got_frames_depth )
test.check( got_frames_rgb )
except Exception:
test.unexpected_exception()
finally: # we must remove all references to the file so we can use it again in the next test
stop_sensor( depth_sensor )
depth_sensor = None
stop_sensor( color_sensor )
color_sensor = None
if recorder:
recorder = None
if playback:
playback = None
test.finish()
#####################################################################################################
test.start("Trying to record and playback using sensor interface with syncer")
try:
sync = rs.syncer()
dev = test.find_first_device_or_exit()
recorder = rs.recorder( file_name, dev )
depth_sensor = dev.first_depth_sensor()
color_sensor = dev.first_color_sensor()
restart_profiles()
depth_sensor.open( dp )
depth_sensor.start( sync )
color_sensor.open( cp )
color_sensor.start( sync )
time.sleep(3)
recorder.pause()
recorder = None
color_sensor.stop()
color_sensor.close()
depth_sensor.stop()
depth_sensor.close()
ctx = rs.context()
playback = ctx.load_device( file_name )
depth_sensor = playback.first_depth_sensor()
color_sensor = playback.first_color_sensor()
restart_profiles()
depth_sensor.open( dp )
depth_sensor.start( sync )
color_sensor.open( cp )
color_sensor.start( sync )
# if the record-playback worked we will get frames, otherwise the next line will timeout and throw
sync.wait_for_frames()
except Exception:
test.unexpected_exception()
finally: # we must remove all references to the file so the temporary folder can be deleted
stop_sensor( depth_sensor )
depth_sensor = None
stop_sensor( color_sensor )
color_sensor = None
if recorder:
recorder = None
if playback:
playback = None
test.finish()
#############################################################################################
test.print_results_and_exit()