AskOverflow.Dev

AskOverflow.Dev Logo AskOverflow.Dev Logo

AskOverflow.Dev Navigation

  • 主页
  • 系统&网络
  • Ubuntu
  • Unix
  • DBA
  • Computer
  • Coding
  • LangChain

Mobile menu

Close
  • 主页
  • 系统&网络
    • 最新
    • 热门
    • 标签
  • Ubuntu
    • 最新
    • 热门
    • 标签
  • Unix
    • 最新
    • 标签
  • DBA
    • 最新
    • 标签
  • Computer
    • 最新
    • 标签
  • Coding
    • 最新
    • 标签
主页 / dba / 问题 / 91971
Accepted
Franck Dernoncourt
Franck Dernoncourt
Asked: 2015-02-15 14:46:07 +0800 CST2015-02-15 14:46:07 +0800 CST 2015-02-15 14:46:07 +0800 CST

将 CSV 或 JSON 文件导入 DynamoDB

  • 772

我有 1000 个 CSV 文件。每个 CSV 文件大小在 1 到 500 MB 之间,格式相同(即相同的列顺序)。我有一个列标题的头文件,它与我的 DynamoDB 表的列名匹配。我需要将这些文件导入到 DynamoDB 表中。这样做的最佳方式/工具是什么?

我可以将这些 CSV 文件连接成一个巨大的文件(我宁愿避免这样做),或者在需要时将它们转换为 JSON。我知道BatchWriteItem的存在,所以我想一个好的解决方案将涉及批量写入。


例子:

  • DynamoDB 表有两列:first_name、last_name
  • 头文件只包含: first_name,last_name
  • 一个 CSV 文件看起来像

:

John,Doe
Bob,Smith
Alice,Lee
Foo,Bar
import csv
  • 4 4 个回答
  • 33053 Views

