AskOverflow.Dev

AskOverflow.Dev Logo AskOverflow.Dev Logo

AskOverflow.Dev Navigation

  • 主页
  • 系统&网络
  • Ubuntu
  • Unix
  • DBA
  • Computer
  • Coding
  • LangChain

Mobile menu

Close
  • 主页
  • 系统&网络
    • 最新
    • 热门
    • 标签
  • Ubuntu
    • 最新
    • 热门
    • 标签
  • Unix
    • 最新
    • 标签
  • DBA
    • 最新
    • 标签
  • Computer
    • 最新
    • 标签
  • Coding
    • 最新
    • 标签
主页 / server / 问题 / 1091789
Accepted
questionto42standswithUkraine
questionto42standswithUkraine
Asked: 2022-02-01 09:32:39 +0800 CST2022-02-01 09:32:39 +0800 CST 2022-02-01 09:32:39 +0800 CST

将 csv 从 CF 写入存储桶时:'with open(filepath, "w") as MY_CSV:' 导致 "FileNotFoundError: [Errno 2] No such file or directory:"

  • 772

FileNotFoundError: [Errno 2] No such file or directory当我尝试使用循环数据批次的 csv 写入器将 csv 文件写入存储桶时,出现此错误。围绕该错误对 Cloud Function 日志的完整洞察:


File "/workspace/main.py", line 299, in write_to_csv_file with
open(filepath, "w") as outcsv: FileNotFoundError: [Errno 2] No such
file or directory: 'gs://MY_BUCKET/MY_CSV.csv'

Function execution took 52655 ms, finished with status: 'crash' 

