You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

173 lines
5.4 KiB

# License: Apache 2.0. See LICENSE file in root directory.
# Copyright(c) 2023 Intel Corporation. All Rights Reserved.
import os.path
import tempfile
import numpy as np
import pyrealsense2 as rs
from rspy import test
def prepare_video_stream(width, height, bpp):
depth_intrinsics = rs.intrinsics()
depth_intrinsics.width = W
depth_intrinsics.height = H
depth_intrinsics.ppx = W / 2
depth_intrinsics.ppy = H / 2
depth_intrinsics.fx = W
depth_intrinsics.fy = H
depth_intrinsics.model = rs.distortion.brown_conrady
depth_intrinsics.coeffs = [0, 0, 0, 0, 0]
vs = rs.video_stream()
vs.type = rs.stream.depth
vs.index = 0
vs.uid = 0
vs.width = W
vs.height = H
vs.fps = 60
vs.bpp = BPP
vs.fmt = rs.format.z16
vs.intrinsics = depth_intrinsics
return vs
def prepare_motion_stream():
motion_intrinsics = rs.motion_device_intrinsic()
motion_intrinsics.data = [[1.0] * 4] * 3
motion_intrinsics.noise_variances = [2, 2, 2]
motion_intrinsics.bias_variances = [3, 3, 3]
motion_stream = rs.motion_stream()
motion_stream.type = rs.stream.accel
motion_stream.index = 0
motion_stream.uid = 1
motion_stream.fps = 200
motion_stream.fmt = rs.format.motion_raw
motion_stream.intrinsics = motion_intrinsics
return motion_stream
def prepare_depth_frame(pixels, bpp, depth_stream_profile):
video_frame.pixels = pixels
video_frame.bpp = BPP
video_frame.stride = W*BPP
video_frame.timestamp = 10000
video_frame.domain = rs.timestamp_domain.hardware_clock
video_frame.frame_number = 0
video_frame.profile = depth_stream_profile.as_video_stream_profile()
return video_frame
def prepare_motion_frame(motion_frame_data, motion_stream_profile):
motion_frame.data = motion_frame_data
motion_frame.timestamp = 20000
motion_frame.domain = rs.timestamp_domain.hardware_clock
motion_frame.frame_number = 0
motion_frame.profile = motion_stream_profile.as_motion_stream_profile()
return motion_frame
def record_frames(filename, sd, sync, video_frame, motion_frame, sensor, stream_profiles):
recorder = rs.recorder(filename, sd)
sensor.open(stream_profiles)
sensor.start(sync)
sensor.on_video_frame(video_frame)
sensor.on_motion_frame(motion_frame)
sensor.stop()
sensor.close()
recorder.pause()
recorder = None
def compare_frames(recorded_depth,recorded_accel):
recorded_depth_data = np.hstack(np.asarray(recorded_depth.as_depth_frame().get_data())).view(dtype=np.uint8)
for (i, pixel) in enumerate(pixels):
test.check_equal(pixel, recorded_depth_data[i])
test.check_equal(video_frame.frame_number, recorded_depth.get_frame_number())
test.check_equal(video_frame.domain, recorded_depth.get_frame_timestamp_domain())
test.check_equal(video_frame.timestamp, recorded_depth.get_timestamp())
recorded_accel_data = recorded_accel.as_motion_frame().get_motion_data()
test.check_equal(motion_frame_data.x, recorded_accel_data.x)
test.check_equal(motion_frame_data.y, recorded_accel_data.y)
test.check_equal(motion_frame_data.z, recorded_accel_data.z)
test.check_equal(motion_frame.frame_number, recorded_accel.get_frame_number())
test.check_equal(motion_frame.domain, recorded_accel.get_frame_timestamp_domain())
test.check_equal(motion_frame.timestamp, recorded_accel.get_timestamp())
def play_frames(filename):
ctx = rs.context()
player_dev = ctx.load_device(filename)
player_dev.set_real_time(False)
player_sync = rs.syncer()
s = player_dev.query_sensors()[0]
s.open(s.get_stream_profiles())
s.start(player_sync)
fset = rs.frame().as_frameset()
recorded_depth = rs.frame()
recorded_accel = rs.frame()
success, fset = player_sync.try_wait_for_frames()
while success:
if fset.first_or_default(rs.stream.depth):
recorded_depth = fset.first_or_default(rs.stream.depth)
if fset.first_or_default(rs.stream.accel):
recorded_accel = fset.first_or_default(rs.stream.accel)
success, fset = player_sync.try_wait_for_frames()
compare_frames(recorded_depth,recorded_accel)
s.stop()
s.close()
################################################################################################
test.start("Record software-device")
W = 640
H = 480
BPP = 2
temp_dir = tempfile.mkdtemp()
filename = os.path.join(temp_dir, "recording.bag")
video_frame = rs.software_video_frame()
motion_frame = rs.software_motion_frame()
pixels = np.array([100 for i in range(W*H*BPP)], dtype=np.uint8)
motion_frame_data = rs.vector()
motion_frame_data.x = 1.0
motion_frame_data.y = 2.0
motion_frame_data.z = 3.0
sd = rs.software_device()
sensor = sd.add_sensor("Synthetic")
vs = prepare_video_stream(W, H, BPP)
depth_stream_profile = sensor.add_video_stream(vs).as_video_stream_profile()
motion_stream = prepare_motion_stream()
motion_stream_profile = sensor.add_motion_stream(motion_stream)
sync = rs.syncer()
stream_profiles = [depth_stream_profile, motion_stream_profile]
video_frame = prepare_depth_frame(pixels, BPP, depth_stream_profile)
motion_frame = prepare_motion_frame(motion_frame_data, motion_stream_profile)
record_frames(filename, sd, sync, video_frame, motion_frame, sensor, stream_profiles)
play_frames(filename)
test.finish()
################################################################################################
test.print_results_and_exit()