4 个回答

  • Voted
  1. Best Answer
    Franck Dernoncourt
    2015-02-15T21:33:19+08:002015-02-15T21:33:19+08:00

    最后,我编写了一个 Python 函数import_csv_to_dynamodb(table_name, csv_file_name, colunm_names, column_types),将 CSV 导入到 DynamoDB 表中。必须指定列名和列。它使用boto ,并从这个要点中获得了很多灵感。下面是函数以及使用的演示 ( main()) 和 CSV 文件。在 Windows 7 x64 上使用 Python 2.7.5 进行了测试,但它应该适用于任何具有 boto 和 Python 的操作系统。

    import boto
    
    MY_ACCESS_KEY_ID = 'copy your access key ID here'
    MY_SECRET_ACCESS_KEY = 'copy your secrete access key here'
    
    
    def do_batch_write(items, table_name, dynamodb_table, dynamodb_conn):
        '''
        From https://gist.github.com/griggheo/2698152#file-gistfile1-py-L31
        '''
        batch_list = dynamodb_conn.new_batch_write_list()
        batch_list.add_batch(dynamodb_table, puts=items)
        while True:
            response = dynamodb_conn.batch_write_item(batch_list)
            unprocessed = response.get('UnprocessedItems', None)
            if not unprocessed:
                break
            batch_list = dynamodb_conn.new_batch_write_list()
            unprocessed_list = unprocessed[table_name]
            items = []
            for u in unprocessed_list:
                item_attr = u['PutRequest']['Item']
                item = dynamodb_table.new_item(
                        attrs=item_attr
                )
                items.append(item)
            batch_list.add_batch(dynamodb_table, puts=items)
    
    
    def import_csv_to_dynamodb(table_name, csv_file_name, colunm_names, column_types):
        '''
        Import a CSV file to a DynamoDB table
        '''        
        dynamodb_conn = boto.connect_dynamodb(aws_access_key_id=MY_ACCESS_KEY_ID, aws_secret_access_key=MY_SECRET_ACCESS_KEY)
        dynamodb_table = dynamodb_conn.get_table(table_name)     
        BATCH_COUNT = 2 # 25 is the maximum batch size for Amazon DynamoDB
    
        items = []
    
        count = 0
        csv_file = open(csv_file_name, 'r')
        for cur_line in csv_file:
            count += 1
            cur_line = cur_line.strip().split(',')
    
            row = {}
            for colunm_number, colunm_name in enumerate(colunm_names):
                row[colunm_name] = column_types[colunm_number](cur_line[colunm_number])
    
            item = dynamodb_table.new_item(
                        attrs=row
                )           
            items.append(item)
    
            if count % BATCH_COUNT == 0:
                print 'batch write start ... ', 
                do_batch_write(items, table_name, dynamodb_table, dynamodb_conn)
                items = []
                print 'batch done! (row number: ' + str(count) + ')'
    
        # flush remaining items, if any
        if len(items) > 0: 
            do_batch_write(items, table_name, dynamodb_table, dynamodb_conn)
    
    
        csv_file.close() 
    
    
    def main():
        '''
        Demonstration of the use of import_csv_to_dynamodb()
        We assume the existence of a table named `test_persons`, with
        - Last_name as primary hash key (type: string)
        - First_name as primary range key (type: string)
        '''
        colunm_names = 'Last_name First_name'.split()
        table_name = 'test_persons'
        csv_file_name = 'test.csv'
        column_types = [str, str]
        import_csv_to_dynamodb(table_name, csv_file_name, colunm_names, column_types)
    
    
    if __name__ == "__main__":
        main()
        #cProfile.run('main()') # if you want to do some profiling
    

    test.csv的内容(必须与 Python 脚本位于同一文件夹中):

    John,Doe
    Bob,Smith
    Alice,Lee
    Foo,Bar
    a,b
    c,d
    e,f
    g,h
    i,j
    j,l
    
    • 12
  2. John Tubert
    2017-01-20T10:38:20+08:002017-01-20T10:38:20+08:00

    稍微更改了先前的答案以使用 CSV 模块,因此您的 CSV 文件可以支持带引号的字符串。

    import boto
    from csv import reader
    
    MY_ACCESS_KEY_ID = 'copy your access key ID here'
    MY_SECRET_ACCESS_KEY = 'copy your secrete access key here'
    
    
    def do_batch_write(items, table_name, dynamodb_table, dynamodb_conn):
        '''
        From https://gist.github.com/griggheo/2698152#file-gistfile1-py-L31
        '''
        batch_list = dynamodb_conn.new_batch_write_list()
        batch_list.add_batch(dynamodb_table, puts=items)
        while True:
            response = dynamodb_conn.batch_write_item(batch_list)
            unprocessed = response.get('UnprocessedItems', None)
            if not unprocessed:
                break
            batch_list = dynamodb_conn.new_batch_write_list()
            unprocessed_list = unprocessed[table_name]
            items = []
            for u in unprocessed_list:
                item_attr = u['PutRequest']['Item']
                item = dynamodb_table.new_item(
                        attrs=item_attr
                )
                items.append(item)
            batch_list.add_batch(dynamodb_table, puts=items)
    
    
    def import_csv_to_dynamodb(table_name, csv_file_name, colunm_names,     column_types):
        '''
        Import a CSV file to a DynamoDB table
        '''        
        dynamodb_conn =     boto.connect_dynamodb(aws_access_key_id=MY_ACCESS_KEY_ID, aws_secret_access_key=MY_SECRET_ACCESS_KEY)
        dynamodb_table = dynamodb_conn.get_table(table_name)     
        BATCH_COUNT = 2 # 25 is the maximum batch size for Amazon DynamoDB
    
        items = []
    
        count = 0
        csv_file = open(csv_file_name, 'r')
        for cur_line in reader(csv_file):
            count += 1
    
            row = {}
            for colunm_number, colunm_name in enumerate(colunm_names):
                row[colunm_name] = column_types[colunm_number]    (cur_line[colunm_number])
    
            item = dynamodb_table.new_item(
                        attrs=row
                )           
            items.append(item)
    
            if count % BATCH_COUNT == 0:
                print 'batch write start ... ', 
                do_batch_write(items, table_name, dynamodb_table, dynamodb_conn)
                items = []
                print 'batch done! (row number: ' + str(count) + ')'
    
        # flush remaining items, if any
        if len(items) > 0: 
            do_batch_write(items, table_name, dynamodb_table, dynamodb_conn)
    
    
        csv_file.close() 
    
    
    def main():
        '''
        Demonstration of the use of import_csv_to_dynamodb()
        We assume the existence of a table named `test_persons`, with
        - Last_name as primary hash key (type: string)
        - First_name as primary range key (type: string)
        '''
        colunm_names = 'facebookID age ethnicity gender hometown name party sfw url'.split()
        table_name = 'OneAmericaDB'
        csv_file_name = 'test_data.csv'
        column_types = [str, str, str, str, str, str, str, str, str]
        import_csv_to_dynamodb(table_name, csv_file_name, colunm_names, column_types)
    
    
    if __name__ == "__main__":
        main()
        #cProfile.run('main()') # if you want to do some profiling
    
    • 0
  3. tinker
    2018-04-26T19:54:45+08:002018-04-26T19:54:45+08:00

    此 NPM 包将任意 json 转换为 DynamoDB 的 PUT 请求。 https://www.npmjs.com/package/json-dynamo-putrequest

    绝对值得一试。

    • 0
  4. Juan Manuel Ruiz Fernández
    2019-02-07T14:52:58+08:002019-02-07T14:52:58+08:00

    我建议您使用 AWS Database Migration Service (DMS)。

    如本文所述:https ://aws.amazon.com/es/blogs/database/migrate-delimited-files-from-amazon-s3-to-an-amazon-dynamodb-nosql-table-using-aws- database-migration-service-and-aws-cloudformation/您可以使用 S3 作为源,使用 DynamoDB 作为目标来导入包含大量元组的 csv 文件。

    我已经成功地实现了从 S3 到 DynamoDB 的完整导入过程,并且是最简单、最快的方法。

    本质上,您必须:

    • 有一个存储桶来放置您的 csv 文件,至少有两个文件夹级别(第一个是“模式”,第二个是“表名”)。
    • 拥有一个至少与 csv 文件具有相同哈希键的 DynamoDB 表。
    • 在 DMS 中创建一个指向 S3 的源元素并映射 csv 结构。
    • 在 DMS 中创建一个指向 DynamoDB 表并从映射源映射的目标元素。
    • 在 DMS 中创建一个复制实例(注意免费层)。
    • 在 DMS 中创建使用源和目标创建元素的复制任务。
    • 执行任务。

    将 DynamoDB 表吞吐量修改为 25 个读取容量单位和 150 个写入容量单位,包括准备任务在内,我已经能够在不到 7 分钟的时间内插入超过 124k 的元组。

    AWS 对此任务的主要建议是使用数据管道服务,但我使用过它并且它更昂贵,并且底层 EMR 集群初始化是一个非常缓慢的过程,所以如果您不想重复此导入任务,请经常使用 DMS反而。

    • 0

