F#のMailboxProcessorで選択的受信

F#にはアクターモデルを実現できるMailboxProcessorクラスが用意されています。

基本的にはキューに積まれたメッセージを順に処理していくものですが、たまに特定のメッセージを先に処理したいときがあります。

今回はMailboxProcessorで選択的に受信するよう実装してみたいと思います。

Erlangでの選択的受信

after 0 を利用して下記の様に記述されます。

important() ->
    receive
        {Priority, Message} when Priority > 10 ->
            [Message | important()]
    after 0 ->
        normal()
    end.
 
normal() ->
    receive
        {_, Message} ->
            [Message | normal()]
    after 0 ->
        []
    end.


Erlangをまねて実装

TryScanでTimeoutを0に設定することにより、たまったキューの中身を確認しています。

Importantがキューに積まれていた場合は優先的に処理されます。

type Message =
    | Important
    | Normal

let processor =
    MailboxProcessor<Message>.Start
        (fun inbox ->
            let rec loop() = 
                async {
                    let! importantProcessed = 
                        inbox.TryScan
                            ( function
                              | Important -> Some (async { printf "Important\n" })
                              | _         -> None
                            , 0
                            )
                    if  importantProcessed.IsNone then
                        let! msg = inbox.Receive()
                        match msg with
                        | Important -> printf "Important\n"
                        | Normal    -> printf "Normal\n"

                    do! Async.Sleep 1000

                    return! loop()
                }
            loop()
        )

下記の様にPostすると、
Normal
Important
Important
Normal
Normal
の順に処理されます。

processor.Post(Normal)
System.Threading.Thread.Sleep(100)

processor.Post(Normal)
processor.Post(Important)
processor.Post(Important)
processor.Post(Normal)


終了をWaitできるよう実装

おまけで、終了メッセージを送信し、処理が終わるまでWaitできるよう実装してみました。

AsyncReplyChannelとPostAndReplyを利用してWaitを実現しています。

type Message =
    | Post
    | Exit of AsyncReplyChannel<unit>

let processor =
    MailboxProcessor<Message>.Start
        (fun inbox ->
            let rec loop() = 
                async {
                    let! exited = 
                        inbox.TryScan
                            ( function
                              | Exit replyChannel -> 
                                async { 
                                    printf "exit\n"
                                    replyChannel.Reply() 
                                }                                
                                |> Some
                              | _  -> None
                            , 0
                            )
                    if exited.IsNone then
                        let! msg = inbox.Receive()
                        match msg with
                        | Exit replyChannel -> 
                            printf "exit\n"
                            replyChannel.Reply()
                        | Post -> 
                            printf "post\n"
                            do! Async.Sleep 1000
                            return! loop()
                }
            loop()
        )

下記のようにPostすると、
post
exit
exited
の順にコンソールに出力されます。

processor.Post(Post)
processor.Post(Post)
processor.Post(Post)
System.Threading.Thread.Sleep(100)
processor.PostAndReply(fun x -> Exit x)
printf "exited\n"


実際の利用

TryScanのたびにキューを全部確認しており、またF#ではTaskなどが利用できるため、実際には独自でスレッドとキューを用意したほうが速くてわかりやすいと思います。