FileNotFoundError: [Errno 2] No such file or directory
当我尝试使用循环数据批次的 csv 写入器将 csv 文件写入存储桶时,出现此错误。围绕该错误对 Cloud Function 日志的完整洞察:
File "/workspace/main.py", line 299, in write_to_csv_file with open(filepath, "w") as outcsv: FileNotFoundError: [Errno 2] No such file or directory: 'gs://MY_BUCKET/MY_CSV.csv' Function execution took 52655 ms, finished with status: 'crash' OpenBLAS WARNING - could not determine the L2 cache size on this system, assuming 256k ```
而且,虽然这个 bucket_filepath 肯定存在:我可以上传一个空的虚拟文件并获取它的“gsutils URI”(右键单击文件右侧的三个点),并且 bucket_filepath 看起来相同:'gs://MY_BUCKET/MY_CSV.csv'
.
我检查了保存一个虚拟的熊猫数据框,而不是使用pd.to_csv
它,它使用相同的 bucket_filepath (!)。
因此,必须有另一个原因,可能是作者不被接受,或者with statement
打开文件。
引发错误的代码如下。它与在本地服务器上的正常 cron 作业中在 Google Cloud Function 之外工作的代码相同。我在抛出错误的行周围添加了两个调试打印,print("Right after opening the file ...")
不再显示。还显示了为每个批次调用的子函数query_execute_batch()
,write_to_csv_file()
但这里可能不是问题,因为在写入打开 csv 文件时,错误已经在一开始就发生了。
requirements.txt
(然后作为模块导入):
SQLAlchemy>=1.4.2
google-cloud-storage>=1.16.1
mysqlclient==2.1.0
pandas==1.2.3
fsspec==2021.11.1
gcsfs==2021.11.1
unicodecsv==0.14.1
从main.py
:
def query_execute_batch(connection):
"""Function for reading data from the query result into batches
:yield: each result in a loop is a batch of the query result
"""
results = execute_select_batch(connection, SQL_QUERY)
print(f"len(results): {len(results)}")
for result in results:
yield result
def write_to_csv_file(connection, filepath):
"""Write the data in a loop over batches into a csv.
This is done in batches since the query from the database is huge.
:param connection: mysqldb connection to DB
:param filepath: path to csv file to write data
returns: metadata on rows and time
"""
countrows = 0
print("Right before opening the file ...")
with open(filepath, "w") as outcsv:
print("Right after opening the file ...")
writer = csv.DictWriter(
outcsv,
fieldnames=FIELDNAMES,
extrasaction="ignore",
delimiter="|",
lineterminator="\n",
)
# write header according to fieldnames
writer.writeheader()
for batch in query_execute_batch(connection):
writer.writerows(batch)
countrows += len(batch)
datetime_now_save = datetime.now()
return countrows, datetime_now_save
请注意,为了使上述脚本正常工作,我导入gcsfs
了这使得存储桶可读写。否则我可能需要一个谷歌云存储对象,例如:
storage_client = storage.Client()
bucket = storage_client.bucket(BUCKET_NAME)
然后使该存储桶中的文件具有更多功能,但这不是这里的目的。
在下面的pd.to_csv
代码中,它使用虚拟 SQL 查询的输出SELECT 1
作为数据帧的输入。这可以保存到同一个bucket_filepath,当然原因可能不仅仅是pd.to_csv()
这样,而且数据集是一个虚拟的,而不是来自一个巨大的SELECT query
. 或者还有其他原因,我只是猜测。
if records is not None:
df = pd.DataFrame(records.fetchall())
df.columns = records.keys()
df.to_csv(filepath,
index=False,
)
datetime_now_save = datetime.now()
countrows = df.shape[0]
我想使用 csv 编写器有机会使用 unicodecsv 模块编写 unicode 并有机会使用批处理。
我可能愿意更改为 pandas 中的批处理(loop + append
mode 或chunksize
),例如将大型 Pandas Dataframes to CSV file in chunks以摆脱此存储桶文件路径问题,但我宁愿使用现成的代码(切勿触摸正在运行的系统)。
如何使用 csv 编写器完成该 csv 的保存,以便它可以在write
模式 =的存储桶中打开一个新文件with open(filepath, "w") as outcsv:
?
给定的函数write_to_csv_file()
只是云函数的一小部分,它使用了广泛的函数和级联函数。我不能在这里展示整个可重现的案例,希望可以通过经验或更简单的例子来回答。
解决方案令人惊讶。如果要使用.
gcsfs
_open()
如果您使用
pd.to_csv()
,import gcsfs
则不需要,但在make workgcsfs
中仍然需要requirements.txt
pd.to_csv()
,因此,pandasto_csv()
似乎会自动使用它。抛开惊喜不谈,这里
pd.to_csv()
是回答问题的代码(经过测试):边注
不要像这样使用 csv 编写器。
这需要很长时间,而不是
pd.to_csv()
参数chunksize
为 5000 的 700k 行只需 62 秒即可加载并作为 csv 存储在存储桶中,具有批量写入器的 CF 需要超过 9 分钟,超过超时限制。因此,我被迫使用pd.to_csv()
并将我的数据转换为数据框。