Via 由 [ ] 提供

优先级队列模式

优先发送到服务,以便具有较高优先级的请求被接收和高于一个较低优先级的更快速地处理请求。这种模式是在应用程序是有用的,它提供不同的服务级别保证或者针对独立客户。

背景和问题

应用程序可以委托给其他服务的具体任务;例如,为了执行后台处理或与其他应用程序或服务的整合。在云中,消息队列通常用于将任务委派给后台处理。在许多情况下,请求由服务接收的顺序是不重要的。然而,在某些情况下,可能需要优先考虑的具体要求。这些要求必须早于较低优先级的其他可能先前已发送由应用程序进行处理。

解决方案

队列通常是先入先出(FIFO)结构,而消费者通常会收到他们发布到队列中的顺序相同的消息。然而,一些消息队列支持优先级的消息传递;应用程序发布一条消息可以分配优先级的消息,并在队列中的消息会自动重新排序,使得具有较高优先级的消息将这些优先级较低的前被接收。图1示出了一个队列,它提供优先权的消息。

图1 - 使用支持消息优先级排队机制

注意: 大多数消息队列的实现支持多个消费者(以下的竞争消费者模式)和消费过程的数量可以按比例增加或减小的需求支配。

在不支持基于优先级的消息队列系统中,一种替代的解决方案是为每一个优先级的独立队列。该应用程序负责将邮件投递到适当的队列。每个队列可以有一个单独的消费者池。高优先级队列可以有更快的硬件比低优先级队列中运行的消费者一个更大的泳池。图2示出了这种方法。

图2 - 使用不同的消息队列为每个优先级

这种策略的变化是有消费者认为检查对高优先级队列中的消息,然后再才开始从低优先级队列中读取消息,如果没有更高优先级的消息都在等待的一个池。还有,使用消费过程的一个池的溶液之间的一些语义差异(或者使用支持不同的优先级或多个队列,每个处理一个单一的优先级消息的消息的单个队列),以及使用多个队列用溶液为每个队列一个单独的游泳池。

在单池的做法,高优先级的消息总是会收到以前低优先级的消息处理。在理论中,具有非常低的优先级的消息可以被不断地取代,并且可能永远不会被处理。在多池的方法,较低优先级的报文将总是被处理,只是不一样迅速的那些更高的优先级的(取决于池和它们具有可用资源的相对大小)。

使用优先级排队机制可提供以下优点:

  • 它允许应用程序以满足必要的可用性或性能优先的业务需求,如提供不同级别的服务,以客户的特定群体。
  • 它可以帮助最大限度地降低运营成本。在单队列的方式,你可以缩减用户的数量,如果有必要的。高优先级消息仍将被首先处理(虽然可能更慢),和低优先级的消息可能会延迟更长。如果您已实现与消费者的每一个单独的队列池多个消息队列的方式,可以减少消费者的池低优先级队列,或者甚至停止所有监听的讯息的消费者暂停处理一些非常低优先级队列这些队列。
  • 在多个消息队列的方法可以帮助划分的基础上处理要求的消息,以最大限度地提高应用程序的性能和可扩展性。例如,重要的任务,可以优先被立即运行,而不太重要的后台任务可以被安排在不太繁忙的时段运行的接收器来处理接收处理。

问题和注意事项

在决定如何实现这个模式时,请考虑以下几点:

  • 定义优先级的解决方案的情况下。例如,“高优先级”可能意味着,信息应该在十秒内进行处理。标识要求处理高优先级的项目,以及其他什么资源必须分配给符合这些标准。
  • 确定是否所有高优先级的项目必须在任何优先级较低的项目之前进行处理。如果该消息是由消费者的一个池被处理,可能有必要提供一种可抢先和暂停正在处理的低优先级消息,如果更高优先级的消息,有一个任务的机制。
  • 在多个队列中的方法,使用该监听所有的队列,而不是一个专门的客户池的每个队列的消费过程的一个池时,消费者必须应用一种算法,以确保它总是从那些从低之前较高优先级的队列提供服务的消息优先级队列。
  • 监视处理的高和低优先级队列中的速度,以确保在这些队列中的消息的预期的速率进行处理。
  • 如果需要,以保证低优先级的消息将被处理时,可能有必要实现与消费者的多个池的多个消息队列的方法。或者,在一个支持消息优先队列,它可能会动态地增加一个排队的消息的优先级,因为它的年龄。然而,该方法依赖于消息队列提供此功能。
  • 使用单独的队列中每个消息优先级最适合有少数明确定义的优先级系统。
  • 消息优先级可以通过系统逻辑决定的。例如,而不是明确的高和低优先级的消息,他们可以被指定为“自费客户”,或“非自费的客户。”根据您的商业模式,你的系统可能会分配更多的资源,从收费处理消息付费用户比非自费的。
  • 有可能是检查队列的消息相关联的金融和处理成本(一些商业邮件系统的消息被发布或检索每次收取一小笔费用,每次一个队列中查询消息)。检查多个队列时,该成本将有所增加。
  • 它可以是能够动态调整的基础上,该池所服务的队列的长度消费者的一个池的大小。欲了解更多信息,请参阅自动缩放指导。

何时使用这个模式

