Azureの小ネタ (改)

~Azureネタを中心に、色々とその他の技術的なことなどを~

Queue の多重取得シナリオ

Windows Azure Applianceの発表もあってか、Azure界隈が賑わってますね。
そろそろ、Queueネタもつきてきそうな今日この頃、以下からQueueネタ
を拝借。というか、Queueの仕様を見ていた当初から持っていたことなんですが。

http://blog.smarx.com/posts/deleting-windows-azure-queue-messages-handling-exceptions

メッセージをデキューするときに、VisibilityTimeOutを設定できます。
デフォルトは30秒だったと思いますが。通常は、この時間内にメッセージを
処理し、削除しないといけません。

なんからの原因で、処理が遅延し削除する前にタイムアウトしてしまうと、
再びQueue上にメッセージが現れ、ほかのロールにメッセージを取られて
しまうことになります。

  1. インスタンス #1が、メッセージを取得して、処理開始。
  2. Visibility Timeout の期限がきれ、メッセージは再びQueueに復活
  3. インスタンス #2が、メッセージを取得して、処理を開始
  4. インスタンス #1が、処理を終了しメッセージを削除しようとするが例外

やっつけで再現プログラムを書いてみました。LinqPadで。
以下の画面通り StorageClientException がC1側で発生します。
C2にメッセージを取られちゃったからです。

もう少し、例外をLinqPadで参照してみます。
StorageClientException.ExtendedErrorInformationプロパティがあって、
ここのErrorCode == "MessageNotFound" で切り分ければいいようです。

ちょっと例外クラスが手を抜きすぎでは? って感じもするのですが。
ちゃんとEnumで定義して欲しいですね。

以下、全ソース。 開発ストレージを使っているので、
LinqPadにコピペして実行できるはずです。

void Main()
{
    var account = CloudStorageAccount.DevelopmentStorageAccount;
    var queueClient  = account.CreateCloudQueueClient();	
    var queue = queueClient.GetQueueReference("work2");
    queue.CreateIfNotExist();
    queue.AddMessage(new CloudQueueMessage("Hello " + DateTime.Now));
        
    var consumer1  = new Thread(new ThreadStart(delegate() {
        Console.WriteLine ("Consumer1 start.");
        try { 
            var msg = queue.GetMessage(TimeSpan.FromSeconds(5));
            Console.WriteLine ("C1 got message." + msg.AsString);
            Thread.Sleep(TimeSpan.FromSeconds(20));
            Console.WriteLine ("C1 delete message.");
            queue.DeleteMessage(msg.Id, msg.PopReceipt);
            Console.WriteLine ("C1 deleted message.");
        } 
        catch (StorageClientException e)
        {
            Console.WriteLine ("Exception in C1 = " + e.ExtendedErrorInformation.ErrorCode);
            e.Dump();
        }

    }));
   
   
    var consumer2  = new Thread(new ThreadStart(delegate() {
        Console.WriteLine ("Consumer2 start.");
        try {
        Thread.Sleep(TimeSpan.FromSeconds(10));
        var msg = queue.GetMessage(TimeSpan.FromSeconds(5));
        Console.WriteLine ("C2 got message." + msg.AsString);
        Thread.Sleep(TimeSpan.FromSeconds(20));
        Console.WriteLine ("C2 delete message");
        queue.DeleteMessage(msg.Id, msg.PopReceipt);
        Console.WriteLine ("C2 deleted message");
        } catch (Exception e)
        {
            Console.WriteLine ("exception occured in t2");
            Console.WriteLine (e);
        }
    }));
    consumer1.Start();
    consumer2.Start();
    consumer1.Join();
    consumer2.Join();
    Console.WriteLine ("Done");
}