Gunicorn 部署 Flask-Apscheduler 之踩坑记录

本贴最后更新于 1757 天前,其中的信息可能已经事过景迁

flask-apscheduler 是一款 flask 的定时任务框架,其本质上是和 apscheduler 一样的,具体的使用操作和其他的 flask 组件一样。在开发环境上定时任务跑起来很顺利,但是到了生产环境用 Gunicorn 部署的时候出现了各种问题。

踩坑一:TimeZone offset does not match system offset

部署生产环境时,一上去就给我丢了一个大大的异常:“TimeZone offset does not match system offset”,大致意思是说我的运行的时区和系统时区不匹配。

2121582866177.pichd.jpg

读了下 flask-apscheduler 的源码发现,他会读取 SCHEDULER_TIMEZONE 这个值,作为当前运行的时区。

timezone = self.app.config.get('SCHEDULER_TIMEZONE')
if timezone:
    options['timezone'] = timezone

心想这简单,我在配置文件里把这个时区配置上应该就完事了。

# config.py
SCHEDULER_TIMEZONE = "Asia/Shanghai"

配置了时区为“Asia/Shanghai”后,发现依然报这个错,既然运行环境的时区正确了,那会不会我生产环境的时区也有问题?

生产环境是 docker 容器,进入容器用 date 查看了他的当前时间,发现时间是和系统时间一致的,再查看 cat /etc/timezone 的时区,发现这里出了问题,显示的是 Etc/UTC,解决的思路是修改 Dockerfile,配置正确的时区,在 Dockerfile 中加入此行。

RUN echo "Asia/Shanghai" > /etc/timezone

修改之后重新运行,已经没有出现上述报错了,这个坑算是到这就排完了。

踩坑二:Flask-Apscheduler 多进程环境重复运行

排完第一个坑的时候项目可以启动运行了,这时当然要测试一下定时任务的分发是否正常,此时出现了下面的情况。

image.png

有三条任务是一模一样的,Flask-Apscheduler 的 add_job() 方法需要传一个 id 值,这个值代表了每一任务,且是不能重复的,不然会抛出
ConflictingIdError 异常,那为什么测试的时候会发出几条一模一样的任务?

其实这主要和生产环境使用 Gunicorn 部署有关,Gunicorn 可以指定一个 worker 参数,指的是开启的进程数,而每次开一个 worker,都会启动一个 scheduler,这就导致了这些定时任务是由不同的进程创建的。

这个问题网上的解决方法五花八门,不如利用阻塞某个 socket 端口,来实现进程的创建时只创建单一一个,或者给 Gunicorn 加 --preload 参数等等,这时还是看官方的 issue 保险,发现还真有,按其中的解决方式我尝试了几种。

  1. 配置环境变量以控制开启的 scheduler 数量

通过读取环境变量的方式,判断 SCHEDULER_LOCK 的值是否为 False,此时开启 scheduler,并且修改环境变量值。

if os.environ.get("SCHEDULER_LOCK") == "False":
    scheduler.start()
    os.environ["SCHEDULER_LOCK"] = "True"

我尝试了这种方法,显然并不行,通过环境变量共享多个进程的方式,是不太可取的。

  1. 通过全局锁,控制 scheduler 只运行一次

首次创建进程时,会创建一个 scheduler.lock 文件,并加上非阻塞互斥锁,此时 scheduler 可以成功开启,如果文件加锁失败抛出异常,则表示当前 scheduler 已经开启了,最后再注册一个退出事件,此时 flask 退出的话,就释放文件锁。

def register_scheduler():
    """
    注册定时任务
    """
    f = open("scheduler.lock", "wb")
    # noinspection PyBroadException
    try:
        fcntl.flock(f, fcntl.LOCK_EX | fcntl.LOCK_NB)
        scheduler.start()
    except:
        pass

    def unlock():
        fcntl.flock(f, fcntl.LOCK_UN)
        f.close()

    atexit.register(unlock)

这个方法看似完美,其在运行开始时确实没有出现问题,但这还没完,且看下个坑。

踩坑三:集群环境下,Flask-Apscheduler 多进程环境重复运行

第二个问题确实可以通过文件锁的方式解决,但是我的生产环境是集群环境,也就是有多个应用实例,每个应用实例部署于 docker 容器之中,这样他们是互相隔离的,并通过 Nginx 做负载均衡。

此时再调用定时任务时,显然还会有问题,当定时任务调用接口打在同一个实例时,是没有问题的,当第二次调用接口时就可以抛出异常并捕获,但对于不同的实例,当第三次,接口请求打在了另外的实例上,此时按正常的业务逻辑,任务下发应该需要失败,但它依然成功了,也就是说上面通过文件全局锁的方式,并不适用于集群环境。