这种模式非常适合场景:

  • 系统必须处理可能有不同的侧重点多个任务。
  • 不同的用户或租户应配以不同的优先级。

例子

微软 Azure 不提供经过整理的本地支持邮件自动优先级排队机制。然而,它确实提供了 Azure 的服务总线主题和订阅,支持排队机制,提供邮件过滤,具有多种灵活的功能,使其非常适合用在几乎所有的优先级队列的实现在一起。

一个 Azure 的解决方案,可以实现服务总线话题,其中一个应用程序可以发布消息,以同样的方式作为一个队列。消息可以包含在应用程序定义的自定义属性的形式的元数据。服务总线订阅可以与主题相关联,并且这些订阅可以筛选根据它们的属性信息。当一个应用程序将消息发送到一个主题,该消息被定向到从那里它可以被消费者阅读相应的订阅。消费者的过程可以检索使用相同的语义消息队列(订阅是一个逻辑队列)从一个订阅消息。

图 3 示出了使用的 Azure 服务总线主题和订阅的解决方案

图3 - 实现与 Azure 的服务总线主题和订阅优先级队列

在图3中的应用程序创建多个消息和每个消息与价值分配被称为优先级的自定义属性,无论是高还是低。该应用程序的帖子,这些消息的一个话题。这个主题有两个相关的订阅,这两个滤波器的消息通过检查优先级属性。一位接受认购,其中优先级属性设置为高的消息,而其他接受其中优先级属性设置为低的消息。消费者池读取每个订阅的消息。高优先认购有较大的游泳池,而这些消费者可能会更强大(且昂贵)的计算机上运行有提供比消费者在低优先级池的更多资源。

请注意,没有什么特别的高,低优先级消息在这个例子中指定。这些仅仅是指定为每个消息中的属性的标签,并用于引导消息发送到一个特定的订阅。如果附加的优先级是必需的,它是比较容易地创建进一步的订阅和消费者进程池来处理这些优先级。

在可用于此引导代码时 Queue 解决方案包含这种方法的一个实现。该解决方案包含一个名为 PriorityQueue.High 和 PriorityQueue.Low 两个工作角色的项目。这两个辅助角色继承的类被称为 PriorityWorkerRole 它包含用于连接到一个指定的预订中 OnStart 方法的功能。

该 PriorityQueue.High 和 PriorityQueue.Low 辅助角色连接到不同的预订,他们的配置设置来定义。管理员可以配置每个角色的不同数量要运行;通常会有比 PriorityQueue.Low 工作者角色的 PriorityQueue.High 辅助角色更多的实例。

在 PriorityWorkerRole 类的 Run 方法安排虚拟 ProcessMessage 的方法(在 PriorityWorkerRole 类定义)的队列中接收到的每个消息被执行。下面的代码显示了运行和 ProcessMessage 的方法。在类的 QueueManager,在 PriorityQueue.Shared 项目定义,提供了辅助方法使用的 Azure 服务总线队列。

public class PriorityWorkerRole : RoleEntryPoint  
{  
  private QueueManager queueManager;  
  ...  

  public override void Run()  
  {  
    // Start listening for messages on the subscription.  
    var subscriptionName = CloudConfigurationManager.GetSetting("SubscriptionName");  
    this.queueManager.ReceiveMessages(subscriptionName, this.ProcessMessage);  
    ...;  
  }  
  ...  

  protected virtual async Task ProcessMessage(BrokeredMessage message)  
  {  
    // Simulating processing.  
    await Task.Delay(TimeSpan.FromSeconds(2));  
  }  
}  

该 PriorityQueue.High 和 PriorityQueue.Low 辅助角色既覆盖 ProcessMessage 的方法的默认功能。下面的代码显示了 ProcessMessage 的方法为 PriorityQueue.High 辅助角色。

Copy  

protected override async Task ProcessMessage(BrokeredMessage message)  
{  
  // Simulate message processing for High priority messages.  
  await base.ProcessMessage(message);  
  Trace.TraceInformation("High priority message processed by " +  
    RoleEnvironment.CurrentRoleInstance.Id + " MessageId: " + message.MessageId);  
}  

当一个应用程序将消息发布到与所使用的 PriorityQueue.High 和 PriorityQueue.Low 辅助角色的订阅相关联的主题,它指定了优先使用优先级的自定义属性,如在下面的代码示例。此代码(这是在 PriorityQueue.Sender项目 WorkerRole 类实现),使用的 QueueManager 类的 SendBatchAsync 辅助方法发帖分批的话题。

// Send a low priority batch.   
var lowMessages = new List<BrokeredMessage>();  

for (int i = 0; i < 10; i++)  
{  
  var message = new BrokeredMessage() { MessageId = Guid.NewGuid().ToString() };  
  message.Properties["Priority"] = Priority.Low;  
  lowMessages.Add(message);  
}  

this.queueManager.SendBatchAsync(lowMessages).Wait();  
...  

// Send a high priority batch.  
var highMessages = new List<BrokeredMessage>();  

for (int i = 0; i < 10; i++)  
{  
  var message = new BrokeredMessage() { MessageId = Guid.NewGuid().ToString() };  
  message.Properties["Priority"] = Priority.High;  
  highMessages.Add(message);  
}  

this.queueManager.SendBatchAsync(highMessages).Wait();