Estou tentando configurar módulos FX, mas tenho dúvidas e não consigo encontrar uma maneira de avançar. Basicamente, tenho um módulo para o servidor:
func NewGRPCServer(
lc fx.Lifecycle, log *zap.Logger, tracer trace.Tracer,
srvsInterceptors []grpc.UnaryServerInterceptor, serverOpt []grpc.ServerOption,
) *grpc.Server {
defaultRecoveryHandler := func(ctx context.Context, r interface{}) (err error) {
logger.FromContext(ctx).Error("recovered from panic", zap.Any("panic", r), zap.Stack("stacktrace"))
return status.Error(codes.Internal, "unexpected error")
}
interceptors := append([]grpc.UnaryServerInterceptor{
LoggerToContextInterceptor(log),
TracerToContextInterceptor(tracer),
grpc_recovery.UnaryServerInterceptor(grpc_recovery.WithRecoveryHandlerContext(defaultRecoveryHandler)),
}, srvsInterceptors...)
otelHandler := otelgrpc.NewServerHandler(
otelgrpc.WithTracerProvider(otel.GetTracerProvider()),
)
serverOpts := []grpc.ServerOption{
grpc.ChainUnaryInterceptor(interceptors...),
grpc.StatsHandler(otelHandler),
grpc.KeepaliveEnforcementPolicy(
keepalive.EnforcementPolicy{
MinTime: 60 * time.Second,
PermitWithoutStream: true,
}),
grpc.KeepaliveParams(
keepalive.ServerParameters{
Time: 60 * time.Second,
Timeout: 10 * time.Second,
},
),
}
serverOpts = append(serverOpts, serverOpt...)
server := grpc.NewServer(serverOpts...)
grpc_health_v1.RegisterHealthServer(server, health.NewServer())
reflection.Register(server)
return server
}
// NewListener creates a new network listener for the gRPC server using the gRPC server address parsed from the config.
func NewListener(cfg Config) (net.Listener, error) {
lis, err := net.Listen("tcp", fmt.Sprintf(":%d", cfg.GRPC))
if err != nil {
return nil, fmt.Errorf("dial connection: %w", err)
}
return lis, nil
}
func GRPCModule() fx.Option {
return fx.Module(
"grpc",
fx.Provide(
fx.Annotate(
NewGRPCServer,
fx.ParamTags(``, ``, ``, `optional:"true"`, `optional:"true"`),
),
),
fx.Invoke(func(lc fx.Lifecycle, server *grpc.Server, config Config, log *zap.Logger) {
lc.Append(fx.Hook{
OnStart: func(ctx context.Context) error {
lis, err := NewListener(config)
if err != nil {
return err
}
go func(srv *grpc.Server, logger *zap.Logger) {
logger.Info("Starting gRPC server")
if err := srv.Serve(lis); err != nil && err != grpc.ErrServerStopped {
logger.Error("gRPC server failed", zap.Error(err))
}
}(server, log)
return nil
},
OnStop: func(ctx context.Context) error {
server.GracefulStop()
return nil
},
})
}),
)
}
E um módulo para UI:
func NewGRPCUIServer(
lc fx.Lifecycle,
logger *zap.Logger,
tracer trace.Tracer,
config Config,
) (*http.Server, error) {
logger.Info("enter on new grpc ui server.")
rpcGrpcHost := fmt.Sprintf("0.0.0.0:%d", config.GRPC)
keepAliveOpt := grpc.WithKeepaliveParams(keepalive.ClientParameters{
Time: 60 * time.Second,
Timeout: 10 * time.Second,
PermitWithoutStream: true,
})
cc, err := grpc.NewClient(
rpcGrpcHost,
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithDefaultServiceConfig(`{"loadBalancingPolicy":"round_robin"}`),
keepAliveOpt,
)
if err != nil {
logger.Error("Failed to connect to gRPC server for UI", zap.Error(err))
return nil, err
}
h, err := standalone.HandlerViaReflection(context.Background(), cc, rpcGrpcHost)
if err != nil {
logger.Error("Failed to create UI handler", zap.Error(err))
return nil, err
}
mux := http.NewServeMux()
mux.Handle("/grpc-ui/", http.StripPrefix("/grpc-ui", h))
return httplib.NewHTTPServerFx(
lc,
httplib.Config{
ServerAddr: fmt.Sprintf(":%d", config.UI),
ServerReadTimeout: 15 * time.Second,
ServerWriteTimeout: 15 * time.Second,
},
logger,
tracer,
mux,
)
}
type GRPCUIParams struct {
fx.In
WebServer *http.Server `name:"grpc-ui-server"`
Logger *zap.Logger
}
func GRPCUIModule() fx.Option {
return fx.Module(
"x:ui",
fx.Provide(
NewGRPCUIServer,
fx.Annotate(
NewGRPCUIServer,
fx.ParamTags(``, ``, ``, ``, ``),
fx.ResultTags(`name:"grpc-ui-server"`),
),
),
fx.Invoke(func(params GRPCUIParams) {
params.Logger.Info("gRPC UI Server initialized:", zap.String("address", params.WebServer.Addr))
}),
)
}
Mas por algum motivo está falhando em handlerViaReflection:
h, err := standalone.HandlerViaReflection(context.Background(), cc, rpcGrpcHost)
if err != nil {
logger.Error("Failed to create UI handler", zap.Error(err))
return nil, err
}
E está dando erro porque o servidor gRPC ainda não iniciou. Coloquei um breakpoint aqui no módulo do servidor:
fx.Invoke(func(lc fx.Lifecycle, server *grpc.Server, config Config, log *zap.Logger) {
lc.Append(fx.Hook{
OnStart: func(ctx context.Context) error {
lis, err := NewListener(config)
if err != nil {
return err
}
go func(srv *grpc.Server, logger *zap.Logger) {
logger.Info("Starting gRPC server")
if err := srv.Serve(lis); err != nil && err != grpc.ErrServerStopped {
logger.Error("gRPC server failed", zap.Error(err))
}
}(server, log)
return nil
},
E não entra; ele sempre vai para o módulo UI primeiro. Ele cria o servidor: NewGRPCServer, mas nunca o inicializa. Alguém sabe como posso resolver isso?