t/euroc: Add lock for IMU recording queue

This commit is contained in:
Mateo de Mayo 2022-07-31 12:54:16 -03:00 committed by Ryan Pavlik
parent 39f80f2ef5
commit dabf592d59

View file

@ -18,6 +18,7 @@
#include <ctime> #include <ctime>
#include <filesystem> #include <filesystem>
#include <fstream> #include <fstream>
#include <mutex>
#include <string> #include <string>
#include <queue> #include <queue>
#include <iomanip> #include <iomanip>
@ -26,9 +27,12 @@
//! @todo: Now that IMU sinks support groundtruth, we could save it here as well. //! @todo: Now that IMU sinks support groundtruth, we could save it here as well.
using std::lock_guard;
using std::mutex;
using std::ofstream; using std::ofstream;
using std::queue; using std::queue;
using std::string; using std::string;
using std::vector;
using std::filesystem::create_directories; using std::filesystem::create_directories;
struct euroc_recorder struct euroc_recorder
@ -53,6 +57,7 @@ struct euroc_recorder
struct xrt_frame_sink writer_right_sink; struct xrt_frame_sink writer_right_sink;
queue<xrt_imu_sample> imu_queue{}; //!< IMU pushes get saved here and are delayed until left_frame pushes queue<xrt_imu_sample> imu_queue{}; //!< IMU pushes get saved here and are delayed until left_frame pushes
mutex imu_queue_lock{}; //!< Lock for imu_queue
// CSV file handles, ofstream implementation is already buffered. // CSV file handles, ofstream implementation is already buffered.
// Using pointers because of `container_of` // Using pointers because of `container_of`
@ -97,12 +102,21 @@ euroc_recorder_try_mkfiles(struct euroc_recorder *er)
static void static void
euroc_recorder_flush(struct euroc_recorder *er) euroc_recorder_flush(struct euroc_recorder *er)
{ {
// Write queued IMU samples to csv stream. vector<xrt_imu_sample> samples;
{ // Move samples out of imu_queue into vector to minimize mutex contention
lock_guard lock{er->imu_queue_lock};
samples.reserve(er->imu_queue.size());
while (!er->imu_queue.empty()) { while (!er->imu_queue.empty()) {
xrt_imu_sample imu = er->imu_queue.front(); samples.push_back(er->imu_queue.front());
xrt_sink_push_imu(&er->writer_imu_sink, &imu);
er->imu_queue.pop(); er->imu_queue.pop();
} }
}
// Write queued IMU samples to csv stream.
for (xrt_imu_sample &sample : samples) {
xrt_sink_push_imu(&er->writer_imu_sink, &sample);
}
// Flush csv streams. Not necessary, doing it only to increase flush frequency // Flush csv streams. Not necessary, doing it only to increase flush frequency
er->imu_csv->flush(); er->imu_csv->flush();
@ -130,23 +144,22 @@ euroc_recorder_save_frame(euroc_recorder *er, struct xrt_frame *frame, bool is_l
string cam_name = is_left ? "cam0" : "cam1"; string cam_name = is_left ? "cam0" : "cam1";
uint64_t ts = frame->timestamp; uint64_t ts = frame->timestamp;
ofstream *cam_csv = is_left ? er->left_cam_csv : er->right_cam_csv;
*cam_csv << ts << "," << ts << ".png" CSV_EOL;
assert(frame->format == XRT_FORMAT_L8 || frame->format == XRT_FORMAT_R8G8B8); // Only formats supported assert(frame->format == XRT_FORMAT_L8 || frame->format == XRT_FORMAT_R8G8B8); // Only formats supported
auto img_type = frame->format == XRT_FORMAT_L8 ? CV_8UC1 : CV_8UC3; auto img_type = frame->format == XRT_FORMAT_L8 ? CV_8UC1 : CV_8UC3;
string img_path = er->path + "/mav0/" + cam_name + "/data/" + std::to_string(ts) + ".png"; string img_path = er->path + "/mav0/" + cam_name + "/data/" + std::to_string(ts) + ".png";
cv::Mat img{(int)frame->height, (int)frame->width, img_type, frame->data, frame->stride}; cv::Mat img{(int)frame->height, (int)frame->width, img_type, frame->data, frame->stride};
cv::imwrite(img_path, img); cv::imwrite(img_path, img);
ofstream *cam_csv = is_left ? er->left_cam_csv : er->right_cam_csv;
*cam_csv << ts << "," << ts << ".png" CSV_EOL;
} }
extern "C" void extern "C" void
euroc_recorder_save_left(struct xrt_frame_sink *sink, struct xrt_frame *frame) euroc_recorder_save_left(struct xrt_frame_sink *sink, struct xrt_frame *frame)
{ {
euroc_recorder *er = container_of(sink, euroc_recorder, writer_left_sink); euroc_recorder *er = container_of(sink, euroc_recorder, writer_left_sink);
euroc_recorder_save_frame(er, frame, true);
euroc_recorder_flush(er); euroc_recorder_flush(er);
euroc_recorder_save_frame(er, frame, true);
} }
extern "C" void extern "C" void
@ -175,7 +188,10 @@ euroc_recorder_receive_imu(xrt_imu_sink *sink, struct xrt_imu_sample *sample)
return; return;
} }
{
lock_guard lock{er->imu_queue_lock};
er->imu_queue.push(*sample); er->imu_queue.push(*sample);
}
} }
@ -243,9 +259,6 @@ euroc_recorder_create(struct xrt_frame_context *xfctx, const char *record_path,
struct euroc_recorder *er = new euroc_recorder{}; struct euroc_recorder *er = new euroc_recorder{};
er->recording = record_from_start; er->recording = record_from_start;
if (record_from_start) {
euroc_recorder_try_mkfiles(er);
}
struct xrt_frame_node *xfn = &er->node; struct xrt_frame_node *xfn = &er->node;
xfn->break_apart = euroc_recorder_node_break_apart; xfn->break_apart = euroc_recorder_node_break_apart;
@ -264,6 +277,10 @@ euroc_recorder_create(struct xrt_frame_context *xfctx, const char *record_path,
er->path = default_path; er->path = default_path;
} }
if (record_from_start) {
euroc_recorder_try_mkfiles(er);
}
// Setup sink pipeline // Setup sink pipeline
// First, make the public queues that will clone frames in memory so that // First, make the public queues that will clone frames in memory so that