RabbitMQ 标准队列

 后端   大苹果   2024-08-11 19:27   869

RabbitMQ 标准队列(Classic Queue)是消息传递系统中最基本的队列类型,消息按先进先出(FIFO)的顺序存储和传递。以下是标准队列的常见应用场景和详细说明:

1. 任务队列(Task Queue)

  • 场景描述:在后台处理任务时,如图像处理、视频转码、数据分析等,需要将任务分配给多个工作节点进行处理,避免阻塞主应用。
  • 应用说明:在任务队列中,每个任务都会被推送到 RabbitMQ 的标准队列中,多个消费者从队列中获取任务并处理,任务处理完毕后删除队列中的消息。
  • 举例
    • 应用:电商网站后台处理订单数据,如生成发货单、发送确认邮件等。这些任务可以异步处理,以提高系统的响应速度。
    • 代码示例(Node.js)
      // 生产者:将任务添加到队列
      const amqp = require('amqplib/callback_api');
      
      amqp.connect('amqp://localhost', function(error0, connection) {
          if (error0) {
              throw error0;
          }
          connection.createChannel(function(error1, channel) {
              if (error1) {
                  throw error1;
              }
              const queue = 'task_queue';
              const msg = 'New Order - Order ID: 12345';
      
              channel.assertQueue(queue, { durable: true });
              channel.sendToQueue(queue, Buffer.from(msg), { persistent: true });
              console.log(" [x] Sent '%s'", msg);
          });
      });
      
      // 消费者:从队列中取出任务并处理
      amqp.connect('amqp://localhost', function(error0, connection) {
          if (error0) {
              throw error0;
          }
          connection.createChannel(function(error1, channel) {
              if (error1) {
                  throw error1;
              }
              const queue = 'task_queue';
      
              channel.assertQueue(queue, { durable: true });
              channel.prefetch(1);
              console.log(" [*] Waiting for messages in %s. To exit press CTRL+C", queue);
              channel.consume(queue, function(msg) {
                  console.log(" [x] Received '%s'", msg.content.toString());
                  // 模拟任务处理时间
                  setTimeout(() => {
                      console.log(" [x] Done");
                      channel.ack(msg);
                  }, 1000);
              });
          });
      });
      

2. 异步日志处理

  • 场景描述:在高并发的系统中,记录日志的过程可能会影响系统的性能。通过将日志写入消息队列,再由专门的日志处理服务异步地写入到文件或数据库中,可以减轻主系统的压力。
  • 应用说明:当应用程序需要记录日志时,它将日志消息发送到 RabbitMQ 标准队列中,然后日志服务从队列中取出日志并存储在持久化存储中。
  • 举例
    • 应用:Web 服务器记录用户行为日志,如访问页面、点击按钮等。日志处理程序从队列中读取这些日志并存储到数据库或文件系统中。
    • 代码示例(Node.js)
      // 生产者:将日志消息添加到队列
      const logMessage = 'User clicked on the Home button';
      channel.assertQueue('log_queue', { durable: true });
      channel.sendToQueue('log_queue', Buffer.from(logMessage), { persistent: true });
      console.log(" [x] Sent log: '%s'", logMessage);
      
      // 消费者:从队列中读取日志并存储
      channel.consume('log_queue', function(msg) {
          console.log(" [x] Log received: '%s'", msg.content.toString());
          // 模拟日志写入操作
          fs.appendFile('app.log', msg.content.toString() + '\n', function (err) {
              if (err) throw err;
              console.log('Log saved!');
              channel.ack(msg);
          });
      });
      

3. 异步通知系统

  • 场景描述:在电子商务、社交网络等应用中,经常需要向用户发送通知,如订单更新、消息提醒等。为了避免阻塞主应用,可以使用标准队列进行异步通知处理。
  • 应用说明:将通知任务放入 RabbitMQ 的标准队列中,由专门的服务来处理并发送通知,确保主应用的响应时间不受影响。
  • 举例
    • 应用:电商平台在用户下单成功后发送订单确认短信或邮件。通知服务从队列中取出任务,调用第三方 API 发送短信或邮件。
    • 代码示例(Node.js)
      const notificationMsg = 'Order confirmed: Order ID 12345';
      channel.assertQueue('notification_queue', { durable: true });
      channel.sendToQueue('notification_queue', Buffer.from(notificationMsg), { persistent: true });
      console.log(" [x] Sent notification: '%s'", notificationMsg);
      
      channel.consume('notification_queue', function(msg) {
          console.log(" [x] Notification to send: '%s'", msg.content.toString());
          // 调用第三方API发送通知
          sendNotificationAPI(msg.content.toString(), () => {
              console.log('Notification sent!');
              channel.ack(msg);
          });
      });
      

4. 工作分配(Work Distribution)

  • 场景描述:在分布式系统中,将工作任务分配给多个消费者处理,以充分利用系统资源,实现负载均衡。
  • 应用说明:标准队列允许将任务均匀分配给多个消费者,避免某个消费者过载。
  • 举例
    • 应用:图片处理应用程序,上传的图片需要经过多个步骤处理(如缩放、裁剪、添加水印),每个步骤可以由不同的消费者处理。
    • 代码示例
      const imageTask = 'Resize Image ID: 54321';
      channel.assertQueue('image_task_queue', { durable: true });
      channel.sendToQueue('image_task_queue', Buffer.from(imageTask), { persistent: true });
      console.log(" [x] Sent image task: '%s'", imageTask);
      
      channel.consume('image_task_queue', function(msg) {
          console.log(" [x] Processing image task: '%s'", msg.content.toString());
          // 模拟图片处理
          processImageTask(msg.content.toString(), () => {
              console.log('Image processed!');
              channel.ack(msg);
          });
      });
      

总结

RabbitMQ 标准队列是最常用的队列类型,广泛应用于任务队列、异步日志处理、异步通知系统和工作分配等场景。其简单易用,适用于需要可靠传递和顺序处理的消息系统。在实际应用中,结合 RabbitMQ 的持久化、确认机制等功能,可以实现更可靠的消息传递和处理。