Descobri um rxjs
comportamento estranho que continua me incomodando e não consigo nem imaginar uma solução para esse problema.
Preciso criar um Observable
from Subject
que emita um erro se o valor consumido for null
, ou emita um valor. E então emita Subject
um null
valor from . Tudo funciona como esperado, mas o subscriber
of Observable
continua consumindo o erro que foi emitido após a primeira assinatura, mesmo com o first()
operador.
Como isso é possível?
import { Subject, throwError, of } from 'rxjs';
import { switchMap, tap, first } from 'rxjs/operators';
var subj$ = new Subject();
var obs$ = subj$.asObservable().pipe(
switchMap((v) => {
if (!v) {
return throwError(() => 'ERROR');
}
return of(v);
})
);
obs$
.pipe(
first(),
tap({
next(v) {
console.log('obs$', v);
},
error(err) {
console.error('obs$ error', err);
},
}),
tap(() => subj$.next(null))
)
.subscribe();
subj$.next('VALUE');
// obs$ VALUE
// obs$ error ERROR <- ???
// ERROR <- ???
https://stackblitz.com/edit/stackblitz-starters-sdx1oqzx?file=index.js
Exemplo do mundo real:
import { Observable, BehaviorSubject, zip, throwError, of } from 'rxjs';
import { first, switchMap, tap } from 'rxjs/operators';
type User {
id: number,
name: string
}
class AuthService {
private isAuthenticatedSource = new BehaviorSubject<boolean>(false);
private userSource = new BehaviorSubject<User | null>(null);
isAuthenticated$: Observable<boolean> = this.isAuthenticatedSource.asObservable();
user$: Observable<User> = zip(
this.isAuthenticatedSource.asObservable(),
this.userSource.asObservable(),
).pipe(
switchMap(([ isAuthenticated, user ]) => {
if (!isAuthenticated || !user) {
return throwError(() => 'Unauthenticated.');
}
return of(user);
})
);
constructor(private socketService: SocketService) {}
authenticate(user: User) {
this.isAuthenticatedSource.next(true);
this.userSource.next(user);
return this.socketService.emit('join', user);
}
// consumes the error from second emission
deauthenticate() {
return this.user$.pipe(
first(),
switchMap((user: User) => this.socketService.emit('leave', user)),
tap(() => {
this.isAuthenticatedSource.next(false);
this.userSource.next(null);
})
);
}
}
class SocketService {
socket = {
emit(event: string, data: any, ack: (res: any) => any) {
return { event, data, ack };
}
}
emit<T>(event: string, data: any): Observable<T> {
return new Observable<T>(subscriber => {
this.socket.emit(event, data, (res) => {
subscriber.next(res);
subscriber.complete();
})
});
}
}