Gostaria de executar groupby e aggregation sobre um dataframe onde a agregação une strings com o mesmo id
. O df se parece com isso:
In [1]: df = pd.DataFrame.from_dict({'id':[1,1,2,2,2,3], 'name':['a','b','c','d','e','f']})
In [2]: df
Out[2]:
id name
0 1 a
1 1 b
2 2 c
3 2 d
4 2 e
5 3 f
Eu tenho isso funcionando no Pandas assim:
def list_aggregator(x):
return '|'.join(x)
df2 = pd.DataFrame.from_dict('id':[], 'name':[])
df2['id'] = df['id'].drop_duplicates()
df2['name'] = df['name'].groupby(df['id']).agg(list_aggregator).values
Produz:
In [26]: df2
Out[26]:
id name
0 1 a|b
2 2 c|d|e
5 3 f
Para Dask, meu entendimento (dos documentos ) é que você tem que dizer a Dask o que fazer para agregar dentro de chunks, e então o que fazer com esses chunks agregados. Em ambos os casos, eu quero fazer o equivalente a '|'.join()
. Então:
ddf = dd.from_pandas(df, 2)
ddf2 = dd.from_pandas(pd.DataFrame.from_dict({'id':[],'name':[]}))
ddf2['id'] = ddf['id'].drop_duplicates()
dd_list_aggregation = dd.Aggregation(
'list_aggregation',
list_aggregator, # chunks are aggregated into strings with 1 string per chunk
list_aggregator, # per-chunk strings are aggregated into a single string per id
)
ddf2['name'] = ddf['name'].groupby(ddf['id']).agg(dd_list_aggregation).values
O resultado esperado é o acima (ou, na verdade, nada, já que ainda não chamei ddf2.compute()
), mas recebo este erro:
---------------------------------------------------------------------------
AttributeError Traceback (most recent call last)
File ~/miniconda3/envs/test/lib/python3.10/site-packages/dask_expr/_core.py:446, in Expr.__getattr__(self, key)
445 try:
--> 446 return object.__getattribute__(self, key)
447 except AttributeError as err:
File ~/miniconda3/envs/test/lib/python3.10/functools.py:981, in cached_property.__get__(self, instance, owner)
980 if val is _NOT_FOUND:
--> 981 val = self.func(instance)
982 try:
File ~/miniconda3/envs/test/lib/python3.10/site-packages/dask_expr/_groupby.py:206, in GroupByApplyConcatApply._meta_chunk(self)
205 meta = meta_nonempty(self.frame._meta)
--> 206 return self.chunk(meta, *self._by_meta, **self.chunk_kwargs)
File ~/miniconda3/envs/test/lib/python3.10/site-packages/dask/dataframe/groupby.py:1200, in _groupby_apply_funcs(df, *by, **kwargs)
1199 for result_column, func, func_kwargs in funcs:
-> 1200 r = func(grouped, **func_kwargs)
1202 if isinstance(r, tuple):
File ~/miniconda3/envs/test/lib/python3.10/site-packages/dask/dataframe/groupby.py:1276, in _apply_func_to_column(df_like, column, func)
1275 if column is None:
-> 1276 return func(df_like)
1278 return func(df_like[column])
Cell In[88], line 2
1 def dd_list_aggregator(x):
----> 2 return '|'.join(x[1])
File ~/miniconda3/envs/test/lib/python3.10/site-packages/pandas/core/base.py:245, in SelectionMixin.__getitem__(self, key)
244 raise KeyError(f"Column not found: {key}")
--> 245 ndim = self.obj[key].ndim
246 return self._gotitem(key, ndim=ndim)
AttributeError: 'str' object has no attribute 'ndim'
During handling of the above exception, another exception occurred:
RuntimeError Traceback (most recent call last)
Cell In[96], line 1
----> 1 ddf2['name'] = ddf['name'].groupby(ddf['id']).agg(dd_list_aggregation).values
File ~/miniconda3/envs/test/lib/python3.10/site-packages/dask_expr/_groupby.py:1907, in GroupBy.agg(self, *args, **kwargs)
1906 def agg(self, *args, **kwargs):
-> 1907 return self.aggregate(*args, **kwargs)
File ~/miniconda3/envs/test/lib/python3.10/site-packages/dask_expr/_groupby.py:1891, in GroupBy.aggregate(self, arg, split_every, split_out, shuffle_method, **kwargs)
1888 if arg == "size":
1889 return self.size()
-> 1891 return new_collection(
1892 GroupbyAggregation(
1893 self.obj.expr,
1894 arg,
1895 self.observed,
1896 self.dropna,
1897 split_every,
1898 split_out,
1899 self.sort,
1900 shuffle_method,
1901 self._slice,
1902 *self.by,
1903 )
1904 )
File ~/miniconda3/envs/test/lib/python3.10/site-packages/dask_expr/_collection.py:4440, in new_collection(expr)
4438 def new_collection(expr):
4439 """Create new collection from an expr"""
-> 4440 meta = expr._meta
4441 expr._name # Ensure backend is imported
4442 return get_collection_type(meta)(expr)
File ~/miniconda3/envs/test/lib/python3.10/functools.py:981, in cached_property.__get__(self, instance, owner)
979 val = cache.get(self.attrname, _NOT_FOUND)
980 if val is _NOT_FOUND:
--> 981 val = self.func(instance)
982 try:
983 cache[self.attrname] = val
File ~/miniconda3/envs/test/lib/python3.10/site-packages/dask_expr/_groupby.py:432, in GroupbyAggregation._meta(self)
430 @functools.cached_property
431 def _meta(self):
--> 432 return self._lower()._meta
File ~/miniconda3/envs/test/lib/python3.10/functools.py:981, in cached_property.__get__(self, instance, owner)
979 val = cache.get(self.attrname, _NOT_FOUND)
980 if val is _NOT_FOUND:
--> 981 val = self.func(instance)
982 try:
983 cache[self.attrname] = val
File ~/miniconda3/envs/test/lib/python3.10/site-packages/dask_expr/_reductions.py:425, in ApplyConcatApply._meta(self)
423 @functools.cached_property
424 def _meta(self):
--> 425 meta = self._meta_chunk
426 aggregate = self.aggregate or (lambda x: x)
427 if self.combine:
File ~/miniconda3/envs/test/lib/python3.10/site-packages/dask_expr/_core.py:451, in Expr.__getattr__(self, key)
447 except AttributeError as err:
448 if key.startswith("_meta"):
449 # Avoid a recursive loop if/when `self._meta*`
450 # produces an `AttributeError`
--> 451 raise RuntimeError(
452 f"Failed to generate metadata for {self}. "
453 "This operation may not be supported by the current backend."
454 )
456 # Allow operands to be accessed as attributes
457 # as long as the keys are not already reserved
458 # by existing methods/properties
459 _parameters = type(self)._parameters
RuntimeError: Failed to generate metadata for DecomposableGroupbyAggregation(frame=df['name'], arg=<dask.dataframe.groupby.Aggregation object at 0x7f052960b850>, observed=False, split_out=1). This operation may not be supported by the current backend.
Meu pensamento é que objetos numéricos são esperados, mas o backend é o pandas, então manipulações de strings devem ser possíveis, certo?