AskOverflow.Dev

AskOverflow.Dev Logo AskOverflow.Dev Logo

AskOverflow.Dev Navigation

  • Início
  • system&network
  • Ubuntu
  • Unix
  • DBA
  • Computer
  • Coding
  • LangChain

Mobile menu

Close
  • Início
  • system&network
    • Recentes
    • Highest score
    • tags
  • Ubuntu
    • Recentes
    • Highest score
    • tags
  • Unix
    • Recentes
    • tags
  • DBA
    • Recentes
    • tags
  • Computer
    • Recentes
    • tags
  • Coding
    • Recentes
    • tags
Início / dba / Perguntas / 91971
Accepted
Franck Dernoncourt
Franck Dernoncourt
Asked: 2015-02-15 14:46:07 +0800 CST2015-02-15 14:46:07 +0800 CST 2015-02-15 14:46:07 +0800 CST

Importar arquivo CSV ou JSON para o DynamoDB

  • 772

Eu tenho 1000 arquivos CSV. Cada arquivo CSV tem entre 1 e 500 MB e é formatado da mesma forma (ou seja, mesma ordem de coluna). Eu tenho um arquivo de cabeçalho para cabeçalhos de coluna, que correspondem aos nomes de coluna da minha tabela do DynamoDB. Preciso importar esses arquivos para uma tabela do DynamoDB. Qual a melhor forma/ferramenta para isso?

Posso concatenar esses arquivos CSV em um único arquivo gigante (prefiro evitar) ou convertê-los em JSON, se necessário. Estou ciente da existência de BatchWriteItem , então acho que uma boa solução envolveria a gravação em lote.


Exemplo:

  • A tabela do DynamoDB tem duas colunas: first_name, last_name
  • O arquivo de cabeçalho contém apenas: first_name,last_name
  • Um arquivo CSV se parece com

:

John,Doe
Bob,Smith
Alice,Lee
Foo,Bar
import csv
  • 4 4 respostas
  • 33053 Views

