离线下载
PDF版 ePub版

郑思愿 · 更新于 2017-08-23 10:00:06

订阅发布机制

两种订阅

Redis 提供两个订阅模式:频道(channel)订阅和 glob-style 模式(pattern)频道订阅。频道订阅容易理解,即CA(client A)向服务器订阅了频道 news,当 CB 向 news 发布消息的时候,CA 便能收到。

glob-style 模式(pattern)频道订阅,需要先解释什么是 glob-style?举一个简单的例子,rm *.jpg linux 下这条命令删除当前目录下所有 jpg 图片,所用到的是 glob-style 模式匹配,你可以将他理解为某种 style 的正则表达式;)

举例,CA(client A)向服务器订阅了频道*.news

  • 当 CB 向 China.news 发布消息的时候,CA 能收到,
  • 当 CB 向 America.news 发布消息的时候,CA 能收到,
  • 当 CB 向 AV.news 发布消息的时候,CA 便能收到。

订阅相关数据结构

struct redisServer 和 struct redisClient 都维护了频道和模式频道,前者维护了所有频道和订阅频道的客户端,后者维护了客户端自己订阅的频道。

struct redisServer {
    ......
    /* Pubsub */
    dict *pubsub_channels; /* Map channels to list of subscribed clients */
    list *pubsub_patterns; /* A list of pubsub_patterns */
    ......
}
typedef struct redisClient {
    ......
    // 用户感兴趣的频道
    dict *pubsub_channels; /* channels a client is interested in (SUBSCRIBE) */
    // 用户感兴趣的模式
    list *pubsub_patterns; /* patterns a client is interested in (SUBSCRIBE) */
    ......
} redisClient;
    // 模式频道数据结构,list *pubsub_patterns 里的每个节点数据都是struct
    // pubsubPattern。
typedef struct pubsubPattern {
    redisClient *client;
    robj *pattern;
} pubsubPattern;

频道订阅是一个 dict,每个 channel 被哈希进相应的桶,每个 channel 对应一个 clients,clients 都订阅了此 channel。当有消息发布的时候,检索 channel,遍历 clients,发布消息。

模式频道订阅是一个 list。当有消息发布的时候,channel 与 glob-style pattern 匹配,发布消息。

订阅过程

两种订阅模式是维护上述两种数据结构的过程,

// 订阅频道
/* Subscribe a client to a channel. Returns 1 if the operation succeeded, or
* 0 if the client was already subscribed to that channel. */
int pubsubSubscribeChannel(redisClient *c, robj *channel) {
    struct dictEntry *de;
    list *clients = NULL;
    int retval = 0;
    // redisClient.pubsub_channels 中保存客户端订阅的所有频道,可以查看客户端
    // 订阅了多少频道以及客户端是否订阅某个频道
    // server.pubsub_channels 中保存所有的频道和每个频道的订阅客户端,可以将
    // 消息发布到订阅客户端
    // 将频道加入redisClient.pubsub_channels
    /* Add the channel to the client -> channels hash table */
    if (dictAdd(c->pubsub_channels,channel,NULL) == DICT_OK) {
        retval = 1;
        incrRefCount(channel);
        // 在服务器负责维护的channel->clients 哈希表中寻找指定的频道
        /* Add the client to the channel -> list of clients hash table */
        de = dictFind(server.pubsub_channels,channel);
        // 未找到客户端指定的频道,需要创建
    if (de == NULL) {
        clients = listCreate();
        // 将频道加入server.pubsub_channels
        dictAdd(server.pubsub_channels,channel,clients);
        incrRefCount(channel);
        // 找到客户端指定的频道,直接获取这个频道
    } else {
        clients = dictGetVal(de);
    }
        // 将客户端添加到链表的尾部
        listAddNodeTail(clients,c);
    }
    // 通知客户端
    /* Notify the client */
    addReply(c,shared.mbulkhdr[3]);
    addReply(c,shared.subscribebulk);
    addReplyBulk(c,channel);
    addReplyLongLong(c,dictSize(c->pubsub_channels)+listLength(
    c->pubsub_patterns));
    return retval;
}
// 订阅模式频道
/* Subscribe a client to a pattern. Returns 1 if the operation succeeded,
or 0 if the client was already subscribed to that pattern. */
int pubsubSubscribePattern(redisClient *c, robj *pattern) {
    int retval = 0;
    // redisClient.pubsub_patterns 中保存客户端订阅的所有模式频道,可以查看
    // 客户端订阅了多少频道以及客户端是否订阅某个频道
    // server.pubsub_patterns 中保存所有的模式频道和每个模式频道的订阅客户端
    // ,可以将消息发布到订阅客户端
    // 未订阅模式频道,插入
    if (listSearchKey(c->pubsub_patterns,pattern) == NULL) {
        retval = 1;
        pubsubPattern *pat;
        // 将模式频道加入redisClient.pubsub_patterns
        listAddNodeTail(c->pubsub_patterns,pattern);
        incrRefCount(pattern);
        // 将模式频道加入server.pubsub_patterns
        pat = zmalloc(sizeof(*pat));
        pat->pattern = getDecodedObject(pattern);
        pat->client = c;
        listAddNodeTail(server.pubsub_patterns,pat);
    }
    // 通知客户端
    /* Notify the client */
    addReply(c,shared.mbulkhdr[3]);
    addReply(c,shared.psubscribebulk);
    addReplyBulk(c,pattern);
    addReplyLongLong(c,dictSize(c->pubsub_channels)+listLength(
    c->pubsub_patterns));
    return retval;
}

