Tenho a seguinte definição de JDBC
fonte em Apache Flink
.
val jdbcSource = JdbcSource.builder<LoggedInEvent>()
.setDBUrl("jdbc:postgresql://db:5432/postgres")
.setSql("SELECT player_id, past_logins FROM user_initial_data")
.setUsername("postgres")
.setPassword("example")
.setTypeInformation(TypeInformation.of(PlayerLoggedInEvent::class.java))
.setResultExtractor { LoggedInEvent(it.getInt(1).toString(), it.getInt(2), Instant.now().toEpochMilli()) }
.build()
val snapshotsStream = env.fromSource(jdbcSource, WatermarkStrategy.noWatermarks(), "LoggedInSnapshots")
Atualmente estou enfrentando dois problemas com esta solução:
- Não posso programar isso para ser executado a cada N segundos. Existe alguma maneira simples de fazer isso com as ferramentas existentes?
- Isso está relacionado ao #1, mas isso é executado apenas uma vez e o trabalho termina. Quero que isso seja agendado e executado continuamente dentro do mesmo trabalho.
O Flink não fornece esse tipo de agendamento ou pesquisa.
Por outro lado, o Kafka Connect oferece suporte a isso: https://docs.confluent.io/kafka-connectors/jdbc/current/source-connector/source_config_options.html