동영상 스트림 DVR


개요

(참고로 여기 적는 글은 대충 절반쯤 Copilot이 적으란 대로 적는것 같다)

스트리밍을 보면서 동시에 저장도 하고 싶었다. 그래서 만들었다.

목표

  • 입력으로는 m3u8 파일을 받는다. 근데 파일 내용은 재생 도중에 계속 업데이트된다.
  • 출력으로는 m3u8 파일을 내보낸다
    • m3u8은 주기적으로 갱신된다
    • m3u8에 들어있는 chunk 파일들은 로컬에 저장한 값을 활용한다

Phase 1: m3u8 다운로더

처음에는 그냥 인터넷 스트림을 다운로드 받는 것에만 집중하고, 재생은… 그냥 브라우저 창 하나 더 열어서 바로 재생하기로 했다.

입력 부분

목표: m3u8을 받아서 chunk 파일들을 다운받는다. chunk 파일들은 로컬에 저장한다.

처음에는 Python으로 간단하게 짜 봤다.

#%%
import requests
import m3u8
from urllib.parse import urljoin

BASE = "<<redacted>>"

#%%
while True:
  resp = requests.get(BASE, headers={
      "User-Agent": "<<redacted>>",
      "Referer": "<<redacted>>"
  })
  print(resp.status_code, len(resp.content))
  if resp.status_code != 200:
    break

  lst = m3u8.parse(resp.content.decode('utf-8'))

  for it in lst['segments']:
    print(it['uri'], urljoin(BASE, it['uri']))
    part = requests.get(urljoin(BASE, it['uri']), headers={
        "User-Agent": "<<redacted>>",
        "Referer": "<<redacted>>"
    })
    with open(it['uri'], 'wb') as f:
        f.write(part.content)

이렇게 짜 봤더니, chunk를 다운로드 받는 부분이 싱글 쓰레드로 돌아가다 보니 재생속도를 따라가지 못하고 중간에 chunk를 계속 날려먹게 되더라. 멀티쓰레드 다운로드를 해야 하는데, 파이썬 멀티쓰레드는 쓰레드를 만들고 관리하는게 너무 귀찮아서 그냥 포기했다. Javascript를 쓰기로 했다.

const base = "<<redacted>>"

const history: Record<string, number> = {};
const sink = Deno.stdout;

const loop1 = setInterval(async () => {
  const resp = await fetch(base, {
    headers: {
      "User-Agent": "<<redacted>>",
      "Referer": "<<redacted>>"
    }
  });
  console.error(resp.status, JSON.stringify(resp.headers));
  if (resp.status !== 200) {
    clearInterval(loop1);
    return;
  }
  const content = await resp.text();
  content.split("\n").filter(v => !v.startsWith("#")).forEach(async (v) => {
    const url = new URL(v, base);
    if (history[url.href]) return;
    history[url.href] = 1;
    const chunk = await fetch(url.href, {
      headers: {
        "User-Agent": "<<redacted>>",
        "Referer": "<<redacted>>",
      }
    });
    console.error(v, chunk.status);
    if (v === '') return;
    const buf = new Uint8Array(await chunk.arrayBuffer());
    await Deno.writeFile(v, buf);
  });
}, 8523);

주기적으로 m3u8 파일을 갱신하면서, history에 없는 새로운 chunk가 발견되면 다운로드 받는 방식이다. history에 계속 쓰기만 하기 때문에 chunk가 1억개 넘어가면 성능 문제가 생길수도 있으니 적당히 청소를 해 주면 좋겠지만… 우린 그정도 스케일이 필요 없다.

Phase 2: Naive m3u8 생성

다운받은 chunk 파일들을 그냥 m3u8에 목록으로 저장만 하면 되는 줄 알았다.

ls *.ts > playlist.m3u8

당연히 이런식으론 안된다. 몇가지 문제가 있다.

  • 저장받은 파일명이 사전순서가 아님
  • .ts 확장자는 스트리밍 chunk에도 쓰이지만 typescript 코드로도 쓰지롱;;
  • 재생중에 뚝뚝 끊김. 원본에는 이러지 않았는데?

뚝뚝 끊기는 문제의 경우, 알고보니 m3u8에 들어가는 메타데이터를 충실히 채워야 하는 문제였다. 다음 내용들을 추가해줘야 했다.

왠지 기본으로 넣어줘야 할 것 같은 것들:

  • #EXTM3U
  • #EXT-X-VERSION:3

자동 갱신을 위해 필요한 것들:

  • #EXT-X-TARGETDURATION:8
  • #EXT-X-MEDIA-SEQUENCE:0: chunk의 순서를 알려줘야 플레이어가 자동으로 다음 chunk를 요청한다.
  • #EXT-X-ALLOW-CACHE:YES

그리고 개별 chunk의 길이를 알려줘야 플레이어에서 뚝뚝 끊기는 문제가 사라진다:

  • #EXTINF:8.000000

Phase 3: 데이터베이스

