Sou novato na rxjs
biblioteca, ainda estou aprendendo, e estou tentando implementar um servidor TCP em node.js usando Observable
, e usando um padrão que vi na minha empresa que consiste em ter funções que recebem um Observable
como entrada e retornam outra saída Observable
que será enviada de volta ao cliente, então criei este código
import net from "net";
import * as rx from "rxjs";
type MaybePromise<T> = T | PromiseLike<T>;
type ConnectionHandler = (address: net.AddressInfo, input$: rx.Observable<Buffer>) => MaybePromise<rx.Observable<Buffer>>;
const createServer = (port: number, handler: ConnectionHandler, opts?: net.ServerOpts) => {
const server = net.createServer(opts);
new rx.Observable<net.Socket>(subscriber => {
server
.on("connection", socket => subscriber.next(socket))
.on("error", err => subscriber.error(err));
}).subscribe(async socket =>
(await handler(
socket.address() as net.SocketAddress,
new rx.Observable<Buffer>(subscriber => {
socket
.on("end", () => socket.destroySoon())
.on("error", err => subscriber.error(err))
.on("data", request => subscriber.next(request))
.on("close", () => subscriber.complete());
}),
)).subscribe({
next: response => {
console.log(response.toString());
socket.emit("data", response);
},
error: () => socket.destroySoon(),
})
);
server.listen(port);
};
createServer(3000, async (address, input$) => {
console.log(`New client: ${address.address}`);
const login = await rx.firstValueFrom(input$);
if (login.toString() != "login\n")
return rx.throwError(() => new Error("login failure"));
return input$.pipe(
rx.map(data => {
return Buffer.from(`ACK: ${data.toString()}`, "utf-8");
})
);
});
Então basicamente neste exemplo eu quero esperar por uma login\n
requisição para o cliente, e então para cada requisição, quero ecoá-la de volta para o cliente, adicionando ACK:
na frente dela, mas ao criar minha saída observável ele adiciona ACK
milhares de vezes em a mesma massagem, então parece que a saída está voltando para a função como entrada a cada vez, mas não sei por que, não estou alterando a entrada Observable
, estou apenas retornando uma nova que será processada para ser enviado ao cliente, você pode me ajudar a apontar de onde vem o erro?
Vocês dois têm:
E:
Acho que é isso que causa o loop infinito.
Você vai querer substituir
socket.emit("data", response)
porsocket.write(response)
.Dê uma olhada na documentação do
Socket#write
.