Skip to content

Commit dbfd4c8

Browse files
committed
fix: Fix duplicated WebSocket connection created when acquiring connection before one established
1 parent e309a2e commit dbfd4c8

File tree

8 files changed

+37
-62
lines changed

8 files changed

+37
-62
lines changed

src/adapters/action/dispatcher-ws.spec.ts

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ describe("DispatcherWs", () => {
2929
}).toThrow(MastoUnexpectedError);
3030
});
3131

32-
it("can be disposed", async () => {
32+
it("can be disposed", () => {
3333
const connector = new WebSocketConnectorImpl({
3434
constructorParameters: ["wss://example.com"],
3535
});
@@ -41,8 +41,6 @@ describe("DispatcherWs", () => {
4141
);
4242

4343
dispatcher[Symbol.dispose]();
44-
await expect(() => connector.acquire()).rejects.toThrow(
45-
MastoWebSocketError,
46-
);
44+
expect(() => connector.acquire()).toThrow(MastoWebSocketError);
4745
});
4846
});

src/adapters/action/dispatcher-ws.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ export class WebSocketActionDispatcher
2424

2525
dispatch<T>(action: WebSocketAction): T {
2626
if (action.type === "close") {
27-
this.connector.close();
27+
this.connector.kill();
2828
return {} as T;
2929
}
3030

@@ -50,6 +50,6 @@ export class WebSocketActionDispatcher
5050
}
5151

5252
[Symbol.dispose](): void {
53-
this.connector.close();
53+
this.connector.kill();
5454
}
5555
}

src/adapters/ws/web-socket-connector.spec.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,15 +18,15 @@ describe("WebSocketConnector", () => {
1818
expect(ws1).toBe(ws2);
1919

2020
server.close();
21-
connector.close();
21+
connector.kill();
2222
});
2323

2424
it("rejects if WebSocket closes", async () => {
2525
const connector = new WebSocketConnectorImpl({
2626
constructorParameters: [`ws://localhost:0`],
2727
});
2828
const promise = connector.acquire();
29-
connector.close();
29+
connector.kill();
3030

3131
await expect(promise).rejects.toBeInstanceOf(MastoWebSocketError);
3232
});

src/adapters/ws/web-socket-connector.ts

