Crystal로 알아보는 Fiber concurrency


동시성 프로그래밍은 재미있지만 구조나 사이즈에 따라 어려움을 동반합니다. 개인적으로도 도구 작성 시 자주 사용하기 때문에 제가 진행하는 프로젝트에서 자주 볼 수 있고, 블로그에도 Goroutine과 Sync, Ruby Concurrency 등 매년 동시성 관련해서 글을 작성한 것 같습니다.

Go, Ruby에 이어 올해는 Crystal로 풀어볼까 합니다. 오늘은 Fiber에 대해 정리해봅니다.

Fiber

Computer Science(CS)에서 Fiber는 동시성을 위한 경량 스레드(Thread) 의미합니다. 스레드와 비슷하지만 큰 차이가 있는데요, 바로 Context Switching의 영역과 멀티 태스킹의 방식입니다.

  • Fiber:
    • Context Switching: User Space
    • Multitasking: cooperative multitasking (협력적 멀티태스킹)
  • Thread:
    • Context Switching: Kenel
    • Multitasking: preemptive multitasking (선점형 멀티태스킹)

뭔가 비교하고 보니 서로 완전 반대되는 기술처럼 보이지만, 사실 Fiber는 Thread 하위에서 동작합니다. 그렇기 때문에 동일 스레드 아래에서 동작하는 Fiber는 Race condition의 위험성이 적습니다.

Cooperative vs Preemptive

먼저 Cooperative multitasking (협력적 멀티태스킹)과 Preemptive multitasking (선점형 멀티태스킹)에 대한 차이를 살펴보는게 좋습니다.

Cooperative Multitasking은 각 프로세스나 작업이 제어권을 가지고 있습니다. 그래서 프로그램이 명시적으로 다른 작업에게 양보해야 Context Switching이 발생합니다. 반대로 Preemptive Multitasking은 OS가 제어권을 가지며 시분할로 각 작업이 공평하게 실행되도록 처리합니다.

Coroutine

Coroutine은 프로그램이 실행되는 동안 일시 중단되고 다시 시작할 수 있는 함수 또는 루틴을 의미합니다. 언어에서 기능적으로 제공할 수도 있고, 직접 구현하여 패턴으로 사용할 수도 있습니다.

Scheduler

Fiber에서 Scheduler는 실행할 작업을 결정하고 조정하는 역할을 수행합니다. Fiber는 비동기적으로 동직하기 때문에 Scheduler를 통해 언제 실행될지 관리가 필요합니다. 이를 통해서 자원을 좀 더 효율적으로 활용할 수 있게 됩니다.

# Crystal의 runtime에는 Fiber를 위한 Scheduler가 내장되어 있습니다. 

def resume_event : Crystal::EventLoop::Event
  @resume_event ||= Crystal::Scheduler.event_loop.create_resume_event(self)
end

def timeout_event : Crystal::EventLoop::Event
  @timeout_event ||= Crystal::Scheduler.event_loop.create_timeout_event(self)
end

Fiber in languages

Fiber를 기본적으로 제공하지 않는 언어도 많습니다. 언어 자체에서 지원하는 것은 대표적으로 Ruby/Crystal의 Fiber, Go는 Goroutine 정도가 있습니다. 다만 Coroutine 패턴이나 Fiber는 라이브러리 등으로 제공될 수 있으니 참고 부탁드립니다.

Utilization of Fibers

Fiber는 보통 다중 작업 처리에서 사용됩니다. Worker 패턴으로 구성하여 여러가지 작업을 빠르게 처리하거나, 비동기 작업에서도 용이하게 사용할 수 있습니다.

In crystal

Spawn

Crystal에선 spawn이란 예약어를 통해 함수를 Fiber로 실행할 수 있습니다.

def func_a
  puts "this is func A"
end

spawn func_a
# #

Yield

Yield를 통해 대기하고 있는 다른 Fiber에게 스왑할 수 있도록 Scheduler를 양보할 수 있습니다.

counter = 0
spawn name: "status" do
  loop do
    puts "Status: #{counter}"
    sleep(2.seconds)
  end
end

while counter < Int32::MAX
  counter += 1
  if counter % 1_000_000 == 0
    # Without this, there would never be an opportunity to resume the status fiber
    Fiber.yield
  end
end

효율적인 동작을 위해선 Fiber의 작업이 끝난 시점 등에서 명시적으로 yield를 수행하여 다른 Fiber의 실행 시간을 보장해주는 것이 좋습니다.

My case

아래는 Noir의 detector 코드입니다. Dir.glob로 읽은 디렉토리 및 리스트를 루프돌면서 spawn을 통해 Fiber를 생성하여 병렬로 처리합니다.

# https://github.com/noir-cr/noir/blob/main/src/detector/detector.cr

begin
    Dir.glob("#{base_path}/**/*") do |file|
      spawn do
        begin
          next if File.directory?(file)
          content = File.read(file, encoding: "utf-8", invalid: :skip)

          detector_list.each do |detector|
            if detector.detect(file, content)
              techs << detector.name
            end
          end
        rescue e : File::NotFoundError
          logger.debug "File not found: #{file}"
        end
      end
      Fiber.yield
    end
  rescue e
    logger.debug e
  end

병렬 처리라 파일이 많더라도 굉장히 빠르게 처리합니다. 다만 위 구조는 파일 수 만큼 Fiber를 생성하기 때문에 그다지 좋은 처리라곤 생각하지 않습니다. 오히려 Worker pool 패턴이 더 안정적일 것 같습니다.

// 참고
// Golang의 worker poll 패턴

concurrency := 10
var wg sync.WaitGroup
for i := 0; i < concurrency; i++ {
    wg.Add(1)
    go func() {
        for word := range wordlists {
            fmt.Println(word)
        }
        wg.Done()
    }()
}

for j := 0; j < 100; j++ {
    wordlists <- strconv.Itoa(j)
}
close(wordlists)
wg.Wait()

References



Source link