我有一个自定义的 AggregateFunction,其签名如下:
public class CustomAggregateFunction
implements AggregateFunction<CustomInput, AggregationAccumulator, CustomOutput> { code...}
MyAggregationAccumulator
是包含一些地图的简单对象,带有 Lombok @Data 注释
@Data
public static class AggregationAccumulator {
private Map<String, Long> customMap = new HashMap<>();
}
然而,Flink 表示13:18:50,091 INFO org.apache.flink.api.java.typeutils.TypeExtractor [] - Field AggregationAccumulator#customMap will be processed as GenericType. Please read the Flink documentation on "Data Types & Serialization" for details of the effect on performance and schema evolution.
我如何提供类型信息以便它不会使用 Kryo?
@TypeInfo
您可以在地图上添加注释,并提供适当的TypeInfoFactory
实现。如下所示:为了验证,请使用: