Skip to content

Commit 59b7757

Browse files
authored
Merge pull request #254 from weaviate/#253/fix-timeout-ux
Improve UX of using timeouts
2 parents b5791d3 + afef475 commit 59b7757

File tree

7 files changed

+206
-36
lines changed

7 files changed

+206
-36
lines changed

src/connection/helpers.ts

+39-23
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,8 @@ export function connectToWeaviateCloud(
8686
grpcHost = `grpc-${url.hostname}`;
8787
}
8888

89+
const { authCredentials: auth, headers, ...rest } = options || {};
90+
8991
return clientMaker({
9092
connectionParams: {
9193
http: {
@@ -99,8 +101,9 @@ export function connectToWeaviateCloud(
99101
port: 443,
100102
},
101103
},
102-
auth: options?.authCredentials,
103-
headers: addWeaviateEmbeddingServiceHeaders(clusterURL, options),
104+
auth,
105+
headers: addWeaviateEmbeddingServiceHeaders(clusterURL, auth, headers),
106+
...rest,
104107
}).catch((e) => {
105108
throw new WeaviateStartUpError(`Weaviate failed to startup with message: ${e.message}`);
106109
});
@@ -110,21 +113,22 @@ export function connectToLocal(
110113
clientMaker: (params: ClientParams) => Promise<WeaviateClient>,
111114
options?: ConnectToLocalOptions
112115
): Promise<WeaviateClient> {
116+
const { host, port, grpcPort, authCredentials: auth, ...rest } = options || {};
113117
return clientMaker({
114118
connectionParams: {
115119
http: {
116120
secure: false,
117-
host: options?.host || 'localhost',
118-
port: options?.port || 8080,
121+
host: host || 'localhost',
122+
port: port || 8080,
119123
},
120124
grpc: {
121125
secure: false,
122-
host: options?.host || 'localhost',
123-
port: options?.grpcPort || 50051,
126+
host: host || 'localhost',
127+
port: grpcPort || 50051,
124128
},
125129
},
126-
auth: options?.authCredentials,
127-
headers: options?.headers,
130+
auth,
131+
...rest,
128132
}).catch((e) => {
129133
throw new WeaviateStartUpError(`Weaviate failed to startup with message: ${e.message}`);
130134
});
@@ -134,37 +138,49 @@ export function connectToCustom(
134138
clientMaker: (params: ClientParams) => Promise<WeaviateClient>,
135139
options?: ConnectToCustomOptions
136140
): Promise<WeaviateClient> {
141+
const {
142+
httpHost,
143+
httpPath,
144+
httpPort,
145+
httpSecure,
146+
grpcHost,
147+
grpcPort,
148+
grpcSecure,
149+
authCredentials: auth,
150+
...rest
151+
} = options || {};
137152
return clientMaker({
138153
connectionParams: {
139154
http: {
140-
secure: options?.httpSecure || false,
141-
host: options?.httpHost || 'localhost',
142-
path: options?.httpPath || '',
143-
port: options?.httpPort || 8080,
155+
secure: httpSecure || false,
156+
host: httpHost || 'localhost',
157+
path: httpPath || '',
158+
port: httpPort || 8080,
144159
},
145160
grpc: {
146-
secure: options?.grpcSecure || false,
147-
host: options?.grpcHost || 'localhost',
148-
port: options?.grpcPort || 50051,
161+
secure: grpcSecure || false,
162+
host: grpcHost || 'localhost',
163+
port: grpcPort || 50051,
149164
},
150165
},
151-
auth: options?.authCredentials,
152-
headers: options?.headers,
153-
proxies: options?.proxies,
166+
auth,
167+
...rest,
154168
}).catch((e) => {
155169
throw new WeaviateStartUpError(`Weaviate failed to startup with message: ${e.message}`);
156170
});
157171
}
158172

159-
function addWeaviateEmbeddingServiceHeaders(clusterURL: string, options?: ConnectToWeaviateCloudOptions) {
160-
const creds = options?.authCredentials;
161-
173+
function addWeaviateEmbeddingServiceHeaders(
174+
clusterURL: string,
175+
creds?: AuthCredentials,
176+
headers?: Record<string, string>
177+
) {
162178
if (!isApiKey(creds)) {
163-
return options?.headers;
179+
return headers;
164180
}
165181

166182
return {
167-
...options?.headers,
183+
...headers,
168184
'X-Weaviate-Api-Key': mapApiKey(creds).apiKey,
169185
'X-Weaviate-Cluster-Url': clusterURL,
170186
};

src/connection/unit.test.ts

+130
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,6 @@
1+
import express from 'express';
2+
import { Server as HttpServer } from 'http';
3+
14
import { testServer } from '../../test/server.js';
25
import {
36
ApiKey,
@@ -7,6 +10,23 @@ import {
710
} from './auth.js';
811
import Connection from './index.js';
912

13+
import { createServer, Server as GrpcServer } from 'nice-grpc';
14+
import {
15+
HealthCheckRequest,
16+
HealthCheckResponse,
17+
HealthCheckResponse_ServingStatus,
18+
HealthDefinition,
19+
HealthServiceImplementation,
20+
} from '../proto/google/health/v1/health';
21+
import { TenantsGetReply } from '../proto/v1/tenants';
22+
import { WeaviateDefinition, WeaviateServiceImplementation } from '../proto/v1/weaviate';
23+
24+
import { WeaviateRequestTimeoutError } from '../errors.js';
25+
import weaviate, { Collection, WeaviateClient } from '../index';
26+
import { BatchObjectsReply } from '../proto/v1/batch.js';
27+
import { BatchDeleteReply } from '../proto/v1/batch_delete.js';
28+
import { SearchReply } from '../proto/v1/search_get.js';
29+
1030
describe('mock server auth tests', () => {
1131
const server = testServer();
1232
describe('OIDC auth flows', () => {
@@ -197,3 +217,113 @@ describe('mock server auth tests', () => {
197217
return server.close();
198218
});
199219
});
220+
221+
const COLLECTION_NAME = 'TestCollectionTimeouts';
222+
223+
const makeRestApp = (version: string) => {
224+
const httpApp = express();
225+
httpApp.get(`/v1/schema/${COLLECTION_NAME}`, (req, res) =>
226+
new Promise((r) => setTimeout(r, 2000)).then(() => res.send({ class: COLLECTION_NAME }))
227+
);
228+
httpApp.get('/v1/meta', (req, res) => res.send({ version }));
229+
return httpApp;
230+
};
231+
232+
const makeGrpcApp = () => {
233+
const weaviateMockImpl: WeaviateServiceImplementation = {
234+
tenantsGet: (): Promise<TenantsGetReply> =>
235+
new Promise((r) => {
236+
setTimeout(r, 2000);
237+
}).then(() => {
238+
return {
239+
took: 5000,
240+
tenants: [],
241+
};
242+
}),
243+
search: (): Promise<SearchReply> =>
244+
new Promise((r) => {
245+
setTimeout(r, 2000);
246+
}).then(() => {
247+
return {
248+
results: [],
249+
took: 5000,
250+
groupByResults: [],
251+
};
252+
}),
253+
batchDelete: (): Promise<BatchDeleteReply> =>
254+
new Promise((r) => {
255+
setTimeout(r, 2000);
256+
}).then(() => {
257+
return {
258+
took: 5000,
259+
status: 'SUCCESS',
260+
failed: 0,
261+
matches: 0,
262+
successful: 0,
263+
objects: [],
264+
};
265+
}),
266+
batchObjects: (): Promise<BatchObjectsReply> =>
267+
new Promise((r) => {
268+
setTimeout(r, 2000);
269+
}).then(() => {
270+
return {
271+
took: 5000,
272+
errors: [],
273+
};
274+
}),
275+
};
276+
const healthMockImpl: HealthServiceImplementation = {
277+
check: (request: HealthCheckRequest): Promise<HealthCheckResponse> =>
278+
Promise.resolve(HealthCheckResponse.create({ status: HealthCheckResponse_ServingStatus.SERVING })),
279+
watch: jest.fn(),
280+
};
281+
282+
const grpcApp = createServer();
283+
grpcApp.add(WeaviateDefinition, weaviateMockImpl);
284+
grpcApp.add(HealthDefinition, healthMockImpl);
285+
286+
return grpcApp;
287+
};
288+
289+
const makeMockServers = async (weaviateVersion: string, httpPort: number, grpcAddress: string) => {
290+
const rest = makeRestApp(weaviateVersion);
291+
const grpc = makeGrpcApp();
292+
const server = await rest.listen(httpPort);
293+
await grpc.listen(grpcAddress);
294+
return { rest: server, grpc, express };
295+
};
296+
297+
describe('Mock testing of timeout behaviour', () => {
298+
let servers: {
299+
rest: HttpServer;
300+
grpc: GrpcServer;
301+
};
302+
let client: WeaviateClient;
303+
let collection: Collection;
304+
305+
beforeAll(async () => {
306+
servers = await makeMockServers('1.28.2', 8954, 'localhost:8955');
307+
client = await weaviate.connectToLocal({ port: 8954, grpcPort: 8955, timeout: { query: 1, insert: 1 } });
308+
collection = client.collections.get(COLLECTION_NAME);
309+
});
310+
311+
it('should timeout when calling REST GET v1/schema', () =>
312+
expect(collection.config.get()).rejects.toThrow(WeaviateRequestTimeoutError));
313+
314+
it('should timeout when calling gRPC TenantsGet', () =>
315+
expect(collection.tenants.get()).rejects.toThrow(WeaviateRequestTimeoutError));
316+
317+
it('should timeout when calling gRPC Search', () =>
318+
expect(collection.query.fetchObjects()).rejects.toThrow(WeaviateRequestTimeoutError));
319+
320+
it('should timeout when calling gRPC BatchObjects', () =>
321+
expect(collection.data.insertMany([{ thing: 'what' }])).rejects.toThrow(WeaviateRequestTimeoutError));
322+
323+
it('should timeout when calling gRPC BatchDelete', () =>
324+
expect(collection.data.deleteMany(collection.filter.byId().equal('123' as any))).rejects.toThrow(
325+
WeaviateRequestTimeoutError
326+
));
327+
328+
afterAll(() => Promise.all([servers.rest.close(), servers.grpc.shutdown()]));
329+
});

src/errors.ts

+9
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,15 @@ export class WeaviateDeleteManyError extends WeaviateError {
3737
}
3838
}
3939

40+
/**
41+
* Is thrown if a gRPC tenants get to Weaviate fails in any way.
42+
*/
43+
export class WeaviateTenantsGetError extends WeaviateError {
44+
constructor(message: string) {
45+
super(`Tenants get failed with message: ${message}`);
46+
}
47+
}
48+
4049
/**
4150
* Is thrown if a gRPC batch query to Weaviate fails in any way.
4251
*/

src/grpc/base.ts

+1-10
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,7 @@
1-
import { isAbortError } from 'abort-controller-x';
21
import { ConsistencyLevel } from '../data/index.js';
32

43
import { Metadata } from 'nice-grpc';
54
import { RetryOptions } from 'nice-grpc-client-middleware-retry';
6-
import { WeaviateRequestTimeoutError } from '../errors.js';
75
import { ConsistencyLevel as ConsistencyLevelGRPC } from '../proto/v1/base.js';
86
import { WeaviateClient } from '../proto/v1/weaviate.js';
97

@@ -47,13 +45,6 @@ export default class Base {
4745
protected sendWithTimeout = <T>(send: (signal: AbortSignal) => Promise<T>): Promise<T> => {
4846
const controller = new AbortController();
4947
const timeoutId = setTimeout(() => controller.abort(), this.timeout * 1000);
50-
return send(controller.signal)
51-
.catch((error) => {
52-
if (isAbortError(error)) {
53-
throw new WeaviateRequestTimeoutError(`timed out after ${this.timeout}ms`);
54-
}
55-
throw error;
56-
})
57-
.finally(() => clearTimeout(timeoutId));
48+
return send(controller.signal).finally(() => clearTimeout(timeoutId));
5849
};
5950
}

src/grpc/batcher.ts

+8
Original file line numberDiff line numberDiff line change
@@ -10,11 +10,13 @@ import {
1010
WeaviateBatchError,
1111
WeaviateDeleteManyError,
1212
WeaviateInsufficientPermissionsError,
13+
WeaviateRequestTimeoutError,
1314
} from '../errors.js';
1415
import { Filters } from '../proto/v1/base.js';
1516
import { BatchDeleteReply, BatchDeleteRequest } from '../proto/v1/batch_delete.js';
1617
import Base from './base.js';
1718

19+
import { isAbortError } from 'abort-controller-x';
1820
import { retryOptions } from './retry.js';
1921

2022
export interface Batch {
@@ -65,6 +67,9 @@ export default class Batcher extends Base implements Batch {
6567
if (err instanceof ServerError && err.code === Status.PERMISSION_DENIED) {
6668
throw new WeaviateInsufficientPermissionsError(7, err.message);
6769
}
70+
if (isAbortError(err)) {
71+
throw new WeaviateRequestTimeoutError(`timed out after ${this.timeout}ms`);
72+
}
6873
throw new WeaviateDeleteManyError(err.message);
6974
});
7075
}
@@ -87,6 +92,9 @@ export default class Batcher extends Base implements Batch {
8792
if (err instanceof ServerError && err.code === Status.PERMISSION_DENIED) {
8893
throw new WeaviateInsufficientPermissionsError(7, err.message);
8994
}
95+
if (isAbortError(err)) {
96+
throw new WeaviateRequestTimeoutError(`timed out after ${this.timeout}ms`);
97+
}
9098
throw new WeaviateBatchError(err.message);
9199
})
92100
);

src/grpc/searcher.ts

+9-1
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,13 @@ import {
2424
} from '../proto/v1/search_get.js';
2525
import { WeaviateClient } from '../proto/v1/weaviate.js';
2626

27+
import { isAbortError } from 'abort-controller-x';
2728
import { RetryOptions } from 'nice-grpc-client-middleware-retry';
28-
import { WeaviateInsufficientPermissionsError, WeaviateQueryError } from '../errors.js';
29+
import {
30+
WeaviateInsufficientPermissionsError,
31+
WeaviateQueryError,
32+
WeaviateRequestTimeoutError,
33+
} from '../errors.js';
2934
import { GenerativeSearch } from '../proto/v1/generative.js';
3035
import Base from './base.js';
3136
import { retryOptions } from './retry.js';
@@ -160,6 +165,9 @@ export default class Searcher extends Base implements Search {
160165
if (err instanceof ServerError && err.code === Status.PERMISSION_DENIED) {
161166
throw new WeaviateInsufficientPermissionsError(7, err.message);
162167
}
168+
if (isAbortError(err)) {
169+
throw new WeaviateRequestTimeoutError(`timed out after ${this.timeout}ms`);
170+
}
163171
throw new WeaviateQueryError(err.message, 'gRPC');
164172
})
165173
);

src/grpc/tenantsManager.ts

+10-2
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,11 @@
1+
import { isAbortError } from 'abort-controller-x';
12
import { Metadata, ServerError, Status } from 'nice-grpc';
23
import { RetryOptions } from 'nice-grpc-client-middleware-retry';
3-
import { WeaviateDeleteManyError, WeaviateInsufficientPermissionsError } from '../errors.js';
4+
import {
5+
WeaviateInsufficientPermissionsError,
6+
WeaviateRequestTimeoutError,
7+
WeaviateTenantsGetError,
8+
} from '../errors.js';
49
import { TenantsGetReply, TenantsGetRequest } from '../proto/v1/tenants.js';
510
import { WeaviateClient } from '../proto/v1/weaviate.js';
611
import Base from './base.js';
@@ -45,7 +50,10 @@ export default class TenantsManager extends Base implements Tenants {
4550
if (err instanceof ServerError && err.code === Status.PERMISSION_DENIED) {
4651
throw new WeaviateInsufficientPermissionsError(7, err.message);
4752
}
48-
throw new WeaviateDeleteManyError(err.message);
53+
if (isAbortError(err)) {
54+
throw new WeaviateRequestTimeoutError(`timed out after ${this.timeout}ms`);
55+
}
56+
throw new WeaviateTenantsGetError(err.message);
4957
})
5058
);
5159
}

0 commit comments

Comments
 (0)