Lines changed: 12 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -18,45 +18,42 @@ interface WebSocketConnectorImplProps {
1818
export class WebSocketConnectorImpl implements WebSocketConnector {
1919
private ws?: WebSocket;
2020

21+
private killed = false;
2122
private queue: PromiseWithResolvers<WebSocket>[] = [];
2223
private backoff: ExponentialBackoff;
2324

24-
private closed = false;
25-
private initialized = false;
26-
2725
constructor(
2826
private readonly props: WebSocketConnectorImplProps,
2927
private readonly logger?: Logger,
3028
) {
3129
this.backoff = new ExponentialBackoff({
3230
maxAttempts: this.props.maxAttempts,
3331
});
32+
this.spawn();
3433
}
3534

36-
async acquire(): Promise<WebSocket> {
37-
if (this.closed) {
35+
acquire(): Promise<WebSocket> {
36+
if (this.killed) {
3837
throw new MastoWebSocketError("WebSocket closed");
3938
}
4039

41-
this.init();
42-
4340
if (this.ws != undefined) {
44-
return this.ws;
41+
return Promise.resolve(this.ws);
4542
}
4643

4744
const promiseWithResolvers = createPromiseWithResolvers<WebSocket>();
4845
this.queue.push(promiseWithResolvers);
49-
return await promiseWithResolvers.promise;
46+
return promiseWithResolvers.promise;
5047
}
5148

5249
async *[Symbol.asyncIterator](): AsyncIterableIterator<WebSocket> {
53-
while (!this.closed) {
50+
while (!this.killed) {
5451
yield await this.acquire();
5552
}
5653
}
5754

58-
close(): void {
59-
this.closed = true;
55+
kill(): void {
56+
this.killed = true;
6057
this.ws?.close();
6158
this.backoff.clear();
6259

@@ -67,14 +64,8 @@ export class WebSocketConnectorImpl implements WebSocketConnector {
6764
this.queue = [];
6865
}
6966

70-
private async init() {
71-
if (this.initialized) {
72-
return;
73-
}
74-
75-
this.initialized = true;
76-
77-
while (!this.closed) {
67+
private async spawn() {
68+
while (!this.killed) {
7869
this.ws?.close();
7970

8071
try {
@@ -114,6 +105,7 @@ export class WebSocketConnectorImpl implements WebSocketConnector {
114105
),
115106
);
116107
}
108+
117109
this.queue = [];
118110
}
119111
}

src/adapters/ws/web-socket-subscription.spec.ts

Lines changed: 3 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -8,24 +8,6 @@ import { WebSocketSubscription } from "./web-socket-subscription";
88
import { WebSocketSubscriptionCounterImpl } from "./web-socket-subscription-counter";
99

1010
describe("WebSocketSubscription", () => {
11-
it("doesn't do anything if no connection was established", async () => {
12-
const logger = createLogger();
13-
14-
const subscription = new WebSocketSubscription(
15-
new WebSocketConnectorImpl(
16-
{ constructorParameters: ["ws://localhost:0"] },
17-
logger,
18-
),
19-
new WebSocketSubscriptionCounterImpl(),
20-
new SerializerNativeImpl(),
21-
"public",
22-
logger,
23-
);
24-
25-
const res = subscription.unsubscribe();
26-
expect(res).toBeUndefined();
27-
});
28-
2911
it("implements async iterator", async () => {
3012
const logger = createLogger();
3113
const port = await getPort();
@@ -43,12 +25,12 @@ describe("WebSocketSubscription", () => {
4325
});
4426
});
4527

46-
const connection = new WebSocketConnectorImpl(
28+
const connector = new WebSocketConnectorImpl(
4729
{ constructorParameters: [`ws://localhost:${port}`] },
4830
logger,
4931
);
5032
const subscription = new WebSocketSubscription(
51-
connection,
33+
connector,
5234
new WebSocketSubscriptionCounterImpl(),
5335
new SerializerNativeImpl(),
5436
"public",
@@ -63,7 +45,7 @@ describe("WebSocketSubscription", () => {
6345

6446
expect(value).toBe("123");
6547

66-
connection.close();
48+
connector.kill();
6749
server.close();
6850
});
6951
});

src/interfaces/ws.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ import { type WebSocket } from "isomorphic-ws";
22

33
export interface WebSocketConnector extends AsyncIterable<WebSocket> {
44
acquire(): Promise<WebSocket>;
5-
close(): void;
5+
kill(): void;
66
}
77

88
export interface WebSocketSubscriptionCounter {

src/mastodon/streaming/client.ts

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@ export interface SubscribeHashtagParams {
1010

1111
export interface Subscription extends AsyncIterable<Event>, Disposable {
1212
values(): AsyncIterableIterator<Event>;
13-
1413
unsubscribe(): void;
1514
}
1615

tests/streaming/connections.spec.ts

Lines changed: 15 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,25 +1,27 @@
11
import assert from "node:assert";
2+
import crypto from "node:crypto";
23

34
it("maintains connections for the event even if other handlers closed it", async () => {
5+
const tag = `tag_${crypto.randomBytes(4).toString("hex")}`;
46
await using alice = await sessions.acquire({ waitForWs: true });
57

6-
using subscription1 = alice.ws.hashtag.subscribe({ tag: "test" });
7-
using subscription2 = alice.ws.hashtag.subscribe({ tag: "test" });
8+
using subscription1 = alice.ws.hashtag.subscribe({ tag });
9+
using subscription2 = alice.ws.hashtag.subscribe({ tag });
810

911
const promise1 = subscription1.values().take(1).toArray();
1012
const promise2 = subscription2.values().take(2).toArray();
1113

1214
// Dispatch event for subscription1 to establish connection
1315
const status1 = await alice.rest.v1.statuses.create({
14-
status: "#test",
16+
status: `#${tag}`,
1517
visibility: "public",
1618
});
1719
await promise1;
1820
subscription1.unsubscribe();
1921

2022
// subscription1 is now closed, so status2 will only be dispatched to subscription2
2123
const status2 = await alice.rest.v1.statuses.create({
22-
status: "#test",
24+
status: `#${tag}`,
2325
visibility: "public",
2426
});
2527

@@ -37,16 +39,17 @@ it("maintains connections for the event even if other handlers closed it", async
3739
});
3840

3941
it("maintains connections for the event if unsubscribe called twice", async () => {
42+
const tag = `tag_${crypto.randomBytes(4).toString("hex")}`;
4043
await using alice = await sessions.acquire({ waitForWs: true });
4144

42-
using subscription1 = alice.ws.hashtag.subscribe({ tag: "test" });
43-
using subscription2 = alice.ws.hashtag.subscribe({ tag: "test" });
45+
using subscription1 = alice.ws.hashtag.subscribe({ tag });
46+
using subscription2 = alice.ws.hashtag.subscribe({ tag });
4447

4548
const promise1 = subscription1.values().take(1).toArray();
4649
const promise2 = subscription2.values().take(2).toArray();
4750

4851
const status1 = await alice.rest.v1.statuses.create({
49-
status: "#test",
52+
status: `#${tag}`,
5053
visibility: "public",
5154
});
5255
await promise1;
@@ -56,7 +59,7 @@ it("maintains connections for the event if unsubscribe called twice", async () =
5659
subscription1.unsubscribe();
5760

5861
const status2 = await alice.rest.v1.statuses.create({
59-
status: "#test",
62+
status: `#${tag}`,
6063
visibility: "public",
6164
});
6265

@@ -74,17 +77,18 @@ it("maintains connections for the event if unsubscribe called twice", async () =
7477
});
7578

7679
it("maintains connections for the event if another handler called unsubscribe before connection established", async () => {
80+
const tag = `tag_${crypto.randomBytes(4).toString("hex")}`;
7781
await using alice = await sessions.acquire({ waitForWs: true });
7882

79-
using subscription1 = alice.ws.hashtag.subscribe({ tag: "test" });
80-
using subscription2 = alice.ws.hashtag.subscribe({ tag: "test" });
83+
using subscription1 = alice.ws.hashtag.subscribe({ tag });
84+
using subscription2 = alice.ws.hashtag.subscribe({ tag });
8185

8286
subscription1.unsubscribe();
8387

8488
const promise2 = subscription2.values().take(1).toArray();
8589

8690
const status1 = await alice.rest.v1.statuses.create({
87-
status: "#test",
91+
status: `#${tag}`,
8892
visibility: "public",
8993
});
9094

0 commit comments

Comments
 (0)