読者です 読者をやめる 読者になる 読者になる

Azureの小ネタ (改)

~Azureネタを中心に、色々とその他の技術的なことなどを~

Azure SDK for Node.js で Queueを操作する

適材適所なのでNode.jsで、同じ事をする必要はありませんが実験として。

よくあるキューのパターン

以下は、ありがちなキューをポーリングするサンプル。メッセージがあれば連続して取り出すし、無ければ10秒間スリープする。

  while (true)
            {
                Console.WriteLine("polling...");
                var msg = q.GetMessage(TimeSpan.FromSeconds(30));
                if (msg != null)
                {
                    Console.WriteLine("get message");
                    Console.WriteLine(msg.AsString);
                    Console.WriteLine("delete message");
                    q.DeleteMessage(msg);
                }
                else
                {
                    Console.WriteLine("message is null");
                    System.Threading.Thread.Sleep(10000);
                }
            }

Node.js で実装する

whileで書くってのは違うだろうというか、色々ブロックしてしまうのでNodeの作法ではないと思われ。ここではsetIntervalを使って10秒毎にタイマを起動してみる。また、.NETと違い、getMessageメソッドは存在しないが、getMessagesの既定動作だと、1つしかメッセージを取得しない。取得されたメッセージは配列に格納されている。

以下、実装したサンプル。ネストが深くなってしまうのはしょうがないのですが、連続して取得する部分は再帰でごまかしす。

var azure = require('azure');

var queueService = azure.createQueueService();
var queueName = 'samplequeue';

var getMessages = function() {

  // この引数だと、1個しかメッセージを取得しない。Timeout は30秒。

  queueService.getMessages(queueName, function(error, msgs) {
    console.log('polling...');
    if(!error && msgs.length > 0) {
      console.log('get messages (%d)', msgs.length);
      console.log(msgs[0]);
      for(var i = 0;i < msgs.length; i++) {
        queueService.deleteMessage(queueName,
                                   msgs[0].messageid,
                                   mgs[0].popreceipt,
                                   function(error){
          if(!error){
            // Message deleted
            console.log('delete message');
          }
        });
      }
      // メッセージがあったら取得
      getMessages();
    }
  });
}
                          
// キューの作成                           
queueService.createQueueIfNotExists(queueName, function(error) {
  if(!error) {
    console.log('queue was created.');
  
    // 10秒間隔でポーリング
    setInterval(getMessages, 10000);
  }
});                                    

以下結果。

polling...
get messages (1)
{ messageid: 'af74d330-5f65-43e1-804a-e465be3cd3d9',
  insertiontime: 'Wed, 09 May 2012 14:25:03 GMT',
  expirationtime: 'Wed, 16 May 2012 14:25:03 GMT',
  dequeuecount: '1',
  popreceipt: 'gC8x7wb8zggBAAAA',
  timenextvisible: 'Wed, 09 May 2012 14:27:01 GMT',
  messagetext: 'Hello World 6' }
delete message
polling...
get messages (1)
{ messageid: 'fabbf505-10ba-48cb-9c6a-cc5c46b7d3d7',
  insertiontime: 'Wed, 09 May 2012 14:25:03 GMT',
  expirationtime: 'Wed, 16 May 2012 14:25:03 GMT',
  dequeuecount: '1',
  popreceipt: 'UAwp9Qb8zggBAAAA',
  timenextvisible: 'Wed, 09 May 2012 14:27:11 GMT',
  messagetext: 'Hello World 7' }
delete message

このような処理で他の処理をブロックしないのか、再帰しすぎてスタックオーバーフローとかしないのかが良く分からない。Storageエミュレータ上で1000個程度メッセージを入れたところで問題はなかったが。

ふとコードを読み返してみて、再帰部分をsetTimeoutしたほうが、非同期で他の処理をブロックしないのかなと思ったり。

      // メッセージがあったら取得
      setTimeout(getMessages,0);

もっとスマートな書き方があるような気もしますが...