Tenho dois dataframes: a (~600M linhas) e b (~2M linhas) . Qual é a melhor abordagem para unir b em a, ao usar 1 condição de igualdade e 2 condições de desigualdade nas respectivas colunas?
- a_1 = b_1
- a_2 >= b_2
- a_3 >= b_3
Até agora explorei os seguintes caminhos:
- Polares :
- join_asof(): permite apenas 1 condição de desigualdade
- join_where() com filter(): mesmo com uma pequena janela de tolerância, a instalação padrão do Polars fica sem linhas (limite de 4,3 bilhões de linhas) durante a junção, e a instalação do polars-u64-idx fica sem memória (512 GB)
- DuckDB : ASOF LEFT JOIN: também permite apenas 1 condição de desigualdade
- Numba : Como o acima não funcionou, tentei criar minha própria função join_asof() - veja o código abaixo. Ela funciona bem, mas com o aumento do comprimento de a, ela se torna proibitivamente lenta. Tentei várias configurações diferentes de loops for/while e filtragem, todas com resultados semelhantes.
Agora estou ficando sem ideias... Qual seria uma maneira mais eficiente de implementar isso?
Obrigado
import numba as nb
import numpy as np
import polars as pl
import time
@nb.njit(nb.int32[:](nb.int32[:], nb.int32[:], nb.int32[:], nb.int32[:], nb.int32[:], nb.int32[:], nb.int32[:]), parallel=True)
def join_multi_ineq(a_1, a_2, a_3, b_1, b_2, b_3, b_4):
output = np.zeros(len(a_1), dtype=np.int32)
for i in nb.prange(len(a_1)):
for j in range(len(b_1) - 1, -1, -1):
if a_1[i] == b_1[j]:
if a_2[i] >= b_2[j]:
if a_3[i] >= b_3[j]:
output[i] = b_4[j]
break
return output
length_a = 5_000_000
length_b = 2_000_000
start_time = time.time()
output = join_multi_ineq(a_1=np.random.randint(1, 1_000, length_a, dtype=np.int32),
a_2=np.random.randint(1, 1_000, length_a, dtype=np.int32),
a_3=np.random.randint(1, 1_000, length_a, dtype=np.int32),
b_1=np.random.randint(1, 1_000, length_b, dtype=np.int32),
b_2=np.random.randint(1, 1_000, length_b, dtype=np.int32),
b_3=np.random.randint(1, 1_000, length_b, dtype=np.int32),
b_4=np.random.randint(1, 1_000, length_b, dtype=np.int32))
print(f"Duration: {(time.time() - start_time):.2f} seconds")
Você pode usar
distinct on
a cláusula DuckDB (Postgresql):Você também pode tentar usar
pl.DataFrame.join_where()
mas no modo lazy. Estou assumindo que seu dataframe 'a' tem uma chave única, neste caso de exemplo -a1,a2,a3
.pl.DataFrame.lazy()
para tratar DataFrame como LazyFrame.pl.LazyFrame.join_where()
para unir quadros preguiçosos.pl.LazyFrame.sort()
para classificar os resultados.pl.LazyFrame.drop()
para removerb2,b3
colunas.pl.LazyFrame.unique()
para deixar apenas uma linha pora1,a2,a3
.pl.LazyFrame.collect()
.Se nada disso funcionar, você pode tentar dividir um dos quadros em N pedaços com
pl.DataFrame.partition_by()
, processar os pedaços separadamente e então usarpl.concat()
para concatená-los novamente.Usar Numba aqui é uma boa ideia, já que a operação é particularmente cara. Dito isso, a complexidade do algoritmo é,
O(n²)
embora não seja fácil fazer muito melhor (sem tornar o código muito mais complexo). Além disso, o arrayb_1
, que pode não caber no cache L3, é lido completamente 5_000_000 vezes, tornando o código bastante limitado à memória.Podemos acelerar bastante o código construindo um índice para não percorrer todo o array
b_1
, mas apenas os valores wherea_1[i] == b_1[j]
. Isso não é suficiente para melhorar a complexidade, pois muitosj
valores atendem a essa condição. Podemos melhorar a complexidade (média) construindo um tipo de árvore para todos os nós do índice, mas, na prática, isso torna o código muito mais complexo e o tempo para construir a árvore seria tão grande que, na verdade, não vale a pena fazer isso na prática. De fato, um índice básico é suficiente para reduzir bastante o tempo de execução no conjunto de dados aleatório fornecido (com números uniformemente distribuídos). Aqui está o código resultante:Note que, em média, apenas 32
k
valores são testados (o que é razoável o suficiente para não construir uma estrutura de dados mais eficiente/complicada). Observe também que o resultado é estritamente idêntico ao fornecido pela implementação ingênua.Referência
Aqui estão os resultados na minha CPU i5-9600KF (6 núcleos):
Portanto, essa implementação é cerca de 30 vezes mais rápida que o código inicial.