发布于 2015-09-13 15:50:50 | 182 次阅读 | 评论: 0 | 来源: 网络整理
本文将对 Disque 的核心数据结构进行介绍, 并在最后通过分析 ADDJOB
命令的实现来帮助大家了解 Disque 的运作原理。
因为时间关系, 本章只介绍了 Disque 源码中最重点的部分, 并且只对集群和命令的运作原理进行了最基本的介绍, 但对于有兴趣深入了解 Disque 源码的读者来说, 应该是一个还不错的入门向导。
Disque 重用了大量 Redis 的底层代码, 比如数据结构部分、事件部分、网络通信部分、服务器主循环部分等等。
这 不是 antirez 第一次做这种事 —— 更早出现的 Redis Sentinel 也大量地重用了 Redis 的代码, 并在这个基础上, 提供了一集不同的命令, 用于监视 Redis 服务器。 但从代码的角度来说, Redis Sentinel 实际上就是一个修改版的 Redis 。
可以说, Disque 和 Redis Sentinel 都把 Redis 中的一些子系统当做一个框架来使用, 特别是 Redis 中的网络通信部分, 尤为如此。
因为以上原因, 熟悉 Redis 代码的人阅读 Disque 的代码应该会比较容易上手。
对于熟悉 Redis 的人来说, 阅读 Disque 代码主要应该关注以下几个文件:
disque.h
& disque.c
,Disque 的队列实现。job.h
& job.c
,Disque 的任务实现。disque.h
& disque.c
,Disque 的服务器进程,相当于 Redis 中的 redis.h
和 redis.c
,但进行了相应的修改。cluster.h
& cluster.c
,Disque 集群在 Redis 集群的基础上进行了一些修改,但基本的运作方式是相同的。Disque 的服务器状态由 disque.h/disqueServer
结构表示, 其中的 jobs
属性和 queues
属性分别记录了储存在服务器里面的所有任务和所有队列:
struct disqueServer { // 其他属性…… /* Jobs & Queues */ // jobs 是一个字典,它包含了服务器储存的所有任务 // 其中字典的键为任务 ID ,而键对应的值则是一个 job.h/job 结构 dict *jobs; /* Main jobs hash table, by job ID. */ // queues 也是一个字典,它包含了服务器储存的所有队列 // 其中字典的键为队列的名字,而键对应的则是一个 queue.h/queue 结构 dict *queues; /* Main queues hash table, by queue name. */ // 其他属性…… };
接下来, 就让我们来看看 job
结构的定义和 queue
结构的定义。
Disque 中的每个队列都由一个 queue.h/queue
结构定义:
typedef struct queue { // 队列的名字 robj *name; /* Queue name as a string object. */ // 一个跳跃表,储存了所有被放进队列里面的任务 // 各个任务按照任务 ID 从小到大有序地进行排列 skiplist *sl; /* The skiplist with the queued jobs. */ // 其他属性…… } queue;
在了解了队列结构之后, 接下来让我们看看 Disque 是怎样表示每个任务的。
Disque 中的每个任务都由一个 job.h/job.h
结构定义:
typedef struct job { // 任务 ID char id[JOB_ID_LEN]; /* Job ID. */ // 其他属性…… // 所属的队列 robj *queue; /* Job queue name. */ // 任务的内容 sds body; /* Body, or NULL if job is just an ACK. */ // 其他属性…… } job;
Disque 集群的工作方式和 Redis 集群的工作方式非常相似, 但是不同于 Redis 能够自由选择单机模式(standalone mode)或者集群模式(cluster mode), Disque 总是运行在集群模式之下 —— 当一个 Disque 服务器启动时, 它就是一个 Disque 集群的节点(node)了。
Disque 集群部分的代码记录在 cluster.h
和 cluster.c
里面, 运作机制和 Redis 的集群模式并无太大区别。 Disque 的主要作用就是在多个节点之间传播和复制任务, 当一个节点执行用户的 ADDJOB
命令, 将一个新任务添加到自己内部的队列里面的同时, 它会将这个新任务以同步或者异步的方式(默认同步,可以通过 ASYNC
选择异步), 传播给集群中的其他节点, 而这种传播是通过发送集群消息(cluster message)来完成的。
以下是 Disque 集群用于传播任务、任务 ID 以及任务请求的集群消息数据结构:
// 传播任务及其内容 /* This data section is used in different message types where we need to * transmit one or multiple full jobs. * * Used by: ADDJOB, YOURJOBS. */ typedef struct { uint32_t numjobs; /* Number of jobs stored here. */ uint32_t datasize; /* Number of bytes following to describe jobs. */ /* The variable data section here is composed of 4 bytes little endian * prefixed length + serialized job data for each job: * [4 bytes len] + [serialized job] + [4 bytes len] + [serialized job] ... * For a total of exactly 'datasize' bytes. */ unsigned char jobs_data[8]; /* Defined as 8 bytes just for alignment. */ } clusterMsgDataJob; // 传播任务 ID /* This data section is used when we need to send just a job ID. * * Used by: GOTJOB, SETACK, and many more. */ typedef struct { char id[JOB_ID_LEN]; uint32_t aux; /* Optional field: For SETACK: Number of nodes that may have this message. For QUEUEJOB: Delay starting from msg reception. */ } clusterMsgDataJobID; // 向其他节点请求传播某个队列的任务 /* This data section is used by NEEDJOBS to specify in what queue we need * a job, and how many jobs we request. */ typedef struct { uint32_t count; /* How many jobs we request. */ uint32_t qnamelen; /* Queue name total length. */ char qname[8]; /* Defined as 8 bytes just for alignment. */ } clusterMsgDataNeedJobs;
在简单地了解了 Disque 的各个主要数据结构之后, 让我们对 ADDJOB
命令的实现代码进行分析, 并通过追踪这个命令的执行流程来了解 Disque 是怎样创建、储存并向其他节点传播一个任务的:
void addjobCommand(client *c) { // 变量声明 long long replicate = server.cluster->size > 3 ? 3 : server.cluster->size; long long ttl = 3600*24; long long retry = -1; long long delay = 0; long long maxlen = 0; /* Max queue length for job to be accepted. */ mstime_t timeout; int j, retval; int async = 0; /* Asynchronous request? */ int extrepl = getMemoryWarningLevel() > 0; /* Replicate externally? */ static uint64_t prev_ctime = 0; // 参数分析 /* Parse args. */ for (j = 4; j < c->argc; j++) { char *opt = c->argv[j]->ptr; int lastarg = j == c->argc-1; if (!strcasecmp(opt,"replicate") && !lastarg) { retval = getLongLongFromObject(c->argv[j+1],&replicate); if (retval != DISQUE_OK || replicate <= 0 || replicate > 65535) { addReplyError(c,"REPLICATE must be between 1 and 65535"); return; } j++; } else if (!strcasecmp(opt,"ttl") && !lastarg) { retval = getLongLongFromObject(c->argv[j+1],&ttl); if (retval != DISQUE_OK || ttl <= 0) { addReplyError(c,"TTL must be a number > 0"); return; } j++; } else if (!strcasecmp(opt,"retry") && !lastarg) { retval = getLongLongFromObject(c->argv[j+1],&retry); if (retval != DISQUE_OK || retry < 0) { addReplyError(c,"RETRY time must be a non negative number"); return; } j++; } else if (!strcasecmp(opt,"delay") && !lastarg) { retval = getLongLongFromObject(c->argv[j+1],&delay); if (retval != DISQUE_OK || delay < 0) { addReplyError(c,"DELAY time must be a non negative number"); return; } j++; } else if (!strcasecmp(opt,"maxlen") && !lastarg) { retval = getLongLongFromObject(c->argv[j+1],&maxlen); if (retval != DISQUE_OK || maxlen <= 0) { addReplyError(c,"MAXLEN must be a positive number"); return; } j++; } else if (!strcasecmp(opt,"async")) { async = 1; } else { addReply(c,shared.syntaxerr); return; } } /* Parse the timeout argument. */ if (getTimeoutFromObjectOrReply(c,c->argv[3],&timeout,UNIT_MILLISECONDS) != DISQUE_OK) return; /* REPLICATE > 1 and RETRY set to 0 does not make sense, why to replicate * the job if it will never try to be re-queued if case the job processing * is not acknowledged? */ if (replicate > 1 && retry == 0) { addReplyError(c,"With RETRY set to 0 please explicitly set " "REPLICATE to 1 (at-most-once delivery)"); return; } /* DELAY greater or equal to TTL is silly. */ if (delay >= ttl) { addReplyError(c,"The specified DELAY is greater than TTL. Job refused " "since would never be delivered"); return; } /* When retry is not specified, it defaults to 1/10 of the TTL. */ if (retry == -1) { retry = ttl/10; if (retry == 0) retry = 1; } /* Check if REPLICATE can't be honoured at all. */ int additional_nodes = extrepl ? replicate : replicate-1; if (additional_nodes > server.cluster->reachable_nodes_count) { if (extrepl && additional_nodes-1 == server.cluster->reachable_nodes_count) { addReplySds(c, sdsnew("-NOREPL Not enough reachable nodes " "for the requested replication level, since I'm unable " "to hold a copy of the message for memory usage " "problems.rn")); } else { addReplySds(c, sdsnew("-NOREPL Not enough reachable nodes " "for the requested replication levelrn")); } return; } // 检查队列是否已经达到最大长度 /* If maxlen was specified, check that the local queue len is * within the requested limits. */ if (maxlen && queueNameLength(c->argv[1]) > (unsigned long) maxlen) { addReplySds(c, sdsnew("-MAXLEN Queue is already longer than " "the specified MAXLEN countrn")); return; } // 创建一个新任务 /* Create a new job. */ job *job = createJob(NULL,JOB_STATE_WAIT_REPL,ttl); // 创建任务 job->queue = c->argv[1]; // 记录任务所在队列的名字 incrRefCount(c->argv[1]); job->repl = replicate; /* If no external replication is used, add myself to the list of nodes * that have a copy of the job. */ if (!extrepl) dictAdd(job->nodes_delivered,myself->name,myself); // 设置任务的各项属性 /* Job ctime is milliseconds * 1000000. Jobs created in the same * millisecond gets an incremental ctime. The ctime is used to sort * queues, so we have some weak sorting semantics for jobs: non-requeued * jobs are delivered roughly in the order they are added into a given * node. */ job->ctime = mstime()*1000000; if (job->ctime <= prev_ctime) job->ctime = prev_ctime+1; prev_ctime = job->ctime; job->etime = server.unixtime + ttl; job->delay = delay; job->retry = retry; job->body = sdsdup(c->argv[2]->ptr); /* Set the next time the job will be queued. Note that once we call * enqueueJob() the first time, this will be set to 0 (never queue * again) for jobs that have a zero retry value (at most once jobs). */ if (delay) { job->qtime = server.mstime + delay*1000; } else { /* This will be updated anyway by enqueueJob(). */ job->qtime = server.mstime + retry*1000; } /* Register the job locally in all the cases but when the job * is externally replicated and asynchronous replicated at the same * time: in this case we don't want to take a local copy at all. */ if (!(async && extrepl) && registerJob(job) == DISQUE_ERR) { /* A job ID with the same name? Practically impossible but * let's handle it to trap possible bugs in a cleaner way. */ serverLog(DISQUE_WARNING,"ID already existing in ADDJOB command!"); freeJob(job); addReplyError(c,"Internal error creating the job, check server logs"); return; } /* For replicated messages where ASYNC option was not asked, block * the client, and wait for acks. Otherwise if no synchronous replication * is used, or ASYNC option was enabled, we just queue the job and * return to the client ASAP. * * Note that for REPLICATE > 1 and ASYNC the replication process is * best effort. */ if (replicate > 1 && !async) { // 决定以同步方式向其他节点传播这个任务 c->bpop.timeout = timeout; c->bpop.job = job; c->bpop.added_node_time = server.mstime; blockClient(c,DISQUE_BLOCKED_JOB_REPL); setJobAssociatedValue(job,c); /* Create the nodes_confirmed dictionary only if we actually need * it for synchronous replication. It will be released later * when we move away from JOB_STATE_WAIT_REPL. */ job->nodes_confirmed = dictCreate(&clusterNodesDictType,NULL); /* Confirm itself as an acknowledged receiver if this node will * retain a copy of the job. */ if (!extrepl) dictAdd(job->nodes_confirmed,myself->name,myself); } else { // 决定以异步方式传递这个任务 if (job->delay == 0) { if (!extrepl) enqueueJob(job); /* Will change the job state. */ } else { /* Delayed jobs that don't wait for replication can move * forward to ACTIVE state ASAP, and get scheduled for * queueing. */ job->state = JOB_STATE_ACTIVE; updateJobAwakeTime(job,0); } addReplyJobID(c,job); AOFLoadJob(job); } // 向其他节点传播这个任务…… /* If the replication factor is > 1, send REPLJOB messages to REPLICATE-1 * nodes. */ if (additional_nodes > 0) clusterReplicateJob(job, additional_nodes, async); /* If the job is asynchronously and externally replicated at the same time, * send a QUEUE message ASAP to one random node, and delete the job from * this node right now. */ if (async && extrepl) { dictEntry *de = dictGetRandomKey(job->nodes_delivered); if (de) { clusterNode *n = dictGetVal(de); clusterSendEnqueue(n,job,job->delay); } /* We don't have to unregister the job since we did not registered * it if it's async + extrepl. */ freeJob(job); } }
总的来说, ADDJOB
命令要做的就是以下几件事情:
job
结构,并将用户给定的任务信息记录起来。总的来说, Disque 对于 Redis 的变动并不大, 而且新添加的内容也不难读懂, 如果你也想要一窥 Disque 这个新的分布式任务队列实现, 那么不要犹豫, 赶紧去读读 Disque 的代码吧!