CouchDB로 materialized view를 만들어보자


서론

이전 글에서 말했듯 CouchDB를 이용해서 조금 일반적이진 않은 collection을 만들어보려고 하는데, 간단히 말하자면 DB 레벨에서 동작하는 엑셀 함수라고 할 수 있겠다. 즉, 어떤 Cell의 값을 =A1+B1이라고 하면 A1과 B1의 값을 더한 값이 그 Cell의 값으로 표시하는 거 말이다.

설계

문서의 정의

일단 이런 류의 “다른 값을 참조해서 값을 계산하는 문서”를 다음과 같이 정의한다 치자.

export interface DerivedDocument {
    type: "derived";
    // 이 문서가 참조하는 다른 문서의 목록
    deps: string[];
    // 다른 문서의 값들이 주어졌을 때 새 값을 계산하는 Javascript 함수를
    // `.toString()`한 값 
    // 예) function(doc1, doc2) { return (doc1.value * doc2.value); }
    func: string;
    // 계산된 결과값
    value: object;
}

색인 설계

CouchDB 자체적으로는 이런 기능을 제공하지 않으니, 임의로 change monitor를 두고 deps에 해당하는 문서가 변경되었을 때 해당 함수를 실행하는 것으로 하자. 이를 위해서는 문서가 변경될 때마다 해당 문서를 참조하는 다른 문서의 목록을 읽어야 하니 거기에 해당하는 인덱스를 만들어야 한다.

간단히 말해, deps[i]를 key로 해서, 문서의 id 목록을 얻을 수 있으면 되겠구먼?

db.insert({
    _id: "_design/signal",
    views: {
        deps: {
            map: function (doc: DerivedDocument & IdentifiedDocument) {
                if (doc.type === "derived") {
                    doc.deps.forEach(dep => emit([dep, doc._id], [...doc.deps, doc.func]));
                }
            }.toString()
        },
    }
});

변경사항 감시

마지막으로, change stream을 따라가다가 변경된 문서에 대해 해당 문서를 참조하는 다른 문서들을 갱신해주는 데몬을 만들면 된다. 변경된 문서 목록을 받고 처리해서 업데이트된 값을 써야하니까 RTT에 따라 latency는 꽤나 높아지게 된다. (심심치않게 변경되기 이전의 값을 읽는 경우가 생긴다는 뜻)

export async function evaluate(docid: string): Promise<void> {
    const doc = await db.get(docid);
    if (doc.type !== "derived") {
        console.error("Not a derived document", doc);
        return;
    }
    const dep_docs = await Promise.all(doc.deps.map(dep => db.get(dep)));
    if (!doc.func) {
        return;
    }
    const script = new vm.Script("(" + doc.func + ")(..." + JSON.stringify(dep_docs) + ")");
    const result = script.runInNewContext();
    db.insert({
        ...doc,
        value: result,
    }).catch((err) => {
        if (err.statusCode === 409) {
            return evaluate(docid);
        }
        throw err;
    });
}
    
async function main() {
    let last_seq: any = undefined;
    while (true) {
        const changes = await db.changes({
            feed: "longpoll",
            limit: 10,
            since: last_seq,
        });
        
        for (const change of changes.results) {
            await db.view("signal", "deps", {
                start_key: [change.id],
                endkey: [change.id, "\uffff"],
            }).then((body) => {
                // console.log(`dependency of ${change.id}`, body);
                body.rows.forEach((row) => {
                    // Async fire and forget
                    evaluate(row.id);
                });
            });
        }
        
        last_seq = changes.last_seq;
    }
}

main();

결론

되긴 되고 transitive dependency에 대해서도 어쨌든 언젠가 업데이트 되긴 된다.

다만 임의의 갯수의 문서에 대해 변경사항을 반영하는 부분을 만들고 싶은데, 그건 다음에 해 보기로 하자.