이제 파일 순서를 어딘가에 저장해야 하는데, 이미 런타임부터 Deno(javascript)와 Python으로 파편화가 되어버렸으니, 그냥 메모리에 써서 교환하기는 물 건너갔다. 여러 프로세스에서 동시에 읽고 쓸 수 있는 데이터베이스를 쓰기로 했는데, 간편하게 sqlite3을 써 봤다.

Typescript에서 쓰는 부분:

import { DB } from "https://deno.land/x/sqlite@v3.7.0/mod.ts";

const DBNAME = "db.sqlite3";

// Caller should be responsible for closing the connection.
export function connect(): DB {
  return new DB(DBNAME);
}

export function create_table(db: DB): void {
  db.query(`
    CREATE TABLE IF NOT EXISTS "streams" (
      "id" INTEGER PRIMARY KEY AUTOINCREMENT,
      "source" TEXT NOT NULL,
      "chunk" TEXT NOT NULL);`);
}

export function insert(db: DB, source: string, chunk: string): void {
  db.query(`INSERT INTO "streams" ("source", "chunk") VALUES (?, ?);`, [source, chunk]);
}

Python에서 읽는 부분:

#%%
# Import the sqlite3 module
import sqlite3

DB = "db.sqlite3"

#%%
def select():
  # Create a connection to the database
  conn = sqlite3.connect(DB)

  # Create a cursor
  cursor = conn.cursor()

  # Select all rows
  cursor.execute('''
    SELECT id, source, chunk FROM streams
  ''')

  # Fetch all rows
  rows = cursor.fetchall()

  # Close the connection
  conn.close()

  return rows

Phase 4: 스트리밍 다운로더

스트리밍을 하려면 chunk 순서를 지켜야 하니, 바꿔 말하자면 원본 m3u8의 chunk 목록을 멀티쓰레드로 받으면서도, 입력 순서대로 처리해야 한다는 소리다. 여러 방법이 있을 수 있겠지만 나는 이렇게 구현했다.

import { connect, insert } from './db.ts';

const base = "<<redacted>>";

const history: Record<string, number> = {};
let last: Promise<void> = Promise.resolve();
const db = connect(); // This connection is not closed during runtime...

const loop1 = setInterval(async () => {
  const resp = await fetch(base, {
    headers: {
      "User-Agent": "<<redacted>>",
      "Referer": "<<redacted>>"
    }
  });
  console.error(resp.status, JSON.stringify(resp.headers));
  if (resp.status !== 200) {
    clearInterval(loop1);
    return;
  }
  const content = await resp.text();
  for (const v of content.split("\n").filter(v => !v.startsWith("#"))) {
    const url = new URL(v, base);
    if (history[url.href]) continue;
    history[url.href] = 1;
    const chunkPromise = fetch(url.href, { // no await
      headers: {
        "User-Agent": "<<redacted>>",
        "Referer": "<<redacted>>"
      }
    });

    last = last.then(async () => {
      const chunk = await chunkPromise;
      console.error(v, chunk.status);
      if (v === '') return;
      const buf = new Uint8Array(await chunk.arrayBuffer());
      await Deno.writeFile(v, buf);
      insert(db, base, v);
    });
  }
}, 6312);

즉, 일단 chunkPromise를 만들어서 다운로드를 동시에 시작해둔 다음에, last promise에서 순서대로 sqlite3 데이터베이스에 기록하도록 했다. 이렇게 하면 chunk 순서를 지키면서도, 다운로드는 동시에 진행된다. 조금 더 개선하자면 다운로드 결과를 일단 파일에 쓰는 부분을 chunkPromise에 함께 넣으면 chunk 데이터를 메모리에 들고 있는 상황을 최소화할 수 있을 것이다…만 역시나 그건 나중에.

Phase 5: m3u8 생성기

Naive 생성기에서 호되게 배웠기 때문에 제대로 된 생성기는 어찌저찌 구현할 수 있었다.

"""
Generate m3u8 from the file list
"""

#%%
import ffmpeg
import time
from m3u8 import M3U8
from m3u8.model import Segment, SegmentList

from db import select

#%%
while True:
  rows = select()

  res = M3U8()
  res.version = 3
  res.target_duration = 3
  res.allow_cache = 'YES'
  res.media_sequence = rows[0][0]
  lst = []
  for row in rows:
    chunk = f"{row[2]}"
    lst.append(Segment(chunk, duration=ffmpeg.probe(chunk)['streams'][0]['duration']))
  res.segments = SegmentList(lst)
  res.dump('playlist1.m3u8')

  time.sleep(10)

뜬금없이 ffmpeg probe를 쓰게 됐는데, EXTINF 메타데이터를 얻기 위해서다. 물론 원본 m3u8에서 가져온 값을 쓰면 더 효율적이겠지만…

오늘의 교훈: 얻은 데이터는 요모조모 사용할 곳이 꼭 있기 마련이다. 생각없이 바이든 나중에 고생한다.

그리고 probe는 내부적으로 ffprobe를 실행하는 것이라 느릴 수밖에 없다. 데이터베이스에 캐시하면 더 좋겠지만… 역시나 시간 부족으로 찍!

결론

되면 한다.