我是这个库的新手rxjs
,仍在学习它,我正在尝试使用 node.js 实现 TCP 服务器Observable
,并使用我在公司看到的模式,该模式包含具有作为Observable
输入和返回的函数另一个输出Observable
将发送回客户端,所以我想出了这个代码
import net from "net";
import * as rx from "rxjs";
type MaybePromise<T> = T | PromiseLike<T>;
type ConnectionHandler = (address: net.AddressInfo, input$: rx.Observable<Buffer>) => MaybePromise<rx.Observable<Buffer>>;
const createServer = (port: number, handler: ConnectionHandler, opts?: net.ServerOpts) => {
const server = net.createServer(opts);
new rx.Observable<net.Socket>(subscriber => {
server
.on("connection", socket => subscriber.next(socket))
.on("error", err => subscriber.error(err));
}).subscribe(async socket =>
(await handler(
socket.address() as net.SocketAddress,
new rx.Observable<Buffer>(subscriber => {
socket
.on("end", () => socket.destroySoon())
.on("error", err => subscriber.error(err))
.on("data", request => subscriber.next(request))
.on("close", () => subscriber.complete());
}),
)).subscribe({
next: response => {
console.log(response.toString());
socket.emit("data", response);
},
error: () => socket.destroySoon(),
})
);
server.listen(port);
};
createServer(3000, async (address, input$) => {
console.log(`New client: ${address.address}`);
const login = await rx.firstValueFrom(input$);
if (login.toString() != "login\n")
return rx.throwError(() => new Error("login failure"));
return input$.pipe(
rx.map(data => {
return Buffer.from(`ACK: ${data.toString()}`, "utf-8");
})
);
});
所以基本上在这个例子中,我想等待login\n
客户端的请求,然后对于每个请求,我想将其回显给客户端,并ACK:
在其前面添加,但是在创建我的输出可观察时,它会添加ACK
数千次相同的按摩,所以看起来输出每次都会返回到函数作为输入,但我不知道为什么,我没有更改输入Observable
,我只是返回一个将被处理的新输入发给客户,你能帮我指出错误出在哪里吗?
你们都有:
和:
我猜这就是导致无限循环的原因。
您需要替换
socket.emit("data", response)
为socket.write(response)
.查看的文档
Socket#write
。