此时只能再次阅读其他 issue,确实也有人遇到了此类问题,但我看到作者的建议其实是拆分业务。

image.png

拆分业务确实给了我不少提示,但我的定时任务的功能不算是一个微服务,如果独立成一个 flask 实例也需要一点时间,此时我想到了这个问题的本质是由于多进程导致 Flask-Apscheduler 开启多个 scheduler 而造成的,那么我可以构建一个单进程的实例,并让特定的定时任务接口流量从这个集群的实例过就可以了。

修改 Gunicorn 的配置文件,通过获取 WORKERS_COUNT 这个环境变量来判断需要开启几个进程,并把进程数赋值给 workers。

if os.environ.get("WORKERS_COUNT"):
    workers = int(os.environ.get("WORKERS_COUNT"))
else:
    workers = multiprocessing.cpu_count() * 2 + 1

在 docker-compose.yml 配置文件中指定其中一个实例的进程数为单进程。

environment:
    - WORKERS_COUNT=1

此时再通过 Nginx 的正则规则,将定时任务的接口路由到该实例就可以了,Nginx 的匹配标识符可以参考下表。

标识符 描述
= **精确匹配:**用于标准 uri 前,要求请求字符串和 uri 严格匹配。如果匹配成功就停止匹配,立即执行该 location 里面的请求。
~ **正则匹配:**用于正则 uri 前,表示 uri 里面包含正则,并且区分大小写。
~* **正则匹配:**用于正则 uri 前,表示 uri 里面包含正则,不区分大小写。
^~ **非正则匹配;**用于标准 uri 前,nginx 服务器匹配到前缀最多的 uri 后就结束,该模式匹配成功后,不会使用正则匹配。
**普通匹配(最长字符匹配);**与 location 顺序无关,是按照匹配的长短来取匹配结果。若完全匹配,就停止匹配。

到这里算是终于解决了这个问题,过程虽然艰辛,但也算颇有收获。

  • Flask
    19 引用 • 9 回帖
  • Python

    Python 是一种面向对象、直译式电脑编程语言,具有近二十年的发展历史,成熟且稳定。它包含了一组完善而且容易理解的标准库,能够轻松完成很多常见的任务。它的语法简捷和清晰,尽量使用无异义的英语单词,与其它大多数程序设计语言使用大括号不一样,它使用缩进来定义语句块。

    545 引用 • 672 回帖
  • Gunicorn
    3 引用 • 21 回帖
  • NGINX

    NGINX 是一个高性能的 HTTP 和反向代理服务器,也是一个 IMAP/POP3/SMTP 代理服务器。 NGINX 是由 Igor Sysoev 为俄罗斯访问量第二的 Rambler.ru 站点开发的,第一个公开版本 0.1.0 发布于 2004 年 10 月 4 日。

    313 引用 • 547 回帖 • 1 关注

相关帖子

欢迎来到这里!

我们正在构建一个小众社区,大家在这里相互信任,以平等 • 自由 • 奔放的价值观进行分享交流。最终,希望大家能够找到与自己志同道合的伙伴,共同成长。

注册 关于
请输入回帖内容 ...
  • JackZhang

    不用那么麻烦吧,直接用 redis 加个锁就能解决啊

    1 回复
  • chen2020

    你好,我如果在代码中加上

    import atexit
    import fcntl
    from apscheduler.events import EVENT_JOB_ERROR, EVENT_JOB_MISSED, EVENT_JOB_EXECUTED
    def scheduler_init(app):
    
        f = open("scheduler.lock", "wb")
        try:
            fcntl.flock(f, fcntl.LOCK_EX | fcntl.LOCK_NB)
    
    
            scheduler.add_listener(job_listener, EVENT_JOB_ERROR | \
                               EVENT_JOB_MISSED | \
                               EVENT_JOB_EXECUTED)
            scheduler.init_app(app)
            scheduler.start()
        except:
            pass
        def unlock():
            fcntl.flock(f, fcntl.LOCK_UN)
            f.close()
        atexit.register(unlock)
    

    代码就会在服务端,导致无法动态修改定时任务这是怎么回事?

    1 回复
  • zhangheng

    请问这里的代码要写在什么地方呢,在哪里动态添加任务?

  • YYJeffrey
    作者

    确实,Redis 锁也是一种不错的解决方案。

  • AkiraHu

    redis 加锁,如何验证是否成功呢

    1 回复
  • YYJeffrey
    作者

    验证是否成功的话,可以重复提交任务,看看是否会生成多个,如果生成了多个,就说明加锁失败了

请输入回帖内容 ...