Via 由 [ ] 提供

Redis 数据迁移

Redis 提供在线数据迁移的能力,把自身的数据往其他 Redis 服务器上迁移。如果需要将部分数据迁移到另一台Redis 服务器上,这个命令会非常有用。

redis migraiton 的实现比较简单。首先将需要迁移的命令打包好,发送到指定的 Redis 服务器上,回复ok 后则删除本地的键值对。

这里面用了前面讲到的 rio:读写对象既可以是文件也可以是内存,只需要安装相应的读写函数即可。这里不难理解。

网络传输部分,用到了 Redis 内部的 syncio 模块,syncio 即同步 io,每读/写入一部分数据会用 IO 多路复用的技术等待下一次可读写/的机会。在 migrateCommand() 的实现中,先用非阻塞的方式建立一个连接,接着将打包好的迁移数据发送到目标 Redis 服务器上,并等待目标 Redis 服务器的相应。

下面通过 migrateCommand() 来了解数据迁移是如何实现的:

/* MIGRATE host port key dbid timeout */
void migrateCommand(redisClient *c) {
    int fd;
    long timeout;
    long dbid;
    long long ttl = 0, expireat;
    robj *o;
    rio cmd, payload;
    // 准备需要迁移的数据,这个数据可以由客户端来指定
    ......
    // 建立一个非阻塞连接
    /* Connect */
    fd = anetTcpNonBlockConnect(server.neterr,c->argv[1]->ptr,
    atoi(c->argv[2]->ptr));
    if (fd == -1) {
        addReplyErrorFormat(c,"Can't connect to target node: %s",
        server.neterr);
        return;
    }
    // 等待建立成功
    if ((aeWait(fd,AE_WRITABLE,timeout*1000) & AE_WRITABLE) == 0) {
        addReplySds(c,sdsnew("-IOERR error or timeout connecting to the client\r\n"));
        return;
    }
    // rio 的读写可以对应的是文件读写,也可以是内存的读写,只需要安装相应的读写
    // 函数
    // 初始化一块空的sds buffer
    /* Create RESTORE payload and generate the protocol to call the command. */
    rioInitWithBuffer(&cmd,sdsempty());
    // 把需要迁移的数据打包追加到
    ......
    // 可以指定过期时间
    expireat = getExpire(c->db,c->argv[3]);
    if (expireat != -1) {
        ttl = expireat-mstime();
    if (ttl < 1) ttl = 1;
    }
    // 生成restore 命令
    ......
    // 生成包含 Redis 版本和校验字段的 payload
    /* Finally the last argument that is the serailized object payload
    * in the DUMP format. */
    createDumpPayload(&payload,o);
    // 写入到迁移内容中
    redisAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,payload.io.buffer.ptr,
    sdslen(payload.io.buffer.ptr)));
    sdsfree(payload.io.buffer.ptr);
    // 将迁移数据送往目标 Redis 服务器
    /* Tranfer the query to the other node in 64K chunks. */
    {
    sds buf = cmd.io.buffer.ptr;
    size_t pos = 0, towrite;
    int nwritten = 0;
    // 最多只传送64K
    while ((towrite = sdslen(buf)-pos) > 0) {
        towrite = (towrite > (64*1024) ? (64*1024) : towrite);
        // 同步写
        nwritten = syncWrite(fd,buf+pos,towrite,timeout);
    if (nwritten != (signed)towrite) goto socket_wr_err;
        pos += nwritten;
        }
    }
    // 读取目标 Redis 服务器的回复
    /* Read back the reply. */
    {
    char buf1[1024];
    char buf2[1024];
    /* Read the two replies */
    if (syncReadLine(fd, buf1, sizeof(buf1), timeout) <= 0)
        goto socket_rd_err;
    if (syncReadLine(fd, buf2, sizeof(buf2), timeout) <= 0)
        goto socket_rd_err;
    if (buf1[0] == '-' || buf2[0] == '-') {
        addReplyErrorFormat(c,"Target instance replied with error: %s",
        (buf1[0] == '-') ? buf1+1 : buf2+1);
    } else {
        // 回复内容正常,说明数据已经迁移成功,删除原始 Redis 服务器的key-value
        robj *aux;
        dbDelete(c->db,c->argv[3]);
        signalModifiedKey(c->db,c->argv[3]);
        addReply(c,shared.ok);
        server.dirty++;
        // 将变更更新到从机,并写入AOF 文件
        /* Translate MIGRATE as DEL for replication/AOF. */
        aux = createStringObject("DEL",3);
        rewriteClientCommandVector(c,2,aux,c->argv[3]);
        decrRefCount(aux);
        }
    }
    // 一些清理工作和错误处理
    ......
}

migrateCommand() 只是数据迁移的一部分代码,目标机器还要负责将数据存储到目标机器上,有兴趣可以参考 restoreCommand() 的实现,基本上和 migrateCommand() 是逆过来的。

上一篇: Redis 监视器 下一篇: Redis 集群(上)