Rust에서 Thread safety 확보하기


개요

그냥 문서 끝에 가면 구현해놓은 데모 있으니까 그거 보세요

문제

Thread safety를 지원하지 않는 C API 라이브러리가 주어졌을 때, 어떻게 하면 Rust에서 이 API를 안전하게 공유할 수 있을까?

  • Llama.cpp의 경우 Thread safety를 아직 지원하지 않는다고 한다.

가정

  • 라이브러리를 수정하진 않는다 - 그냥 임의의 C/C++ 라이브러리에 대해 일반적으로 동작하는 방법이 필요하다
  • Rust로만 구현

문제 - 요구사항

간단히 말하면 아래와 같은 코드가 있을 때 Goal::incr을 호출하고 그 결과를 받아올 방법이 필요하다.

#[derive(Debug)]
struct Goal {
    inner: *mut i32,
}

impl Goal {
    fn incr(&mut self, msg: &str) -> String {
        unsafe {
            *self.inner += 1;
        }
        format!("Hello, {}", msg)
    }
}

뭐가 문제인데?

Rust 컴파일러는 Goal의 필드를 보고 이것이 !Send라고 간주한다. safe rust에서 아무 생각없이 쓰던 async move 등의 syntax가 모두 먹통이 되어버린다는 뜻. 잘 생각해보면 말이 되는데,

  1. 여러 thread에서 동시에 incr을 호출하면 race condition이 발생해서 의도한 대로 동작하지 않을 수 있다.
  2. 현재 실행하는 Thread에 뭔가 context 정보가 할당되어 있을 수 있는데, Rust 관점에서 그걸 알 수 없으니 다른 Thread로 해당 작업을 옮겨서 계속 실행하는게 안전하다는 보장이 없다.

반대로 말하면 컴파일러가 찡찡대지 않을 수 있도록 코드를 잘 구현하는 과정에서 race condition도 해결할 수 있고 thread context 문제도 해결할 수 있게 된다는 말이다.

대충의 설계

Thread safety가 없다고 하니까, 그냥 전담 thread를 만들고 거기서만 돌리면 되는 것 아닌가?

  1. Thread를 하나 만든다.
  2. 그 Thread 안에서 Goal을 생성한다.
  3. 외부에서 해당 Thread와 통신한다.
  4. 해결?!

통신 - 작업 Thread에게 뭔가 보내려면 어떻게 하지?

전에는 Rust에 channel이라는 게 있는 걸 알고는 있었는데 언제 왜 쓰는지는 잘 몰랐다. 그런데 !Send 때문에 지지고 볶다 보니까 channel이 이 목적에 쓰기 딱 좋은 것 같다는 생각이다.

진짜 통신용 소켓을 만들어도 되기야 했겠지만, 그럴려면 주고받는 메시지의 serialization까지 생각해야 하는 번거로운 작업이 되지 않나. Channel은 그냥 뭔가 주고받을 수 있다는 것만 구현하면 된다.

채널 구현체가 여러 가지 있어서 좀 결정장애가 오긴 하는데, 일단 뭐가 됐든 작동하기만 하는 녀석을 쓰고 나중에 고치지 뭐…

작업 Thread의 결과는 어떻게 돌려받을까?

하여튼 대충의 abstraction이 worker에게 뭔가 작업을 주고 그 결과를 나중에 돌려받는 방식이 되었는데, 작업을 돌려받을 때도 마찬가지로 channel을 쓰면 된다. channel의 sender는 얼마든지 다른 쓰레드에게 전송할 수 있다는 사실에 감사하십시오 휴먼

데모

use futures::{channel::mpsc::{unbounded, UnboundedSender}, future::{self, join_all}, StreamExt as _};

#[derive(Debug)]
struct Goal {
    inner: *mut i32,
}

impl Goal {
    fn incr(&mut self, msg: &str) -> String {
        unsafe {
            *self.inner += 1;
        }
        format!("Hello, {}", msg)
    }
}

#[tokio::main]
async fn main() {
    let (sender, receiver) = unbounded::<(String, UnboundedSender<String>)>();
    // Can see send into worker thread can happen
    let holder_init = 42u32;
    std::thread::spawn(move || {
        let mut holder = Box::new(holder_init);
        let mut obj = Goal { inner: holder.as_mut() as *const _ as *mut _};
        futures::executor::block_on(receiver.for_each(|t| {
            let (msg, cb) = t;
            let resp = obj.incr(&msg);
            cb.unbounded_send(resp).expect("should send");
            future::ready(())
        }));
        // Can check no race condition happened
        // holder should be used here, otherwise Goal.inner will be dangling
        println!("service close: {}", holder);
    });
    
    let mut handles = vec![];
    for i in 0..1000 {
        let s2 = sender.clone();
        let handle = tokio::spawn(async move {
            let (client, client_cb) = unbounded();
            s2.unbounded_send((format!("world {}", i), client)).expect("should send");
            client_cb.for_each(|v| { dbg!(v); future::ready(()) }).await;
        });
        handles.push(handle);
    }
    
    join_all(handles).await;
    // sender will be dropped here, making the service to be closed
}

결론

일단 되긴 된다. 이제 LSP에서 llama로 이것저것 제안하는 코드만 짜면 되는데 뭘 제안하게 시키지…? 연관문서라도 좀 정리하게 시켜볼까?