From dabf592d5915316967f92028058358cecafd5210 Mon Sep 17 00:00:00 2001 From: Mateo de Mayo Date: Sun, 31 Jul 2022 12:54:16 -0300 Subject: [PATCH] t/euroc: Add lock for IMU recording queue --- .../auxiliary/tracking/t_euroc_recorder.cpp | 43 +++++++++++++------ 1 file changed, 30 insertions(+), 13 deletions(-) diff --git a/src/xrt/auxiliary/tracking/t_euroc_recorder.cpp b/src/xrt/auxiliary/tracking/t_euroc_recorder.cpp index f351c00bc..c28c04a8d 100644 --- a/src/xrt/auxiliary/tracking/t_euroc_recorder.cpp +++ b/src/xrt/auxiliary/tracking/t_euroc_recorder.cpp @@ -18,6 +18,7 @@ #include #include #include +#include #include #include #include @@ -26,9 +27,12 @@ //! @todo: Now that IMU sinks support groundtruth, we could save it here as well. +using std::lock_guard; +using std::mutex; using std::ofstream; using std::queue; using std::string; +using std::vector; using std::filesystem::create_directories; struct euroc_recorder @@ -53,6 +57,7 @@ struct euroc_recorder struct xrt_frame_sink writer_right_sink; queue imu_queue{}; //!< IMU pushes get saved here and are delayed until left_frame pushes + mutex imu_queue_lock{}; //!< Lock for imu_queue // CSV file handles, ofstream implementation is already buffered. // Using pointers because of `container_of` @@ -97,11 +102,20 @@ euroc_recorder_try_mkfiles(struct euroc_recorder *er) static void euroc_recorder_flush(struct euroc_recorder *er) { + vector samples; + + { // Move samples out of imu_queue into vector to minimize mutex contention + lock_guard lock{er->imu_queue_lock}; + samples.reserve(er->imu_queue.size()); + while (!er->imu_queue.empty()) { + samples.push_back(er->imu_queue.front()); + er->imu_queue.pop(); + } + } + // Write queued IMU samples to csv stream. - while (!er->imu_queue.empty()) { - xrt_imu_sample imu = er->imu_queue.front(); - xrt_sink_push_imu(&er->writer_imu_sink, &imu); - er->imu_queue.pop(); + for (xrt_imu_sample &sample : samples) { + xrt_sink_push_imu(&er->writer_imu_sink, &sample); } // Flush csv streams. Not necessary, doing it only to increase flush frequency @@ -130,23 +144,22 @@ euroc_recorder_save_frame(euroc_recorder *er, struct xrt_frame *frame, bool is_l string cam_name = is_left ? "cam0" : "cam1"; uint64_t ts = frame->timestamp; - ofstream *cam_csv = is_left ? er->left_cam_csv : er->right_cam_csv; - *cam_csv << ts << "," << ts << ".png" CSV_EOL; - assert(frame->format == XRT_FORMAT_L8 || frame->format == XRT_FORMAT_R8G8B8); // Only formats supported auto img_type = frame->format == XRT_FORMAT_L8 ? CV_8UC1 : CV_8UC3; string img_path = er->path + "/mav0/" + cam_name + "/data/" + std::to_string(ts) + ".png"; cv::Mat img{(int)frame->height, (int)frame->width, img_type, frame->data, frame->stride}; cv::imwrite(img_path, img); + + ofstream *cam_csv = is_left ? er->left_cam_csv : er->right_cam_csv; + *cam_csv << ts << "," << ts << ".png" CSV_EOL; } extern "C" void euroc_recorder_save_left(struct xrt_frame_sink *sink, struct xrt_frame *frame) { euroc_recorder *er = container_of(sink, euroc_recorder, writer_left_sink); - euroc_recorder_save_frame(er, frame, true); - euroc_recorder_flush(er); + euroc_recorder_save_frame(er, frame, true); } extern "C" void @@ -175,7 +188,10 @@ euroc_recorder_receive_imu(xrt_imu_sink *sink, struct xrt_imu_sample *sample) return; } - er->imu_queue.push(*sample); + { + lock_guard lock{er->imu_queue_lock}; + er->imu_queue.push(*sample); + } } @@ -243,9 +259,6 @@ euroc_recorder_create(struct xrt_frame_context *xfctx, const char *record_path, struct euroc_recorder *er = new euroc_recorder{}; er->recording = record_from_start; - if (record_from_start) { - euroc_recorder_try_mkfiles(er); - } struct xrt_frame_node *xfn = &er->node; xfn->break_apart = euroc_recorder_node_break_apart; @@ -264,6 +277,10 @@ euroc_recorder_create(struct xrt_frame_context *xfctx, const char *record_path, er->path = default_path; } + if (record_from_start) { + euroc_recorder_try_mkfiles(er); + } + // Setup sink pipeline // First, make the public queues that will clone frames in memory so that