python rq源码简单剖析

    最近在看rpc的一些原理,看到生产者/消费者模型的时候,有点不解,找到了rq,看了下其实现,简单记录下。   

    rq是一个轻量级的python生产者/消费者库,相比celery要轻量多了,所以很多人也倾向于简单任务就用它。

    对于rq,他包括两个端,一个是消费端,一个是生产端。在这里我只关心两个点,第一个是rq如何在redis里组织他的数据结构?第二是rq如何调度进程/线程进行消费的?其他不和这两点相关的暂且忽略…

    对于生产端,需要用enqueue或者enqueue_call压入redis中,所以我们来看这两个函数,其他他们都在queue.py文件里。先看下enqueue

def enqueue(self, f, *args, **kwargs):
    ...

    return self.enqueue_call(func=f, args=args, kwargs=kwargs,
                             timeout=timeout, result_ttl=result_ttl, ttl=ttl,
                             description=description, depends_on=depends_on,
                             job_id=job_id, at_front=at_front, meta=meta)

这里省略了一些无关代码,可以看到,他还是调用了enqueue_call,所以我们接着分析他。

def enqueue_job(self, job, pipeline=None, at_front=False):
    """Enqueues a job for delayed execution.

    If Queue is instantiated with is_async=False, job is executed immediately.
    """
    pipe = pipeline if pipeline is not None else self.connection._pipeline()

    # Add Queue key set

    # 1) 增加到set中
    pipe.sadd(self.redis_queues_keys, self.key)

    # 2) 设置状态
    job.set_status(JobStatus.QUEUED, pipeline=pipe)

    # 3) 保存job的一些信息,包括用户meta信息
    job.save(pipeline=pipe)

    # 4) 把job压入队列
    self.push_job_id(job.id, pipeline=pipe, at_front=at_front)

    if pipeline is None:
        pipe.execute()

    return job

def enqueue_call(self, func, args=None, kwargs=None, timeout=None,
                 result_ttl=None, ttl=None, description=None,
                 depends_on=None, job_id=None, at_front=False, meta=None):
    """Creates a job to represent the delayed function call and enqueues
    it.

    It is much like `.enqueue()`, except that it takes the function's args
    and kwargs as explicit arguments.  Any kwargs passed to this function
    contain options for RQ itself.
    """

    # 1) 创建一个job,job是元数据
    job = self.job_class.create(
        func, args=args, kwargs=kwargs, connection=self.connection,
        result_ttl=result_ttl, ttl=ttl, status=JobStatus.QUEUED,
        description=description, depends_on=depends_on,
        timeout=timeout, id=job_id, origin=self.name, meta=meta)

    # 2) 把job压入队列中
    job = self.enqueue_job(job, at_front=at_front)

    return job

同样我也去掉了一些无关代码,可以看到在enqueue_call里也只是创建了一个job后就调用了enqueue_job方法。

先看job是什么东西?

lass Job(object):
    """A Job is just a convenient datastructure to pass around job (meta) data.
    """
    redis_job_namespace_prefix = 'rq:job:'
    ...

可以看到job其实就是一个存储任务的抽象结构,他比较重要的一些方法比如delete,cancel,fetch,save等。

主要看两个方法,第一个是to_dict,这个是从job需要存储到redis里前做的序列化工作。

def to_dict(self, include_meta=True):
    """
    Returns a serialization of the current job instance

    You can exclude serializing the `meta` dictionary by setting
    `include_meta=False`.
    """
    obj = {}
    obj['created_at'] = utcformat(self.created_at or utcnow())
    obj['data'] = zlib.compress(self.data)
    ...

    if self.meta and include_meta:
        obj['meta'] = dumps(self.meta)
    if self.ttl:
        obj['ttl'] = self.ttl

    return obj

可以看到他对数据进行了压缩处理,而且也对用户数据meta进行了存储。另外一个比较重要的方法是perform,这个方法等到下面再讲,简单说就是job的执行。

下面看下set_status做了什么?

def set_status(self, status, pipeline=None):
    self._status = status
    self.connection._hset(self.key, 'status', self._status, pipeline)

单独用redis set去维护job的状态,然后看job的save方法做了什么?

def save(self, pipeline=None, include_meta=True):
    """
    Dumps the current job instance to its corresponding Redis key.

    Exclude saving the `meta` dictionary by setting
    `include_meta=False`. This is useful to prevent clobbering
    user metadata without an expensive `refresh()` call first.

    Redis key persistence may be altered by `cleanup()` method.
    """
    key = self.key
    connection = pipeline if pipeline is not None else self.connection

    connection.hmset(key, self.to_dict(include_meta=include_meta))

可以看到这里才是保存job的真正数据,用到上面说的to_dict方法去处理。