取消订阅的过程则相反。

消息发布

发布消息的过程则遍历上述两个数据结构(dict 和list),并将消息发布到匹配频道的所有客户端。

// 发布消息
/* Publish a message */
int pubsubPublishMessage(robj *channel, robj *message) {
    int receivers = 0;
    struct dictEntry *de;
    listNode *ln;
    listIter li;
    // 发布消息有两个步骤,
    // 指定频道的所有订阅者发布消息
    // 指定模式频道的所有订阅者发布消息
    // 
    // 寻找频道
    /* Send to clients listening for that channel */
    de = dictFind(server.pubsub_channels,channel);
    // 向频道所有订阅者发布信息
    if (de) {
        list *list = dictGetVal(de);
        listNode *ln;
        listIter li;
        listRewind(list,&li);
    while ((ln = listNext(&li)) != NULL) {
        redisClient *c = ln->value;
        addReply(c,shared.mbulkhdr[3]);
        addReply(c,shared.messagebulk);
        addReplyBulk(c,channel);
        addReplyBulk(c,message);
        receivers++;
    }
}
// 
// 进行glob-style 模式匹配
/* Send to clients listening to matching channels */
if (listLength(server.pubsub_patterns)) {
    listRewind(server.pubsub_patterns,&li);
    channel = getDecodedObject(channel);
    while ((ln = listNext(&li)) != NULL) {
        pubsubPattern *pat = ln->value;
        // 匹配成功,向订阅者发布消息
        if (stringmatchlen((char*)pat->pattern->ptr,
                sdslen(pat->pattern->ptr),
                (char*)channel->ptr,
                sdslen(channel->ptr),0)) {
        addReply(pat->client,shared.mbulkhdr[4]);
        addReply(pat->client,shared.pmessagebulk);
        addReplyBulk(pat->client,pat->pattern);
        addReplyBulk(pat->client,channel);
        addReplyBulk(pat->client,message);
        receivers++;
        }
      }
   decrRefCount(channel);
  }
return receivers;
}

注意, 只要客户端订阅了频道, 除了SUBCRIBE,UNSUBCRIBE,PSUBCRIBE,PSUBCRIBE,就不能执行其他命令。

int processCommand(redisClient *c) {
    ......
    // 在订阅发布模式下,只允许处理SUBSCRIBE 或者UNSUBSCRIBE 命令
    // 从下面的检测条件可以看出:只要存在redisClient.pubsub_channels 或者
    // redisClient.pubsub_patterns,就代表处于订阅发布模式下
    /* Only allow SUBSCRIBE and UNSUBSCRIBE in the context of Pub/Sub */
    if ((dictSize(c->pubsub_channels) > 0 || listLength(c->pubsub_patterns) > 0)
        &&
        c->cmd->proc != subscribeCommand &&
        c->cmd->proc != unsubscribeCommand &&
        c->cmd->proc != psubscribeCommand &&
        c->cmd->proc != punsubscribeCommand) {
        addReplyError(c,"only (P)SUBSCRIBE / (P)UNSUBSCRIBE / QUIT allowed
                "in this context");
        return REDIS_OK;
        }
    ......
}
上一篇: AOF 持久化策略 下一篇: 主从复制