F#のMailboxProcessorで選択的受信
F#にはアクターモデルを実現できるMailboxProcessorクラスが用意されています。
基本的にはキューに積まれたメッセージを順に処理していくものですが、たまに特定のメッセージを先に処理したいときがあります。
今回はMailboxProcessorで選択的に受信するよう実装してみたいと思います。
Erlangでの選択的受信
after 0 を利用して下記の様に記述されます。
important() -> receive {Priority, Message} when Priority > 10 -> [Message | important()] after 0 -> normal() end. normal() -> receive {_, Message} -> [Message | normal()] after 0 -> [] end.
Erlangをまねて実装
TryScanでTimeoutを0に設定することにより、たまったキューの中身を確認しています。
Importantがキューに積まれていた場合は優先的に処理されます。
type Message = | Important | Normal let processor = MailboxProcessor<Message>.Start (fun inbox -> let rec loop() = async { let! importantProcessed = inbox.TryScan ( function | Important -> Some (async { printf "Important\n" }) | _ -> None , 0 ) if importantProcessed.IsNone then let! msg = inbox.Receive() match msg with | Important -> printf "Important\n" | Normal -> printf "Normal\n" do! Async.Sleep 1000 return! loop() } loop() )
下記の様にPostすると、
Normal
Important
Important
Normal
Normal
の順に処理されます。
processor.Post(Normal) System.Threading.Thread.Sleep(100) processor.Post(Normal) processor.Post(Important) processor.Post(Important) processor.Post(Normal)
終了をWaitできるよう実装
おまけで、終了メッセージを送信し、処理が終わるまでWaitできるよう実装してみました。
AsyncReplyChannelとPostAndReplyを利用してWaitを実現しています。
type Message = | Post | Exit of AsyncReplyChannel<unit> let processor = MailboxProcessor<Message>.Start (fun inbox -> let rec loop() = async { let! exited = inbox.TryScan ( function | Exit replyChannel -> async { printf "exit\n" replyChannel.Reply() } |> Some | _ -> None , 0 ) if exited.IsNone then let! msg = inbox.Receive() match msg with | Exit replyChannel -> printf "exit\n" replyChannel.Reply() | Post -> printf "post\n" do! Async.Sleep 1000 return! loop() } loop() )
下記のようにPostすると、
post
exit
exited
の順にコンソールに出力されます。
processor.Post(Post) processor.Post(Post) processor.Post(Post) System.Threading.Thread.Sleep(100) processor.PostAndReply(fun x -> Exit x) printf "exited\n"
実際の利用
TryScanのたびにキューを全部確認しており、またF#ではTaskなどが利用できるため、実際には独自でスレッドとキューを用意したほうが速くてわかりやすいと思います。