4 respostas

  • Voted
  1. Best Answer
    Franck Dernoncourt
    2015-02-15T21:33:19+08:002015-02-15T21:33:19+08:00

    No final, codifiquei uma função Python import_csv_to_dynamodb(table_name, csv_file_name, colunm_names, column_types)que importa um CSV para uma tabela do DynamoDB. Os nomes das colunas e as colunas devem ser especificados. Ele usa boto e se inspira muito nessa essência . Abaixo está a função, bem como uma demonstração ( main()) e o arquivo CSV usado. Testado no Windows 7 x64 com Python 2.7.5, mas deve funcionar em qualquer sistema operacional que tenha boto e Python.

    import boto
    
    MY_ACCESS_KEY_ID = 'copy your access key ID here'
    MY_SECRET_ACCESS_KEY = 'copy your secrete access key here'
    
    
    def do_batch_write(items, table_name, dynamodb_table, dynamodb_conn):
        '''
        From https://gist.github.com/griggheo/2698152#file-gistfile1-py-L31
        '''
        batch_list = dynamodb_conn.new_batch_write_list()
        batch_list.add_batch(dynamodb_table, puts=items)
        while True:
            response = dynamodb_conn.batch_write_item(batch_list)
            unprocessed = response.get('UnprocessedItems', None)
            if not unprocessed:
                break
            batch_list = dynamodb_conn.new_batch_write_list()
            unprocessed_list = unprocessed[table_name]
            items = []
            for u in unprocessed_list:
                item_attr = u['PutRequest']['Item']
                item = dynamodb_table.new_item(
                        attrs=item_attr
                )
                items.append(item)
            batch_list.add_batch(dynamodb_table, puts=items)
    
    
    def import_csv_to_dynamodb(table_name, csv_file_name, colunm_names, column_types):
        '''
        Import a CSV file to a DynamoDB table
        '''        
        dynamodb_conn = boto.connect_dynamodb(aws_access_key_id=MY_ACCESS_KEY_ID, aws_secret_access_key=MY_SECRET_ACCESS_KEY)
        dynamodb_table = dynamodb_conn.get_table(table_name)     
        BATCH_COUNT = 2 # 25 is the maximum batch size for Amazon DynamoDB
    
        items = []
    
        count = 0
        csv_file = open(csv_file_name, 'r')
        for cur_line in csv_file:
            count += 1
            cur_line = cur_line.strip().split(',')
    
            row = {}
            for colunm_number, colunm_name in enumerate(colunm_names):
                row[colunm_name] = column_types[colunm_number](cur_line[colunm_number])
    
            item = dynamodb_table.new_item(
                        attrs=row
                )           
            items.append(item)
    
            if count % BATCH_COUNT == 0:
                print 'batch write start ... ', 
                do_batch_write(items, table_name, dynamodb_table, dynamodb_conn)
                items = []
                print 'batch done! (row number: ' + str(count) + ')'
    
        # flush remaining items, if any
        if len(items) > 0: 
            do_batch_write(items, table_name, dynamodb_table, dynamodb_conn)
    
    
        csv_file.close() 
    
    
    def main():
        '''
        Demonstration of the use of import_csv_to_dynamodb()
        We assume the existence of a table named `test_persons`, with
        - Last_name as primary hash key (type: string)
        - First_name as primary range key (type: string)
        '''
        colunm_names = 'Last_name First_name'.split()
        table_name = 'test_persons'
        csv_file_name = 'test.csv'
        column_types = [str, str]
        import_csv_to_dynamodb(table_name, csv_file_name, colunm_names, column_types)
    
    
    if __name__ == "__main__":
        main()
        #cProfile.run('main()') # if you want to do some profiling
    

    test.csv's content (deve estar localizado na mesma pasta que o script Python):

    John,Doe
    Bob,Smith
    Alice,Lee
    Foo,Bar
    a,b
    c,d
    e,f
    g,h
    i,j
    j,l
    
    • 12
  2. John Tubert
    2017-01-20T10:38:20+08:002017-01-20T10:38:20+08:00

    Alterou um pouco a resposta anterior para usar o módulo CSV para que seu arquivo CSV possa suportar strings com aspas.

    import boto
    from csv import reader
    
    MY_ACCESS_KEY_ID = 'copy your access key ID here'
    MY_SECRET_ACCESS_KEY = 'copy your secrete access key here'
    
    
    def do_batch_write(items, table_name, dynamodb_table, dynamodb_conn):
        '''
        From https://gist.github.com/griggheo/2698152#file-gistfile1-py-L31
        '''
        batch_list = dynamodb_conn.new_batch_write_list()
        batch_list.add_batch(dynamodb_table, puts=items)
        while True:
            response = dynamodb_conn.batch_write_item(batch_list)
            unprocessed = response.get('UnprocessedItems', None)
            if not unprocessed:
                break
            batch_list = dynamodb_conn.new_batch_write_list()
            unprocessed_list = unprocessed[table_name]
            items = []
            for u in unprocessed_list:
                item_attr = u['PutRequest']['Item']
                item = dynamodb_table.new_item(
                        attrs=item_attr
                )
                items.append(item)
            batch_list.add_batch(dynamodb_table, puts=items)
    
    
    def import_csv_to_dynamodb(table_name, csv_file_name, colunm_names,     column_types):
        '''
        Import a CSV file to a DynamoDB table
        '''        
        dynamodb_conn =     boto.connect_dynamodb(aws_access_key_id=MY_ACCESS_KEY_ID, aws_secret_access_key=MY_SECRET_ACCESS_KEY)
        dynamodb_table = dynamodb_conn.get_table(table_name)     
        BATCH_COUNT = 2 # 25 is the maximum batch size for Amazon DynamoDB
    
        items = []
    
        count = 0
        csv_file = open(csv_file_name, 'r')
        for cur_line in reader(csv_file):
            count += 1
    
            row = {}
            for colunm_number, colunm_name in enumerate(colunm_names):
                row[colunm_name] = column_types[colunm_number]    (cur_line[colunm_number])
    
            item = dynamodb_table.new_item(
                        attrs=row
                )           
            items.append(item)
    
            if count % BATCH_COUNT == 0:
                print 'batch write start ... ', 
                do_batch_write(items, table_name, dynamodb_table, dynamodb_conn)
                items = []
                print 'batch done! (row number: ' + str(count) + ')'
    
        # flush remaining items, if any
        if len(items) > 0: 
            do_batch_write(items, table_name, dynamodb_table, dynamodb_conn)
    
    
        csv_file.close() 
    
    
    def main():
        '''
        Demonstration of the use of import_csv_to_dynamodb()
        We assume the existence of a table named `test_persons`, with
        - Last_name as primary hash key (type: string)
        - First_name as primary range key (type: string)
        '''
        colunm_names = 'facebookID age ethnicity gender hometown name party sfw url'.split()
        table_name = 'OneAmericaDB'
        csv_file_name = 'test_data.csv'
        column_types = [str, str, str, str, str, str, str, str, str]
        import_csv_to_dynamodb(table_name, csv_file_name, colunm_names, column_types)
    
    
    if __name__ == "__main__":
        main()
        #cProfile.run('main()') # if you want to do some profiling
    
    • 0
  3. tinker
    2018-04-26T19:54:45+08:002018-04-26T19:54:45+08:00

    Este pacote NPM converte um json arbitrário em uma solicitação PUT para o DynamoDB. https://www.npmjs.com/package/json-dynamo-putrequest

    Definitivamente vale a pena tentar.

    • 0
  4. Juan Manuel Ruiz Fernández
    2019-02-07T14:52:58+08:002019-02-07T14:52:58+08:00

    Sugiro que você use o AWS Database Migration Service (DMS).

    Conforme descrito neste artigo: https://aws.amazon.com/es/blogs/database/migrate-delimited-files-from-amazon-s3-to-an-amazon-dynamodb-nosql-table-using-aws- database-migration-service-and-aws-cloudformation/ você pode usar o S3 como origem e o DynamoDB como destino para importar arquivos csv com muitas tuplas.

    Implementei com sucesso um processo de importação completo do S3 para o DynamoDB e é a maneira mais simples e rápida de fazer isso.

    Essencialmente, você deve:

    • Tenha um bucket para colocar seus arquivos csv, com pelo menos dois níveis de pastas (o primeiro se refere a "esquema" e o segundo é "nome da tabela").
    • Tenha uma tabela do DynamoDB com pelo menos a mesma chave de hash dos arquivos csv.
    • Crie um elemento de origem no DMS apontando para S3 e mapeando a estrutura csv.
    • Crie um elemento de destino no DMS apontando para a tabela do DynamoDB e o mapeamento da origem mapeada.
    • Crie uma instância de replicação (preste atenção ao nível gratuito) no DMS.
    • Crie uma tarefa de replicação no DMS que use elementos criados de origem e destino.
    • Executar tarefa.

    Modificando o rendimento da tabela do DynamoDB para 25 unidades de capacidade de leitura e 150 unidades de capacidade de gravação, consegui inserir mais de 124k tuplas em menos de 7 minutos, incluindo as tarefas de preparação.

    A recomendação principal da AWS para esta tarefa é usar o serviço de pipeline de dados, mas eu já o usei e é mais caro e a inicialização do culster EMR subjacente é um processo muito lento, portanto, se você não quiser repetir essa tarefa de importação recorrentemente, use o DMS em vez de.

    • 0

