程序员最近都爱上了这个网站  程序员们快来瞅瞅吧!  it98k网:it98k.com

本站消息

站长简介/公众号

  出租广告位,需要合作请联系站长

+关注
已关注

分类  

暂无分类

标签  

暂无标签

日期归档  

暂无数据

如何在 Django 应用程序中安全地使用多处理?

发布于2024-11-26 22:09     阅读(480)     评论(0)     点赞(4)     收藏(5)


我读过一些文档,其中指出多处理可能会在 Django 应用程序或 Windows 中导致意想不到的副作用,尤其是那些连接到多个数据库的应用程序。具体来说,我使用函数load_to_table从 DataFrame 创建多个 CSV 文件,然后使用多处理将数据加载到 PostgreSQL 表中。该函数与我的 Django 应用程序深度集成,不是独立脚本。

如果在生产中使用此代码,我担心可能会产生长期影响。此外,if __name__ == '__main__':似乎在 Django 的深层文件/函数中不起作用。这是因为 Django 的管理命令是在不同的上下文中执行的,其中__name__未设置为"__main__",这会阻止此块按预期执行。此外,多处理指南建议使用if __name__ == '__main__':来安全地初始化多处理任务,因为它可以确保代码不会意外执行多次,尤其是在 Windows 等平台上,模块级代码会在子进程中重新导入。

以下是我使用的代码:

import os
import glob
from multiprocessing import Pool, cpu_count
from functools import partial
from portal.db_postgresql.connection import Connection

def copy_to_table(file_name: str, table_name: str, columns: list):
    # custom connection class
    connection_obj = Connection(get_current_db_name(), 1, 1)
    connection = connection_obj.connection()
    cursor = connection.cursor()
    with open(file_name, "r") as f:
        cursor.copy_from(f, table_name, sep=",", columns=columns, null="")
    connection.commit()
    connection.close()
    return file_name

# df_ops is a custom PySpark dataframe class
def load_to_table(df_ops: PySparkOperations, table_name: str) -> dict:
    filepath = os.path.join("uploaded_files", table_name)
    df_ops.df.repartition(10).write.mode("overwrite").format("csv").option("header", "false").save(filepath)
    file_path_list = sorted(glob.glob(f"{filepath}/*.csv"))
    with Pool(cpu_count()) as p:
        p.map(partial(copy_to_table, table_name=table_name, columns=df_ops.df.columns), file_path_list)
    return df_ops.count

上述函数无法在 VS Code 调试器中使用,很可能是因为debugpy,这会干扰 Django 的多处理。但是,它可以在 中使用runserver。当我使用 VS Code 调试器运行 Django 应用程序时,执行该函数时遇到以下错误。它似乎在循环运行。

File "/usr/lib/python3.11/multiprocessing/process.py", line 121, in start
    self._popen = self._Popen(self)
                  ^^^^^^^^^^^^^^^^^
  File "/home/rhythmflow/Desktop/Reconciliation/reconciliation-backend-v3/portal/operations/load_data/methods.py", line 225, in load_to_table
    with Pool(cpu_count()) as p:
         ^^^^^^^^^^^^^^^^^
  File "/usr/lib/python3.11/multiprocessing/context.py", line 281, in _Popen
    return Popen(process_obj)
           ^^^^^^^^^^^^^^^^^^
  File "/home/rhythmflow/Desktop/Reconciliation/reconciliation-backend-v3/portal/operations/load_data/load_data.py", line 71, in start
    load_to_table(df_ops, self.source_tmp_details)
  File "/usr/lib/python3.11/multiprocessing/context.py", line 119, in Pool
    return Pool(processes, initializer, initargs, maxtasksperchild,
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

File "/home/rhythmflow/.vscode/extensions/ms-python.debugpy-2024.10.0-linux-x64/bundled/libs/debugpy/_vendored/pydevd/pydevd.py", line 838, in wait_for_ready_to_run
    self._py_db_command_thread_event.wait(0.1)
  File "/usr/lib/python3.11/threading.py", line 629, in wait
    signaled = self._cond.wait(timeout)
               ^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/lib/python3.11/threading.py", line 331, in wait
    gotit = waiter.acquire(True, timeout)
            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/rhythmflow/Desktop/Reconciliation/reconciliation-backend-v3/.venv/lib/python3.11/site-packages/django/utils/autoreload.py", line 664, in <lambda>
    signal.signal(signal.SIGTERM, lambda *args: sys.exit(0))
SystemExit: 0
[22/Aug/2024 15:04:30] "POST /start-process/ HTTP/1.1" 500 59
[22/Aug/2024 15:04:35,063] - Broken pipe from ('127.0.0.1', 51102)

什么原因可能导致了这个问题?如何在使用 VS Code 调试器时解决它?


解决方案


在 Django 中使用纯多处理将导致其内部服务器在分叉/生成的进程中启动,并尝试将自身与实际工作程序用于监听 HTTP 传入请求的相同套接字资源绑定。

听起来够乱的了。所以,除非有 django-docs 官方页面告诉你如何在整个过程中安全地使用 Django 多处理,否则我不会走那条路。

但是,还有其他途径可以使紧密耦合的进程使用与面向 Web 的工作者相同的代码库并发运行 - 只需更改入口点,这样 django 就不会尝试提供 HTTP 服务 - 我相信django 管理脚本- 一种在代码库中编写通常从 CLI 调用的函数的方法,是一种能够连接到数据库和其他资源、使用模型类等的好方法,并且在另一个进程中使用 Django 本身创建的其他数据库连接执行此操作,同时知道它们在其他进程中。

您应该使用它们,而不是使用multiprocessing,这将隐式地重新加载或克隆您的 Python 应用程序(包括监听 HTTP),来调用subprocess.Popen您想要作为管理脚本执行的任何进程外操作。调用它应该很简单,就像从 CLI 调用它一样,传递它应该处理的模型实例的任何 ID.s 作为命令行参数。通信稍微复杂一些(尽管您可以在 DB 上创建一个特殊模型,并使用此模型的实例在面向 Web 的进程和工作进程之间交换消息)

否则,我过去曾使用Celery让进程外的工作程序使用相同的 django 代码库。设置好后,使用起来可能会更容易 - 工作程序必须作为 celery 配置的一部分单独启动 - 但除此之外,在这些工作程序中调用远程函数变得轻而易举。查找有关如何使用 celery 以及如何将 django 与 Celery 一起使用的文档

(哦,Django 现在在 Celery 中明确支持开箱即用:https ://docs.celeryq.dev/en/stable/django/first-steps-with-django.html#django-first-steps )



所属网站分类: 技术文章 > 问答

作者:黑洞官方问答小能手

链接:https://www.pythonheidong.com/blog/article/2046113/6279013ffac0158d834c/

来源:python黑洞网

任何形式的转载都请注明出处,如有侵权 一经发现 必将追究其法律责任

4 0
收藏该文
已收藏

评论内容:(最多支持255个字符)