上一篇文章, 我们从框架层面, 主要介绍了 Redis 的启动过程, 以及主要的命令处理流程逻辑. 这些更多的都是些差不多的道理, 而要细了解 Redis, 则需要更细节的东西.
今天我们稍微内围的角度, 来看看几个命令执行的重要方法, 深入理解下 Redis 的魅力所在.
首先, 我们通过上一章知道, processCommand 是其业务主要入口, 我们再来回顾下:
- // server.c
- /* If this function gets called we already read a whole
- * command, arguments are in the client argv/argc fields.
- * processCommand() execute the command or prepare the
- * server for a bulk read from the client.
- *
- * If C_OK is returned the client is still alive and valid and
- * other operations can be performed by the caller. Otherwise
- * if C_ERR is returned the client was destroyed (i.e. after QUIT). */
- int processCommand(client *c) {
- /* The QUIT command is handled separately. Normal command procs will
- * go through checking for replication and QUIT will cause trouble
- * when FORCE_REPLICATION is enabled and would be implemented in
- * a regular command proc. */
- // 如果是 quit 命令, 直接回复 ok 即可, 由客户端主动关闭请求
- if (!strcasecmp(c->argv[0]->ptr,"quit")) {
- addReply(c,shared.ok);
- c->flags |= CLIENT_CLOSE_AFTER_REPLY;
- return C_ERR;
- }
- /* Now lookup the command and check ASAP about trivial error conditions
- * such as wrong arity, bad command name and so forth. */
- // 查找命令信息, 根据第一个字符串进行查找, 这个我们主要看看
- c->cmd = c->lastcmd = lookupCommand(c->argv[0]->ptr);
- // 验证命令查找是否找到, 以及参数个数是否匹配, 否则直接响应返回
- if (!c->cmd) {
- flagTransaction(c);
- addReplyErrorFormat(c,"unknown command'%s'",
- (char*)c->argv[0]->ptr);
- return C_OK;
- } else if ((c->cmd->arity> 0 && c->cmd->arity != c->argc) ||
- (c->argc <-c->cmd->arity)) {
- flagTransaction(c);
- addReplyErrorFormat(c,"wrong number of arguments for'%s'command",
- c->cmd->name);
- return C_OK;
- }
- /* Check if the user is authenticated */
- // 权限判定, 只有先授权后, 才能执行后续命令 (auth 除外)
- if (server.requirepass && !c->authenticated && c->cmd->proc != authCommand)
- {
- flagTransaction(c);
- addReply(c,shared.noautherr);
- return C_OK;
- }
- /* If cluster is enabled perform the cluster redirection here.
- * However we don't perform the redirection if:
- * 1) The sender of this command is our master.
- * 2) The command has no key arguments. */
- // 集群非 master 写请求转移
- // 此处可见 flags 设计的重要性, 代表了 n 多属性
- if (server.cluster_enabled &&
- !(c->flags & CLIENT_MASTER) &&
- !(c->flags & CLIENT_LUA &&
- server.lua_caller->flags & CLIENT_MASTER) &&
- !(c->cmd->getkeys_proc == NULL && c->cmd->firstkey == 0))
- {
- int hashslot;
- if (server.cluster->state != CLUSTER_OK) {
- flagTransaction(c);
- clusterRedirectClient(c,NULL,0,CLUSTER_REDIR_DOWN_STATE);
- return C_OK;
- } else {
- int error_code;
- clusterNode *n = getNodeByQuery(c,c->cmd,c->argv,c->argc,&hashslot,&error_code);
- if (n == NULL || n != server.cluster->myself) {
- flagTransaction(c);
- clusterRedirectClient(c,n,hashslot,error_code);
- return C_OK;
- }
- }
- }
- /* Handle the maxmemory directive.
- *
- * First we try to free some memory if possible (if there are volatile
- * keys in the dataset). If there are not the only thing we can do
- * is returning an error. */
- // 最大内存检查, 释放
- if (server.maxmemory) {
- int retval = freeMemoryIfNeeded();
- /* freeMemoryIfNeeded may flush slave output buffers. This may result
- * into a slave, that may be the active client, to be freed. */
- if (server.current_client == NULL) return C_ERR;
- /* It was impossible to free enough memory, and the command the client
- * is trying to execute is denied during OOM conditions? Error. */
- if ((c->cmd->flags & CMD_DENYOOM) && retval == C_ERR) {
- flagTransaction(c);
- addReply(c, shared.oomerr);
- return C_OK;
- }
- }
- /* Don't accept write commands if there are problems persisting on disk
- * and if this is a master instance. */
- // 持久化异常检测, 不接受写操作, 不接受 ping
- if (((server.stop_writes_on_bgsave_err &&
- server.saveparamslen> 0 &&
- server.lastbgsave_status == C_ERR) ||
- server.aof_last_write_status == C_ERR) &&
- server.masterhost == NULL &&
- (c->cmd->flags & CMD_WRITE ||
- c->cmd->proc == pingCommand))
- {
- flagTransaction(c);
- if (server.aof_last_write_status == C_OK)
- addReply(c, shared.bgsaveerr);
- else
- addReplySds(c,
- sdscatprintf(sdsempty(),
- "-MISCONF Errors writing to the AOF file: %s\r\n",
- strerror(server.aof_last_write_errno)));
- return C_OK;
- }
- /* Don't accept write commands if there are not enough good slaves and
- * user configured the min-slaves-to-write option. */
- // slave 数量不够时, 不接受写操作
- if (server.masterhost == NULL &&
- server.repl_min_slaves_to_write &&
- server.repl_min_slaves_max_lag &&
- c->cmd->flags & CMD_WRITE &&
- server.repl_good_slaves_count <server.repl_min_slaves_to_write)
- {
- flagTransaction(c);
- addReply(c, shared.noreplicaserr);
- return C_OK;
- }
- /* Don't accept write commands if this is a read only slave. But
- * accept write commands if this is our master. */
- // 只读 slave 不接受写操作
- if (server.masterhost && server.repl_slave_ro &&
- !(c->flags & CLIENT_MASTER) &&
- c->cmd->flags & CMD_WRITE)
- {
- addReply(c, shared.roslaveerr);
- return C_OK;
- }
- /* Only allow SUBSCRIBE and UNSUBSCRIBE in the context of Pub/Sub */
- // pub/sub 模式仅接受极少数命令
- if (c->flags & CLIENT_PUBSUB &&
- c->cmd->proc != pingCommand &&
- c->cmd->proc != subscribeCommand &&
- c->cmd->proc != unsubscribeCommand &&
- c->cmd->proc != psubscribeCommand &&
- c->cmd->proc != punsubscribeCommand) {
- addReplyError(c,"only (P)SUBSCRIBE / (P)UNSUBSCRIBE / PING / QUIT allowed in this context");
- return C_OK;
- }
- /* Only allow INFO and SLAVEOF when slave-serve-stale-data is no and
- * we are a slave with a broken link with master. */
- // 复制连接超时, slave-serve-stale-data: on 时可以处理请求
- if (server.masterhost && server.repl_state != REPL_STATE_CONNECTED &&
- server.repl_serve_stale_data == 0 &&
- !(c->cmd->flags & CMD_STALE))
- {
- flagTransaction(c);
- addReply(c, shared.masterdownerr);
- return C_OK;
- }
- /* Loading DB? Return an error if the command has not the
- * CMD_LOADING flag. */
- // db 还在加载中, 不接受任何请求
- if (server.loading && !(c->cmd->flags & CMD_LOADING)) {
- addReply(c, shared.loadingerr);
- return C_OK;
- }
- /* Lua script too slow? Only allow a limited number of commands. */
- // lua 脚本执行缓慢时, 仅接受少数命令
- if (server.lua_timedout &&
- c->cmd->proc != authCommand &&
- c->cmd->proc != replconfCommand &&
- !(c->cmd->proc == shutdownCommand &&
- c->argc == 2 &&
- tolower(((char*)c->argv[1]->ptr)[0]) == 'n') &&
- !(c->cmd->proc == scriptCommand &&
- c->argc == 2 &&
- tolower(((char*)c->argv[1]->ptr)[0]) == 'k'))
- {
- flagTransaction(c);
- addReply(c, shared.slowscripterr);
- return C_OK;
- }
- /* Exec the command */
- if (c->flags & CLIENT_MULTI &&
- c->cmd->proc != execCommand && c->cmd->proc != discardCommand &&
- c->cmd->proc != multiCommand && c->cmd->proc != watchCommand)
- {
- // 事务型命令, 将命令入队
- queueMultiCommand(c);
- addReply(c,shared.queued);
- } else {
- // 非事务性命令, 直接处理请求
- call(c,CMD_CALL_FULL);
- c->woff = server.master_repl_offset;
- // 如果有待处理事件, 继续处理
- if (listLength(server.ready_keys))
- handleClientsBlockedOnLists();
- }
- return C_OK;
- }
零, Redis 中的几个关键数据结构
1. redisServer
redisServer 是 Redis 中最大的全局变量, 负责保存客户端, 配置信息, db 等等各种重要信息. 各函数之间通信, 也是隐藏的使用 redisServer 进行通信. 它的定义是在 server.h 中, 而 实例化则是在 server.c 中.
- struct redisServer {
- /* General */
- pid_t pid; /* Main process pid. */
- char *configfile; /* Absolute config file path, or NULL */
- char *executable; /* Absolute executable file path. */
- char **exec_argv; /* Executable argv vector (copy). */
- int hz; /* serverCron() calls frequency in hertz */
- redisDb *db;
- dict *commands; /* Command table */
- dict *orig_commands; /* Command table before command renaming. */
- aeEventLoop *el;
- unsigned lruclock:LRU_BITS; /* Clock for LRU eviction */
- int shutdown_asap; /* SHUTDOWN needed ASAP */
- int activerehashing; /* Incremental rehash in serverCron() */
- char *requirepass; /* Pass for AUTH command, or NULL */
- char *pidfile; /* PID file path */
- int arch_bits; /* 32 or 64 depending on sizeof(long) */
- int cronloops; /* Number of times the cron function run */
- char runid[CONFIG_RUN_ID_SIZE+1]; /* ID always different at every exec. */
- int sentinel_mode; /* True if this instance is a Sentinel. */
- /* Networking */
- int port; /* TCP listening port */
- int tcp_backlog; /* TCP listen() backlog */
- char *bindaddr[CONFIG_BINDADDR_MAX]; /* Addresses we should bind to */
- int bindaddr_count; /* Number of addresses in server.bindaddr[] */
- char *unixsocket; /* UNIX socket path */
- mode_t unixsocketperm; /* UNIX socket permission */
- int ipfd[CONFIG_BINDADDR_MAX]; /* TCP socket file descriptors */
- int ipfd_count; /* Used slots in ipfd[] */
- int sofd; /* Unix socket file descriptor */
- int cfd[CONFIG_BINDADDR_MAX];/* Cluster bus listening socket */
- int cfd_count; /* Used slots in cfd[] */
- list *clients; /* List of active clients */
- list *clients_to_close; /* Clients to close asynchronously */
- list *clients_pending_write; /* There is to write or install handler. */
- list *slaves, *monitors; /* List of slaves and MONITORs */
- client *current_client; /* Current client, only used on crash report */
- int clients_paused; /* True if clients are currently paused */
- mstime_t clients_pause_end_time; /* Time when we undo clients_paused */
- char neterr[ANET_ERR_LEN]; /* Error buffer for .NET.c */
- dict *migrate_cached_sockets;/* MIGRATE cached sockets */
- uint64_t next_client_id; /* Next client unique ID. Incremental. */
- int protected_mode; /* Don't accept external connections. */
- /* RDB / AOF loading information */
- int loading; /* We are loading data from disk if true */
- off_t loading_total_bytes;
- off_t loading_loaded_bytes;
- time_t loading_start_time;
- off_t loading_process_events_interval_bytes;
- /* Fast pointers to often looked up command */
- struct redisCommand *delCommand, *multiCommand, *lpushCommand, *lpopCommand,
- *rpopCommand, *sremCommand, *execCommand;
- /* Fields used only for stats */
- time_t stat_starttime; /* Server start time */
- long long stat_numcommands; /* Number of processed commands */
- long long stat_numconnections; /* Number of connections received */
- long long stat_expiredkeys; /* Number of expired keys */
- long long stat_evictedkeys; /* Number of evicted keys (maxmemory) */
- long long stat_keyspace_hits; /* Number of successful lookups of keys */
- long long stat_keyspace_misses; /* Number of failed lookups of keys */
- size_t stat_peak_memory; /* Max used memory record */
- long long stat_fork_time; /* Time needed to perform latest fork() */
- double stat_fork_rate; /* Fork rate in GB/sec. */
- long long stat_rejected_conn; /* Clients rejected because of maxclients */
- long long stat_sync_full; /* Number of full resyncs with slaves. */
- long long stat_sync_partial_ok; /* Number of accepted PSYNC requests. */
- long long stat_sync_partial_err;/* Number of unaccepted PSYNC requests. */
- list *slowlog; /* SLOWLOG list of commands */
- long long slowlog_entry_id; /* SLOWLOG current entry ID */
- long long slowlog_log_slower_than; /* SLOWLOG time limit (to get logged) */
- unsigned long slowlog_max_len; /* SLOWLOG max number of items logged */
- size_t resident_set_size; /* RSS sampled in serverCron(). */
- long long stat_net_input_bytes; /* Bytes read from network. */
- long long stat_net_output_bytes; /* Bytes written to network. */
- /* The following two are used to track instantaneous metrics, like
- * number of operations per second, network traffic. */
- struct {
- long long last_sample_time; /* Timestamp of last sample in ms */
- long long last_sample_count;/* Count in last sample */
- long long samples[STATS_METRIC_SAMPLES];
- int idx;
- } inst_metric[STATS_METRIC_COUNT];
- /* Configuration */
- int verbosity; /* Loglevel in Redis.conf */
- int maxidletime; /* Client timeout in seconds */
- int tcpkeepalive; /* Set SO_KEEPALIVE if non-zero. */
- int active_expire_enabled; /* Can be disabled for testing purposes. */
- size_t client_max_querybuf_len; /* Limit for client query buffer length */
- int dbnum; /* Total number of configured DBs */
- int supervised; /* 1 if supervised, 0 otherwise. */
- int supervised_mode; /* See SUPERVISED_* */
- int daemonize; /* True if running as a daemon */
- clientBufferLimitsConfig client_obuf_limits[CLIENT_TYPE_OBUF_COUNT];
- /* AOF persistence */
- int aof_state; /* AOF_(ON|OFF|WAIT_REWRITE) */
- int aof_fsync; /* Kind of fsync() policy */
- char *aof_filename; /* Name of the AOF file */
- int aof_no_fsync_on_rewrite; /* Don't fsync if a rewrite is in prog. */
- int aof_rewrite_perc; /* Rewrite AOF if % growth is> M and... */
- off_t aof_rewrite_min_size; /* the AOF file is at least N bytes. */
- off_t aof_rewrite_base_size; /* AOF size on latest startup or rewrite. */
- off_t aof_current_size; /* AOF current size. */
- int aof_rewrite_scheduled; /* Rewrite once BGSAVE terminates. */
- pid_t aof_child_pid; /* PID if rewriting process */
- list *aof_rewrite_buf_blocks; /* Hold changes during an AOF rewrite. */
- sds aof_buf; /* AOF buffer, written before entering the event loop */
- int aof_fd; /* File descriptor of currently selected AOF file */
- int aof_selected_db; /* Currently selected DB in AOF */
- time_t aof_flush_postponed_start; /* UNIX time of postponed AOF flush */
- time_t aof_last_fsync; /* UNIX time of last fsync() */
- time_t aof_rewrite_time_last; /* Time used by last AOF rewrite run. */
- time_t aof_rewrite_time_start; /* Current AOF rewrite start time. */
- int aof_lastbgrewrite_status; /* C_OK or C_ERR */
- unsigned long aof_delayed_fsync; /* delayed AOF fsync() counter */
- int aof_rewrite_incremental_fsync;/* fsync incrementally while rewriting? */
- int aof_last_write_status; /* C_OK or C_ERR */
- int aof_last_write_errno; /* Valid if aof_last_write_status is ERR */
- int aof_load_truncated; /* Don't stop on unexpected AOF EOF. */
- /* AOF pipes used to communicate between parent and child during rewrite. */
- int aof_pipe_write_data_to_child;
- int aof_pipe_read_data_from_parent;
- int aof_pipe_write_ack_to_parent;
- int aof_pipe_read_ack_from_child;
- int aof_pipe_write_ack_to_child;
- int aof_pipe_read_ack_from_parent;
- int aof_stop_sending_diff; /* If true stop sending accumulated diffs
- to child process. */
- sds aof_child_diff; /* AOF diff accumulator child side. */
- /* RDB persistence */
- long long dirty; /* Changes to DB from the last save */
- long long dirty_before_bgsave; /* Used to restore dirty on failed BGSAVE */
- pid_t rdb_child_pid; /* PID of RDB saving child */
- struct saveparam *saveparams; /* Save points array for RDB */
- int saveparamslen; /* Number of saving points */
- char *rdb_filename; /* Name of RDB file */
- int rdb_compression; /* Use compression in RDB? */
- int rdb_checksum; /* Use RDB checksum? */
- time_t lastsave; /* Unix time of last successful save */
- time_t lastbgsave_try; /* Unix time of last attempted bgsave */
- time_t rdb_save_time_last; /* Time used by last RDB save run. */
- time_t rdb_save_time_start; /* Current RDB save start time. */
- int rdb_child_type; /* Type of save by active child. */
- int lastbgsave_status; /* C_OK or C_ERR */
- int stop_writes_on_bgsave_err; /* Don't allow writes if can't BGSAVE */
- int rdb_pipe_write_result_to_parent; /* RDB pipes used to return the state */
- int rdb_pipe_read_result_from_child; /* of each slave in diskless SYNC. */
- /* Propagation of commands in AOF / replication */
- redisOpArray also_propagate; /* Additional command to propagate. */
- /* Logging */
- char *logfile; /* Path of log file */
- int syslog_enabled; /* Is syslog enabled? */
- char *syslog_ident; /* Syslog ident */
- int syslog_facility; /* Syslog facility */
- /* Replication (master) */
- int slaveseldb; /* Last SELECTed DB in replication output */
- long long master_repl_offset; /* Global replication offset */
- int repl_ping_slave_period; /* Master pings the slave every N seconds */
- char *repl_backlog; /* Replication backlog for partial syncs */
- long long repl_backlog_size; /* Backlog circular buffer size */
- long long repl_backlog_histlen; /* Backlog actual data length */
- long long repl_backlog_idx; /* Backlog circular buffer current offset */
- long long repl_backlog_off; /* Replication offset of first byte in the
- backlog buffer. */
- time_t repl_backlog_time_limit; /* Time without slaves after the backlog
- gets released. */
- time_t repl_no_slaves_since; /* We have no slaves since that time.
- Only valid if server.slaves len is 0. */
- int repl_min_slaves_to_write; /* Min number of slaves to write. */
- int repl_min_slaves_max_lag; /* Max lag of <count> slaves to write. */
- int repl_good_slaves_count; /* Number of slaves with lag <= max_lag. */
- int repl_diskless_sync; /* Send RDB to slaves sockets directly. */
- int repl_diskless_sync_delay; /* Delay to start a diskless repl BGSAVE. */
- /* Replication (slave) */
- char *masterauth; /* AUTH with this password with master */
- char *masterhost; /* Hostname of master */
- int masterport; /* Port of master */
- int repl_timeout; /* Timeout after N seconds of master idle */
- client *master; /* Client that is master for this slave */
- client *cached_master; /* Cached master to be reused for PSYNC. */
- int repl_syncio_timeout; /* Timeout for synchronous I/O calls */
- int repl_state; /* Replication status if the instance is a slave */
- off_t repl_transfer_size; /* Size of RDB to read from master during sync. */
- off_t repl_transfer_read; /* Amount of RDB read from master during sync. */
- off_t repl_transfer_last_fsync_off; /* Offset when we fsync-ed last time. */
- int repl_transfer_s; /* Slave -> Master SYNC socket */
- int repl_transfer_fd; /* Slave -> Master SYNC temp file descriptor */
- char *repl_transfer_tmpfile; /* Slave-> master SYNC temp file name */
- time_t repl_transfer_lastio; /* Unix time of the latest read, for timeout */
- int repl_serve_stale_data; /* Serve stale data when link is down? */
- int repl_slave_ro; /* Slave is read only? */
- time_t repl_down_since; /* Unix time at which link with master went down */
- int repl_disable_tcp_nodelay; /* Disable TCP_NODELAY after SYNC? */
- int slave_priority; /* Reported in INFO and used by Sentinel. */
- char repl_master_runid[CONFIG_RUN_ID_SIZE+1]; /* Master run id for PSYNC. */
- long long repl_master_initial_offset; /* Master PSYNC offset. */
- int repl_slave_lazy_flush; /* Lazy FLUSHALL before loading DB? */
- /* Replication script cache. */
- dict *repl_scriptcache_dict; /* SHA1 all slaves are aware of. */
- list *repl_scriptcache_fifo; /* First in, first out LRU eviction. */
- unsigned int repl_scriptcache_size; /* Max number of elements. */
- /* Synchronous replication. */
- list *clients_waiting_acks; /* Clients waiting in WAIT command. */
- int get_ack_from_slaves; /* If true we send REPLCONF GETACK. */
- /* Limits */
- unsigned int maxclients; /* Max number of simultaneous clients */
- unsigned long long maxmemory; /* Max number of memory bytes to use */
- int maxmemory_policy; /* Policy for key eviction */
- int maxmemory_samples; /* Pricision of random sampling */
- /* Blocked clients */
- unsigned int bpop_blocked_clients; /* Number of clients blocked by lists */
- list *unblocked_clients; /* list of clients to unblock before next loop */
- list *ready_keys; /* List of readyList structures for BLPOP & co */
- /* Sort parameters - qsort_r() is only available under BSD so we
- * have to take this state global, in order to pass it to sortCompare() */
- int sort_desc;
- int sort_alpha;
- int sort_bypattern;
- int sort_store;
- /* Zip structure config, see Redis.conf for more information */
- size_t hash_max_ziplist_entries;
- size_t hash_max_ziplist_value;
- size_t set_max_intset_entries;
- size_t zset_max_ziplist_entries;
- size_t zset_max_ziplist_value;
- size_t hll_sparse_max_bytes;
- /* List parameters */
- int list_max_ziplist_size;
- int list_compress_depth;
- /* time cache */
- time_t unixtime; /* Unix time sampled every cron cycle. */
- long long mstime; /* Like 'unixtime' but with milliseconds resolution. */
- /* Pubsub */
- dict *pubsub_channels; /* Map channels to list of subscribed clients */
- list *pubsub_patterns; /* A list of pubsub_patterns */
- int notify_keyspace_events; /* Events to propagate via Pub/Sub. This is an
- xor of NOTIFY_... flags. */
- /* Cluster */
- int cluster_enabled; /* Is cluster enabled? */
- mstime_t cluster_node_timeout; /* Cluster node timeout. */
- char *cluster_configfile; /* Cluster auto-generated config file name. */
- struct clusterState *cluster; /* State of the cluster */
- int cluster_migration_barrier; /* Cluster replicas migration barrier. */
- int cluster_slave_validity_factor; /* Slave max data age for failover. */
- int cluster_require_full_coverage; /* If true, put the cluster down if
- there is at least an uncovered slot.*/
- char *cluster_announce_ip; /* IP address to announce on cluster bus. */
- int cluster_announce_port; /* base port to announce on cluster bus. */
- int cluster_announce_bus_port; /* bus port to announce on cluster bus. */
- /* Scripting */
- lua_State *lua; /* The Lua interpreter. We use just one for all clients */
- client *lua_client; /* The "fake client" to query Redis from Lua */
- client *lua_caller; /* The client running EVAL right now, or NULL */
- dict *lua_scripts; /* A dictionary of SHA1 -> Lua scripts */
- mstime_t lua_time_limit; /* Script timeout in milliseconds */
- mstime_t lua_time_start; /* Start time of script, milliseconds time */
- int lua_write_dirty; /* True if a write command was called during the
- execution of the current script. */
- int lua_random_dirty; /* True if a random command was called during the
- execution of the current script. */
- int lua_replicate_commands; /* True if we are doing single commands repl. */
- int lua_multi_emitted;/* True if we already proagated MULTI. */
- int lua_repl; /* Script replication flags for Redis.set_repl(). */
- int lua_timedout; /* True if we reached the time limit for script
- execution. */
- int lua_kill; /* Kill the script if true. */
- int lua_always_replicate_commands; /* Default replication type. */
- /* Lazy free */
- int lazyfree_lazy_eviction;
- int lazyfree_lazy_expire;
- int lazyfree_lazy_server_del;
- /* Latency monitor */
- long long latency_monitor_threshold;
- dict *latency_events;
- /* Assert & bug reporting */
- char *assert_failed;
- char *assert_file;
- int assert_line;
- int bug_report_start; /* True if bug report header was already logged. */
- int watchdog_period; /* Software watchdog period in ms. 0 = off */
- /* System hardware info */
- size_t system_memory_size; /* Total memory in system as reported by OS */
- };
- // server.c, 实例化 server
- struct redisServer server; /* server global state */
- 2. redisObject
java 有万事万物皆对象的说法, 而 Redis 中也可以用 一切皆 redisObject 来描述, 可以说是最通用的 Redis 数据结构.
- typedef struct redisObject {
- // 类型, 4 个字节
- unsigned type:4;
- // 编码, 4 个字节
- unsigned encoding:4;
- // lru 时间, 24 字节
- unsigned lru:LRU_BITS; /* lru time (relative to server.lruclock) */
- // 引用计数, 当引用为 0 时, 意味着无用了
- int refcount;
- // 数据指针, 存储任意数据
- void *ptr;
- } robj;
- 3. redisDb
redisDb 是 Redis 作为数据库的主要存储模型, 承载了所有的业务数据存储. 单从这点来说, 要实现一个数据库貌似很简单, 但实际却是很难.
- typedef struct redisDb {
- // 一个数据库, 就是一个 kv 字典, 查找出 k 后, 才能确定其数据类型 如 string, hash, list, set, zset
- dict *dict; /* The keyspace for this DB */
- // 过期数据队列
- dict *expires; /* Timeout of keys with a timeout set */
- dict *blocking_keys; /* Keys with clients waiting for data (BLPOP) */
- dict *ready_keys; /* Blocked keys that received a PUSH */
- dict *watched_keys; /* WATCHED keys for MULTI/EXEC CAS */
- struct evictionPoolEntry *eviction_pool; /* Eviction pool of keys */
- // 数据库号, 默认是 16, 如果想支持更多数据库号, 改外部 db 数组大小, 增大这个值就可以了
- int id; /* Database ID */
- long long avg_ttl; /* Average TTL, just for stats */
- } redisDb;
- 4. client
每一个客户端连接, 就是一个 client 实例, 其中包含许多全局引用信息. 比如解析完客户端请求之后, 会把参数, 数据库指针都放到 client 中.
- typedef struct client {
- uint64_t id; /* Client incremental unique ID. */
- // socket fd
- int fd; /* Client socket. */
- // 用户目前使用的 db, 所有的操作都是针对这个 db 的操作
- redisDb *db; /* Pointer to currently SELECTed DB. */
- int dictid; /* ID of the currently SELECTed DB. */
- robj *name; /* As set by CLIENT SETNAME. */
- // 用户请求相关参数放置
- sds querybuf; /* Buffer we use to accumulate client queries. */
- size_t querybuf_peak; /* Recent (100ms or more) peak of querybuf size. */
- int argc; /* Num of arguments of current command. */
- robj **argv; /* Arguments of current command. */
- // 当前命令和上一个命令指针
- struct redisCommand *cmd, *lastcmd; /* Last command executed. */
- int reqtype; /* Request protocol type: PROTO_REQ_* */
- int multibulklen; /* Number of multi bulk arguments left to read. */
- long bulklen; /* Length of bulk argument in multi bulk request. */
- list *reply; /* List of reply objects to send to the client. */
- unsigned long long reply_bytes; /* Tot bytes of objects in reply list. */
- size_t sentlen; /* Amount of bytes already sent in the current
- buffer or object being sent. */
- time_t ctime; /* Client creation time. */
- time_t lastinteraction; /* Time of the last interaction, used for timeout */
- time_t obuf_soft_limit_reached_time;
- int flags; /* Client flags: CLIENT_* macros. */
- // 是否已授权
- int authenticated; /* When requirepass is non-NULL. */
- // 复制相关
- int replstate; /* Replication state if this is a slave. */
- int repl_put_online_on_ack; /* Install slave write handler on ACK. */
- int repldbfd; /* Replication DB file descriptor. */
- off_t repldboff; /* Replication DB file offset. */
- off_t repldbsize; /* Replication DB file size. */
- sds replpreamble; /* Replication DB preamble. */
- long long reploff; /* Replication offset if this is our master. */
- long long repl_ack_off; /* Replication ack offset, if this is a slave. */
- long long repl_ack_time;/* Replication ack time, if this is a slave. */
- long long psync_initial_offset; /* FULLRESYNC reply offset other slaves
- copying this slave output buffer
- should use. */
- char replrunid[CONFIG_RUN_ID_SIZE+1]; /* Master run id if is a master. */
- int slave_listening_port; /* As configured with: SLAVECONF listening-port */
- int slave_capa; /* Slave capabilities: SLAVE_CAPA_* bitwise OR. */
- multiState mstate; /* MULTI/EXEC state */
- int btype; /* Type of blocking op if CLIENT_BLOCKED. */
- blockingState bpop; /* blocking state */
- long long woff; /* Last write global replication offset. */
- list *watched_keys; /* Keys WATCHED for MULTI/EXEC CAS */
- dict *pubsub_channels; /* channels a client is interested in (SUBSCRIBE) */
- list *pubsub_patterns; /* patterns a client is interested in (SUBSCRIBE) */
- sds peerid; /* Cached peer ID. */
- /* Response buffer */
- int bufpos;
- char buf[PROTO_REPLY_CHUNK_BYTES];
- } client;
一, 命令查找 dict
当一个请求发到 Redis 服务器后, 我们将其数据解析出来, 自然先要明白命令是哪个, 然后才知道如何处理它. 我们看一下, Redis 是如何查找具体的处理命令的?
- // server.c, 其实无非就是一个 map 形式的查找而已
- struct redisCommand *lookupCommand(sds name) {
- // 直接基于 server.commands 查询, server.commands 是在启动的时候初始化好的
- return dictFetchValue(server.commands, name);
- }
- // dict.c , 查找字典, 返回任意类型的地址
- void *dictFetchValue(dict *d, const void *key) {
- dictEntry *he;
- // 找到 dict 后, 直接取其 value 即可, 否则返回 NULL
- he = dictFind(d,key);
- return he ? dictGetVal(he) : NULL;
- }
- // dict.c, 查找字典 entry, 也就是 hashmap 那一套东西了
- dictEntry *dictFind(dict *d, const void *key)
- {
- dictEntry *he;
- unsigned int h, idx, table;
- if (d->ht[0].size == 0) return NULL; /* We don't have a table at all */
- // 如果正在进行 rehash 缩扩容, 则进行一次增量式 rehash 数据迁移, 这是 Redis 独有的玩意
- if (dictIsRehashing(d)) _dictRehashStep(d);
- h = dictHashKey(d, key);
- // 最大查找 ht 的 2 个表, 如果进行 rehash 的话, 否则只遍历一次
- for (table = 0; table <= 1; table++) {
- idx = h & d->ht[table].sizemask;
- he = d->ht[table].table[idx];
- while(he) {
- // 找到对应元素, 则返回, 否则链表查询
- if (dictCompareKeys(d, key, he->key))
- return he;
- he = he->next;
- }
- // 如果没有进行 rehash, 那么应当是一遍历就可以拿到结果或者拿不到
- if (!dictIsRehashing(d)) return NULL;
- }
- return NULL;
- }
- // dict.h, 只有 rehashidx=-1 才表示在进行 rehash, rehashidx 表示正在进行的 rehash 的元素数
- #define dictIsRehashing(d) ((d)->rehashidx != -1)
- // dict.c, 内部 rehash
- /* This function performs just a step of rehashing, and only if there are
- * no safe iterators bound to our hash table. When we have iterators in the
- * middle of a rehashing we can't mess with the two hash tables otherwise
- * some element can be missed or duplicated.
- *
- * This function is called by common lookup or update operations in the
- * dictionary so that the hash table automatically migrates from H1 to H2
- * while it is actively used. */
- static void _dictRehashStep(dict *d) {
- // 只会进行一次 rehash 操作, 不会用时很久
- if (d->iterators == 0) dictRehash(d,1);
- }
- // dict.c
- /* Performs N steps of incremental rehashing. Returns 1 if there are still
- * keys to move from the old to the new hash table, otherwise 0 is returned.
- *
- * Note that a rehashing step consists in moving a bucket (that may have more
- * than one key as we use chaining) from the old to the new hash table, however
- * since part of the hash table may be composed of empty spaces, it is not
- * guaranteed that this function will rehash even a single bucket, since it
- * will visit at max N*10 empty buckets in total, otherwise the amount of
- * work it does would be unbound and the function may block for a long time. */
- int dictRehash(dict *d, int n) {
- // 最大只会访问 n*10 个元素, 避免长时间 hash 导致暂停
- int empty_visits = n*10; /* Max number of empty buckets to visit. */
- if (!dictIsRehashing(d)) return 0;
- // rehash 并不是一次性完成的, 而是遇到 used=0, 则退出了
- while(n-- && d->ht[0].used != 0) {
- dictEntry *de, *nextde;
- /* Note that rehashidx can't overflow as we are sure there are more
- * elements because ht[0].used != 0 */
- assert(d->ht[0].size> (unsigned long)d->rehashidx);
- // 从上次 rehash 的地方开始进行, 如果中间的值都是空的, 则本次不再进行深度 rehash 了
- while(d->ht[0].table[d->rehashidx] == NULL) {
- d->rehashidx++;
- if (--empty_visits == 0) return 1;
- }
- de = d->ht[0].table[d->rehashidx];
- /* Move all the keys in this bucket from the old to the new hash HT */
- // 针对找到的需要 rehash 的元素, 做转移
- while(de) {
- unsigned int h;
- nextde = de->next;
- /* Get the index in the new hash table */
- h = dictHashKey(d, de->key) & d->ht[1].sizemask;
- // 解决冲突问题, 与原有 slot 元素链接
- de->next = d->ht[1].table[h];
- d->ht[1].table[h] = de;
- d->ht[0].used--;
- d->ht[1].used++;
- de = nextde;
- }
- d->ht[0].table[d->rehashidx] = NULL;
- // rehashidx++, 表示正在进行的 rehash
- d->rehashidx++;
- }
- /* Check if we already rehashed the whole table... */
- if (d->ht[0].used == 0) {
- zfree(d->ht[0].table);
- // rehash 完所有元素后, 直接交换 ht 0/1
- d->ht[0] = d->ht[1];
- _dictReset(&d->ht[1]);
- d->rehashidx = -1;
- return 0;
- }
- /* More to rehash... */
- // 返回 1 代表还需要进行 rehash
- return 1;
- }
综上, 可以看出一个 命令的查找过程, 其实就是一个 hash 字典的查找过程. 做的比较特别的优化是, 进行 rehash 时, 只做部分 rehash, 将停顿时间最大可能减小. 使用 两个 hash 表进行互相替换, 来保证数据的完整性.
hash 表是个很有用的数据结构, 上面是对对于命令的查找使用, 但是对于后面的 kv 的查找, 同样可以使用 dict 这种结构, 所以后续对其他命令的解析时, 只需注意其特有的处理方式即可.
二, 溯源: 命令的添加
要想实现如上的查找, 我们有必要了解下其是在何时添加的. 下面, 我们来看看, 这些命令是如何写入到 server.commands 中的.
- // server.c, 在进行配置初始化 initServerConfig() 时, 添加命令集到 server.commands
- /* Populates the Redis Command Table starting from the hard coded list
- * we have on top of Redis.c file. */
- void populateCommandTable(void) {
- int j;
- int numcommands = sizeof(redisCommandTable)/sizeof(struct redisCommand);
- for (j = 0; j <numcommands; j++) {
- // 通过在文件开头定义的一个数组, 将命令集加入
- struct redisCommand *c = redisCommandTable+j;
- char *f = c->sflags;
- int retval1, retval2;
- // 转换 flags 到 command 中, 用一个位表示一个 flag 标识
- while(*f != '\0') {
- switch(*f) {
- case 'w': c->flags |= CMD_WRITE; break;
- case 'r': c->flags |= CMD_READONLY; break;
- case 'm': c->flags |= CMD_DENYOOM; break;
- case 'a': c->flags |= CMD_ADMIN; break;
- case 'p': c->flags |= CMD_PUBSUB; break;
- case 's': c->flags |= CMD_NOSCRIPT; break;
- case 'R': c->flags |= CMD_RANDOM; break;
- case 'S': c->flags |= CMD_SORT_FOR_SCRIPT; break;
- case 'l': c->flags |= CMD_LOADING; break;
- case 't': c->flags |= CMD_STALE; break;
- case 'M': c->flags |= CMD_SKIP_MONITOR; break;
- case 'k': c->flags |= CMD_ASKING; break;
- case 'F': c->flags |= CMD_FAST; break;
- default: serverPanic("Unsupported command flag"); break;
- }
- f++;
- }
- // 添加 command 到 server.commands 中, 不外乎就是 hash, rehash...
- retval1 = dictAdd(server.commands, sdsnew(c->name), c);
- /* Populate an additional dictionary that will be unaffected
- * by rename-command statements in Redis.conf. */
- retval2 = dictAdd(server.orig_commands, sdsnew(c->name), c);
- serverAssert(retval1 == DICT_OK && retval2 == DICT_OK);
- }
- }
- // dict.c, 字典数据的添加, 其实已经超过前面理解的范围
- /* Add an element to the target hash table */
- int dictAdd(dict *d, void *key, void *val)
- {
- dictEntry *entry = dictAddRaw(d,key);
- if (!entry) return DICT_ERR;
- // 设置 value 到 entry 上
- dictSetVal(d, entry, val);
- return DICT_OK;
- }
- // 不过既然都到这里了, 我们索性把 dict 的添加过程也给了解了吧, 省得后面再花时间
- // dict.c, 将 key 添加到 dict 中, 并返回 dictEntry 添加的实例
- /* Low level add. This function adds the entry but instead of setting
- * a value returns the dictEntry structure to the user, that will make
- * sure to fill the value field as he wishes.
- *
- * This function is also directly exposed to the user API to be called
- * mainly in order to store non-pointers inside the hash value, example:
- *
- * entry = dictAddRaw(dict,mykey);
- * if (entry != NULL) dictSetSignedIntegerVal(entry,1000);
- *
- * Return values:
- *
- * If key already exists NULL is returned.
- * If key was added, the hash entry is returned to be manipulated by the caller.
- */
- dictEntry *dictAddRaw(dict *d, void *key)
- {
- int index;
- dictEntry *entry;
- dictht *ht;
- // 和 dictFetchValue 一样, 先检查 rehash 情况
- if (dictIsRehashing(d)) _dictRehashStep(d);
- /* Get the index of the new element, or -1 if
- * the element already exists. */
- // 如果元素已经存在, 则返回 -1, 否则走后续添加流程
- // 其含义是 元素只允许新增, 不允许修改
- if ((index = _dictKeyIndex(d, key)) == -1)
- return NULL;
- /* Allocate the memory and store the new entry.
- * Insert the element in top, with the assumption that in a database
- * system it is more likely that recently added entries are accessed
- * more frequently. */
- ht = dictIsRehashing(d) ? &d->ht[1] : &d->ht[0];
- // 以链表形式保存数据
- entry = zmalloc(sizeof(*entry));
- entry->next = ht->table[index];
- ht->table[index] = entry;
- ht->used++;
- /* Set the hash entry fields. */
- // 将新组织的 entry 设置值到 key 上, 小技巧 do {} while(0); 的应用
- dictSetKey(d, entry, key);
- return entry;
- }
- // dict.c, 获取元素所在的数组下标
- /* Returns the index of a free slot that can be populated with
- * a hash entry for the given 'key'.
- * If the key already exists, -1 is returned.
- *
- * Note that if we are in the process of rehashing the hash table, the
- * index is always returned in the context of the second (new) hash table. */
- static int _dictKeyIndex(dict *d, const void *key)
- {
- unsigned int h, idx, table;
- dictEntry *he;
- /* Expand the hash table if needed */
- // 做扩容操作
- if (_dictExpandIfNeeded(d) == DICT_ERR)
- return -1;
- /* Compute the key hash value */
- h = dictHashKey(d, key);
- for (table = 0; table <= 1; table++) {
- idx = h & d->ht[table].sizemask;
- /* Search if this slot does not already contain the given key */
- he = d->ht[table].table[idx];
- while(he) {
- if (dictCompareKeys(d, key, he->key))
- return -1;
- he = he->next;
- }
- if (!dictIsRehashing(d)) break;
- }
- return idx;
- }
- // 扩容过程
- // dict.c,
- /* Expand the hash table if needed */
- static int _dictExpandIfNeeded(dict *d)
- {
- /* Incremental rehashing already in progress. Return. */
- if (dictIsRehashing(d)) return DICT_OK;
- /* If the hash table is empty expand it to the initial size. */
- // 默认 DICT_HT_INITIAL_SIZE=4
- if (d->ht[0].size == 0) return dictExpand(d, DICT_HT_INITIAL_SIZE);
- /* If we reached the 1:1 ratio, and we are allowed to resize the hash
- * table (global setting) or we should avoid it but the ratio between
- * elements/buckets is over the "safe" threshold, we resize doubling
- * the number of buckets. */
- // 扩容, 直接 *2 后得到
- if (d->ht[0].used>= d->ht[0].size &&
- (dict_can_resize ||
- d->ht[0].used/d->ht[0].size> dict_force_resize_ratio))
- {
- return dictExpand(d, d->ht[0].used*2);
- }
- return DICT_OK;
- }
- // dict.c, 扩容, 其实就是创建一个空的 dictht hash 表, 备用做一步步 rehash
- /* Expand or create the hash table */
- int dictExpand(dict *d, unsigned long size)
- {
- dictht n; /* the new hash table */
- // 大于 size 的首个 2n 次方作为 realsize
- unsigned long realsize = _dictNextPower(size);
- /* the size is invalid if it is smaller than the number of
- * elements already inside the hash table */
- if (dictIsRehashing(d) || d->ht[0].used> size)
- return DICT_ERR;
- /* Rehashing to the same table size is not useful. */
- if (realsize == d->ht[0].size) return DICT_ERR;
- /* Allocate the new hash table and initialize all pointers to NULL */
- n.size = realsize;
- n.sizemask = realsize-1;
- n.table = zcalloc(realsize*sizeof(dictEntry*));
- n.used = 0;
- /* Is this the first initialization? If so it's not really a rehashing
- * we just set the first hash table so that it can accept keys. */
- if (d->ht[0].table == NULL) {
- d->ht[0] = n;
- return DICT_OK;
- }
- /* Prepare a second hash table for incremental rehashing */
- // 将新开辟的 hash 表赋给 ht[1], 并将 rehashidx=0, 表示需要进行 rehash
- // 然后, 后续任务就依次进入 rehash 阶段了
- d->ht[1] = n;
- d->rehashidx = 0;
- return DICT_OK;
- }
- // dict.h, setVal, setKey, do while 0 防止宏编译报错
- #define dictSetKey(d, entry, _key_) do { \
- if ((d)->type->keyDup) \
- entry->key = (d)->type->keyDup((d)->privdata, _key_); \
- else \
- entry->key = (_key_); \
- } while(0)
- #define dictSetVal(d, entry, _val_) do { \
- if ((d)->type->valDup) \
- entry->v.val = (d)->type->valDup((d)->privdata, _val_); \
- else \
- entry->v.val = (_val_); \
- } while(0)
以上, 自然就是一个 hash 表插入数据过程, 然后 server.commands 就有数据了, 请求进来自然就可以处理了.
三, 命令集的定义
在 server.c 的头部, 就有一个数组, 专门用于定义各个命令的处理方法, 并最终被初始化到 server.commands 中.
- /* Our command table.
- *
- * Every entry is composed of the following fields:
- *
- * name: a string representing the command name.
- * function: pointer to the C function implementing the command.
- * arity: number of arguments, it is possible to use -N to say>= N
- * sflags: command flags as string. See below for a table of flags.
- * flags: flags as bitmask. Computed by Redis using the 'sflags' field.
- * get_keys_proc: an optional function to get key arguments from a command.
- * This is only used when the following three fields are not
- * enough to specify what arguments are keys.
- * first_key_index: first argument that is a key
- * last_key_index: last argument that is a key
- * key_step: step to get all the keys from first to last argument. For instance
- * in MSET the step is two since arguments are key,val,key,val,...
- * microseconds: microseconds of total execution time for this command.
- * calls: total number of calls of this command.
- *
- * The flags, microseconds and calls fields are computed by Redis and should
- * always be set to zero.
- *
- * Command flags are expressed using strings where every character represents
- * a flag. Later the populateCommandTable() function will take care of
- * populating the real 'flags' field using this characters.
- *
- * This is the meaning of the flags:
- *
- * w: write command (may modify the key space).
- * r: read command (will never modify the key space).
- * m: may increase memory usage once called. Don't allow if out of memory.
- * a: admin command, like SAVE or SHUTDOWN.
- * p: Pub/Sub related command.
- * f: force replication of this command, regardless of server.dirty.
- * s: command not allowed in scripts.
- * R: random command. Command is not deterministic, that is, the same command
- * with the same arguments, with the same key space, may have different
- * results. For instance SPOP and RANDOMKEY are two random commands.
- * S: Sort command output array if called from script, so that the output
- * is deterministic.
- * l: Allow command while loading the database.
- * t: Allow command while a slave has stale data but is not allowed to
- * server this data. Normally no command is accepted in this condition
- * but just a few.
- * M: Do not automatically propagate the command on MONITOR.
- * k: Perform an implicit ASKING for this command, so the command will be
- * accepted in cluster mode if the slot is marked as 'importing'.
- * F: Fast command: O(1) or O(log(N)) command that should never delay
- * its execution as long as the kernel scheduler is giving us time.
- * Note that commands that may trigger a DEL as a side effect (like SET)
- * are not fast commands.
- */
- struct redisCommand redisCommandTable[] = {
- {"get",getCommand,2,"rF",0,NULL,1,1,1,0,0},
- {"set",setCommand,-3,"wm",0,NULL,1,1,1,0,0},
- {"setnx",setnxCommand,3,"wmF",0,NULL,1,1,1,0,0},
- {"setex",setexCommand,4,"wm",0,NULL,1,1,1,0,0},
- {"psetex",psetexCommand,4,"wm",0,NULL,1,1,1,0,0},
- {"append",appendCommand,3,"wm",0,NULL,1,1,1,0,0},
- {"strlen",strlenCommand,2,"rF",0,NULL,1,1,1,0,0},
- {"del",delCommand,-2,"w",0,NULL,1,-1,1,0,0},
- {"unlink",unlinkCommand,-2,"wF",0,NULL,1,-1,1,0,0},
- {"exists",existsCommand,-2,"rF",0,NULL,1,-1,1,0,0},
- {"setbit",setbitCommand,4,"wm",0,NULL,1,1,1,0,0},
- {"getbit",getbitCommand,3,"rF",0,NULL,1,1,1,0,0},
- {"setrange",setrangeCommand,4,"wm",0,NULL,1,1,1,0,0},
- {"getrange",getrangeCommand,4,"r",0,NULL,1,1,1,0,0},
- {"substr",getrangeCommand,4,"r",0,NULL,1,1,1,0,0},
- {"incr",incrCommand,2,"wmF",0,NULL,1,1,1,0,0},
- {"decr",decrCommand,2,"wmF",0,NULL,1,1,1,0,0},
- {"mget",mgetCommand,-2,"r",0,NULL,1,-1,1,0,0},
- {"rpush",rpushCommand,-3,"wmF",0,NULL,1,1,1,0,0},
- {"lpush",lpushCommand,-3,"wmF",0,NULL,1,1,1,0,0},
- {"rpushx",rpushxCommand,3,"wmF",0,NULL,1,1,1,0,0},
- {"lpushx",lpushxCommand,3,"wmF",0,NULL,1,1,1,0,0},
- {"linsert",linsertCommand,5,"wm",0,NULL,1,1,1,0,0},
- {"rpop",rpopCommand,2,"wF",0,NULL,1,1,1,0,0},
- {"lpop",lpopCommand,2,"wF",0,NULL,1,1,1,0,0},
- {"brpop",brpopCommand,-3,"ws",0,NULL,1,1,1,0,0},
- {"brpoplpush",brpoplpushCommand,4,"wms",0,NULL,1,2,1,0,0},
- {"blpop",blpopCommand,-3,"ws",0,NULL,1,-2,1,0,0},
- {"llen",llenCommand,2,"rF",0,NULL,1,1,1,0,0},
- {"lindex",lindexCommand,3,"r",0,NULL,1,1,1,0,0},
- {"lset",lsetCommand,4,"wm",0,NULL,1,1,1,0,0},
- {"lrange",lrangeCommand,4,"r",0,NULL,1,1,1,0,0},
- {"ltrim",ltrimCommand,4,"w",0,NULL,1,1,1,0,0},
- {"lrem",lremCommand,4,"w",0,NULL,1,1,1,0,0},
- {"rpoplpush",rpoplpushCommand,3,"wm",0,NULL,1,2,1,0,0},
- {"sadd",saddCommand,-3,"wmF",0,NULL,1,1,1,0,0},
- {"srem",sremCommand,-3,"wF",0,NULL,1,1,1,0,0},
- {"smove",smoveCommand,4,"wF",0,NULL,1,2,1,0,0},
- {"sismember",sismemberCommand,3,"rF",0,NULL,1,1,1,0,0},
- {"scard",scardCommand,2,"rF",0,NULL,1,1,1,0,0},
- {"spop",spopCommand,-2,"wRsF",0,NULL,1,1,1,0,0},
- {"srandmember",srandmemberCommand,-2,"rR",0,NULL,1,1,1,0,0},
- {"sinter",sinterCommand,-2,"rS",0,NULL,1,-1,1,0,0},
- {"sinterstore",sinterstoreCommand,-3,"wm",0,NULL,1,-1,1,0,0},
- {"sunion",sunionCommand,-2,"rS",0,NULL,1,-1,1,0,0},
- {"sunionstore",sunionstoreCommand,-3,"wm",0,NULL,1,-1,1,0,0},
- {"sdiff",sdiffCommand,-2,"rS",0,NULL,1,-1,1,0,0},
- {"sdiffstore",sdiffstoreCommand,-3,"wm",0,NULL,1,-1,1,0,0},
- {"smembers",sinterCommand,2,"rS",0,NULL,1,1,1,0,0},
- {"sscan",sscanCommand,-3,"rR",0,NULL,1,1,1,0,0},
- {"zadd",zaddCommand,-4,"wmF",0,NULL,1,1,1,0,0},
- {"zincrby",zincrbyCommand,4,"wmF",0,NULL,1,1,1,0,0},
- {"zrem",zremCommand,-3,"wF",0,NULL,1,1,1,0,0},
- {"zremrangebyscore",zremrangebyscoreCommand,4,"w",0,NULL,1,1,1,0,0},
- {"zremrangebyrank",zremrangebyrankCommand,4,"w",0,NULL,1,1,1,0,0},
- {"zremrangebylex",zremrangebylexCommand,4,"w",0,NULL,1,1,1,0,0},
- {"zunionstore",zunionstoreCommand,-4,"wm",0,zunionInterGetKeys,0,0,0,0,0},
- {"zinterstore",zinterstoreCommand,-4,"wm",0,zunionInterGetKeys,0,0,0,0,0},
- {"zrange",zrangeCommand,-4,"r",0,NULL,1,1,1,0,0},
- {"zrangebyscore",zrangebyscoreCommand,-4,"r",0,NULL,1,1,1,0,0},
- {"zrevrangebyscore",zrevrangebyscoreCommand,-4,"r",0,NULL,1,1,1,0,0},
- {"zrangebylex",zrangebylexCommand,-4,"r",0,NULL,1,1,1,0,0},
- {"zrevrangebylex",zrevrangebylexCommand,-4,"r",0,NULL,1,1,1,0,0},
- {"zcount",zcountCommand,4,"rF",0,NULL,1,1,1,0,0},
- {"zlexcount",zlexcountCommand,4,"rF",0,NULL,1,1,1,0,0},
- {"zrevrange",zrevrangeCommand,-4,"r",0,NULL,1,1,1,0,0},
- {"zcard",zcardCommand,2,"rF",0,NULL,1,1,1,0,0},
- {"zscore",zscoreCommand,3,"rF",0,NULL,1,1,1,0,0},
- {"zrank",zrankCommand,3,"rF",0,NULL,1,1,1,0,0},
- {"zrevrank",zrevrankCommand,3,"rF",0,NULL,1,1,1,0,0},
- {"zscan",zscanCommand,-3,"rR",0,NULL,1,1,1,0,0},
- {"hset",hsetCommand,4,"wmF",0,NULL,1,1,1,0,0},
- {"hsetnx",hsetnxCommand,4,"wmF",0,NULL,1,1,1,0,0},
- {"hget",hgetCommand,3,"rF",0,NULL,1,1,1,0,0},
- {"hmset",hmsetCommand,-4,"wm",0,NULL,1,1,1,0,0},
- {"hmget",hmgetCommand,-3,"r",0,NULL,1,1,1,0,0},
- {"hincrby",hincrbyCommand,4,"wmF",0,NULL,1,1,1,0,0},
- {"hincrbyfloat",hincrbyfloatCommand,4,"wmF",0,NULL,1,1,1,0,0},
- {"hdel",hdelCommand,-3,"wF",0,NULL,1,1,1,0,0},
- {"hlen",hlenCommand,2,"rF",0,NULL,1,1,1,0,0},
- {"hstrlen",hstrlenCommand,3,"rF",0,NULL,1,1,1,0,0},
- {"hkeys",hkeysCommand,2,"rS",0,NULL,1,1,1,0,0},
- {"hvals",hvalsCommand,2,"rS",0,NULL,1,1,1,0,0},
- {"hgetall",hgetallCommand,2,"r",0,NULL,1,1,1,0,0},
- {"hexists",hexistsCommand,3,"rF",0,NULL,1,1,1,0,0},
- {"hscan",hscanCommand,-3,"rR",0,NULL,1,1,1,0,0},
- {"incrby",incrbyCommand,3,"wmF",0,NULL,1,1,1,0,0},
- {"decrby",decrbyCommand,3,"wmF",0,NULL,1,1,1,0,0},
- {"incrbyfloat",incrbyfloatCommand,3,"wmF",0,NULL,1,1,1,0,0},
- {"getset",getsetCommand,3,"wm",0,NULL,1,1,1,0,0},
- {"mset",msetCommand,-3,"wm",0,NULL,1,-1,2,0,0},
- {"msetnx",msetnxCommand,-3,"wm",0,NULL,1,-1,2,0,0},
- {"randomkey",randomkeyCommand,1,"rR",0,NULL,0,0,0,0,0},
- {"select",selectCommand,2,"rlF",0,NULL,0,0,0,0,0},
- {"move",moveCommand,3,"wF",0,NULL,1,1,1,0,0},
- {"rename",renameCommand,3,"w",0,NULL,1,2,1,0,0},
- {"renamenx",renamenxCommand,3,"wF",0,NULL,1,2,1,0,0},
- {"expire",expireCommand,3,"wF",0,NULL,1,1,1,0,0},
- {"expireat",expireatCommand,3,"wF",0,NULL,1,1,1,0,0},
- {"pexpire",pexpireCommand,3,"wF",0,NULL,1,1,1,0,0},
- {"pexpireat",pexpireatCommand,3,"wF",0,NULL,1,1,1,0,0},
- {"keys",keysCommand,2,"rS",0,NULL,0,0,0,0,0},
- {"scan",scanCommand,-2,"rR",0,NULL,0,0,0,0,0},
- {"dbsize",dbsizeCommand,1,"rF",0,NULL,0,0,0,0,0},
- {"auth",authCommand,2,"rsltF",0,NULL,0,0,0,0,0},
- {"ping",pingCommand,-1,"rtF",0,NULL,0,0,0,0,0},
- {"echo",echoCommand,2,"rF",0,NULL,0,0,0,0,0},
- {"save",saveCommand,1,"ars",0,NULL,0,0,0,0,0},
- {"bgsave",bgsaveCommand,1,"ar",0,NULL,0,0,0,0,0},
- {"bgrewriteaof",bgrewriteaofCommand,1,"ar",0,NULL,0,0,0,0,0},
- {"shutdown",shutdownCommand,-1,"arlt",0,NULL,0,0,0,0,0},
- {"lastsave",lastsaveCommand,1,"rRF",0,NULL,0,0,0,0,0},
- {"type",typeCommand,2,"rF",0,NULL,1,1,1,0,0},
- {"multi",multiCommand,1,"rsF",0,NULL,0,0,0,0,0},
- {"exec",execCommand,1,"sM",0,NULL,0,0,0,0,0},
- {"discard",discardCommand,1,"rsF",0,NULL,0,0,0,0,0},
- {"sync",syncCommand,1,"ars",0,NULL,0,0,0,0,0},
- {"psync",syncCommand,3,"ars",0,NULL,0,0,0,0,0},
- {"replconf",replconfCommand,-1,"arslt",0,NULL,0,0,0,0,0},
- {"flushdb",flushdbCommand,-1,"w",0,NULL,0,0,0,0,0},
- {"flushall",flushallCommand,-1,"w",0,NULL,0,0,0,0,0},
- {"sort",sortCommand,-2,"wm",0,sortGetKeys,1,1,1,0,0},
- {"info",infoCommand,-1,"rlt",0,NULL,0,0,0,0,0},
- {"monitor",monitorCommand,1,"ars",0,NULL,0,0,0,0,0},
- {"ttl",ttlCommand,2,"rF",0,NULL,1,1,1,0,0},
- {"pttl",pttlCommand,2,"rF",0,NULL,1,1,1,0,0},
- {"persist",persistCommand,2,"wF",0,NULL,1,1,1,0,0},
- {"slaveof",slaveofCommand,3,"ast",0,NULL,0,0,0,0,0},
- {"role",roleCommand,1,"lst",0,NULL,0,0,0,0,0},
- {"debug",debugCommand,-2,"as",0,NULL,0,0,0,0,0},
- {"config",configCommand,-2,"art",0,NULL,0,0,0,0,0},
- {"subscribe",subscribeCommand,-2,"rpslt",0,NULL,0,0,0,0,0},
- {"unsubscribe",unsubscribeCommand,-1,"rpslt",0,NULL,0,0,0,0,0},
- {"psubscribe",psubscribeCommand,-2,"rpslt",0,NULL,0,0,0,0,0},
- {"punsubscribe",punsubscribeCommand,-1,"rpslt",0,NULL,0,0,0,0,0},
- {"publish",publishCommand,3,"pltrF",0,NULL,0,0,0,0,0},
- {"pubsub",pubsubCommand,-2,"pltrR",0,NULL,0,0,0,0,0},
- {"watch",watchCommand,-2,"rsF",0,NULL,1,-1,1,0,0},
- {"unwatch",unwatchCommand,1,"rsF",0,NULL,0,0,0,0,0},
- {"cluster",clusterCommand,-2,"ar",0,NULL,0,0,0,0,0},
- {"restore",restoreCommand,-4,"wm",0,NULL,1,1,1,0,0},
- {"restore-asking",restoreCommand,-4,"wmk",0,NULL,1,1,1,0,0},
- {"migrate",migrateCommand,-6,"w",0,migrateGetKeys,0,0,0,0,0},
- {"asking",askingCommand,1,"r",0,NULL,0,0,0,0,0},
- {"readonly",readonlyCommand,1,"rF",0,NULL,0,0,0,0,0},
- {"readwrite",readwriteCommand,1,"rF",0,NULL,0,0,0,0,0},
- {"dump",dumpCommand,2,"r",0,NULL,1,1,1,0,0},
- {"object",objectCommand,3,"r",0,NULL,2,2,2,0,0},
- {"client",clientCommand,-2,"rs",0,NULL,0,0,0,0,0},
- {"eval",evalCommand,-3,"s",0,evalGetKeys,0,0,0,0,0},
- {"evalsha",evalShaCommand,-3,"s",0,evalGetKeys,0,0,0,0,0},
- {"slowlog",slowlogCommand,-2,"r",0,NULL,0,0,0,0,0},
- {"script",scriptCommand,-2,"rs",0,NULL,0,0,0,0,0},
- {"time",timeCommand,1,"rRF",0,NULL,0,0,0,0,0},
- {"bitop",bitopCommand,-4,"wm",0,NULL,2,-1,1,0,0},
- {"bitcount",bitcountCommand,-2,"r",0,NULL,1,1,1,0,0},
- {"bitpos",bitposCommand,-3,"r",0,NULL,1,1,1,0,0},
- {"wait",waitCommand,3,"rs",0,NULL,0,0,0,0,0},
- {"command",commandCommand,0,"rlt",0,NULL,0,0,0,0,0},
- {"geoadd",geoaddCommand,-5,"wm",0,NULL,1,1,1,0,0},
- {"georadius",georadiusCommand,-6,"r",0,NULL,1,1,1,0,0},
- {"georadiusbymember",georadiusByMemberCommand,-5,"r",0,NULL,1,1,1,0,0},
- {"geohash",geohashCommand,-2,"r",0,NULL,1,1,1,0,0},
- {"geopos",geoposCommand,-2,"r",0,NULL,1,1,1,0,0},
- {"geodist",geodistCommand,-4,"r",0,NULL,1,1,1,0,0},
- {"pfselftest",pfselftestCommand,1,"r",0,NULL,0,0,0,0,0},
- {"pfadd",pfaddCommand,-2,"wmF",0,NULL,1,1,1,0,0},
- {"pfcount",pfcountCommand,-2,"r",0,NULL,1,-1,1,0,0},
- {"pfmerge",pfmergeCommand,-2,"wm",0,NULL,1,-1,1,0,0},
- {"pfdebug",pfdebugCommand,-3,"w",0,NULL,0,0,0,0,0},
- {"latency",latencyCommand,-2,"arslt",0,NULL,0,0,0,0,0}
- };
可以说, 核心功能都是在这里定义的哟 (小手册), 如果想自己添加功能, 也是从这里开始, 然后去实现它. 其中, lua 的调用则直接使用 eval 进行即可.
三, 执行命令的模板方法
调用真正命令之前, 之后, 会各种判断, 才可以进行命令的调用. 这也是数据库的与简单函数调用的差别之一.
- // server.c, call 框架
- /* Call() is the core of Redis execution of a command.
- *
- * The following flags can be passed:
- * CMD_CALL_NONE No flags.
- * CMD_CALL_SLOWLOG Check command speed and log in the slow log if needed.
- * CMD_CALL_STATS Populate command stats.
- * CMD_CALL_PROPAGATE_AOF Append command to AOF if it modified the dataset
- * or if the client flags are forcing propagation.
- * CMD_CALL_PROPAGATE_REPL Send command to salves if it modified the dataset
- * or if the client flags are forcing propagation.
- * CMD_CALL_PROPAGATE Alias for PROPAGATE_AOF|PROPAGATE_REPL.
- * CMD_CALL_FULL Alias for SLOWLOG|STATS|PROPAGATE.
- *
- * The exact propagation behavior depends on the client flags.
- * Specifically:
- *
- * 1. If the client flags CLIENT_FORCE_AOF or CLIENT_FORCE_REPL are set
- * and assuming the corresponding CMD_CALL_PROPAGATE_AOF/REPL is set
- * in the call flags, then the command is propagated even if the
- * dataset was not affected by the command.
- * 2. If the client flags CLIENT_PREVENT_REPL_PROP or CLIENT_PREVENT_AOF_PROP
- * are set, the propagation into AOF or to slaves is not performed even
- * if the command modified the dataset.
- *
- * Note that regardless of the client flags, if CMD_CALL_PROPAGATE_AOF
- * or CMD_CALL_PROPAGATE_REPL are not set, then respectively AOF or
- * slaves propagation will never occur.
- *
- * Client flags are modified by the implementation of a given command
- * using the following API:
- *
- * forceCommandPropagation(client *c, int flags);
- * preventCommandPropagation(client *c);
- * preventCommandAOF(client *c);
- * preventCommandReplication(client *c);
- *
- */
- void call(client *c, int flags) {
- long long dirty, start, duration;
- int client_old_flags = c->flags;
- /* Sent the command to clients in MONITOR mode, only if the commands are
- * not generated from reading an AOF. */
- // monitors 模式下, 先把命令传播给需要的客户端
- if (listLength(server.monitors) &&
- !server.loading &&
- !(c->cmd->flags & (CMD_SKIP_MONITOR|CMD_ADMIN)))
- {
- replicationFeedMonitors(c,server.monitors,c->db->id,c->argv,c->argc);
- }
- /* Initialization: clear the flags that must be set by the command on
- * demand, and initialize the array for additional commands propagation. */
- c->flags &= ~(CLIENT_FORCE_AOF|CLIENT_FORCE_REPL|CLIENT_PREVENT_PROP);
- // 重置 server.also_propagate
- redisOpArrayInit(&server.also_propagate);
- /* Call the command. */
- // 调用具体命令操作, 该命令会自已负责客户端的响应, 外部仅记录时间
- dirty = server.dirty;
- start = ustime();
- c->cmd->proc(c);
- duration = ustime()-start;
- dirty = server.dirty-dirty;
- if (dirty <0) dirty = 0;
- /* When EVAL is called loading the AOF we don't want commands called
- * from Lua to go into the slowlog or to populate statistics. */
- if (server.loading && c->flags & CLIENT_LUA)
- flags &= ~(CMD_CALL_SLOWLOG | CMD_CALL_STATS);
- /* If the caller is Lua, we want to force the EVAL caller to propagate
- * the script if the command flag or client flag are forcing the
- * propagation. */
- if (c->flags & CLIENT_LUA && server.lua_caller) {
- if (c->flags & CLIENT_FORCE_REPL)
- server.lua_caller->flags |= CLIENT_FORCE_REPL;
- if (c->flags & CLIENT_FORCE_AOF)
- server.lua_caller->flags |= CLIENT_FORCE_AOF;
- }
- /* Log the command into the Slow log if needed, and populate the
- * per-command statistics that we show in INFO commandstats. */
- // 慢查询日志记录
- if (flags & CMD_CALL_SLOWLOG && c->cmd->proc != execCommand) {
- char *latency_event = (c->cmd->flags & CMD_FAST) ?
- "fast-command" : "command";
- latencyAddSampleIfNeeded(latency_event,duration/1000);
- slowlogPushEntryIfNeeded(c->argv,c->argc,duration);
- }
- if (flags & CMD_CALL_STATS) {
- c->cmd->microseconds += duration;
- c->cmd->calls++;
- }
- /* Propagate the command into the AOF and replication link */
- if (flags & CMD_CALL_PROPAGATE &&
- (c->flags & CLIENT_PREVENT_PROP) != CLIENT_PREVENT_PROP)
- {
- int propagate_flags = PROPAGATE_NONE;
- /* Check if the command operated changes in the data set. If so
- * set for replication / AOF propagation. */
- if (dirty) propagate_flags |= (PROPAGATE_AOF|PROPAGATE_REPL);
- /* If the client forced AOF / replication of the command, set
- * the flags regardless of the command effects on the data set. */
- if (c->flags & CLIENT_FORCE_REPL) propagate_flags |= PROPAGATE_REPL;
- if (c->flags & CLIENT_FORCE_AOF) propagate_flags |= PROPAGATE_AOF;
- /* However prevent AOF / replication propagation if the command
- * implementatino called preventCommandPropagation() or similar,
- * or if we don't have the call() flags to do so. */
- if (c->flags & CLIENT_PREVENT_REPL_PROP ||
- !(flags & CMD_CALL_PROPAGATE_REPL))
- propagate_flags &= ~PROPAGATE_REPL;
- if (c->flags & CLIENT_PREVENT_AOF_PROP ||
- !(flags & CMD_CALL_PROPAGATE_AOF))
- propagate_flags &= ~PROPAGATE_AOF;
- /* Call propagate() only if at least one of AOF / replication
- * propagation is needed. */
- // 只要不是 PROPAGATE_NONE, 都会进行命令传播
- if (propagate_flags != PROPAGATE_NONE)
- propagate(c->cmd,c->db->id,c->argv,c->argc,propagate_flags);
- }
- /* Restore the old replication flags, since call() can be executed
- * recursively. */
- c->flags &= ~(CLIENT_FORCE_AOF|CLIENT_FORCE_REPL|CLIENT_PREVENT_PROP);
- c->flags |= client_old_flags &
- (CLIENT_FORCE_AOF|CLIENT_FORCE_REPL|CLIENT_PREVENT_PROP);
- /* Handle the alsoPropagate() API to handle commands that want to propagate
- * multiple separated commands. Note that alsoPropagate() is not affected
- * by CLIENT_PREVENT_PROP flag. */
- if (server.also_propagate.numops) {
- int j;
- redisOp *rop;
- // 命令传播到 server.also_propagate 中
- if (flags & CMD_CALL_PROPAGATE) {
- for (j = 0; j <server.also_propagate.numops; j++) {
- rop = &server.also_propagate.ops[j];
- int target = rop->target;
- /* Whatever the command wish is, we honor the call() flags. */
- if (!(flags&CMD_CALL_PROPAGATE_AOF)) target &= ~PROPAGATE_AOF;
- if (!(flags&CMD_CALL_PROPAGATE_REPL)) target &= ~PROPAGATE_REPL;
- if (target)
- propagate(rop->cmd,rop->dbid,rop->argv,rop->argc,target);
- }
- }
- redisOpArrayFree(&server.also_propagate);
- }
- server.stat_numcommands++;
- }
四, 响应客户端
响应客户端可以使用 addReply(), 当然还有其他简化版本, 道理一致.
// networking.c, 向 client c 中响应数据 obj /* ----------------------------------------------------------------------------- * Higher level functions to queue data on the client output buffer. * The following functions are the ones that commands implementations will call. * -------------------------------------------------------------------------- */ void addReply(client *c, robj *obj) { // 客户端连接不可写时, 直接返回本次写操作 if (prepareClientToWrite(c) != C_OK) return; /* This is an important place where we can avoid copy-on-write * when there is a saving child running, avoiding touching the * refcount field of the object if it's not needed. * * If the encoding is RAW and there is room in the static buffer * we'll be able to send the object to the client without * messing with its page. */ // 检测编码是否是 OBJ_ENCODING_RAW/OBJ_ENCODING_EMBSTR if (sdsEncodedObject(obj)) { // 将数据添加到 c->buf 中 // 添加失败则将 obj 直接添加到 c->reply 队列中 if (_addReplyToBuffer(c,obj->ptr,sdslen(obj->ptr)) != C_OK) _addReplyObjectToList(c,obj); } else if (obj->encoding == OBJ_ENCODING_INT) { /* Optimization: if there is room in the static buffer for 32 bytes * (more than the max chars a 64 bit integer can take as string) we * avoid decoding the object and go for the lower level approach. */ if (listLength(c->reply) == 0 && (sizeof(c->buf) - c->bufpos)>= 32) { char buf[32]; int len; len = ll2string(buf,sizeof(buf),(long)obj->ptr); if (_addReplyToBuffer(c,buf,len) == C_OK) return; /* else... continue with the normal code path, but should never * happen actually since we verified there is room. */ } obj = getDecodedObject(obj); if (_addReplyToBuffer(c,obj->ptr,sdslen(obj->ptr)) != C_OK) _addReplyObjectToList(c,obj); decrRefCount(obj); } else { serverPanic("Wrong obj->encoding in addReply()"); } } // networking.c, int _addReplyToBuffer(client *c, const char *s, size_t len) { size_t available = sizeof(c->buf)-c->bufpos; if (c->flags & CLIENT_CLOSE_AFTER_REPLY) return C_OK; /* If there already are entries in the reply list, we cannot * add anything more to the static buffer. */ if (listLength(c->reply)> 0) return C_ERR; /* Check that the buffer has enough space available for this string. */ if (len> available) return C_ERR; memcpy(c->buf+c->bufpos,s,len); c->bufpos+=len; return C_OK; }
将数据写入到 c->buf 后, 又是谁向客户端写出了结果呢?
要么是有一个后台线程一直写, 要么是在下一次循环的时候再主动写, 你觉得呢?( 请参考: server.clients_pending_write )
来源: https://www.cnblogs.com/yougewe/p/12219098.html