relate perguntas

  • MySQL import csv obtém apenas metade das linhas

  • Como descobrir quais dados acabei de importar para o Oracle?

  • Como posso importar o conteúdo de um banco de dados Oracle para o Visio para criar um diagrama de relacionamento de entidade?

  • Quantas buscas de disco são necessárias para gravar uma linha no SQLite?

  • Problema do Oracle Import causado por diferentes conjuntos de caracteres

Sidebar

Stats

  • Perguntas 205573
  • respostas 270741
  • best respostas 135370
  • utilizador 68524
  • Highest score
  • respostas
  • Marko Smith

    conectar ao servidor PostgreSQL: FATAL: nenhuma entrada pg_hba.conf para o host

    • 12 respostas
  • Marko Smith

    Como fazer a saída do sqlplus aparecer em uma linha?

    • 3 respostas
  • Marko Smith

    Selecione qual tem data máxima ou data mais recente

    • 3 respostas
  • Marko Smith

    Como faço para listar todos os esquemas no PostgreSQL?

    • 4 respostas
  • Marko Smith

    Listar todas as colunas de uma tabela especificada

    • 5 respostas
  • Marko Smith

    Como usar o sqlplus para se conectar a um banco de dados Oracle localizado em outro host sem modificar meu próprio tnsnames.ora

    • 4 respostas
  • Marko Smith

    Como você mysqldump tabela (s) específica (s)?

    • 4 respostas
  • Marko Smith

    Listar os privilégios do banco de dados usando o psql

    • 10 respostas
  • Marko Smith

    Como inserir valores em uma tabela de uma consulta de seleção no PostgreSQL?

    • 4 respostas
  • Marko Smith

    Como faço para listar todos os bancos de dados e tabelas usando o psql?

    • 7 respostas
  • Martin Hope
    Jin conectar ao servidor PostgreSQL: FATAL: nenhuma entrada pg_hba.conf para o host 2014-12-02 02:54:58 +0800 CST
  • Martin Hope
    Stéphane Como faço para listar todos os esquemas no PostgreSQL? 2013-04-16 11:19:16 +0800 CST
  • Martin Hope
    Mike Walsh Por que o log de transações continua crescendo ou fica sem espaço? 2012-12-05 18:11:22 +0800 CST
  • Martin Hope
    Stephane Rolland Listar todas as colunas de uma tabela especificada 2012-08-14 04:44:44 +0800 CST
  • Martin Hope
    haxney O MySQL pode realizar consultas razoavelmente em bilhões de linhas? 2012-07-03 11:36:13 +0800 CST
  • Martin Hope
    qazwsx Como posso monitorar o andamento de uma importação de um arquivo .sql grande? 2012-05-03 08:54:41 +0800 CST
  • Martin Hope
    markdorison Como você mysqldump tabela (s) específica (s)? 2011-12-17 12:39:37 +0800 CST
  • Martin Hope
    Jonas Como posso cronometrar consultas SQL usando psql? 2011-06-04 02:22:54 +0800 CST
  • Martin Hope
    Jonas Como inserir valores em uma tabela de uma consulta de seleção no PostgreSQL? 2011-05-28 00:33:05 +0800 CST
  • Martin Hope
    Jonas Como faço para listar todos os bancos de dados e tabelas usando o psql? 2011-02-18 00:45:49 +0800 CST

Hot tag

sql-server mysql postgresql sql-server-2014 sql-server-2016 oracle sql-server-2008 database-design query-performance sql-server-2017

Explore

  • Início
  • Perguntas
    • Recentes
    • Highest score
  • tag
  • help

Footer

AskOverflow.Dev

About Us

  • About Us
  • Contact Us

Legal Stuff

  • Privacy Policy

Language

  • Pt
  • Server
  • Unix

© 2023 AskOverflow.DEV All Rights Reserve