From 49e85efe23ff5953e9d88ed395108c443af45ca5 Mon Sep 17 00:00:00 2001 From: wukko Date: Fri, 31 Mar 2023 11:20:49 +0600 Subject: [PATCH] catch connection resets in streamables and other error handling improvements --- src/cobalt.js | 24 +++++++++++++++++----- src/modules/stream/types.js | 40 ++++++++++++++++++++++++------------- 2 files changed, 45 insertions(+), 19 deletions(-) diff --git a/src/cobalt.js b/src/cobalt.js index 4ca68ec9..ed791a33 100644 --- a/src/cobalt.js +++ b/src/cobalt.js @@ -38,6 +38,7 @@ if (fs.existsSync('./.env') && process.env.selfURL && process.env.streamSalt && keyGenerator: (req, res) => sha256(req.ip.replace('::ffff:', ''), process.env.streamSalt), handler: (req, res, next, opt) => { res.status(429).json({ "status": "error", "text": loc(languageCode(req), 'ErrorRateLimit') }); + return; } }); const apiLimiterStream = rateLimit({ @@ -48,6 +49,7 @@ if (fs.existsSync('./.env') && process.env.selfURL && process.env.streamSalt && keyGenerator: (req, res) => sha256(req.ip.replace('::ffff:', ''), process.env.streamSalt), handler: (req, res, next, opt) => { res.status(429).json({ "status": "error", "text": loc(languageCode(req), 'ErrorRateLimit') }); + return; } }); @@ -73,10 +75,17 @@ if (fs.existsSync('./.env') && process.env.selfURL && process.env.streamSalt && try { JSON.parse(buf); if (buf.length > 720) throw new Error(); - if (String(req.header('Content-Type')) !== "application/json") res.status(500).json({ 'status': 'error', 'text': 'invalid content type header' }) - if (String(req.header('Accept')) !== "application/json") res.status(500).json({ 'status': 'error', 'text': 'invalid accept header' }) + if (String(req.header('Content-Type')) !== "application/json") { + res.status(400).json({ 'status': 'error', 'text': 'invalid content type header' }); + return; + } + if (String(req.header('Accept')) !== "application/json") { + res.status(400).json({ 'status': 'error', 'text': 'invalid accept header' }); + return; + } } catch(e) { - res.status(500).json({ 'status': 'error', 'text': 'invalid json body.' }) + res.status(400).json({ 'status': 'error', 'text': 'invalid json body.' }); + return; } } })); @@ -100,8 +109,10 @@ if (fs.existsSync('./.env') && process.env.selfURL && process.env.streamSalt && j = apiJSON(0, { t: loc(lang, 'ErrorCantProcess') }); } res.status(j.status).json(j.body); + return; } catch (e) { - res.status(500).json({ 'status': 'error', 'text': loc(languageCode(req), 'ErrorCantProcess') }) + res.status(500).json({ 'status': 'error', 'text': loc(languageCode(req), 'ErrorCantProcess') }); + return; } }); @@ -112,11 +123,13 @@ if (fs.existsSync('./.env') && process.env.selfURL && process.env.streamSalt && case 'stream': if (req.query.p) { res.status(200).json({ "status": "continue" }); + return; } else if (req.query.t && req.query.h && req.query.e) { stream(res, ip, req.query.t, req.query.h, req.query.e); } else { let j = apiJSON(0, { t: "no stream id" }) res.status(j.status).json(j.body); + return; } break; case 'onDemand': @@ -144,7 +157,8 @@ if (fs.existsSync('./.env') && process.env.selfURL && process.env.streamSalt && break; } } catch (e) { - res.status(500).json({ 'status': 'error', 'text': loc(languageCode(req), 'ErrorCantProcess') }) + res.status(500).json({ 'status': 'error', 'text': loc(languageCode(req), 'ErrorCantProcess') }); + return; } }); app.get("/api", (req, res) => { diff --git a/src/modules/stream/types.js b/src/modules/stream/types.js index 959ab495..a4eb233f 100644 --- a/src/modules/stream/types.js +++ b/src/modules/stream/types.js @@ -16,19 +16,22 @@ export function streamDefault(streamInfo, res) { isStream: true }); stream.pipe(res).on('error', () => { - res.end(); + res.destroy(); }); stream.on('error', () => { - res.end(); + res.destroy(); + }); + stream.on('aborted', () => { + res.destroy(); }); } catch (e) { - res.end(); + res.destroy(); } } export function streamLiveRender(streamInfo, res) { try { if (streamInfo.urls.length !== 2) { - res.end(); + res.destroy(); return; } let audio = got.get(streamInfo.urls[1], { isStream: true }); @@ -54,15 +57,24 @@ export function streamLiveRender(streamInfo, res) { res.setHeader('Content-Disposition', `attachment; filename="${streamInfo.filename}"`); res.on('error', () => { ffmpegProcess.kill(); - res.end(); + res.destroy(); }); ffmpegProcess.stdio[4].pipe(res).on('error', () => { ffmpegProcess.kill(); - res.end(); - });; + res.destroy(); + }); audio.pipe(ffmpegProcess.stdio[3]).on('error', () => { ffmpegProcess.kill(); - res.end(); + res.destroy(); + }); + + audio.on('error', () => { + ffmpegProcess.kill(); + res.destroy(); + }); + audio.on('aborted', () => { + ffmpegProcess.kill(); + res.destroy(); }); ffmpegProcess.on('disconnect', () => ffmpegProcess.kill()); @@ -72,11 +84,11 @@ export function streamLiveRender(streamInfo, res) { res.on('close', () => ffmpegProcess.kill()); ffmpegProcess.on('error', () => { ffmpegProcess.kill(); - res.end(); + res.destroy(); }); } catch (e) { - res.end(); + res.destroy(); } } export function streamAudioOnly(streamInfo, res) { @@ -119,10 +131,10 @@ export function streamAudioOnly(streamInfo, res) { res.on('close', () => ffmpegProcess.kill()); ffmpegProcess.on('error', () => { ffmpegProcess.kill(); - res.end(); + res.destroy(); }); } catch (e) { - res.end(); + res.destroy(); } } export function streamVideoOnly(streamInfo, res) { @@ -154,9 +166,9 @@ export function streamVideoOnly(streamInfo, res) { res.on('close', () => ffmpegProcess.kill()); ffmpegProcess.on('error', () => { ffmpegProcess.kill(); - res.end(); + res.destroy(); }); } catch (e) { - res.end(); + res.destroy(); } }