OpenBLAS WARNING - could not determine the L2 cache size on this
system, assuming 256k  ```

而且,虽然这个 bucket_filepath 肯定存在:我可以上传一个空的虚拟文件并获取它的“gsutils URI”(右键单击文件右侧的三个点),并且 bucket_filepath 看起来相同:'gs://MY_BUCKET/MY_CSV.csv'.

我检查了保存一个虚拟的熊猫数据框,而不是使用pd.to_csv它,它使用相同的 bucket_filepath (!)。

因此,必须有另一个原因,可能是作者不被接受,或者with statement打开文件。

引发错误的代码如下。它与在本地服务器上的正常 cron 作业中在 Google Cloud Function 之外工作的代码相同。我在抛出错误的行周围添加了两个调试打印,print("Right after opening the file ...")不再显示。还显示了为每个批次调用的子函数query_execute_batch(),write_to_csv_file()但这里可能不是问题,因为在写入打开 csv 文件时,错误已经在一开始就发生了。

requirements.txt(然后作为模块导入):

SQLAlchemy>=1.4.2
google-cloud-storage>=1.16.1
mysqlclient==2.1.0
pandas==1.2.3
fsspec==2021.11.1
gcsfs==2021.11.1
unicodecsv==0.14.1

从main.py:

def query_execute_batch(connection):
    """Function for reading data from the query result into batches
    :yield: each result in a loop is a batch of the query result
    """
    results = execute_select_batch(connection, SQL_QUERY)
    print(f"len(results): {len(results)}")
    for result in results:
        yield result

def write_to_csv_file(connection, filepath):
    """Write the data in a loop over batches into a csv.
    This is done in batches since the query from the database is huge.
    :param connection: mysqldb connection to DB
    :param filepath: path to csv file to write data
    returns: metadata on rows and time
    """
    countrows = 0
    print("Right before opening the file ...")    
    with open(filepath, "w") as outcsv:
        print("Right after opening the file ...")        
        writer = csv.DictWriter(
            outcsv,
            fieldnames=FIELDNAMES,
            extrasaction="ignore",
            delimiter="|",
            lineterminator="\n",
        )
        # write header according to fieldnames
        writer.writeheader()

        for batch in query_execute_batch(connection):
            writer.writerows(batch)
            countrows += len(batch)
        datetime_now_save = datetime.now()
    return countrows, datetime_now_save

请注意,为了使上述脚本正常工作,我导入gcsfs了这使得存储桶可读写。否则我可能需要一个谷歌云存储对象,例如:

storage_client = storage.Client()
bucket = storage_client.bucket(BUCKET_NAME)

然后使该存储桶中的文件具有更多功能,但这不是这里的目的。

在下面的pd.to_csv代码中,它使用虚拟 SQL 查询的输出SELECT 1作为数据帧的输入。这可以保存到同一个bucket_filepath,当然原因可能不仅仅是pd.to_csv()这样,而且数据集是一个虚拟的,而不是来自一个巨大的SELECT query. 或者还有其他原因,我只是猜测。

if records is not None:
    df = pd.DataFrame(records.fetchall())
    df.columns = records.keys()
    df.to_csv(filepath,
        index=False,
    )
    datetime_now_save = datetime.now()
    countrows = df.shape[0]

我想使用 csv 编写器有机会使用 unicodecsv 模块编写 unicode 并有机会使用批处理。

我可能愿意更改为 pandas 中的批处理(loop + appendmode 或chunksize),例如将大型 Pandas Dataframes to CSV file in chunks以摆脱此存储桶文件路径问题,但我宁愿使用现成的代码(切勿触摸正在运行的系统)。

如何使用 csv 编写器完成该 csv 的保存,以便它可以在write模式 =的存储桶中打开一个新文件with open(filepath, "w") as outcsv:?

给定的函数write_to_csv_file()只是云函数的一小部分,它使用了广泛的函数和级联函数。我不能在这里展示整个可重现的案例,希望可以通过经验或更简单的例子来回答。

filesystems google-cloud-platform google-cloud-functions
  • 1 1 个回答
  • 910 Views

1 个回答

  • Voted
  1. Best Answer
    questionto42standswithUkraine
    2022-02-01T13:05:37+08:002022-02-01T13:05:37+08:00

    解决方案令人惊讶。如果要使用. gcsfs_open()

    如果您使用pd.to_csv(),import gcsfs则不需要,但在make workgcsfs中仍然需要requirements.txtpd.to_csv(),因此,pandasto_csv()似乎会自动使用它。

    抛开惊喜不谈,这里pd.to_csv()是回答问题的代码(经过测试):

    def write_to_csv_file(connection, filepath):
        """Write the QUERY result in a loop over batches into a csv.
        This is done in batches since the query from the database is huge.
        :param connection: mysqldb connection to DB
        :param filepath: path to csv file to write data
        return: metadata on rows and time
        """
        countrows = 0
        print("Right before opening the file ...")
       
    
        # A gcsfs object is needed to open a file.
        # https://stackoverflow.com/questions/52805016/how-to-open-a-file-from-google-cloud-storage-into-a-cloud-function
        # https://gcsfs.readthedocs.io/en/latest/index.html#examples
        # Side-note (Exception):
        # pd.to_csv() needs neither the gcsfs object, nor its import.
        # It is not used here, but it has been tested with examples.
        fs = gcsfs.GCSFileSystem(project=MY_PROJECT)
        fs.ls(BUCKET_NAME)
    
    
        # wb needed, else "builtins.TypeError: must be str, not bytes"
        # https://stackoverflow.com/questions/5512811/builtins-typeerror-must-be-str-not-bytes
        with fs.open(filepath, 'wb') as outcsv:
            print("Right after opening the file ...")
    
            writer = csv.DictWriter(
                outcsv,
                fieldnames=FIELDNAMES,
                extrasaction="ignore",
                delimiter="|",
                lineterminator="\n",
            )
            # write header according to fieldnames
            print("before writer.writeheader()")
            writer.writeheader()
            print("after writer.writeheader()")
    
            for batch in query_execute_batch(connection):
                writer.writerows(batch)
                countrows += len(batch)
            datetime_now_save = datetime.now()
        return countrows, datetime_now_save
    

    边注

    不要像这样使用 csv 编写器。

    这需要很长时间,而不是pd.to_csv()参数chunksize为 5000 的 700k 行只需 62 秒即可加载并作为 csv 存储在存储桶中,具有批量写入器的 CF 需要超过 9 分钟,超过超时限制。因此,我被迫使用pd.to_csv()并将我的数据转换为数据框。

    • 1

相关问题

  • 我如何知道 AIX 中磁盘上的文件系统是什么?

  • 控制 UNIX 目录内容用户组所有权

  • 在 Ubuntu 上将 Windows 磁盘分区与 Linux 分区合并

  • 在 VMPlayer 上 Windows 和 Linux 之间共享目录的最佳方式

  • md5sum 重复为同一台机器上的同一文件提供不同的校验和

Sidebar

Stats

  • 问题 205573
  • 回答 270741
  • 最佳答案 135370
  • 用户 68524
  • 热门
  • 回答
  • Marko Smith

    新安装后 postgres 的默认超级用户用户名/密码是什么?

    • 5 个回答
  • Marko Smith

    SFTP 使用什么端口?

    • 6 个回答
  • Marko Smith

    命令行列出 Windows Active Directory 组中的用户?

    • 9 个回答
  • Marko Smith

    什么是 Pem 文件,它与其他 OpenSSL 生成的密钥文件格式有何不同?

    • 3 个回答
  • Marko Smith

    如何确定bash变量是否为空?

    • 15 个回答
  • Martin Hope
    Tom Feiner 如何按大小对 du -h 输出进行排序 2009-02-26 05:42:42 +0800 CST
  • Martin Hope
    Noah Goodrich 什么是 Pem 文件,它与其他 OpenSSL 生成的密钥文件格式有何不同? 2009-05-19 18:24:42 +0800 CST
  • Martin Hope
    Brent 如何确定bash变量是否为空? 2009-05-13 09:54:48 +0800 CST
  • Martin Hope
    cletus 您如何找到在 Windows 中打开文件的进程? 2009-05-01 16:47:16 +0800 CST

热门标签

linux nginx windows networking ubuntu domain-name-system amazon-web-services active-directory apache-2.4 ssh

Explore

  • 主页
  • 问题
    • 最新
    • 热门
  • 标签
  • 帮助

Footer

AskOverflow.Dev

关于我们

  • 关于我们
  • 联系我们

Legal Stuff

  • Privacy Policy

Language

  • Pt
  • Server
  • Unix

© 2023 AskOverflow.DEV All Rights Reserve