最后一个方法看名字也知道他就是压入任务到一个队列里了。

def push_job_id(self, job_id, pipeline=None, at_front=False):
    """Pushes a job ID on the corresponding Redis queue.
    'at_front' allows you to push the job onto the front instead of the back of the queue"""
    connection = pipeline if pipeline is not None else self.connection
    if at_front:
        connection.lpush(self.key, job_id)
    else:
        connection.rpush(self.key, job_id)

worker的调度入口是work方法,调度的中心是一个while True,我们看下这部分代码

try:
    while True:
        try:
            # 1) 检测是否挂起
            self.check_for_suspension(burst)

            if self.should_run_maintenance_tasks:
                self.clean_registries()

            if self._stop_requested:
                self.log.info('Stopping on request')
                break

            timeout = None if burst else max(1, self.default_worker_ttl - 15)

            result = self.dequeue_job_and_maintain_ttl(timeout)
            if result is None:
                if burst:
                    self.log.info("RQ worker {0!r} done, quitting".format(self.key))
                break

            job, queue = result

            # 2) 执行job
            self.execute_job(job, queue)

            # 3) 上报job的心跳
            self.heartbeat()

            did_perform_work = True

        except StopRequested:
            break

现在我们最关心得还是execute_job方法,这个方法主要是真正的执行部分

def execute_job(self, job, queue):
    """Spawns a work horse to perform the actual work and passes it a job.
    The worker will wait for the work horse and make sure it executes
    within the given timeout bounds, or will end the work horse with
    SIGALRM.
    """
    self.set_state(WorkerStatus.BUSY)

    # 1) fork出进程,执行job
    self.fork_work_horse(job, queue)

    # 2) 监控上一步fork的进程执行
    self.monitor_work_horse(job)
    self.set_state(WorkerStatus.IDLE)

这里fork部分是我们最关心得.

def fork_work_horse(self, job, queue):
    """Spawns a work horse to perform the actual work and passes it a job.
    """
    child_pid = os.fork()
    os.environ['RQ_WORKER_ID'] = self.name
    os.environ['RQ_JOB_ID'] = job.id
    if child_pid == 0:
        # 子进程调度
        self.main_work_horse(job, queue)
    else:
        self._horse_pid = child_pid
        self.procline('Forked {0} at {1}'.format(child_pid, time.time()))

可以看到的是在这个方法里,父子进程分离了,而main_work_horse就开始真正执行刚才说的job.perform方法

# 1) 执行前预处理
self.prepare_job_execution(job)

# 2) 压入链接
push_connection(self.connection)

started_job_registry = StartedJobRegistry(job.origin,
                                          self.connection,
                                          job_class=self.job_class)

try:
    job.started_at = utcnow()
    timeout = job.timeout or self.queue_class.DEFAULT_TIMEOUT
    with self.death_penalty_class(timeout, JobTimeoutException):
        # 3) job执行
        rv = job.perform()

    job.ended_at = utcnow()

    # Pickle the result in the same try-except block since we need
    # to use the same exc handling when pickling fails
    job._result = rv

    # 4) 处理job成功状态
    self.handle_job_success(job=job,
                            queue=queue,
                            started_job_registry=started_job_registry)
except:
    ...

再看monitor_work_horse方法,这个方法主要作用是监视上面fork出来的子进程用的

def monitor_work_horse(self, job):
    while True:
        try:
            with UnixSignalDeathPenalty(self.job_monitoring_interval, HorseMonitorTimeoutException):
                retpid, ret_val = os.waitpid(self._horse_pid, 0)
            break
        except HorseMonitorTimeoutException:
            self.heartbeat(self.job_monitoring_interval + 5)
        except OSError as e:
            if e.errno != errno.EINTR:
                raise
            # Send a heartbeat to keep the worker alive.
            self.heartbeat()

    if ret_val == os.EX_OK:  # The process exited normally.
        return
    ...

可以看到他一直在等待子进程返回状态,根据返回状态进行相应的设置。你可能会问一个worker不能并发执行吗?嗯,确实是这样的,在一个worker中只能有一个子进程再执行,官方文档的建议是多开几个worker就可以做到多进程执行了。不过这样维护起来有点麻烦,不过网上找到了一个解决方案就是用multiprocess来多开就可以了。

http://xiaorui.cc/2015/12/15/python-rq%E7%9A%84%E5%AE%9A%E6%97%B6%E5%8F%8A%E8%AE%A1%E5%88%92%E4%BB%BB%E5%8A%A1delay%E5%AE%9E%E7%8E%B0/

暂无评论

发表评论

电子邮件地址不会被公开。 必填项已用*标注

备案号:浙ICP备15006402号-2 备注:博客君在0.049里共执行41个查询, 总共占用内存 5.67MB