相关问题

  • MySQL import csv 只得到一半的行

  • 如何找出我刚刚导入 Oracle 的数据?

  • 如何将 Oracle 数据库的内容导入 Visio 以创建实体关系图?

  • 在 SQLite 上写入一行需要多少次磁盘寻道?

  • 不同字符集导致的Oracle导入问题

Sidebar

Stats

  • 问题 205573
  • 回答 270741
  • 最佳答案 135370
  • 用户 68524
  • 热门
  • 回答
  • Marko Smith

    连接到 PostgreSQL 服务器:致命:主机没有 pg_hba.conf 条目

    • 12 个回答
  • Marko Smith

    如何让sqlplus的输出出现在一行中?

    • 3 个回答
  • Marko Smith

    选择具有最大日期或最晚日期的日期

    • 3 个回答
  • Marko Smith

    如何列出 PostgreSQL 中的所有模式?

    • 4 个回答
  • Marko Smith

    列出指定表的所有列

    • 5 个回答
  • Marko Smith

    如何在不修改我自己的 tnsnames.ora 的情况下使用 sqlplus 连接到位于另一台主机上的 Oracle 数据库

    • 4 个回答
  • Marko Smith

    你如何mysqldump特定的表?

    • 4 个回答
  • Marko Smith

    使用 psql 列出数据库权限

    • 10 个回答
  • Marko Smith

    如何从 PostgreSQL 中的选择查询中将值插入表中?

    • 4 个回答
  • Marko Smith

    如何使用 psql 列出所有数据库和表?

    • 7 个回答
  • Martin Hope
    Jin 连接到 PostgreSQL 服务器:致命:主机没有 pg_hba.conf 条目 2014-12-02 02:54:58 +0800 CST
  • Martin Hope
    Stéphane 如何列出 PostgreSQL 中的所有模式? 2013-04-16 11:19:16 +0800 CST
  • Martin Hope
    Mike Walsh 为什么事务日志不断增长或空间不足? 2012-12-05 18:11:22 +0800 CST
  • Martin Hope
    Stephane Rolland 列出指定表的所有列 2012-08-14 04:44:44 +0800 CST
  • Martin Hope
    haxney MySQL 能否合理地对数十亿行执行查询? 2012-07-03 11:36:13 +0800 CST
  • Martin Hope
    qazwsx 如何监控大型 .sql 文件的导入进度? 2012-05-03 08:54:41 +0800 CST
  • Martin Hope
    markdorison 你如何mysqldump特定的表? 2011-12-17 12:39:37 +0800 CST
  • Martin Hope
    Jonas 如何使用 psql 对 SQL 查询进行计时? 2011-06-04 02:22:54 +0800 CST
  • Martin Hope
    Jonas 如何从 PostgreSQL 中的选择查询中将值插入表中? 2011-05-28 00:33:05 +0800 CST
  • Martin Hope
    Jonas 如何使用 psql 列出所有数据库和表? 2011-02-18 00:45:49 +0800 CST

热门标签

sql-server mysql postgresql sql-server-2014 sql-server-2016 oracle sql-server-2008 database-design query-performance sql-server-2017

Explore

  • 主页
  • 问题
    • 最新
    • 热门
  • 标签
  • 帮助

Footer

AskOverflow.Dev

关于我们

  • 关于我们
  • 联系我们

Legal Stuff

  • Privacy Policy

Language

  • Pt
  • Server
  • Unix

© 2023 AskOverflow.DEV All Rights Reserve