You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
126 lines
4.9 KiB
126 lines
4.9 KiB
# License: Apache 2.0. See LICENSE file in root directory.
|
|
# Copyright(c) 2023 Intel Corporation. All Rights Reserved.
|
|
|
|
#temporary fix to prevent the test from running on Win_SH_Py_DDS_CI
|
|
#test:donotrun:dds
|
|
|
|
import pyrealsense2 as rs
|
|
from rspy import test, repo
|
|
import os.path
|
|
import time
|
|
################################################################################################
|
|
|
|
|
|
def playback_callback(status):
|
|
global playback_status
|
|
playback_status = status
|
|
|
|
|
|
def validate_ppf_results(result_frame_data, reference_frame_data):
|
|
result_bytearray_frame_data, result_profile = result_frame_data
|
|
reference_bytearray_frame_data, reference_profile = reference_frame_data
|
|
|
|
test.check_equal(result_profile.width(), reference_profile.width())
|
|
test.check_equal(result_profile.height(), reference_profile.height())
|
|
test.check_equal_lists(result_bytearray_frame_data,reference_bytearray_frame_data)
|
|
|
|
|
|
def process_frame(frame, frame_source):
|
|
global pre_processed_frames_map
|
|
sensor_name = rs.sensor.from_frame(frame).get_info(rs.camera_info.name)
|
|
if sensor_name in pre_processed_frames_map:
|
|
pre_processed_frames_map[sensor_name].append(frame)
|
|
else:
|
|
pre_processed_frames_map[sensor_name] = [frame]
|
|
|
|
if len(pre_processed_frames_map[sensor_name]) == 2:
|
|
frameset = frame_source.allocate_composite_frame(pre_processed_frames_map[sensor_name])
|
|
fs_processed = process_frame_callback(frameset.as_frameset())
|
|
fs_processed_data = bytearray(fs_processed.get_data())
|
|
fs_processed_profile = fs_processed.get_profile().as_video_stream_profile()
|
|
frames_data_map[sensor_name] = (fs_processed_data, fs_processed_profile)
|
|
|
|
|
|
def load_ref_frames_to_map(frame):
|
|
if len(frames_data_map) < len(sensors):
|
|
sensor_name = rs.sensor.from_frame(frame).get_info(rs.camera_info.name)
|
|
frames_data_map[sensor_name] = (bytearray(frame.get_data()), frame.get_profile().as_video_stream_profile())
|
|
|
|
|
|
def get_frames(callback):
|
|
for s in sensors:
|
|
s.open(s.get_stream_profiles())
|
|
|
|
for s in sensors:
|
|
s.start(callback)
|
|
|
|
while playback_status != rs.playback_status.stopped:
|
|
time.sleep(0.25)
|
|
|
|
for s in sensors:
|
|
s.stop()
|
|
|
|
for s in sensors:
|
|
s.close()
|
|
|
|
|
|
def playback_file(file, callback):
|
|
global playback_status, sensors
|
|
playback_status = None
|
|
filename = os.path.join(repo.build, 'unit-tests', 'recordings', file)
|
|
ctx = rs.context()
|
|
|
|
dev = ctx.load_device(filename)
|
|
dev.set_real_time(False)
|
|
dev.set_status_changed_callback(playback_callback)
|
|
|
|
sensors = dev.query_sensors()
|
|
|
|
get_frames(callback)
|
|
|
|
|
|
def compare_processed_frames_vs_recorded_frames(file):
|
|
# we need processing_block in order to have frame_source.
|
|
# frame_source is used to composite frames (by calling allocate_composite_frames function).
|
|
global frames_data_map, pre_processed_frames_map, sensors
|
|
frame_processor = rs.processing_block(process_frame)
|
|
frames_data_map = {}
|
|
pre_processed_frames_map = {}
|
|
sensors = []
|
|
playback_file('all_combinations_depth_color.bag', lambda frame: (frame_processor.invoke(frame)))
|
|
processed_frames_data_list = []
|
|
for sf in frames_data_map:
|
|
processed_frames_data_list.append(frames_data_map[sf])
|
|
|
|
frames_data_map = {}
|
|
playback_file(file, lambda frame: load_ref_frames_to_map(frame))
|
|
ref_frame_data_list = []
|
|
for sf in frames_data_map:
|
|
ref_frame_data_list.append(frames_data_map[sf])
|
|
|
|
test.check_equal(len(processed_frames_data_list),len(ref_frame_data_list))
|
|
|
|
for i in range(len(processed_frames_data_list)):
|
|
validate_ppf_results(processed_frames_data_list[i], ref_frame_data_list[i])
|
|
|
|
################################################################################################
|
|
with test.closure("Test align depth to color from recording"):
|
|
align = rs.align(rs.stream.color)
|
|
process_frame_callback = lambda fs: align.process(fs).first_or_default(rs.stream.depth)
|
|
|
|
compare_processed_frames_vs_recorded_frames("[aligned_2c]_all_combinations_depth_color.bag")
|
|
################################################################################################
|
|
with test.closure("Test align color to depth from recording"):
|
|
align = rs.align(rs.stream.depth)
|
|
process_frame_callback = lambda fs: align.process(fs).first_or_default(rs.stream.color)
|
|
|
|
compare_processed_frames_vs_recorded_frames("[aligned_2d]_all_combinations_depth_color.bag")
|
|
################################################################################################
|
|
with test.closure("Test point cloud from recording"):
|
|
pc = rs.pointcloud()
|
|
process_frame_callback = lambda fs: pc.calculate(fs.get_depth_frame())
|
|
|
|
compare_processed_frames_vs_recorded_frames("[pointcloud]_all_combinations_depth_color.bag")
|
|
################################################################################################
|
|
test.print_results_and_exit()
|