Skip to content

Commit b7732ed

Browse files
committed
fix: Unsubscribe from WebSocket stream when iterator ended
1 parent f0796fc commit b7732ed

File tree

4 files changed

+38
-27
lines changed

4 files changed

+38
-27
lines changed

src/adapters/action/dispatcher-ws.spec.ts

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import { MastoUnexpectedError } from "../errors";
1+
import { MastoUnexpectedError, MastoWebSocketError } from "../errors";
22
import { createLogger } from "../logger";
33
import { SerializerNativeImpl } from "../serializers";
44
import {
@@ -29,7 +29,7 @@ describe("DispatcherWs", () => {
2929
}).toThrow(MastoUnexpectedError);
3030
});
3131

32-
it("can be disposed", () => {
32+
it("can be disposed", async () => {
3333
const connector = new WebSocketConnectorImpl({
3434
constructorParameters: ["wss://example.com"],
3535
});
@@ -41,6 +41,8 @@ describe("DispatcherWs", () => {
4141
);
4242

4343
dispatcher[Symbol.dispose]();
44-
expect(connector.canAcquire()).toBe(false);
44+
await expect(() => connector.acquire()).rejects.toThrow(
45+
MastoWebSocketError,
46+
);
4547
});
4648
});

src/adapters/ws/web-socket-connector.ts

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -33,11 +33,11 @@ export class WebSocketConnectorImpl implements WebSocketConnector {
3333
});
3434
}
3535

36-
canAcquire(): boolean {
37-
return !this.closed;
38-
}
39-
4036
async acquire(): Promise<WebSocket> {
37+
if (this.closed) {
38+
throw new MastoWebSocketError("WebSocket closed");
39+
}
40+
4141
this.init();
4242

4343
if (this.ws != undefined) {
@@ -49,6 +49,12 @@ export class WebSocketConnectorImpl implements WebSocketConnector {
4949
return await promiseWithResolvers.promise;
5050
}
5151

52+
async *[Symbol.asyncIterator](): AsyncIterableIterator<WebSocket> {
53+
while (!this.closed) {
54+
yield await this.acquire();
55+
}
56+
}
57+
5258
close(): void {
5359
this.closed = true;
5460
this.ws?.close();

src/adapters/ws/web-socket-subscription.ts

Lines changed: 22 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -25,31 +25,35 @@ export class WebSocketSubscription implements mastodon.streaming.Subscription {
2525
async *values(): AsyncIterableIterator<mastodon.streaming.Event> {
2626
this.logger?.log("info", "Subscribing to stream", this.stream);
2727

28-
while (this.connector.canAcquire()) {
29-
this.connection = await this.connector.acquire();
28+
try {
29+
for await (const connection of this.connector) {
30+
this.connection = connection;
3031

31-
const data = this.serializer.serialize("json", {
32-
type: "subscribe",
33-
stream: this.stream,
34-
...this.params,
35-
});
32+
const data = this.serializer.serialize("json", {
33+
type: "subscribe",
34+
stream: this.stream,
35+
...this.params,
36+
});
3637

37-
this.logger?.log("debug", "↑ WEBSOCKET", data);
38-
this.connection.send(data);
39-
this.counter.increment(this.stream, this.params);
38+
this.logger?.log("debug", "↑ WEBSOCKET", data);
39+
this.connection.send(data);
40+
this.counter.increment(this.stream, this.params);
4041

41-
const messages = toAsyncIterable(this.connection);
42+
const messages = toAsyncIterable(this.connection);
4243

43-
for await (const message of messages) {
44-
const event = await this.parseMessage(message.data as string);
44+
for await (const message of messages) {
45+
const event = await this.parseMessage(message.data as string);
4546

46-
if (!this.test(event)) {
47-
continue;
48-
}
47+
if (!this.test(event)) {
48+
continue;
49+
}
4950

50-
this.logger?.log("debug", "↓ WEBSOCKET", event);
51-
yield event;
51+
this.logger?.log("debug", "↓ WEBSOCKET", event);
52+
yield event;
53+
}
5254
}
55+
} finally {
56+
this.unsubscribe();
5357
}
5458
}
5559

src/interfaces/ws.ts

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,8 @@
11
import { type WebSocket } from "isomorphic-ws";
22

3-
export interface WebSocketConnector {
3+
export interface WebSocketConnector extends AsyncIterable<WebSocket> {
44
acquire(): Promise<WebSocket>;
55
close(): void;
6-
canAcquire(): boolean;
76
}
87

98
export interface WebSocketSubscriptionCounter {

0 commit comments

Comments
 (0)