diff --git a/api/src/core/api.js b/api/src/core/api.js index 153f2ca6..364764c0 100644 --- a/api/src/core/api.js +++ b/api/src/core/api.js @@ -8,18 +8,19 @@ import jwt from "../security/jwt.js"; import stream from "../stream/stream.js"; import match from "../processing/match.js"; -import { env, isCluster, setTunnelPort } from "../config.js"; +import { env } from "../config.js"; import { extract } from "../processing/url.js"; -import { Green, Bright, Cyan } from "../misc/console-text.js"; +import { Bright, Cyan } from "../misc/console-text.js"; import { hashHmac } from "../security/secrets.js"; import { createStore } from "../store/redis-ratelimit.js"; import { randomizeCiphers } from "../misc/randomize-ciphers.js"; import { verifyTurnstileToken } from "../security/turnstile.js"; import { friendlyServiceName } from "../processing/service-alias.js"; -import { verifyStream, getInternalStream } from "../stream/manage.js"; +import { verifyStream } from "../stream/manage.js"; import { createResponse, normalizeRequest, getIP } from "../processing/request.js"; import * as APIKeys from "../security/api-keys.js"; import * as Cookies from "../processing/cookie/manager.js"; +import { setupTunnelHandler } from "./itunnel.js"; const git = { branch: await getBranch(), @@ -263,6 +264,15 @@ export const runAPI = async (express, app, __dirname, isPrimary = true) => { } }) + app.use('/tunnel', cors({ + methods: ['GET'], + exposedHeaders: [ + 'Estimated-Content-Length', + 'Content-Disposition' + ], + ...corsConfig, + })); + app.get('/tunnel', apiTunnelLimiter, async (req, res) => { const id = String(req.query.id); const exp = String(req.query.exp); @@ -292,31 +302,7 @@ export const runAPI = async (express, app, __dirname, isPrimary = true) => { } return stream(res, streamInfo); - }) - - const itunnelHandler = (req, res) => { - if (!req.ip.endsWith('127.0.0.1')) { - return res.sendStatus(403); - } - - if (String(req.query.id).length !== 21) { - return res.sendStatus(400); - } - - const streamInfo = getInternalStream(req.query.id); - if (!streamInfo) { - return res.sendStatus(404); - } - - streamInfo.headers = new Map([ - ...(streamInfo.headers || []), - ...Object.entries(req.headers) - ]); - - return stream(res, { type: 'internal', ...streamInfo }); - }; - - app.get('/itunnel', itunnelHandler); + }); app.get('/', (_, res) => { res.type('json'); @@ -378,17 +364,5 @@ export const runAPI = async (express, app, __dirname, isPrimary = true) => { } }); - if (isCluster) { - const istreamer = express(); - istreamer.get('/itunnel', itunnelHandler); - const server = istreamer.listen({ - port: 0, - host: '127.0.0.1', - exclusive: true - }, () => { - const { port } = server.address(); - console.log(`${Green('[✓]')} cobalt sub-instance running on 127.0.0.1:${port}`); - setTunnelPort(port); - }); - } + setupTunnelHandler(); } diff --git a/api/src/core/itunnel.js b/api/src/core/itunnel.js new file mode 100644 index 00000000..fea4cb76 --- /dev/null +++ b/api/src/core/itunnel.js @@ -0,0 +1,61 @@ +import stream from "../stream/stream.js"; +import { getInternalTunnel } from "../stream/manage.js"; +import { setTunnelPort } from "../config.js"; +import { Green } from "../misc/console-text.js"; +import express from "express"; + +const validateTunnel = (req, res) => { + if (!req.ip.endsWith('127.0.0.1')) { + res.sendStatus(403); + return; + } + + if (String(req.query.id).length !== 21) { + res.sendStatus(400); + return; + } + + const streamInfo = getInternalTunnel(req.query.id); + if (!streamInfo) { + res.sendStatus(404); + return; + } + + return streamInfo; +} + +const streamTunnel = (req, res) => { + const streamInfo = validateTunnel(req, res); + if (!streamInfo) { + return; + } + + streamInfo.headers = new Map([ + ...(streamInfo.headers || []), + ...Object.entries(req.headers) + ]); + + return stream(res, { type: 'internal', ...streamInfo }); +} + +export const setupTunnelHandler = () => { + const tunnelHandler = express(); + + tunnelHandler.get('/itunnel', streamTunnel); + + // fallback + tunnelHandler.use((_, res) => res.sendStatus(400)); + // error handler + tunnelHandler.use((_, __, res, ____) => res.socket.end()); + + + const server = tunnelHandler.listen({ + port: 0, + host: '127.0.0.1', + exclusive: true + }, () => { + const { port } = server.address(); + console.log(`${Green('[✓]')} internal tunnel handler running on 127.0.0.1:${port}`); + setTunnelPort(port); + }); +} diff --git a/api/src/processing/services/bilibili.js b/api/src/processing/services/bilibili.js index b47b0bc2..47932711 100644 --- a/api/src/processing/services/bilibili.js +++ b/api/src/processing/services/bilibili.js @@ -58,7 +58,8 @@ async function com_download(id) { return { urls: [video.baseUrl, audio.baseUrl], audioFilename: `bilibili_${id}_audio`, - filename: `bilibili_${id}_${video.width}x${video.height}.mp4` + filename: `bilibili_${id}_${video.width}x${video.height}.mp4`, + isHLS: true }; } diff --git a/api/src/stream/internal-hls.js b/api/src/stream/internal-hls.js index 55634c71..e4416288 100644 --- a/api/src/stream/internal-hls.js +++ b/api/src/stream/internal-hls.js @@ -1,5 +1,6 @@ import HLS from "hls-parser"; import { createInternalStream } from "./manage.js"; +import { request } from "undici"; function getURL(url) { try { @@ -55,8 +56,11 @@ function transformMediaPlaylist(streamInfo, hlsPlaylist) { const HLS_MIME_TYPES = ["application/vnd.apple.mpegurl", "audio/mpegurl", "application/x-mpegURL"]; -export function isHlsResponse (req) { - return HLS_MIME_TYPES.includes(req.headers['content-type']); +export function isHlsResponse(req, streamInfo) { + return HLS_MIME_TYPES.includes(req.headers['content-type']) + // bluesky's cdn responds with wrong content-type for the hls playlist, + // so we enforce it here until they fix it + || (streamInfo.service === 'bsky' && streamInfo.url.endsWith('.m3u8')); } export async function handleHlsPlaylist(streamInfo, req, res) { @@ -71,3 +75,59 @@ export async function handleHlsPlaylist(streamInfo, req, res) { res.send(hlsPlaylist); } + +async function getSegmentSize(url, config) { + const segmentResponse = await request(url, { + ...config, + throwOnError: true + }); + + if (segmentResponse.headers['content-length']) { + segmentResponse.body.dump(); + return +segmentResponse.headers['content-length']; + } + + // if the response does not have a content-length + // header, we have to compute it ourselves + let size = 0; + + for await (const data of segmentResponse.body) { + size += data.length; + } + + return size; +} + +export async function probeInternalHLSTunnel(streamInfo) { + const { url, headers, dispatcher, signal } = streamInfo; + + // remove all falsy headers + Object.keys(headers).forEach(key => { + if (!headers[key]) delete headers[key]; + }); + + const config = { headers, dispatcher, signal, maxRedirections: 16 }; + + const manifestResponse = await fetch(url, config); + + const manifest = HLS.parse(await manifestResponse.text()); + if (manifest.segments.length === 0) + return -1; + + const segmentSamples = await Promise.all( + Array(5).fill().map(async () => { + const manifestIdx = Math.floor(Math.random() * manifest.segments.length); + const randomSegment = manifest.segments[manifestIdx]; + if (!randomSegment.uri) + throw "segment is missing URI"; + + const segmentSize = await getSegmentSize(randomSegment.uri, config) / randomSegment.duration; + return segmentSize; + }) + ); + + const averageBitrate = segmentSamples.reduce((a, b) => a + b) / segmentSamples.length; + const totalDuration = manifest.segments.reduce((acc, segment) => acc + segment.duration, 0); + + return averageBitrate * totalDuration; +} diff --git a/api/src/stream/internal.js b/api/src/stream/internal.js index 7d8bf4c9..2cfc990c 100644 --- a/api/src/stream/internal.js +++ b/api/src/stream/internal.js @@ -1,7 +1,7 @@ import { request } from "undici"; import { Readable } from "node:stream"; import { closeRequest, getHeaders, pipe } from "./shared.js"; -import { handleHlsPlaylist, isHlsResponse } from "./internal-hls.js"; +import { handleHlsPlaylist, isHlsResponse, probeInternalHLSTunnel } from "./internal-hls.js"; const CHUNK_SIZE = BigInt(8e6); // 8 MB const min = (a, b) => a < b ? a : b; @@ -96,10 +96,7 @@ async function handleGenericStream(streamInfo, res) { res.status(fileResponse.statusCode); fileResponse.body.on('error', () => {}); - // bluesky's cdn responds with wrong content-type for the hls playlist, - // so we enforce it here until they fix it - const isHls = isHlsResponse(fileResponse) - || (streamInfo.service === "bsky" && streamInfo.url.endsWith('.m3u8')); + const isHls = isHlsResponse(fileResponse, streamInfo); for (const [ name, value ] of Object.entries(fileResponse.headers)) { if (!isHls || name.toLowerCase() !== 'content-length') { @@ -133,3 +130,40 @@ export function internalStream(streamInfo, res) { return handleGenericStream(streamInfo, res); } + +export async function probeInternalTunnel(streamInfo) { + try { + const signal = AbortSignal.timeout(3000); + const headers = { + ...Object.fromEntries(streamInfo.headers || []), + ...getHeaders(streamInfo.service), + host: undefined, + range: undefined + }; + + if (streamInfo.isHLS) { + return probeInternalHLSTunnel({ + ...streamInfo, + signal, + headers + }); + } + + const response = await request(streamInfo.url, { + method: 'HEAD', + headers, + dispatcher: streamInfo.dispatcher, + signal, + maxRedirections: 16 + }); + + if (response.statusCode !== 200) + throw "status is not 200 OK"; + + const size = +response.headers['content-length']; + if (isNaN(size)) + throw "content-length is not a number"; + + return size; + } catch {} +} diff --git a/api/src/stream/manage.js b/api/src/stream/manage.js index 79b5c1db..10d25384 100644 --- a/api/src/stream/manage.js +++ b/api/src/stream/manage.js @@ -68,10 +68,20 @@ export function createStream(obj) { return streamLink.toString(); } -export function getInternalStream(id) { +export function getInternalTunnel(id) { return internalStreamCache.get(id); } +export function getInternalTunnelFromURL(url) { + url = new URL(url); + if (url.hostname !== '127.0.0.1') { + return; + } + + const id = url.searchParams.get('id'); + return getInternalTunnel(id); +} + export function createInternalStream(url, obj = {}) { assert(typeof url === 'string'); @@ -124,7 +134,7 @@ export function destroyInternalStream(url) { const id = url.searchParams.get('id'); if (internalStreamCache.has(id)) { - closeRequest(getInternalStream(id)?.controller); + closeRequest(getInternalTunnel(id)?.controller); internalStreamCache.delete(id); } } diff --git a/api/src/stream/shared.js b/api/src/stream/shared.js index 65af03f0..ec06339d 100644 --- a/api/src/stream/shared.js +++ b/api/src/stream/shared.js @@ -1,5 +1,7 @@ import { genericUserAgent } from "../config.js"; import { vkClientAgent } from "../processing/services/vk.js"; +import { getInternalTunnelFromURL } from "./manage.js"; +import { probeInternalTunnel } from "./internal.js"; const defaultHeaders = { 'user-agent': genericUserAgent @@ -47,3 +49,40 @@ export function pipe(from, to, done) { from.pipe(to); } + +export async function estimateTunnelLength(streamInfo, multiplier = 1.1) { + let urls = streamInfo.urls; + if (!Array.isArray(urls)) { + urls = [ urls ]; + } + + const internalTunnels = urls.map(getInternalTunnelFromURL); + if (internalTunnels.some(t => !t)) + return -1; + + const sizes = await Promise.all(internalTunnels.map(probeInternalTunnel)); + const estimatedSize = sizes.reduce( + // if one of the sizes is missing, let's just make a very + // bold guess that it's the same size as the existing one + (acc, cur) => cur <= 0 ? acc * 2 : acc + cur, + 0 + ); + + if (isNaN(estimatedSize) || estimatedSize <= 0) { + return -1; + } + + return Math.floor(estimatedSize * multiplier); +} + +export function estimateAudioMultiplier(streamInfo) { + if (streamInfo.audioFormat === 'wav') { + return 1411 / 128; + } + + if (streamInfo.audioCopy) { + return 1; + } + + return streamInfo.audioBitrate / 128; +} diff --git a/api/src/stream/stream.js b/api/src/stream/stream.js index a6d41200..6de52793 100644 --- a/api/src/stream/stream.js +++ b/api/src/stream/stream.js @@ -10,20 +10,20 @@ export default async function(res, streamInfo) { return await stream.proxy(streamInfo, res); case "internal": - return internalStream(streamInfo, res); + return await internalStream(streamInfo, res); case "merge": - return stream.merge(streamInfo, res); + return await stream.merge(streamInfo, res); case "remux": case "mute": - return stream.remux(streamInfo, res); + return await stream.remux(streamInfo, res); case "audio": - return stream.convertAudio(streamInfo, res); + return await stream.convertAudio(streamInfo, res); case "gif": - return stream.convertGif(streamInfo, res); + return await stream.convertGif(streamInfo, res); } closeResponse(res); diff --git a/api/src/stream/types.js b/api/src/stream/types.js index 0a4e2d47..1e270df8 100644 --- a/api/src/stream/types.js +++ b/api/src/stream/types.js @@ -6,7 +6,7 @@ import { create as contentDisposition } from "content-disposition-header"; import { env } from "../config.js"; import { destroyInternalStream } from "./manage.js"; import { hlsExceptions } from "../processing/service-config.js"; -import { getHeaders, closeRequest, closeResponse, pipe } from "./shared.js"; +import { getHeaders, closeRequest, closeResponse, pipe, estimateTunnelLength, estimateAudioMultiplier } from "./shared.js"; const ffmpegArgs = { webm: ["-c:v", "copy", "-c:a", "copy"], @@ -29,7 +29,7 @@ const convertMetadataToFFmpeg = (metadata) => { for (const [ name, value ] of Object.entries(metadata)) { if (metadataTags.includes(name)) { - args.push('-metadata', `${name}=${value.replace(/[\u0000-\u0009]/g, "")}`); + args.push('-metadata', `${name}=${value.replace(/[\u0000-\u0009]/g, "")}`); // skipcq: JS-0004 } else { throw `${name} metadata tag is not supported.`; } @@ -98,7 +98,7 @@ const proxy = async (streamInfo, res) => { } } -const merge = (streamInfo, res) => { +const merge = async (streamInfo, res) => { let process; const shutdown = () => ( killProcess(process), @@ -112,7 +112,7 @@ const merge = (streamInfo, res) => { try { if (streamInfo.urls.length !== 2) return shutdown(); - const format = streamInfo.filename.split('.')[streamInfo.filename.split('.').length - 1]; + const format = streamInfo.filename.split('.').pop(); let args = [ '-loglevel', '-8', @@ -152,6 +152,7 @@ const merge = (streamInfo, res) => { res.setHeader('Connection', 'keep-alive'); res.setHeader('Content-Disposition', contentDisposition(streamInfo.filename)); + res.setHeader('Estimated-Content-Length', await estimateTunnelLength(streamInfo)); pipe(muxOutput, res, shutdown); @@ -162,7 +163,7 @@ const merge = (streamInfo, res) => { } } -const remux = (streamInfo, res) => { +const remux = async (streamInfo, res) => { let process; const shutdown = () => ( killProcess(process), @@ -196,7 +197,7 @@ const remux = (streamInfo, res) => { args.push('-bsf:a', 'aac_adtstoasc'); } - let format = streamInfo.filename.split('.')[streamInfo.filename.split('.').length - 1]; + let format = streamInfo.filename.split('.').pop(); if (format === "mp4") { args.push('-movflags', 'faststart+frag_keyframe+empty_moov') } @@ -215,6 +216,7 @@ const remux = (streamInfo, res) => { res.setHeader('Connection', 'keep-alive'); res.setHeader('Content-Disposition', contentDisposition(streamInfo.filename)); + res.setHeader('Estimated-Content-Length', await estimateTunnelLength(streamInfo)); pipe(muxOutput, res, shutdown); @@ -225,7 +227,7 @@ const remux = (streamInfo, res) => { } } -const convertAudio = (streamInfo, res) => { +const convertAudio = async (streamInfo, res) => { let process; const shutdown = () => ( killProcess(process), @@ -284,6 +286,13 @@ const convertAudio = (streamInfo, res) => { res.setHeader('Connection', 'keep-alive'); res.setHeader('Content-Disposition', contentDisposition(streamInfo.filename)); + res.setHeader( + 'Estimated-Content-Length', + await estimateTunnelLength( + streamInfo, + estimateAudioMultiplier(streamInfo) * 1.1 + ) + ); pipe(muxOutput, res, shutdown); res.on('finish', shutdown); @@ -292,7 +301,7 @@ const convertAudio = (streamInfo, res) => { } } -const convertGif = (streamInfo, res) => { +const convertGif = async (streamInfo, res) => { let process; const shutdown = () => (killProcess(process), closeResponse(res)); @@ -321,6 +330,7 @@ const convertGif = (streamInfo, res) => { res.setHeader('Connection', 'keep-alive'); res.setHeader('Content-Disposition', contentDisposition(streamInfo.filename)); + res.setHeader('Estimated-Content-Length', await estimateTunnelLength(streamInfo, 60)); pipe(muxOutput, res, shutdown); diff --git a/web/i18n/en/button.json b/web/i18n/en/button.json index 1ea7fb41..f066049e 100644 --- a/web/i18n/en/button.json +++ b/web/i18n/en/button.json @@ -16,5 +16,6 @@ "save": "save", "export": "export", "yes": "yes", - "no": "no" + "no": "no", + "clear": "clear" } diff --git a/web/i18n/en/queue.json b/web/i18n/en/queue.json new file mode 100644 index 00000000..f709b44b --- /dev/null +++ b/web/i18n/en/queue.json @@ -0,0 +1,6 @@ +{ + "title": "processing queue", + "stub": "nothing in the queue yet, only two of us.\ntry {{ value }} something!", + "stub.download": "downloading", + "stub.remux": "remuxing" +} diff --git a/web/i18n/en/settings.json b/web/i18n/en/settings.json index c450b4b9..c0a4f1df 100644 --- a/web/i18n/en/settings.json +++ b/web/i18n/en/settings.json @@ -113,6 +113,10 @@ "advanced.data": "data management", + "advanced.duck": "local processing", + "advanced.duck.title": "process everything on device", + "advanced.duck.description": "very wip, may cause critical issues or not work at all. this toggle will probably be gone by release.", + "processing.community": "community instances", "processing.enable_custom.title": "use a custom processing server", "processing.enable_custom.description": "cobalt will use a custom processing instance if you choose to. even though cobalt has some security measures in place, we are not responsible for any damages done via a community instance, as we have no control over them.\n\nplease be mindful of what instances you use and make sure they're hosted by people you trust.", diff --git a/web/src/components/misc/Meowbalt.svelte b/web/src/components/misc/Meowbalt.svelte index 26ad14b3..929c63aa 100644 --- a/web/src/components/misc/Meowbalt.svelte +++ b/web/src/components/misc/Meowbalt.svelte @@ -1,6 +1,5 @@ + +
+ {#if renderPopover} + + {/if} +
+ + diff --git a/web/src/components/misc/SectionHeading.svelte b/web/src/components/misc/SectionHeading.svelte index e3b1e635..8741030f 100644 --- a/web/src/components/misc/SectionHeading.svelte +++ b/web/src/components/misc/SectionHeading.svelte @@ -8,6 +8,7 @@ export let title: string; export let sectionId: string; export let beta = false; + export let nolink = false; export let copyData = ""; const sectionURL = `${$page.url.origin}${$page.url.pathname}#${sectionId}`; @@ -32,18 +33,20 @@ {/if} - + {#if !nolink} + + {/if} diff --git a/web/src/components/queue/ProcessingQueueItem.svelte b/web/src/components/queue/ProcessingQueueItem.svelte new file mode 100644 index 00000000..6c43a6cf --- /dev/null +++ b/web/src/components/queue/ProcessingQueueItem.svelte @@ -0,0 +1,156 @@ + + +
+
+
+
+ +
+ + {filename} + +
+
+
+
+
{id}: {status}
+
+
+ + +
+
+ + diff --git a/web/src/components/queue/ProcessingQueueStub.svelte b/web/src/components/queue/ProcessingQueueStub.svelte new file mode 100644 index 00000000..5a4afef7 --- /dev/null +++ b/web/src/components/queue/ProcessingQueueStub.svelte @@ -0,0 +1,44 @@ + + +
+ + + {$t("queue.stub", { + value: $t(`queue.stub.${randomAction()}`), + })} + +
+ + diff --git a/web/src/components/queue/ProcessingStatus.svelte b/web/src/components/queue/ProcessingStatus.svelte new file mode 100644 index 00000000..7acd4a2b --- /dev/null +++ b/web/src/components/queue/ProcessingStatus.svelte @@ -0,0 +1,133 @@ + + + + + diff --git a/web/src/components/save/SupportedServices.svelte b/web/src/components/save/SupportedServices.svelte index 6dcb9244..adf63ef5 100644 --- a/web/src/components/save/SupportedServices.svelte +++ b/web/src/components/save/SupportedServices.svelte @@ -1,18 +1,21 @@ @@ -49,7 +40,7 @@
- {#if renderPopover} -
-
- {#if loaded} - {#each services as service} -
{service}
- {/each} - {:else} - {#each { length: 17 } as _} - - {/each} - {/if} -
-
- {$t("save.services.disclaimer")} -
+ +
+ {#if loaded} + {#each services as service} +
{service}
+ {/each} + {:else} + {#each { length: 17 } as _} + + {/each} + {/if}
- {/if} +
+ {$t("save.services.disclaimer")} +
+