我正在尝试设置 FX 模块,但我有疑虑,找不到前进的方向。基本上,我有一个用于服务器的模块:
func NewGRPCServer(
lc fx.Lifecycle, log *zap.Logger, tracer trace.Tracer,
srvsInterceptors []grpc.UnaryServerInterceptor, serverOpt []grpc.ServerOption,
) *grpc.Server {
defaultRecoveryHandler := func(ctx context.Context, r interface{}) (err error) {
logger.FromContext(ctx).Error("recovered from panic", zap.Any("panic", r), zap.Stack("stacktrace"))
return status.Error(codes.Internal, "unexpected error")
}
interceptors := append([]grpc.UnaryServerInterceptor{
LoggerToContextInterceptor(log),
TracerToContextInterceptor(tracer),
grpc_recovery.UnaryServerInterceptor(grpc_recovery.WithRecoveryHandlerContext(defaultRecoveryHandler)),
}, srvsInterceptors...)
otelHandler := otelgrpc.NewServerHandler(
otelgrpc.WithTracerProvider(otel.GetTracerProvider()),
)
serverOpts := []grpc.ServerOption{
grpc.ChainUnaryInterceptor(interceptors...),
grpc.StatsHandler(otelHandler),
grpc.KeepaliveEnforcementPolicy(
keepalive.EnforcementPolicy{
MinTime: 60 * time.Second,
PermitWithoutStream: true,
}),
grpc.KeepaliveParams(
keepalive.ServerParameters{
Time: 60 * time.Second,
Timeout: 10 * time.Second,
},
),
}
serverOpts = append(serverOpts, serverOpt...)
server := grpc.NewServer(serverOpts...)
grpc_health_v1.RegisterHealthServer(server, health.NewServer())
reflection.Register(server)
return server
}
// NewListener creates a new network listener for the gRPC server using the gRPC server address parsed from the config.
func NewListener(cfg Config) (net.Listener, error) {
lis, err := net.Listen("tcp", fmt.Sprintf(":%d", cfg.GRPC))
if err != nil {
return nil, fmt.Errorf("dial connection: %w", err)
}
return lis, nil
}
func GRPCModule() fx.Option {
return fx.Module(
"grpc",
fx.Provide(
fx.Annotate(
NewGRPCServer,
fx.ParamTags(``, ``, ``, `optional:"true"`, `optional:"true"`),
),
),
fx.Invoke(func(lc fx.Lifecycle, server *grpc.Server, config Config, log *zap.Logger) {
lc.Append(fx.Hook{
OnStart: func(ctx context.Context) error {
lis, err := NewListener(config)
if err != nil {
return err
}
go func(srv *grpc.Server, logger *zap.Logger) {
logger.Info("Starting gRPC server")
if err := srv.Serve(lis); err != nil && err != grpc.ErrServerStopped {
logger.Error("gRPC server failed", zap.Error(err))
}
}(server, log)
return nil
},
OnStop: func(ctx context.Context) error {
server.GracefulStop()
return nil
},
})
}),
)
}
以及一个用于 UI 的模块:
func NewGRPCUIServer(
lc fx.Lifecycle,
logger *zap.Logger,
tracer trace.Tracer,
config Config,
) (*http.Server, error) {
logger.Info("enter on new grpc ui server.")
rpcGrpcHost := fmt.Sprintf("0.0.0.0:%d", config.GRPC)
keepAliveOpt := grpc.WithKeepaliveParams(keepalive.ClientParameters{
Time: 60 * time.Second,
Timeout: 10 * time.Second,
PermitWithoutStream: true,
})
cc, err := grpc.NewClient(
rpcGrpcHost,
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithDefaultServiceConfig(`{"loadBalancingPolicy":"round_robin"}`),
keepAliveOpt,
)
if err != nil {
logger.Error("Failed to connect to gRPC server for UI", zap.Error(err))
return nil, err
}
h, err := standalone.HandlerViaReflection(context.Background(), cc, rpcGrpcHost)
if err != nil {
logger.Error("Failed to create UI handler", zap.Error(err))
return nil, err
}
mux := http.NewServeMux()
mux.Handle("/grpc-ui/", http.StripPrefix("/grpc-ui", h))
return httplib.NewHTTPServerFx(
lc,
httplib.Config{
ServerAddr: fmt.Sprintf(":%d", config.UI),
ServerReadTimeout: 15 * time.Second,
ServerWriteTimeout: 15 * time.Second,
},
logger,
tracer,
mux,
)
}
type GRPCUIParams struct {
fx.In
WebServer *http.Server `name:"grpc-ui-server"`
Logger *zap.Logger
}
func GRPCUIModule() fx.Option {
return fx.Module(
"x:ui",
fx.Provide(
NewGRPCUIServer,
fx.Annotate(
NewGRPCUIServer,
fx.ParamTags(``, ``, ``, ``, ``),
fx.ResultTags(`name:"grpc-ui-server"`),
),
),
fx.Invoke(func(params GRPCUIParams) {
params.Logger.Info("gRPC UI Server initialized:", zap.String("address", params.WebServer.Addr))
}),
)
}
但由于某种原因,它在handlerViaReflection处失败了:
h, err := standalone.HandlerViaReflection(context.Background(), cc, rpcGrpcHost)
if err != nil {
logger.Error("Failed to create UI handler", zap.Error(err))
return nil, err
}
由于 gRPC 服务器尚未启动,因此出现错误。我在服务器模块中放置了一个断点:
fx.Invoke(func(lc fx.Lifecycle, server *grpc.Server, config Config, log *zap.Logger) {
lc.Append(fx.Hook{
OnStart: func(ctx context.Context) error {
lis, err := NewListener(config)
if err != nil {
return err
}
go func(srv *grpc.Server, logger *zap.Logger) {
logger.Info("Starting gRPC server")
if err := srv.Serve(lis); err != nil && err != grpc.ErrServerStopped {
logger.Error("gRPC server failed", zap.Error(err))
}
}(server, log)
return nil
},
而且它不会进入;它总是先进入 UI 模块。它创建服务器:NewGRPCServer,但从不初始化它。有人知道我该如何解决这个问题吗?