All Data Structures Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
as_event.h
Go to the documentation of this file.
1 /*
2  * Copyright 2008-2022 Aerospike, Inc.
3  *
4  * Portions may be licensed to Aerospike, Inc. under one or more contributor
5  * license agreements.
6  *
7  * Licensed under the Apache License, Version 2.0 (the "License"); you may not
8  * use this file except in compliance with the License. You may obtain a copy of
9  * the License at http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13  * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14  * License for the specific language governing permissions and limitations under
15  * the License.
16  */
17 #pragma once
18 
19 #include <aerospike/as_error.h>
20 #include <aerospike/as_queue.h>
21 #include <pthread.h>
22 
23 /**
24  * @defgroup async_events Event Framework Abstraction
25  *
26  * Generic asynchronous events abstraction. Designed to support multiple event libraries.
27  * Only one library is supported per build.
28  */
29 #if defined(AS_USE_LIBEV) || defined(AS_USE_LIBUV) || defined(AS_USE_LIBEVENT)
30 #define AS_EVENT_LIB_DEFINED 1
31 #endif
32 
33 #if defined(AS_USE_LIBEV)
34 #include <ev.h>
35 #elif defined(AS_USE_LIBUV)
36 #include <uv.h>
37 #elif defined(AS_USE_LIBEVENT)
38 #include <event2/event_struct.h>
39 #include <aerospike/as_vector.h>
40 #else
41 #endif
42 
43 #ifdef __cplusplus
44 extern "C" {
45 #endif
46 
47 /******************************************************************************
48  * TYPES
49  *****************************************************************************/
50 
51 /**
52  * Asynchronous event loop configuration.
53  *
54  * @ingroup async_events
55  */
56 typedef struct as_policy_event {
57  /**
58  * Maximum number of async commands that can be processed in each event loop at any point in
59  * time. Each executing non-pipeline async command requires a socket connection. Consuming too
60  * many sockets can negatively affect application reliability and performance. If the user does
61  * not limit async command count in their application, this field should be used to enforce a
62  * limit internally in the client.
63  *
64  * If this limit is reached, the next async command will be placed on the event loop's delay
65  * queue for later execution. If this limit is zero, all async commands will be executed
66  * immediately and the delay queue will not be used.
67  *
68  * If defined, a reasonable value is 40. The optimal value will depend on cpu count, cpu speed,
69  * network bandwitdh and the number of event loops employed.
70  *
71  * Default: 0 (execute all async commands immediately)
72  */
74 
75  /**
76  * Maximum number of async commands that can be stored in each event loop's delay queue for
77  * later execution. Queued commands consume memory, but they do not consume sockets. This
78  * limit should be defined when it's possible that the application executes so many async
79  * commands that memory could be exhausted.
80  *
81  * If this limit is reached, the next async command will be rejected with error code
82  * AEROSPIKE_ERR_ASYNC_QUEUE_FULL. If this limit is zero, all async commands will be accepted
83  * into the delay queue.
84  *
85  * The optimal value will depend on your application's magnitude of command bursts and the
86  * amount of memory available to store commands.
87  *
88  * Default: 0 (no delay queue limit)
89  */
91 
92  /**
93  * Initial capacity of each event loop's delay queue. The delay queue can resize beyond this
94  * initial capacity.
95  *
96  * Default: 256 (if delay queue is used)
97  */
100 
101 /**
102  * Generic asynchronous event loop abstraction. There is one event loop per thread.
103  * Event loops can be created by the client, or be referenced to externally created event loops.
104  *
105  * @ingroup async_events
106  */
107 typedef struct as_event_loop {
108 #if defined(AS_USE_LIBEV)
109  struct ev_loop* loop;
110  struct ev_async wakeup;
111 #elif defined(AS_USE_LIBUV)
112  uv_loop_t* loop;
113  uv_async_t* wakeup;
114 #elif defined(AS_USE_LIBEVENT)
115  struct event_base* loop;
116  struct event wakeup;
117  struct event trim;
118  as_vector clusters;
119 #else
120  void* loop;
121 #endif
122 
124  pthread_mutex_t lock;
128  pthread_t thread;
129  uint32_t index;
132  int pending;
133  // Count of consecutive errors occurring before event loop registration.
134  // Used to prevent deep recursion.
135  uint32_t errors;
138 } as_event_loop;
139 
140 /******************************************************************************
141  * GLOBAL VARIABLES
142  *****************************************************************************/
143 
146 AS_EXTERN extern uint32_t as_event_loop_size;
148 
149 /******************************************************************************
150  * PUBLIC FUNCTIONS
151  *****************************************************************************/
152 
153 /**
154  * Initialize event loop configuration variables.
155  *
156  * @ingroup async_events
157  */
158 static inline void
160 {
161  policy->max_commands_in_process = 0;
162  policy->max_commands_in_queue = 0;
163  policy->queue_initial_capacity = 256;
164 }
165 
166 /**
167  * Create new event loops with default event policy.
168  *
169  * This method should only be called when async client commands will be used and the calling program
170  * itself is not async. If this method is used, it must be called before aerospike_connect().
171  *
172  * @param capacity Number of event loops to create.
173  * @return Event loop array.
174  *
175  * @ingroup async_events
176  */
178 as_event_create_loops(uint32_t capacity);
179 
180 /**
181  * Create new event loops with specified event policy.
182  *
183  * This method should only be called when async client commands will be used and the calling program
184  * itself is not async. If this method is used, it must be called before aerospike_connect().
185  *
186  * @param err The as_error to be populated if an error occurs.
187  * @param policy Event loop configuration. Pass in NULL for default configuration.
188  * @param capacity Number of event loops to create.
189  * @param event_loops Created event loops. Pass in NULL if event loops do not need to be retrieved.
190  * @return AEROSPIKE_OK If successful. Otherwise an error.
191  *
192  * @ingroup async_events
193  */
195 as_create_event_loops(as_error* err, as_policy_event* policy, uint32_t capacity, as_event_loop** event_loops);
196 
197 /**
198  * Set the number of externally created event loops. This method should be called when the
199  * calling program wants to share event loops with the client. This reduces resource usage and
200  * can increase performance.
201  *
202  * This method is used in conjunction with as_event_set_external_loop() to fully define the
203  * the external loop to the client and obtain a reference the client's event loop abstraction.
204  *
205  * ~~~~~~~~~~{.c}
206  * struct {
207  * pthread_t thread;
208  * struct ev_loop* loop;
209  * as_event_loop* as_loop;
210  * } my_loop;
211  *
212  * static void* my_loop_worker_thread(void* udata)
213  * {
214  * struct my_loop* myloop = udata;
215  * myloop->loop = ev_loop_new(EVFLAG_AUTO);
216  * myloop->as_loop = as_event_set_external_loop(myloop->loop);
217  * ev_loop(myloop->loop, 0);
218  * ev_loop_destroy(myloop->loop);
219  * return NULL;
220  * }
221  *
222  * int capacity = 8;
223  * struct my_loop* loops = malloc(sizeof(struct my_loop) * capacity);
224  * as_event_set_external_loop_capacity(capacity);
225  *
226  * for (int i = 0; i < capacity; i++) {
227  * struct my_loop* myloop = &loops[i];
228  * return pthread_create(&myloop->thread, NULL, my_loop_worker_thread, myloop) == 0;
229  * }
230  * ~~~~~~~~~~
231  *
232  * @param capacity Number of externally created event loops.
233  * @return True if all external loops were initialized.
234  *
235  * @ingroup async_events
236  */
237 AS_EXTERN bool
238 as_event_set_external_loop_capacity(uint32_t capacity);
239 
240 /**
241  * Register an external event loop with the client with default event policy.
242  *
243  * This method should be called when the calling program wants to share event loops with the client.
244  * This reduces resource usage and can increase performance.
245  *
246  * This method must be called in the same thread as the event loop that is being registered.
247  *
248  * This method is used in conjunction with as_event_set_external_loop_capacity() to fully define
249  * the external loop to the client and obtain a reference the client's event loop abstraction.
250  *
251  * ~~~~~~~~~~{.c}
252  * struct {
253  * pthread_t thread;
254  * struct ev_loop* loop;
255  * as_event_loop* as_loop;
256  * } my_loop;
257  *
258  * static void* my_loop_worker_thread(void* udata)
259  * {
260  * struct my_loop* myloop = udata;
261  * myloop->loop = ev_loop_new(EVFLAG_AUTO);
262  * myloop->as_loop = as_event_set_external_loop(myloop->loop);
263  * ev_loop(myloop->loop, 0);
264  * ev_loop_destroy(myloop->loop);
265  * return NULL;
266  * }
267  *
268  * int capacity = 8;
269  * struct my_loop* loops = malloc(sizeof(struct my_loop) * capacity);
270  * as_event_set_external_loop_capacity(capacity);
271  *
272  * for (int i = 0; i < capacity; i++) {
273  * struct my_loop* myloop = &loops[i];
274  * return pthread_create(&myloop->thread, NULL, my_loop_worker_thread, myloop) == 0;
275  * }
276  * ~~~~~~~~~~
277  *
278  * @param loop External event loop.
279  * @return Client's generic event loop abstraction that is used in client async commands.
280  * Returns NULL if external loop capacity would be exceeded.
281  *
282  * @ingroup async_events
283  */
286 
287 /**
288  * Register an external event loop with the client with specified event policy.
289  *
290  * This method should be called when the calling program wants to share event loops with the client.
291  * This reduces resource usage and can increase performance.
292  *
293  * This method must be called in the same thread as the event loop that is being registered.
294  *
295  * This method is used in conjunction with as_event_set_external_loop_capacity() to fully define
296  * the external loop to the client and obtain a reference the client's event loop abstraction.
297  *
298  * ~~~~~~~~~~{.c}
299  * struct {
300  * pthread_t thread;
301  * struct ev_loop* loop;
302  * as_event_loop* as_loop;
303  * } my_loop;
304  *
305  * static void* my_loop_worker_thread(void* udata)
306  * {
307  * struct my_loop* myloop = udata;
308  * myloop->loop = ev_loop_new(EVFLAG_AUTO);
309  *
310  * as_policy_event policy;
311  * as_policy_event_init(&policy);
312  * policy.max_commands_in_process = 30;
313  *
314  * as_error err;
315  * if (as_set_external_event_loop(&err, &policy, myloop->loop, &myloop->as_loop) != AEROSPIKE_OK) {
316  * printf("Failed to set event loop: %d %s\n, err.code, err.message);
317  * return NULL;
318  * }
319  * myloop->as_loop = as_event_set_external_loop(myloop->loop);
320  * ev_loop(myloop->loop, 0);
321  * ev_loop_destroy(myloop->loop);
322  * return NULL;
323  * }
324  *
325  * int capacity = 8;
326  * struct my_loop* loops = malloc(sizeof(struct my_loop) * capacity);
327  * as_event_set_external_loop_capacity(capacity);
328  *
329  * for (int i = 0; i < capacity; i++) {
330  * struct my_loop* myloop = &loops[i];
331  * return pthread_create(&myloop->thread, NULL, my_loop_worker_thread, myloop) == 0;
332  * }
333  * ~~~~~~~~~~
334  *
335  * @param err The as_error to be populated if an error occurs.
336  * @param policy Event loop configuration. Pass in NULL for default configuration.
337  * @param loop External event loop.
338  * @param event_loop Created event loop.
339  * @return AEROSPIKE_OK If successful. Otherwise an error.
340  *
341  * @ingroup async_events
342  */
344 as_set_external_event_loop(as_error* err, as_policy_event* policy, void* loop, as_event_loop** event_loop);
345 
346 /**
347  * Find client's event loop abstraction given the external event loop.
348  *
349  * @param loop External event loop.
350  * @return Client's generic event loop abstraction that is used in client async commands.
351  * Returns NULL if loop not found.
352  *
353  * @ingroup async_events
354  */
356 as_event_loop_find(void* loop);
357 
358 /**
359  * Retrieve event loop by array index.
360  *
361  * @param index Event loop array index.
362  * @return Client's generic event loop abstraction that is used in client async commands.
363  *
364  * @ingroup async_events
365  */
366 static inline as_event_loop*
368 {
369  return index < as_event_loop_size ? &as_event_loops[index] : NULL;
370 }
371 
372 /**
373  * Retrieve a random event loop using round robin distribution.
374  *
375  * @return Client's generic event loop abstraction that is used in client async commands.
376  *
377  * @ingroup async_events
378  */
379 static inline as_event_loop*
381 {
382  // The last event loop points to the first event loop to create a circular linked list.
383  // Not atomic because doesn't need to be exactly accurate.
384  as_event_loop* event_loop = as_event_loop_current;
385  as_event_loop_current = event_loop->next;
386  return event_loop;
387 }
388 
389 /**
390  * Return the approximate number of commands currently being processed on
391  * the event loop. The value is approximate because the call may be from a
392  * different thread than the event loop’s thread and there are no locks or
393  * atomics used.
394  *
395  * @ingroup async_events
396  */
397 static inline int
399 {
400  return event_loop->pending;
401 }
402 
403 /**
404  * Return the approximate number of commands stored on this event loop's
405  * delay queue that have not been started yet. The value is approximate
406  * because the call may be from a different thread than the event loop’s
407  * thread and there are no locks or atomics used.
408  *
409  * @ingroup async_events
410  */
411 static inline uint32_t
413 {
414  return as_queue_size(&event_loop->delay_queue);
415 }
416 
417 /**
418  * Close internal event loops and release watchers for internal and external event loops.
419  * The global event loop array will also be destroyed for internal event loops.
420  *
421  * This method should be called once on program shutdown if as_event_create_loops() or
422  * as_event_set_external_loop_capacity() was called.
423  *
424  * The shutdown sequence is slightly different for internal and external event loops.
425  *
426  * Internal:
427  * ~~~~~~~~~~{.c}
428  * as_event_close_loops();
429  * ~~~~~~~~~~
430  *
431  * External:
432  * ~~~~~~~~~~{.c}
433  * as_event_close_loops();
434  * Join on external loop threads.
435  * as_event_destroy_loops();
436  * ~~~~~~~~~~
437  *
438  * @return True if event loop close was successful. If false, as_event_destroy_loops() should
439  * not be called.
440  *
441  * @ingroup async_events
442  */
443 AS_EXTERN bool
445 
446 /**
447  * Close internal event loop and release internal/external event loop watchers.
448  * This optional method can be used instead of as_event_close_loops().
449  * If used, must be called from event loop's thread.
450  */
451 AS_EXTERN void
453 
454 /**
455  * Destroy global event loop array. This function only needs to be called for external
456  * event loops.
457  *
458  * @ingroup async_events
459  */
460 AS_EXTERN void
462 
463 /******************************************************************************
464  * LIBEVENT SINGLE THREAD MODE FUNCTIONS
465  *****************************************************************************/
466 
467 #if defined(AS_USE_LIBEVENT)
468 struct aerospike_s;
469 
470 /**
471  * Event loop close aerospike listener
472  *
473  * @ingroup async_events
474  */
475 typedef void (*as_event_close_listener) (void* udata);
476 
477 /**
478  * Set flag to signify that all async commands will be created in their associated event loop thread.
479  * If enabled, the client can remove locks associated with sending async commands to the event loop.
480  * This flag is only referenced when running the client with the libevent framework.
481  *
482  * By default, async single thread mode is false.
483  *
484  * @ingroup async_events
485  */
486 static inline void
487 as_event_set_single_thread(bool single_thread)
488 {
489  as_event_single_thread = single_thread;
490 }
491 
492 /**
493  * Register aerospike instance with event loop.
494  * Should only be called in libevent single-thread mode.
495  * The call must occur in the event loop's thread.
496  *
497  * @ingroup async_events
498  */
499 AS_EXTERN void
500 as_event_loop_register_aerospike(as_event_loop* event_loop, struct aerospike_s* as);
501 
502 /**
503  * Unregister and free aerospike instance resources associated with event loop.
504  * Should only be called in libevent single-thread mode.
505  * The call must occur in the event loop's thread.
506  *
507  * Listener is called when all aerospike instance async commands have completed
508  * on this event loop. Do not call aerospike_close() until listeners return on all
509  * event loops.
510  *
511  * @ingroup async_events
512  */
513 AS_EXTERN void
514 as_event_loop_close_aerospike(
515  as_event_loop* event_loop, struct aerospike_s* as, as_event_close_listener listener, void* udata
516  );
517 
518 #endif
519 
520 #ifdef __cplusplus
521 } // end extern "C"
522 #endif
uint32_t errors
Definition: as_event.h:135
AS_EXTERN bool as_event_single_thread
as_queue queue
Definition: as_event.h:125
as_queue delay_queue
Definition: as_event.h:126
AS_EXTERN bool as_event_set_external_loop_capacity(uint32_t capacity)
AS_EXTERN bool as_event_close_loops(void)
AS_EXTERN as_status as_set_external_event_loop(as_error *err, as_policy_event *policy, void *loop, as_event_loop **event_loop)
static uint32_t as_event_loop_get_queue_size(as_event_loop *event_loop)
Definition: as_event.h:412
AS_EXTERN uint32_t as_event_loop_size
as_status
Definition: as_status.h:30
AS_EXTERN void as_event_close_loop(as_event_loop *event_loop)
pthread_mutex_t lock
Definition: as_event.h:124
AS_EXTERN void as_event_destroy_loops(void)
struct as_event_loop * next
Definition: as_event.h:123
int max_commands_in_process
Definition: as_event.h:131
pthread_t thread
Definition: as_event.h:128
as_queue pipe_cb_queue
Definition: as_event.h:127
uint32_t max_commands_in_queue
Definition: as_event.h:90
uint32_t max_commands_in_queue
Definition: as_event.h:130
#define AS_EXTERN
Definition: as_std.h:25
bool pipe_cb_calling
Definition: as_event.h:137
uint32_t queue_initial_capacity
Definition: as_event.h:98
void * loop
Definition: as_event.h:120
bool using_delay_queue
Definition: as_event.h:136
AS_EXTERN as_status as_create_event_loops(as_error *err, as_policy_event *policy, uint32_t capacity, as_event_loop **event_loops)
static int as_event_loop_get_process_size(as_event_loop *event_loop)
Definition: as_event.h:398
AS_EXTERN as_event_loop * as_event_loops
static uint32_t as_queue_size(as_queue *queue)
Definition: as_queue.h:114
static void as_policy_event_init(as_policy_event *policy)
Definition: as_event.h:159
uint32_t index
Definition: as_event.h:129
AS_EXTERN as_event_loop * as_event_create_loops(uint32_t capacity)
static as_event_loop * as_event_loop_get()
Definition: as_event.h:380
AS_EXTERN as_event_loop * as_event_set_external_loop(void *loop)
AS_EXTERN as_event_loop * as_event_loop_current
static as_event_loop * as_event_loop_get_by_index(uint32_t index)
Definition: as_event.h:367
AS_EXTERN as_event_loop * as_event_loop_find(void *loop)
int max_commands_in_process
Definition: as_event.h:73