Rust에서 Thread safety 확보하기
개요
그냥 문서 끝에 가면 구현해놓은 데모 있으니까 그거 보세요
문제
Thread safety를 지원하지 않는 C API 라이브러리가 주어졌을 때, 어떻게 하면 Rust에서 이 API를 안전하게 공유할 수 있을까?
- Llama.cpp의 경우 Thread safety를 아직 지원하지 않는다고 한다.
가정
- 라이브러리를 수정하진 않는다 - 그냥 임의의 C/C++ 라이브러리에 대해 일반적으로 동작하는 방법이 필요하다
- Rust로만 구현
문제 - 요구사항
간단히 말하면 아래와 같은 코드가 있을 때 Goal::incr
을 호출하고 그 결과를 받아올 방법이 필요하다.
#[derive(Debug)]
struct Goal {
inner: *mut i32,
}
impl Goal {
fn incr(&mut self, msg: &str) -> String {
unsafe {
*self.inner += 1;
}
format!("Hello, {}", msg)
}
}
뭐가 문제인데?
Rust 컴파일러는 Goal
의 필드를 보고 이것이 !Send
라고 간주한다. safe rust에서 아무 생각없이 쓰던 async move
등의 syntax가 모두 먹통이 되어버린다는 뜻. 잘 생각해보면 말이 되는데,
- 여러 thread에서 동시에 incr을 호출하면 race condition이 발생해서 의도한 대로 동작하지 않을 수 있다.
- 현재 실행하는 Thread에 뭔가 context 정보가 할당되어 있을 수 있는데, Rust 관점에서 그걸 알 수 없으니 다른 Thread로 해당 작업을 옮겨서 계속 실행하는게 안전하다는 보장이 없다.
반대로 말하면 컴파일러가 찡찡대지 않을 수 있도록 코드를 잘 구현하는 과정에서 race condition도 해결할 수 있고 thread context 문제도 해결할 수 있게 된다는 말이다.
대충의 설계
Thread safety가 없다고 하니까, 그냥 전담 thread를 만들고 거기서만 돌리면 되는 것 아닌가?
- Thread를 하나 만든다.
- 그 Thread 안에서
Goal
을 생성한다. - 외부에서 해당 Thread와 통신한다.
- 해결?!
통신 - 작업 Thread에게 뭔가 보내려면 어떻게 하지?
전에는 Rust에 channel이라는 게 있는 걸 알고는 있었는데 언제 왜 쓰는지는 잘 몰랐다. 그런데 !Send
때문에 지지고 볶다 보니까 channel이 이 목적에 쓰기 딱 좋은 것 같다는 생각이다.
진짜 통신용 소켓을 만들어도 되기야 했겠지만, 그럴려면 주고받는 메시지의 serialization까지 생각해야 하는 번거로운 작업이 되지 않나. Channel은 그냥 뭔가 주고받을 수 있다는 것만 구현하면 된다.
채널 구현체가 여러 가지 있어서 좀 결정장애가 오긴 하는데, 일단 뭐가 됐든 작동하기만 하는 녀석을 쓰고 나중에 고치지 뭐…
작업 Thread의 결과는 어떻게 돌려받을까?
하여튼 대충의 abstraction이 worker에게 뭔가 작업을 주고 그 결과를 나중에 돌려받는 방식이 되었는데, 작업을 돌려받을 때도 마찬가지로 channel을 쓰면 된다. channel의 sender는 얼마든지 다른 쓰레드에게 전송할 수 있다는 사실에 감사하십시오 휴먼
데모
use futures::{channel::mpsc::{unbounded, UnboundedSender}, future::{self, join_all}, StreamExt as _};
#[derive(Debug)]
struct Goal {
inner: *mut i32,
}
impl Goal {
fn incr(&mut self, msg: &str) -> String {
unsafe {
*self.inner += 1;
}
format!("Hello, {}", msg)
}
}
#[tokio::main]
async fn main() {
let (sender, receiver) = unbounded::<(String, UnboundedSender<String>)>();
// Can see send into worker thread can happen
let holder_init = 42u32;
std::thread::spawn(move || {
let mut holder = Box::new(holder_init);
let mut obj = Goal { inner: holder.as_mut() as *const _ as *mut _};
futures::executor::block_on(receiver.for_each(|t| {
let (msg, cb) = t;
let resp = obj.incr(&msg);
cb.unbounded_send(resp).expect("should send");
future::ready(())
}));
// Can check no race condition happened
// holder should be used here, otherwise Goal.inner will be dangling
println!("service close: {}", holder);
});
let mut handles = vec![];
for i in 0..1000 {
let s2 = sender.clone();
let handle = tokio::spawn(async move {
let (client, client_cb) = unbounded();
s2.unbounded_send((format!("world {}", i), client)).expect("should send");
client_cb.for_each(|v| { dbg!(v); future::ready(()) }).await;
});
handles.push(handle);
}
join_all(handles).await;
// sender will be dropped here, making the service to be closed
}
결론
일단 되긴 된다. 이제 LSP에서 llama로 이것저것 제안하는 코드만 짜면 되는데 뭘 제안하게 시키지…? 연관문서라도 좀 정리하게 시켜볼까?