diff --git a/api/src/misc/utils.js b/api/src/misc/utils.js index fd497d18..e15690b0 100644 --- a/api/src/misc/utils.js +++ b/api/src/misc/utils.js @@ -29,3 +29,7 @@ export function splitFilenameExtension(filename) { return [ parts.join('.'), ext ] } } + +export function zip(a, b) { + return a.map((value, i) => [ value, b[i] ]); +} diff --git a/api/src/stream/internal.js b/api/src/stream/internal.js index 7d8bf4c9..8c94c485 100644 --- a/api/src/stream/internal.js +++ b/api/src/stream/internal.js @@ -7,7 +7,7 @@ const CHUNK_SIZE = BigInt(8e6); // 8 MB const min = (a, b) => a < b ? a : b; async function* readChunks(streamInfo, size) { - let read = 0n; + let read = 0n, chunksSinceTransplant = 0; while (read < size) { if (streamInfo.controller.signal.aborted) { throw new Error("controller aborted"); @@ -22,6 +22,16 @@ async function* readChunks(streamInfo, size) { signal: streamInfo.controller.signal }); + if (chunk.statusCode === 403 && chunksSinceTransplant >= 3 && streamInfo.transplant) { + chunksSinceTransplant = 0; + try { + await streamInfo.transplant(streamInfo.dispatcher); + continue; + } catch {} + } + + chunksSinceTransplant++; + const expected = min(CHUNK_SIZE, size - read); const received = BigInt(chunk.headers['content-length']); diff --git a/api/src/stream/manage.js b/api/src/stream/manage.js index 3323ce5d..ebb5c6c7 100644 --- a/api/src/stream/manage.js +++ b/api/src/stream/manage.js @@ -9,6 +9,7 @@ import { env } from "../config.js"; import { closeRequest } from "./shared.js"; import { decryptStream, encryptStream } from "../misc/crypto.js"; import { hashHmac } from "../security/secrets.js"; +import { zip } from "../misc/utils.js"; // optional dependency const freebind = env.freebindCIDR && await import('freebind').catch(() => {}); @@ -40,7 +41,7 @@ export function createStream(obj) { audioFormat: obj.audioFormat, isHLS: obj.isHLS || false, - originalRequest: obj.parameters + originalRequest: obj.originalRequest }; // FIXME: this is now a Promise, but it is not awaited @@ -101,6 +102,7 @@ export function createInternalStream(url, obj = {}) { controller, dispatcher, isHLS: obj.isHLS, + transplant: obj.transplant }); let streamLink = new URL('/itunnel', `http://127.0.0.1:${env.tunnelPort}`); @@ -116,13 +118,17 @@ export function createInternalStream(url, obj = {}) { return streamLink.toString(); } -export function destroyInternalStream(url) { +function getInternalTunnelId(url) { url = new URL(url); if (url.hostname !== '127.0.0.1') { return; } - const id = url.searchParams.get('id'); + return url.searchParams.get('id'); +} + +export function destroyInternalStream(url) { + const id = getInternalTunnelId(url); if (internalStreamCache.has(id)) { closeRequest(getInternalStream(id)?.controller); @@ -130,9 +136,68 @@ export function destroyInternalStream(url) { } } +const transplantInternalTunnels = function(tunnelUrls, transplantUrls) { + if (tunnelUrls.length !== transplantUrls.length) { + return; + } + + for (const [ tun, url ] of zip(tunnelUrls, transplantUrls)) { + const id = getInternalTunnelId(tun); + const itunnel = getInternalStream(id); + + if (!itunnel) continue; + itunnel.url = url; + } +} + +const transplantTunnel = async function (dispatcher) { + if (this.pendingTransplant) { + await this.pendingTransplant; + return; + } + + let finished; + this.pendingTransplant = new Promise(r => finished = r); + + try { + const handler = await import(`../processing/services/${this.service}.js`); + const response = await handler.default({ + ...this.originalRequest, + dispatcher + }); + + if (!response.urls) { + return; + } + + response.urls = [response.urls].flat(); + if (this.originalRequest.isAudioOnly && response.urls.length > 1) { + response.urls = [response.urls[1]]; + } else if (this.originalRequest.isAudioMuted) { + response.urls = [response.urls[0]]; + } + + const tunnels = [this.urls].flat(); + if (tunnels.length !== response.urls.length) { + return; + } + + transplantInternalTunnels(tunnels, response.urls); + } + catch {} + finally { + finished(); + delete this.pendingTransplant; + } +} + function wrapStream(streamInfo) { const url = streamInfo.urls; + if (streamInfo.originalRequest) { + streamInfo.transplant = transplantTunnel.bind(streamInfo); + } + if (typeof url === 'string') { streamInfo.urls = createInternalStream(url, streamInfo); } else if (Array.isArray(url)) {