35
36:- module(thread_pool,
37 [ thread_pool_create/3, 38 thread_pool_destroy/1, 39 thread_create_in_pool/4, 40
41 current_thread_pool/1, 42 thread_pool_property/2 43 ]). 44:- use_module(library(error)). 45:- use_module(library(lists)). 46:- use_module(library(option)). 47:- use_module(library(rbtrees)). 48:- use_module(library(debug)). 49
50
84
85:- meta_predicate
86 thread_create_in_pool(+, 0, -, :). 87:- predicate_options(thread_create_in_pool/4, 4,
88 [ wait(boolean),
89 pass_to(system:thread_create/3, 3)
90 ]). 91
92:- multifile
93 create_pool/1. 94
115
116thread_pool_create(Name, Size, Options) :-
117 must_be(list, Options),
118 pool_manager(Manager),
119 thread_self(Me),
120 thread_send_message(Manager, create_pool(Name, Size, Options, Me)),
121 wait_reply.
122
128
129thread_pool_destroy(Name) :-
130 pool_manager(Manager),
131 thread_self(Me),
132 thread_send_message(Manager, destroy_pool(Name, Me)),
133 wait_reply.
134
135
139
140current_thread_pool(Name) :-
141 pool_manager(Manager),
142 thread_self(Me),
143 thread_send_message(Manager, current_pools(Me)),
144 wait_reply(Pools),
145 ( atom(Name)
146 -> memberchk(Name, Pools)
147 ; member(Name, Pools)
148 ).
149
167
168thread_pool_property(Name, Property) :-
169 current_thread_pool(Name),
170 pool_manager(Manager),
171 thread_self(Me),
172 thread_send_message(Manager, pool_properties(Me, Name, Property)),
173 wait_reply(Props),
174 ( nonvar(Property)
175 -> memberchk(Property, Props)
176 ; member(Property, Props)
177 ).
178
179
195
196thread_create_in_pool(Pool, Goal, Id, QOptions) :-
197 meta_options(is_meta, QOptions, Options),
198 catch(thread_create_in_pool_(Pool, Goal, Id, Options),
199 Error, true),
200 ( var(Error)
201 -> true
202 ; Error = error(existence_error(thread_pool, Pool), _),
203 create_pool_lazily(Pool)
204 -> thread_create_in_pool_(Pool, Goal, Id, Options)
205 ; throw(Error)
206 ).
207
208thread_create_in_pool_(Pool, Goal, Id, Options) :-
209 select_option(wait(Wait), Options, ThreadOptions, true),
210 pool_manager(Manager),
211 thread_self(Me),
212 thread_send_message(Manager,
213 create(Pool, Goal, Me, Wait, Id, ThreadOptions)),
214 wait_reply(Id).
215
216is_meta(at_exit).
217
218
222
223create_pool_lazily(Pool) :-
224 with_mutex(Pool,
225 ( mutex_destroy(Pool),
226 create_pool_sync(Pool)
227 )).
228
229create_pool_sync(Pool) :-
230 current_thread_pool(Pool),
231 !.
232create_pool_sync(Pool) :-
233 create_pool(Pool).
234
235
236 239
244
245pool_manager(TID) :-
246 TID = '__thread_pool_manager',
247 ( thread_running(TID)
248 -> true
249 ; with_mutex('__thread_pool', create_pool_manager(TID))
250 ).
251
252thread_running(Thread) :-
253 catch(thread_property(Thread, status(Status)),
254 E, true),
255 ( var(E)
256 -> ( Status == running
257 -> true
258 ; thread_join(Thread, _),
259 print_message(warning, thread_pool(manager_died(Status))),
260 fail
261 )
262 ; E = error(existence_error(thread, Thread), _)
263 -> fail
264 ; throw(E)
265 ).
266
267create_pool_manager(Thread) :-
268 thread_running(Thread),
269 !.
270create_pool_manager(Thread) :-
271 thread_create(pool_manager_main, _,
272 [ alias(Thread),
273 inherit_from(main)
274 ]).
275
276
277pool_manager_main :-
278 rb_new(State0),
279 manage_thread_pool(State0).
280
281
282 285
287
288manage_thread_pool(State0) :-
289 thread_get_message(Message),
290 ( update_thread_pool(Message, State0, State)
291 -> debug(thread_pool(state), 'Message ~p --> ~p', [Message, State]),
292 manage_thread_pool(State)
293 ; format(user_error, 'Update failed: ~p~n', [Message])
294 ).
295
296
297update_thread_pool(create_pool(Name, Size, Options, For), State0, State) :-
298 !,
299 ( rb_insert_new(State0,
300 Name, tpool(Options, Size, Size, WP, WP, []),
301 State)
302 -> thread_send_message(For, thread_pool(true))
303 ; reply_error(For, permission_error(create, thread_pool, Name)),
304 State = State0
305 ).
306update_thread_pool(destroy_pool(Name, For), State0, State) :-
307 !,
308 ( rb_delete(State0, Name, State)
309 -> thread_send_message(For, thread_pool(true))
310 ; reply_error(For, existence_error(thread_pool, Name)),
311 State = State0
312 ).
313update_thread_pool(current_pools(For), State, State) :-
314 !,
315 rb_keys(State, Keys),
316 debug(thread_pool(current), 'Reply to ~w: ~p', [For, Keys]),
317 reply(For, Keys).
318update_thread_pool(pool_properties(For, Name, P), State, State) :-
319 !,
320 ( rb_lookup(Name, Pool, State)
321 -> findall(P, pool_property(P, Pool), List),
322 reply(For, List)
323 ; reply_error(For, existence_error(thread_pool, Name))
324 ).
325update_thread_pool(Message, State0, State) :-
326 arg(1, Message, Name),
327 ( rb_lookup(Name, Pool0, State0)
328 -> update_pool(Message, Pool0, Pool),
329 rb_update(State0, Name, Pool, State)
330 ; State = State0,
331 ( Message = create(Name, _, For, _, _, _)
332 -> reply_error(For, existence_error(thread_pool, Name))
333 ; true
334 )
335 ).
336
337pool_property(options(Options),
338 tpool(Options, _Free, _Size, _WP, _WPT, _Members)).
339pool_property(backlog(Size),
340 tpool(_, _Free, _Size, WP, WPT, _Members)) :-
341 diff_list_length(WP, WPT, Size).
342pool_property(free(Free),
343 tpool(_, Free, _Size, _, _, _)).
344pool_property(size(Size),
345 tpool(_, _Free, Size, _, _, _)).
346pool_property(running(Count),
347 tpool(_, Free, Size, _, _, _)) :-
348 Count is Size - Free.
349pool_property(members(IDList),
350 tpool(_, _, _, _, _, IDList)).
351
352diff_list_length(List, Tail, Size) :-
353 '$skip_list'(Length, List, Rest),
354 ( Rest == Tail
355 -> Size = Length
356 ; type_error(difference_list, List/Tail)
357 ).
358
359
373
374update_pool(create(Name, Goal, For, _, Id, MyOptions),
375 tpool(Options, Free0, Size, WP, WPT, Members0),
376 tpool(Options, Free, Size, WP, WPT, Members)) :-
377 succ(Free, Free0),
378 !,
379 merge_options(MyOptions, Options, ThreadOptions),
380 select_option(at_exit(AtExit), ThreadOptions, ThreadOptions1, true),
381 catch(thread_create(Goal, Id,
382 [ at_exit(worker_exitted(Name, Id, AtExit))
383 | ThreadOptions1
384 ]),
385 E, true),
386 ( var(E)
387 -> Members = [Id|Members0],
388 reply(For, Id)
389 ; reply_error(For, E),
390 Members = Members0
391 ).
392update_pool(Create,
393 tpool(Options, 0, Size, WP, WPT0, Members),
394 tpool(Options, 0, Size, WP, WPT, Members)) :-
395 Create = create(Name, _Goal, For, Wait, _, _Options),
396 !,
397 option(backlog(BackLog), Options, infinite),
398 ( can_delay(Wait, BackLog, WP, WPT0)
399 -> WPT0 = [Create|WPT],
400 debug(thread_pool, 'Delaying ~p', [Create])
401 ; WPT = WPT0,
402 reply_error(For, resource_error(threads_in_pool(Name)))
403 ).
404update_pool(exitted(_Name, Id),
405 tpool(Options, Free0, Size, WP0, WPT, Members0),
406 Pool) :-
407 succ(Free0, Free),
408 delete(Members0, Id, Members1),
409 Pool1 = tpool(Options, Free, Size, WP, WPT, Members1),
410 ( WP0 == WPT
411 -> WP = WP0,
412 Pool = Pool1
413 ; WP0 = [Waiting|WP],
414 debug(thread_pool, 'Start delayed ~p', [Waiting]),
415 update_pool(Waiting, Pool1, Pool)
416 ).
417
418
419can_delay(true, infinite, _, _) :- !.
420can_delay(true, BackLog, WP, WPT) :-
421 diff_list_length(WP, WPT, Size),
422 BackLog > Size.
423
431
432worker_exitted(Name, Id, AtExit) :-
433 catch(thread_send_message('__thread_pool_manager', exitted(Name, Id)),
434 _, true),
435 call(AtExit).
436
437
438 441
442reply(To, Term) :-
443 thread_send_message(To, thread_pool(true(Term))).
444
445reply_error(To, Error) :-
446 thread_send_message(To, thread_pool(error(Error, _))).
447
448wait_reply :-
449 thread_get_message(thread_pool(Result)),
450 ( Result == true
451 -> true
452 ; Result == fail
453 -> fail
454 ; throw(Result)
455 ).
456
457wait_reply(Value) :-
458 thread_get_message(thread_pool(Reply)),
459 ( Reply = true(Value0)
460 -> Value = Value0
461 ; Reply == fail
462 -> fail
463 ; throw(Reply)
464 ).
465
466
467 470
486
487 490:- multifile
491 prolog:message/3. 492
493prolog:message(thread_pool(Message)) -->
494 message(Message).
495
496message(manager_died(Status)) -->
497 [ 'Thread-pool: manager died on status ~p; restarting'-[Status] ]