d/euroc: Refactor player to stream samples more like a real device

Splits the image and IMU producers into their own threads.
This commit is contained in:
Mateo de Mayo 2022-03-03 17:30:55 -03:00 committed by Jakob Bornecrantz
parent 1dbda3d8c8
commit 71694b80fd

View file

@ -19,16 +19,26 @@
#include "euroc_driver.h"
#include <chrono>
#include <stdio.h>
#include <fstream>
#include <future>
#include <thread>
#define EUROC_PLAYER_STR "Euroc Player"
#define CLAMP(X, A, B) (MIN(MAX((X), (A)), (B)))
using std::async;
using std::ifstream;
using std::is_same_v;
using std::launch;
using std::pair;
using std::string;
using std::vector;
typedef std::pair<timepoint_ns, std::string> img_sample;
typedef std::vector<xrt_imu_sample> imu_samples;
typedef std::vector<img_sample> img_samples;
using img_sample = pair<timepoint_ns, string>;
using imu_samples = vector<xrt_imu_sample>;
using img_samples = vector<img_sample>;
enum euroc_player_ui_state
{
@ -75,7 +85,7 @@ struct euroc_player
//! Next frame number to use, index in `left_imgs` and `right_imgs`.
//! Note that this expects that both cameras provide the same amount of frames.
//! Furthermore, it is also expected that their timestamps match.
uint64_t img_seq;
uint64_t img_seq; //!< Next frame number to use, index in `{left, right}_imgs`
uint64_t imu_seq; //!< Next imu sample number to use, index in `imus`
imu_samples *imus; //!< List of all IMU samples read from the dataset
img_samples *left_imgs; //!< List of all image names to read from the dataset
@ -100,6 +110,7 @@ struct euroc_player
bool paused; //!< Whether to pause the playback
bool use_source_ts; //!< If true, use the original timestamps from the dataset
} playback;
timepoint_ns last_pause_ts; //!< Last time the stream was paused
// UI related fields
enum euroc_player_ui_state ui_state;
@ -126,30 +137,31 @@ euroc_player_set_ui_state(struct euroc_player *ep, euroc_player_ui_state state);
//! Returns whether the appropriate data.csv file could be opened
static bool
euroc_player_preload_imu_data(struct euroc_player *ep,
std::string dataset_path,
const string &dataset_path,
imu_samples *samples,
int64_t read_n = -1)
{
std::string csv_filename = dataset_path + "/mav0/imu0/data.csv";
std::ifstream fin{csv_filename};
string csv_filename = dataset_path + "/mav0/imu0/data.csv";
ifstream fin{csv_filename};
if (!fin.is_open()) {
return false;
}
std::string line;
std::getline(fin, line); // Skip header line
constexpr int COLUMN_COUNT = 6; // EuRoC imu columns: ts wx wy wz ax ay az
string line;
getline(fin, line); // Skip header line
bool set_base_ts = ep != NULL;
while (std::getline(fin, line) && read_n-- != 0) {
while (getline(fin, line) && read_n-- != 0) {
timepoint_ns timestamp;
double v[6];
double v[COLUMN_COUNT];
size_t i = 0;
size_t j = line.find(',');
timestamp = std::stoll(line.substr(i, j));
for (size_t k = 0; k < 6; k++) {
timestamp = stoll(line.substr(i, j));
for (size_t k = 0; k < COLUMN_COUNT; k++) {
i = j;
j = line.find(',', i + 1);
v[k] = std::stod(line.substr(i + 1, j));
v[k] = stod(line.substr(i + 1, j));
}
// Save first IMU sample timestamp
@ -168,30 +180,30 @@ euroc_player_preload_imu_data(struct euroc_player *ep,
//! If read_n > 0, read at most that amount of samples
//! Returns whether the appropriate data.csv file could be opened
static bool
euroc_player_preload_img_data(std::string dataset_path, img_samples *samples, bool is_left, int64_t read_n = -1)
euroc_player_preload_img_data(const string &dataset_path, img_samples *samples, bool is_left, int64_t read_n = -1)
{
// Parse image data, assumes data.csv is well formed
std::string cam_name = is_left ? "cam0" : "cam1";
std::string imgs_path = dataset_path + "/mav0/" + cam_name + "/data";
std::string csv_filename = dataset_path + "/mav0/" + cam_name + "/data.csv";
std::ifstream fin{csv_filename};
string cam_name = is_left ? "cam0" : "cam1";
string imgs_path = dataset_path + "/mav0/" + cam_name + "/data";
string csv_filename = dataset_path + "/mav0/" + cam_name + "/data.csv";
ifstream fin{csv_filename};
if (!fin.is_open()) {
return false;
}
std::string line;
std::getline(fin, line); // Skip header line
while (std::getline(fin, line) && read_n-- != 0) {
string line;
getline(fin, line); // Skip header line
while (getline(fin, line) && read_n-- != 0) {
size_t i = line.find(',');
timepoint_ns timestamp = std::stoll(line.substr(0, i));
std::string img_name_tail = line.substr(i + 1);
timepoint_ns timestamp = stoll(line.substr(0, i));
string img_name_tail = line.substr(i + 1);
// Standard euroc datasets use CRLF line endings, so let's remove the extra '\r'
if (img_name_tail.back() == '\r') {
img_name_tail.pop_back();
}
std::string img_name = imgs_path + "/" + img_name_tail;
string img_name = imgs_path + "/" + img_name_tail;
img_sample sample{timestamp, img_name};
samples->push_back(sample);
}
@ -282,10 +294,6 @@ os_monotonic_get_ts()
static timepoint_ns
euroc_player_mapped_ts(struct euroc_player *ep, timepoint_ns ts)
{
if (ep->playback.use_source_ts) {
return ts;
}
timepoint_ns relative_ts = ts - ep->base_ts; // Relative to imu0 first ts
ep->playback.speed = MAX(ep->playback.speed, 1.0 / 256);
double speed = ep->playback.speed;
@ -293,6 +301,16 @@ euroc_player_mapped_ts(struct euroc_player *ep, timepoint_ns ts)
return mapped_ts;
}
//! Same as @ref euroc_player_mapped_ts but only if playback options allow it.
static timepoint_ns
euroc_player_mapped_playback_ts(struct euroc_player *ep, timepoint_ns ts)
{
if (ep->playback.use_source_ts) {
return ts;
}
return euroc_player_mapped_ts(ep, ts);
}
static void
euroc_player_load_next_frame(struct euroc_player *ep, bool is_left, struct xrt_frame *&xf)
{
@ -305,8 +323,8 @@ euroc_player_load_next_frame(struct euroc_player *ep, bool is_left, struct xrt_f
float scale = ep->playback.scale;
// Load image from disk
timepoint_ns timestamp = euroc_player_mapped_ts(ep, sample.first);
std::string img_name = sample.second;
timepoint_ns timestamp = euroc_player_mapped_playback_ts(ep, sample.first);
string img_name = sample.second;
EUROC_TRACE(ep, "%s img t = %ld filename = %s", is_left ? "left" : "right", timestamp, img_name.c_str());
cv::ImreadModes read_mode = allow_color ? cv::IMREAD_ANYCOLOR : cv::IMREAD_GRAYSCALE;
cv::Mat img = cv::imread(img_name, read_mode); // If colored, reads in BGR order
@ -333,35 +351,11 @@ euroc_player_load_next_frame(struct euroc_player *ep, bool is_left, struct xrt_f
xf->source_id = ep->base.source_id;
}
static bool
euroc_player_is_imu_next(struct euroc_player *ep)
{
bool prioritize_imus = ep->playback.send_all_imus_first;
bool more_imus = ep->imu_seq < ep->imus->size();
if (more_imus && prioritize_imus) {
return true;
}
bool more_imgs = ep->img_seq < ep->left_imgs->size();
timepoint_ns imu_ts = more_imus ? ep->imus->at(ep->imu_seq).timestamp_ns : INT64_MAX;
timepoint_ns img_ts = more_imgs ? ep->left_imgs->at(ep->img_seq).first : INT64_MAX;
return imu_ts < img_ts;
}
static void
euroc_player_push_next_sample(struct euroc_player *ep)
euroc_player_push_next_frame(struct euroc_player *ep)
{
bool stereo = ep->playback.stereo;
// Push next IMU sample
if (euroc_player_is_imu_next(ep)) {
xrt_imu_sample sample = ep->imus->at(ep->imu_seq++);
sample.timestamp_ns = euroc_player_mapped_ts(ep, sample.timestamp_ns);
xrt_sink_push_imu(ep->in_sinks.imu, &sample);
return;
}
// Push next frame(s)
struct xrt_frame *left_xf = NULL, *right_xf = NULL;
euroc_player_load_next_frame(ep, true, left_xf);
if (stereo) {
@ -377,41 +371,97 @@ euroc_player_push_next_sample(struct euroc_player *ep)
xrt_sink_push_frame(ep->in_sinks.right, right_xf);
}
// We are now done with the frames, unreference them so
// they can be freed if all consumers are done with them.
xrt_frame_reference(&left_xf, NULL);
xrt_frame_reference(&right_xf, NULL);
snprintf(ep->progress_text, sizeof(ep->progress_text), "Frames %lu/%lu - IMUs %lu/%lu", ep->img_seq,
ep->left_imgs->size(), ep->imu_seq, ep->imus->size());
// Determine how much to sleep until next frame
if (ep->img_seq >= ep->left_imgs->size()) {
return;
}
timepoint_ns next_frame_ts = euroc_player_mapped_ts(ep, ep->left_imgs->at(ep->img_seq).first);
timepoint_ns now = os_monotonic_get_ts();
int32_t frame_delay_ns = MAX(next_frame_ts - now, 0);
os_nanosleep(frame_delay_ns);
}
static bool
euroc_player_is_paused(struct euroc_player *ep)
static void
euroc_player_push_next_imu(struct euroc_player *ep)
{
if (!ep->playback.paused) {
return false;
}
xrt_imu_sample sample = ep->imus->at(ep->imu_seq++);
sample.timestamp_ns = euroc_player_mapped_playback_ts(ep, sample.timestamp_ns);
xrt_sink_push_imu(ep->in_sinks.imu, &sample);
}
timepoint_ns pre_pause = os_monotonic_get_ts();
os_nanosleep(200 * 1000 * 1000);
timepoint_ns pos_pause = os_monotonic_get_ts();
timepoint_ns pause_length = pos_pause - pre_pause;
ep->offset_ts += pause_length;
return true;
template <typename SamplesType>
timepoint_ns
euroc_player_get_next_euroc_ts(struct euroc_player *ep)
{
if constexpr (is_same_v<SamplesType, imu_samples>) {
return ep->imus->at(ep->imu_seq).timestamp_ns;
} else {
return ep->left_imgs->at(ep->img_seq).first;
}
}
template <typename SamplesType>
void
euroc_player_sleep_until_next_sample(struct euroc_player *ep)
{
using std::chrono::nanoseconds;
using timepoint_ch = std::chrono::time_point<std::chrono::steady_clock>;
using std::this_thread::sleep_until;
timepoint_ns next_sample_euroc_ts = euroc_player_get_next_euroc_ts<SamplesType>(ep);
timepoint_ns next_sample_mono_ts = euroc_player_mapped_ts(ep, next_sample_euroc_ts);
timepoint_ch next_sample_chrono_tp{nanoseconds{next_sample_mono_ts}};
sleep_until(next_sample_chrono_tp);
#ifndef NDEBUG
// Complain when we are >1ms late. It can happen due to a busy scheduler.
double oversleep_ms = (os_monotonic_get_ts() - next_sample_mono_ts) / (double)U_TIME_1MS_IN_NS;
if (abs(oversleep_ms) > 1) {
string sample_type_name = "frame";
if constexpr (is_same_v<SamplesType, imu_samples>) {
sample_type_name = "imu";
}
EUROC_DEBUG(ep, "(%s) Woke up %.1fms late", sample_type_name.c_str(), oversleep_ms);
}
#endif
}
//! Based on the SamplesType to stream, return a set of corresponding entities:
//! the samples vector, sequence number, push and sleep functions.
template <typename SamplesType>
auto
euroc_player_get_stream_set(struct euroc_player *ep)
{
constexpr bool is_imu = is_same_v<SamplesType, imu_samples>;
const SamplesType *samples;
if constexpr (is_imu) {
samples = ep->imus;
} else {
samples = ep->left_imgs;
}
uint64_t *sample_seq = is_imu ? &ep->imu_seq : &ep->img_seq;
auto push_next_sample = is_imu ? euroc_player_push_next_imu : euroc_player_push_next_frame;
auto sleep_until_next_sample = euroc_player_sleep_until_next_sample<SamplesType>;
return make_tuple(samples, sample_seq, push_next_sample, sleep_until_next_sample);
}
template <typename SamplesType>
static void
euroc_player_stream_samples(struct euroc_player *ep)
{
// These fields correspond to IMU or frame streams depending on SamplesType
const auto [samples, sample_seq, push_next_sample, sleep_until_next_sample] =
euroc_player_get_stream_set<SamplesType>(ep);
while (*sample_seq < samples->size() && ep->is_running) {
while (ep->playback.paused) {
constexpr int64_t PAUSE_POLL_INTERVAL_NS = 15L * U_TIME_1MS_IN_NS;
os_nanosleep(PAUSE_POLL_INTERVAL_NS);
}
sleep_until_next_sample(ep);
push_next_sample(ep);
}
}
static void *
euroc_player_mainloop(void *ptr)
euroc_player_stream(void *ptr)
{
struct xrt_fs *xfs = (struct xrt_fs *)ptr;
struct euroc_player *ep = euroc_player(xfs);
@ -422,20 +472,24 @@ euroc_player_mainloop(void *ptr)
ep->start_ts = os_monotonic_get_ts();
bool more_imus = ep->imu_seq < ep->imus->size();
bool more_imgs = ep->img_seq < ep->left_imgs->size();
while (ep->is_running && (more_imus || more_imgs)) {
if (euroc_player_is_paused(ep)) {
continue;
// Push all IMU samples now if requested
if (ep->playback.send_all_imus_first) {
while (ep->imu_seq < ep->imus->size()) {
euroc_player_push_next_imu(ep);
}
euroc_player_push_next_sample(ep);
more_imus = ep->imu_seq < ep->imus->size();
more_imgs = ep->img_seq < ep->left_imgs->size();
}
// Launch image and IMU producers
auto serve_imus = async(launch::async, [ep] { euroc_player_stream_samples<imu_samples>(ep); });
auto serve_imgs = async(launch::async, [ep] { euroc_player_stream_samples<img_samples>(ep); });
// Note that the only fields of `ep` being modified in the threads are: img_seq, imu_seq and
// progress_text in single locations, thus no race conditions should occur.
// Wait for the end of both streams
serve_imgs.get();
serve_imus.get();
EUROC_INFO(ep, "Euroc dataset playback finished");
euroc_player_set_ui_state(ep, STREAM_ENDED);
@ -642,7 +696,7 @@ euroc_player_start_btn_cb(void *ptr)
int ret = 0;
ret |= os_thread_helper_init(&ep->play_thread);
ret |= os_thread_helper_start(&ep->play_thread, euroc_player_mainloop, ep);
ret |= os_thread_helper_start(&ep->play_thread, euroc_player_stream, ep);
EUROC_ASSERT(ret == 0, "Thread launch failure");
euroc_player_set_ui_state(ep, STREAM_PLAYING);
@ -653,6 +707,14 @@ euroc_player_pause_btn_cb(void *ptr)
{
struct euroc_player *ep = (struct euroc_player *)ptr;
ep->playback.paused = !ep->playback.paused;
if (ep->playback.paused) {
ep->last_pause_ts = os_monotonic_get_ts();
} else {
time_duration_ns pause_length = os_monotonic_get_ts() - ep->last_pause_ts;
ep->offset_ts += pause_length;
}
euroc_player_set_ui_state(ep, ep->playback.paused ? STREAM_PAUSED : STREAM_PLAYING);
}
@ -681,14 +743,14 @@ euroc_player_setup_gui(struct euroc_player *ep)
u_var_add_log_level(ep, &ep->log_level, "Log level");
u_var_add_gui_header(ep, NULL, "Playback Options");
u_var_add_ro_text(ep, "When using a SLAM system, setting these after start is unlikely to work", "Note");
u_var_add_ro_text(ep, "Set these before starting the stream", "Note");
u_var_add_bool(ep, &ep->playback.stereo, "Stereo (if available)");
u_var_add_bool(ep, &ep->playback.color, "Color (if available)");
u_var_add_f32(ep, &ep->playback.skip_first_s, "First seconds to skip (set at start)");
u_var_add_f32(ep, &ep->playback.skip_first_s, "First seconds to skip");
u_var_add_f32(ep, &ep->playback.scale, "Scale");
u_var_add_f64(ep, &ep->playback.speed, "Speed (set at start)");
u_var_add_bool(ep, &ep->playback.send_all_imus_first, "Send all IMU samples now");
u_var_add_bool(ep, &ep->playback.use_source_ts, "Don't correct timestamps (set at start)");
u_var_add_f64(ep, &ep->playback.speed, "Speed");
u_var_add_bool(ep, &ep->playback.send_all_imus_first, "Send all IMU samples first");
u_var_add_bool(ep, &ep->playback.use_source_ts, "Use original timestamps");
u_var_add_gui_header(ep, NULL, "Streams");
u_var_add_ro_ff_vec3_f32(ep, ep->gyro_ff, "Gyroscope");