我想在数据框上运行 groupby 和聚合,其中聚合将具有相同的字符串连接起来id
。 df 如下所示:
In [1]: df = pd.DataFrame.from_dict({'id':[1,1,2,2,2,3], 'name':['a','b','c','d','e','f']})
In [2]: df
Out[2]:
id name
0 1 a
1 1 b
2 2 c
3 2 d
4 2 e
5 3 f
我在 Pandas 中实现了这个功能:
def list_aggregator(x):
return '|'.join(x)
df2 = pd.DataFrame.from_dict('id':[], 'name':[])
df2['id'] = df['id'].drop_duplicates()
df2['name'] = df['name'].groupby(df['id']).agg(list_aggregator).values
生成:
In [26]: df2
Out[26]:
id name
0 1 a|b
2 2 c|d|e
5 3 f
对于 Dask,我的理解(来自文档)是你必须告诉 Dask 如何在块内进行聚合,然后如何处理这些聚合块。在这两种情况下,我都想做等同于的事情'|'.join()
。所以:
ddf = dd.from_pandas(df, 2)
ddf2 = dd.from_pandas(pd.DataFrame.from_dict({'id':[],'name':[]}))
ddf2['id'] = ddf['id'].drop_duplicates()
dd_list_aggregation = dd.Aggregation(
'list_aggregation',
list_aggregator, # chunks are aggregated into strings with 1 string per chunk
list_aggregator, # per-chunk strings are aggregated into a single string per id
)
ddf2['name'] = ddf['name'].groupby(ddf['id']).agg(dd_list_aggregation).values
预期结果如上(或者,实际上没有任何结果,因为我ddf2.compute()
还没有调用),但我收到此错误:
---------------------------------------------------------------------------
AttributeError Traceback (most recent call last)
File ~/miniconda3/envs/test/lib/python3.10/site-packages/dask_expr/_core.py:446, in Expr.__getattr__(self, key)
445 try:
--> 446 return object.__getattribute__(self, key)
447 except AttributeError as err:
File ~/miniconda3/envs/test/lib/python3.10/functools.py:981, in cached_property.__get__(self, instance, owner)
980 if val is _NOT_FOUND:
--> 981 val = self.func(instance)
982 try:
File ~/miniconda3/envs/test/lib/python3.10/site-packages/dask_expr/_groupby.py:206, in GroupByApplyConcatApply._meta_chunk(self)
205 meta = meta_nonempty(self.frame._meta)
--> 206 return self.chunk(meta, *self._by_meta, **self.chunk_kwargs)
File ~/miniconda3/envs/test/lib/python3.10/site-packages/dask/dataframe/groupby.py:1200, in _groupby_apply_funcs(df, *by, **kwargs)
1199 for result_column, func, func_kwargs in funcs:
-> 1200 r = func(grouped, **func_kwargs)
1202 if isinstance(r, tuple):
File ~/miniconda3/envs/test/lib/python3.10/site-packages/dask/dataframe/groupby.py:1276, in _apply_func_to_column(df_like, column, func)
1275 if column is None:
-> 1276 return func(df_like)
1278 return func(df_like[column])
Cell In[88], line 2
1 def dd_list_aggregator(x):
----> 2 return '|'.join(x[1])
File ~/miniconda3/envs/test/lib/python3.10/site-packages/pandas/core/base.py:245, in SelectionMixin.__getitem__(self, key)
244 raise KeyError(f"Column not found: {key}")
--> 245 ndim = self.obj[key].ndim
246 return self._gotitem(key, ndim=ndim)
AttributeError: 'str' object has no attribute 'ndim'
During handling of the above exception, another exception occurred:
RuntimeError Traceback (most recent call last)
Cell In[96], line 1
----> 1 ddf2['name'] = ddf['name'].groupby(ddf['id']).agg(dd_list_aggregation).values
File ~/miniconda3/envs/test/lib/python3.10/site-packages/dask_expr/_groupby.py:1907, in GroupBy.agg(self, *args, **kwargs)
1906 def agg(self, *args, **kwargs):
-> 1907 return self.aggregate(*args, **kwargs)
File ~/miniconda3/envs/test/lib/python3.10/site-packages/dask_expr/_groupby.py:1891, in GroupBy.aggregate(self, arg, split_every, split_out, shuffle_method, **kwargs)
1888 if arg == "size":
1889 return self.size()
-> 1891 return new_collection(
1892 GroupbyAggregation(
1893 self.obj.expr,
1894 arg,
1895 self.observed,
1896 self.dropna,
1897 split_every,
1898 split_out,
1899 self.sort,
1900 shuffle_method,
1901 self._slice,
1902 *self.by,
1903 )
1904 )
File ~/miniconda3/envs/test/lib/python3.10/site-packages/dask_expr/_collection.py:4440, in new_collection(expr)
4438 def new_collection(expr):
4439 """Create new collection from an expr"""
-> 4440 meta = expr._meta
4441 expr._name # Ensure backend is imported
4442 return get_collection_type(meta)(expr)
File ~/miniconda3/envs/test/lib/python3.10/functools.py:981, in cached_property.__get__(self, instance, owner)
979 val = cache.get(self.attrname, _NOT_FOUND)
980 if val is _NOT_FOUND:
--> 981 val = self.func(instance)
982 try:
983 cache[self.attrname] = val
File ~/miniconda3/envs/test/lib/python3.10/site-packages/dask_expr/_groupby.py:432, in GroupbyAggregation._meta(self)
430 @functools.cached_property
431 def _meta(self):
--> 432 return self._lower()._meta
File ~/miniconda3/envs/test/lib/python3.10/functools.py:981, in cached_property.__get__(self, instance, owner)
979 val = cache.get(self.attrname, _NOT_FOUND)
980 if val is _NOT_FOUND:
--> 981 val = self.func(instance)
982 try:
983 cache[self.attrname] = val
File ~/miniconda3/envs/test/lib/python3.10/site-packages/dask_expr/_reductions.py:425, in ApplyConcatApply._meta(self)
423 @functools.cached_property
424 def _meta(self):
--> 425 meta = self._meta_chunk
426 aggregate = self.aggregate or (lambda x: x)
427 if self.combine:
File ~/miniconda3/envs/test/lib/python3.10/site-packages/dask_expr/_core.py:451, in Expr.__getattr__(self, key)
447 except AttributeError as err:
448 if key.startswith("_meta"):
449 # Avoid a recursive loop if/when `self._meta*`
450 # produces an `AttributeError`
--> 451 raise RuntimeError(
452 f"Failed to generate metadata for {self}. "
453 "This operation may not be supported by the current backend."
454 )
456 # Allow operands to be accessed as attributes
457 # as long as the keys are not already reserved
458 # by existing methods/properties
459 _parameters = type(self)._parameters
RuntimeError: Failed to generate metadata for DecomposableGroupbyAggregation(frame=df['name'], arg=<dask.dataframe.groupby.Aggregation object at 0x7f052960b850>, observed=False, split_out=1). This operation may not be supported by the current backend.
我的想法是期望数值对象,但后端是 pandas,所以应该可以进行字符串操作,对吗?