corosync  2.4.2
lib/cpg.c
Go to the documentation of this file.
1 /*
2  * vi: set autoindent tabstop=4 shiftwidth=4 :
3  *
4  * Copyright (c) 2006-2015 Red Hat, Inc.
5  *
6  * All rights reserved.
7  *
8  * Author: Christine Caulfield (ccaulfi@redhat.com)
9  * Author: Jan Friesse (jfriesse@redhat.com)
10  *
11  * This software licensed under BSD license, the text of which follows:
12  *
13  * Redistribution and use in source and binary forms, with or without
14  * modification, are permitted provided that the following conditions are met:
15  *
16  * - Redistributions of source code must retain the above copyright notice,
17  * this list of conditions and the following disclaimer.
18  * - Redistributions in binary form must reproduce the above copyright notice,
19  * this list of conditions and the following disclaimer in the documentation
20  * and/or other materials provided with the distribution.
21  * - Neither the name of the MontaVista Software, Inc. nor the names of its
22  * contributors may be used to endorse or promote products derived from this
23  * software without specific prior written permission.
24  *
25  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
26  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
27  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
28  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
29  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
30  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
31  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
32  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
33  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
34  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
35  * THE POSSIBILITY OF SUCH DAMAGE.
36  */
37 /*
38  * Provides a closed process group API using the coroipcc executive
39  */
40 
41 #include <config.h>
42 
43 #include <stdlib.h>
44 #include <stdio.h>
45 #include <string.h>
46 #include <unistd.h>
47 #include <sys/types.h>
48 #include <sys/socket.h>
49 #include <sys/mman.h>
50 #include <sys/uio.h>
51 #include <sys/stat.h>
52 #include <errno.h>
53 #include <limits.h>
54 
55 #include <qb/qbdefs.h>
56 #include <qb/qbipcc.h>
57 #include <qb/qblog.h>
58 
59 #include <corosync/hdb.h>
60 #include <corosync/list.h>
61 #include <corosync/corotypes.h>
62 #include <corosync/corodefs.h>
63 #include <corosync/cpg.h>
64 #include <corosync/ipc_cpg.h>
65 
66 #include "util.h"
67 
68 #ifndef MAP_ANONYMOUS
69 #define MAP_ANONYMOUS MAP_ANON
70 #endif
71 
72 /*
73  * Maximum number of times to retry a send when transmitting
74  * a large message fragment
75  */
76 #define MAX_RETRIES 100
77 
78 /*
79  * ZCB files have following umask (umask is same as used in libqb)
80  */
81 #define CPG_MEMORY_MAP_UMASK 077
82 
83 struct cpg_inst {
84  qb_ipcc_connection_t *c;
85  int finalize;
86  void *context;
87  union {
90  };
92  uint32_t max_msg_size;
93  char *assembly_buf;
94  uint32_t assembly_buf_ptr;
95  int assembling; /* Flag that says we have started assembling a message.
96  * It's here to catch the situation where a node joins
97  * the cluster/group in the middle of a CPG message send
98  * so we don't pass on a partial message to the client.
99  */
100 };
101 static void cpg_inst_free (void *inst);
102 
103 DECLARE_HDB_DATABASE(cpg_handle_t_db, cpg_inst_free);
104 
107  qb_ipcc_connection_t *conn;
109  struct list_head list;
110 };
111 
112 DECLARE_HDB_DATABASE(cpg_iteration_handle_t_db,NULL);
113 
114 
115 /*
116  * Internal (not visible by API) functions
117  */
118 
119 static cs_error_t
120 coroipcc_msg_send_reply_receive (
121  qb_ipcc_connection_t *c,
122  const struct iovec *iov,
123  unsigned int iov_len,
124  void *res_msg,
125  size_t res_len)
126 {
127  return qb_to_cs_error(qb_ipcc_sendv_recv(c, iov, iov_len, res_msg, res_len,
129 }
130 
131 static void cpg_iteration_instance_finalize (struct cpg_iteration_instance_t *cpg_iteration_instance)
132 {
133  list_del (&cpg_iteration_instance->list);
134  hdb_handle_destroy (&cpg_iteration_handle_t_db, cpg_iteration_instance->cpg_iteration_handle);
135 }
136 
137 static void cpg_inst_free (void *inst)
138 {
139  struct cpg_inst *cpg_inst = (struct cpg_inst *)inst;
140  qb_ipcc_disconnect(cpg_inst->c);
141 }
142 
143 static void cpg_inst_finalize (struct cpg_inst *cpg_inst, hdb_handle_t handle)
144 {
145  struct list_head *iter, *iter_next;
147 
148  /*
149  * Traverse thru iteration instances and delete them
150  */
151  for (iter = cpg_inst->iteration_list_head.next; iter != &cpg_inst->iteration_list_head;iter = iter_next) {
152  iter_next = iter->next;
153 
154  cpg_iteration_instance = list_entry (iter, struct cpg_iteration_instance_t, list);
155 
156  cpg_iteration_instance_finalize (cpg_iteration_instance);
157  }
158  hdb_handle_destroy (&cpg_handle_t_db, handle);
159 }
160 
169  cpg_handle_t *handle,
170  cpg_callbacks_t *callbacks)
171 {
173 
174  memset (&model_v1_data, 0, sizeof (cpg_model_v1_data_t));
175 
176  if (callbacks) {
177  model_v1_data.cpg_deliver_fn = callbacks->cpg_deliver_fn;
178  model_v1_data.cpg_confchg_fn = callbacks->cpg_confchg_fn;
179  }
180 
181  return (cpg_model_initialize (handle, CPG_MODEL_V1, (cpg_model_data_t *)&model_v1_data, NULL));
182 }
183 
185  cpg_handle_t *handle,
186  cpg_model_t model,
188  void *context)
189 {
190  cs_error_t error;
191  struct cpg_inst *cpg_inst;
192 
193  if (model != CPG_MODEL_V1) {
194  error = CS_ERR_INVALID_PARAM;
195  goto error_no_destroy;
196  }
197 
198  error = hdb_error_to_cs (hdb_handle_create (&cpg_handle_t_db, sizeof (struct cpg_inst), handle));
199  if (error != CS_OK) {
200  goto error_no_destroy;
201  }
202 
203  error = hdb_error_to_cs (hdb_handle_get (&cpg_handle_t_db, *handle, (void *)&cpg_inst));
204  if (error != CS_OK) {
205  goto error_destroy;
206  }
207 
208  cpg_inst->c = qb_ipcc_connect ("cpg", IPC_REQUEST_SIZE);
209  if (cpg_inst->c == NULL) {
210  error = qb_to_cs_error(-errno);
211  goto error_put_destroy;
212  }
213 
214  if (model_data != NULL) {
215  switch (model) {
216  case CPG_MODEL_V1:
217  memcpy (&cpg_inst->model_v1_data, model_data, sizeof (cpg_model_v1_data_t));
218  if ((cpg_inst->model_v1_data.flags & ~(CPG_MODEL_V1_DELIVER_INITIAL_TOTEM_CONF)) != 0) {
219  error = CS_ERR_INVALID_PARAM;
220 
221  goto error_destroy;
222  }
223  break;
224  }
225  }
226 
227  /* Allow space for corosync internal headers */
228  cpg_inst->max_msg_size = IPC_REQUEST_SIZE - 1024;
229  cpg_inst->model_data.model = model;
230  cpg_inst->context = context;
231 
232  list_init(&cpg_inst->iteration_list_head);
233 
234  hdb_handle_put (&cpg_handle_t_db, *handle);
235 
236  return (CS_OK);
237 
238 error_put_destroy:
239  hdb_handle_put (&cpg_handle_t_db, *handle);
240 error_destroy:
241  hdb_handle_destroy (&cpg_handle_t_db, *handle);
242 error_no_destroy:
243  return (error);
244 }
245 
247  cpg_handle_t handle)
248 {
249  struct cpg_inst *cpg_inst;
250  struct iovec iov;
251  struct req_lib_cpg_finalize req_lib_cpg_finalize;
252  struct res_lib_cpg_finalize res_lib_cpg_finalize;
253  cs_error_t error;
254 
255  error = hdb_error_to_cs (hdb_handle_get (&cpg_handle_t_db, handle, (void *)&cpg_inst));
256  if (error != CS_OK) {
257  return (error);
258  }
259 
260  /*
261  * Another thread has already started finalizing
262  */
263  if (cpg_inst->finalize) {
264  hdb_handle_put (&cpg_handle_t_db, handle);
265  return (CS_ERR_BAD_HANDLE);
266  }
267 
268  cpg_inst->finalize = 1;
269 
270  /*
271  * Send service request
272  */
273  req_lib_cpg_finalize.header.size = sizeof (struct req_lib_cpg_finalize);
274  req_lib_cpg_finalize.header.id = MESSAGE_REQ_CPG_FINALIZE;
275 
276  iov.iov_base = (void *)&req_lib_cpg_finalize;
277  iov.iov_len = sizeof (struct req_lib_cpg_finalize);
278 
279  error = coroipcc_msg_send_reply_receive (cpg_inst->c,
280  &iov,
281  1,
282  &res_lib_cpg_finalize,
283  sizeof (struct res_lib_cpg_finalize));
284 
285  cpg_inst_finalize (cpg_inst, handle);
286  hdb_handle_put (&cpg_handle_t_db, handle);
287 
288  return (error);
289 }
290 
292  cpg_handle_t handle,
293  int *fd)
294 {
295  cs_error_t error;
296  struct cpg_inst *cpg_inst;
297 
298  error = hdb_error_to_cs (hdb_handle_get (&cpg_handle_t_db, handle, (void *)&cpg_inst));
299  if (error != CS_OK) {
300  return (error);
301  }
302 
303  error = qb_to_cs_error (qb_ipcc_fd_get (cpg_inst->c, fd));
304 
305  hdb_handle_put (&cpg_handle_t_db, handle);
306 
307  return (error);
308 }
309 
311  cpg_handle_t handle,
312  uint32_t *size)
313 {
314  cs_error_t error;
315  struct cpg_inst *cpg_inst;
316 
317  error = hdb_error_to_cs (hdb_handle_get (&cpg_handle_t_db, handle, (void *)&cpg_inst));
318  if (error != CS_OK) {
319  return (error);
320  }
321 
322  *size = cpg_inst->max_msg_size;
323 
324  hdb_handle_put (&cpg_handle_t_db, handle);
325 
326  return (error);
327 }
328 
330  cpg_handle_t handle,
331  void **context)
332 {
333  cs_error_t error;
334  struct cpg_inst *cpg_inst;
335 
336  error = hdb_error_to_cs (hdb_handle_get (&cpg_handle_t_db, handle, (void *)&cpg_inst));
337  if (error != CS_OK) {
338  return (error);
339  }
340 
341  *context = cpg_inst->context;
342 
343  hdb_handle_put (&cpg_handle_t_db, handle);
344 
345  return (CS_OK);
346 }
347 
349  cpg_handle_t handle,
350  void *context)
351 {
352  cs_error_t error;
353  struct cpg_inst *cpg_inst;
354 
355  error = hdb_error_to_cs (hdb_handle_get (&cpg_handle_t_db, handle, (void *)&cpg_inst));
356  if (error != CS_OK) {
357  return (error);
358  }
359 
360  cpg_inst->context = context;
361 
362  hdb_handle_put (&cpg_handle_t_db, handle);
363 
364  return (CS_OK);
365 }
366 
368  cpg_handle_t handle,
369  cs_dispatch_flags_t dispatch_types)
370 {
371  int timeout = -1;
372  cs_error_t error;
373  int cont = 1; /* always continue do loop except when set to 0 */
374  struct cpg_inst *cpg_inst;
375  struct res_lib_cpg_confchg_callback *res_cpg_confchg_callback;
376  struct res_lib_cpg_deliver_callback *res_cpg_deliver_callback;
377  struct res_lib_cpg_partial_deliver_callback *res_cpg_partial_deliver_callback;
378  struct res_lib_cpg_totem_confchg_callback *res_cpg_totem_confchg_callback;
379  struct cpg_inst cpg_inst_copy;
380  struct qb_ipc_response_header *dispatch_data;
381  struct cpg_address member_list[CPG_MEMBERS_MAX];
382  struct cpg_address left_list[CPG_MEMBERS_MAX];
383  struct cpg_address joined_list[CPG_MEMBERS_MAX];
384  struct cpg_name group_name;
385  mar_cpg_address_t *left_list_start;
386  mar_cpg_address_t *joined_list_start;
387  unsigned int i;
388  struct cpg_ring_id ring_id;
389  uint32_t totem_member_list[CPG_MEMBERS_MAX];
390  int32_t errno_res;
391  char dispatch_buf[IPC_DISPATCH_SIZE];
392 
393  error = hdb_error_to_cs (hdb_handle_get (&cpg_handle_t_db, handle, (void *)&cpg_inst));
394  if (error != CS_OK) {
395  return (error);
396  }
397 
398  /*
399  * Timeout instantly for CS_DISPATCH_ONE_NONBLOCKING or CS_DISPATCH_ALL and
400  * wait indefinitely for CS_DISPATCH_ONE or CS_DISPATCH_BLOCKING
401  */
402  if (dispatch_types == CS_DISPATCH_ALL || dispatch_types == CS_DISPATCH_ONE_NONBLOCKING) {
403  timeout = 0;
404  }
405 
406  dispatch_data = (struct qb_ipc_response_header *)dispatch_buf;
407  do {
408  errno_res = qb_ipcc_event_recv (
409  cpg_inst->c,
410  dispatch_buf,
412  timeout);
413  error = qb_to_cs_error (errno_res);
414  if (error == CS_ERR_BAD_HANDLE) {
415  error = CS_OK;
416  goto error_put;
417  }
418  if (error == CS_ERR_TRY_AGAIN) {
419  if (dispatch_types == CS_DISPATCH_ONE_NONBLOCKING) {
420  /*
421  * Don't mask error
422  */
423  goto error_put;
424  }
425  error = CS_OK;
426  if (dispatch_types == CS_DISPATCH_ALL) {
427  break; /* exit do while cont is 1 loop */
428  } else {
429  continue; /* next poll */
430  }
431  }
432  if (error != CS_OK) {
433  goto error_put;
434  }
435 
436  /*
437  * Make copy of callbacks, message data, unlock instance, and call callback
438  * A risk of this dispatch method is that the callback routines may
439  * operate at the same time that cpgFinalize has been called.
440  */
441  memcpy (&cpg_inst_copy, cpg_inst, sizeof (struct cpg_inst));
442  switch (cpg_inst_copy.model_data.model) {
443  case CPG_MODEL_V1:
444  /*
445  * Dispatch incoming message
446  */
447  switch (dispatch_data->id) {
449  if (cpg_inst_copy.model_v1_data.cpg_deliver_fn == NULL) {
450  break;
451  }
452 
453  res_cpg_deliver_callback = (struct res_lib_cpg_deliver_callback *)dispatch_data;
454 
455  marshall_from_mar_cpg_name_t (
456  &group_name,
457  &res_cpg_deliver_callback->group_name);
458 
459  cpg_inst_copy.model_v1_data.cpg_deliver_fn (handle,
460  &group_name,
461  res_cpg_deliver_callback->nodeid,
462  res_cpg_deliver_callback->pid,
463  &res_cpg_deliver_callback->message,
464  res_cpg_deliver_callback->msglen);
465  break;
466 
468  res_cpg_partial_deliver_callback = (struct res_lib_cpg_partial_deliver_callback *)dispatch_data;
469 
470  marshall_from_mar_cpg_name_t (
471  &group_name,
472  &res_cpg_partial_deliver_callback->group_name);
473 
474  if (res_cpg_partial_deliver_callback->type == LIBCPG_PARTIAL_FIRST) {
475  /*
476  * Allocate a buffer to contain a full message.
477  */
478  cpg_inst->assembly_buf = malloc(res_cpg_partial_deliver_callback->msglen);
479  if (!cpg_inst->assembly_buf) {
480  error = CS_ERR_NO_MEMORY;
481  goto error_put;
482  }
483  cpg_inst->assembling = 1;
484  cpg_inst->assembly_buf_ptr = 0;
485  }
486  if (cpg_inst->assembling) {
487  memcpy(cpg_inst->assembly_buf + cpg_inst->assembly_buf_ptr,
488  res_cpg_partial_deliver_callback->message, res_cpg_partial_deliver_callback->fraglen);
489  cpg_inst->assembly_buf_ptr += res_cpg_partial_deliver_callback->fraglen;
490 
491  if (res_cpg_partial_deliver_callback->type == LIBCPG_PARTIAL_LAST) {
492  cpg_inst_copy.model_v1_data.cpg_deliver_fn (handle,
493  &group_name,
494  res_cpg_partial_deliver_callback->nodeid,
495  res_cpg_partial_deliver_callback->pid,
496  cpg_inst->assembly_buf,
497  res_cpg_partial_deliver_callback->msglen);
498  free(cpg_inst->assembly_buf);
499  cpg_inst->assembling = 0;
500  }
501  }
502  break;
503 
505  if (cpg_inst_copy.model_v1_data.cpg_confchg_fn == NULL) {
506  break;
507  }
508 
509  res_cpg_confchg_callback = (struct res_lib_cpg_confchg_callback *)dispatch_data;
510 
511  for (i = 0; i < res_cpg_confchg_callback->member_list_entries; i++) {
512  marshall_from_mar_cpg_address_t (&member_list[i],
513  &res_cpg_confchg_callback->member_list[i]);
514  }
515  left_list_start = res_cpg_confchg_callback->member_list +
516  res_cpg_confchg_callback->member_list_entries;
517  for (i = 0; i < res_cpg_confchg_callback->left_list_entries; i++) {
518  marshall_from_mar_cpg_address_t (&left_list[i],
519  &left_list_start[i]);
520  }
521  joined_list_start = res_cpg_confchg_callback->member_list +
522  res_cpg_confchg_callback->member_list_entries +
523  res_cpg_confchg_callback->left_list_entries;
524  for (i = 0; i < res_cpg_confchg_callback->joined_list_entries; i++) {
525  marshall_from_mar_cpg_address_t (&joined_list[i],
526  &joined_list_start[i]);
527  }
528  marshall_from_mar_cpg_name_t (
529  &group_name,
530  &res_cpg_confchg_callback->group_name);
531 
532  cpg_inst_copy.model_v1_data.cpg_confchg_fn (handle,
533  &group_name,
534  member_list,
535  res_cpg_confchg_callback->member_list_entries,
536  left_list,
537  res_cpg_confchg_callback->left_list_entries,
538  joined_list,
539  res_cpg_confchg_callback->joined_list_entries);
540 
541  break;
543  if (cpg_inst_copy.model_v1_data.cpg_totem_confchg_fn == NULL) {
544  break;
545  }
546 
547  res_cpg_totem_confchg_callback = (struct res_lib_cpg_totem_confchg_callback *)dispatch_data;
548 
549  marshall_from_mar_cpg_ring_id_t (&ring_id, &res_cpg_totem_confchg_callback->ring_id);
550  for (i = 0; i < res_cpg_totem_confchg_callback->member_list_entries; i++) {
551  totem_member_list[i] = res_cpg_totem_confchg_callback->member_list[i];
552  }
553 
554  cpg_inst_copy.model_v1_data.cpg_totem_confchg_fn (handle,
555  ring_id,
556  res_cpg_totem_confchg_callback->member_list_entries,
557  totem_member_list);
558  break;
559  default:
560  error = CS_ERR_LIBRARY;
561  goto error_put;
562  break;
563  } /* - switch (dispatch_data->id) */
564  break; /* case CPG_MODEL_V1 */
565  } /* - switch (cpg_inst_copy.model_data.model) */
566 
567  if (cpg_inst_copy.finalize || cpg_inst->finalize) {
568  /*
569  * If the finalize has been called then get out of the dispatch.
570  */
571  cpg_inst->finalize = 1;
572  error = CS_ERR_BAD_HANDLE;
573  goto error_put;
574  }
575 
576  /*
577  * Determine if more messages should be processed
578  */
579  if (dispatch_types == CS_DISPATCH_ONE || dispatch_types == CS_DISPATCH_ONE_NONBLOCKING) {
580  cont = 0;
581  }
582  } while (cont);
583 
584 error_put:
585  hdb_handle_put (&cpg_handle_t_db, handle);
586  return (error);
587 }
588 
590  cpg_handle_t handle,
591  const struct cpg_name *group)
592 {
593  cs_error_t error;
594  struct cpg_inst *cpg_inst;
595  struct iovec iov[2];
596  struct req_lib_cpg_join req_lib_cpg_join;
597  struct res_lib_cpg_join response;
598 
599  if (group->length > CPG_MAX_NAME_LENGTH) {
600  return (CS_ERR_NAME_TOO_LONG);
601  }
602 
603  error = hdb_error_to_cs (hdb_handle_get (&cpg_handle_t_db, handle, (void *)&cpg_inst));
604  if (error != CS_OK) {
605  return (error);
606  }
607 
608  /* Now join */
609  req_lib_cpg_join.header.size = sizeof (struct req_lib_cpg_join);
610  req_lib_cpg_join.header.id = MESSAGE_REQ_CPG_JOIN;
611  req_lib_cpg_join.pid = getpid();
612  req_lib_cpg_join.flags = 0;
613 
614  switch (cpg_inst->model_data.model) {
615  case CPG_MODEL_V1:
616  req_lib_cpg_join.flags = cpg_inst->model_v1_data.flags;
617  break;
618  }
619 
620  marshall_to_mar_cpg_name_t (&req_lib_cpg_join.group_name,
621  group);
622 
623  iov[0].iov_base = (void *)&req_lib_cpg_join;
624  iov[0].iov_len = sizeof (struct req_lib_cpg_join);
625 
626  do {
627  error = coroipcc_msg_send_reply_receive (cpg_inst->c, iov, 1,
628  &response, sizeof (struct res_lib_cpg_join));
629 
630  if (error != CS_OK) {
631  goto error_exit;
632  }
633  } while (response.header.error == CS_ERR_BUSY);
634 
635  error = response.header.error;
636 
637 error_exit:
638  hdb_handle_put (&cpg_handle_t_db, handle);
639 
640  return (error);
641 }
642 
644  cpg_handle_t handle,
645  const struct cpg_name *group)
646 {
647  cs_error_t error;
648  struct cpg_inst *cpg_inst;
649  struct iovec iov[2];
650  struct req_lib_cpg_leave req_lib_cpg_leave;
651  struct res_lib_cpg_leave res_lib_cpg_leave;
652 
653  if (group->length > CPG_MAX_NAME_LENGTH) {
654  return (CS_ERR_NAME_TOO_LONG);
655  }
656 
657  error = hdb_error_to_cs (hdb_handle_get (&cpg_handle_t_db, handle, (void *)&cpg_inst));
658  if (error != CS_OK) {
659  return (error);
660  }
661 
662  req_lib_cpg_leave.header.size = sizeof (struct req_lib_cpg_leave);
663  req_lib_cpg_leave.header.id = MESSAGE_REQ_CPG_LEAVE;
664  req_lib_cpg_leave.pid = getpid();
665  marshall_to_mar_cpg_name_t (&req_lib_cpg_leave.group_name,
666  group);
667 
668  iov[0].iov_base = (void *)&req_lib_cpg_leave;
669  iov[0].iov_len = sizeof (struct req_lib_cpg_leave);
670 
671  do {
672  error = coroipcc_msg_send_reply_receive (cpg_inst->c, iov, 1,
673  &res_lib_cpg_leave, sizeof (struct res_lib_cpg_leave));
674 
675  if (error != CS_OK) {
676  goto error_exit;
677  }
678  } while (res_lib_cpg_leave.header.error == CS_ERR_BUSY);
679 
680  error = res_lib_cpg_leave.header.error;
681 
682 error_exit:
683  hdb_handle_put (&cpg_handle_t_db, handle);
684 
685  return (error);
686 }
687 
689  cpg_handle_t handle,
690  struct cpg_name *group_name,
691  struct cpg_address *member_list,
692  int *member_list_entries)
693 {
694  cs_error_t error;
695  struct cpg_inst *cpg_inst;
696  struct iovec iov;
697  struct req_lib_cpg_membership_get req_lib_cpg_membership_get;
698  struct res_lib_cpg_membership_get res_lib_cpg_membership_get;
699  unsigned int i;
700 
701  if (group_name->length > CPG_MAX_NAME_LENGTH) {
702  return (CS_ERR_NAME_TOO_LONG);
703  }
704  if (member_list == NULL) {
705  return (CS_ERR_INVALID_PARAM);
706  }
707  if (member_list_entries == NULL) {
708  return (CS_ERR_INVALID_PARAM);
709  }
710 
711  error = hdb_error_to_cs (hdb_handle_get (&cpg_handle_t_db, handle, (void *)&cpg_inst));
712  if (error != CS_OK) {
713  return (error);
714  }
715 
716  req_lib_cpg_membership_get.header.size = sizeof (struct req_lib_cpg_membership_get);
717  req_lib_cpg_membership_get.header.id = MESSAGE_REQ_CPG_MEMBERSHIP;
718 
719  marshall_to_mar_cpg_name_t (&req_lib_cpg_membership_get.group_name,
720  group_name);
721 
722  iov.iov_base = (void *)&req_lib_cpg_membership_get;
723  iov.iov_len = sizeof (struct req_lib_cpg_membership_get);
724 
725  error = coroipcc_msg_send_reply_receive (cpg_inst->c, &iov, 1,
726  &res_lib_cpg_membership_get, sizeof (res_lib_cpg_membership_get));
727 
728  if (error != CS_OK) {
729  goto error_exit;
730  }
731 
732  error = res_lib_cpg_membership_get.header.error;
733 
734  /*
735  * Copy results to caller
736  */
737  *member_list_entries = res_lib_cpg_membership_get.member_count;
738  if (member_list) {
739  for (i = 0; i < res_lib_cpg_membership_get.member_count; i++) {
740  marshall_from_mar_cpg_address_t (&member_list[i],
741  &res_lib_cpg_membership_get.member_list[i]);
742  }
743  }
744 
745 error_exit:
746  hdb_handle_put (&cpg_handle_t_db, handle);
747 
748  return (error);
749 }
750 
752  cpg_handle_t handle,
753  unsigned int *local_nodeid)
754 {
755  cs_error_t error;
756  struct cpg_inst *cpg_inst;
757  struct iovec iov;
758  struct req_lib_cpg_local_get req_lib_cpg_local_get;
759  struct res_lib_cpg_local_get res_lib_cpg_local_get;
760 
761  error = hdb_error_to_cs (hdb_handle_get (&cpg_handle_t_db, handle, (void *)&cpg_inst));
762  if (error != CS_OK) {
763  return (error);
764  }
765 
766  req_lib_cpg_local_get.header.size = sizeof (struct qb_ipc_request_header);
767  req_lib_cpg_local_get.header.id = MESSAGE_REQ_CPG_LOCAL_GET;
768 
769  iov.iov_base = (void *)&req_lib_cpg_local_get;
770  iov.iov_len = sizeof (struct req_lib_cpg_local_get);
771 
772  error = coroipcc_msg_send_reply_receive (cpg_inst->c, &iov, 1,
773  &res_lib_cpg_local_get, sizeof (res_lib_cpg_local_get));
774 
775  if (error != CS_OK) {
776  goto error_exit;
777  }
778 
779  error = res_lib_cpg_local_get.header.error;
780 
781  *local_nodeid = res_lib_cpg_local_get.local_nodeid;
782 
783 error_exit:
784  hdb_handle_put (&cpg_handle_t_db, handle);
785 
786  return (error);
787 }
788 
790  cpg_handle_t handle,
791  cpg_flow_control_state_t *flow_control_state)
792 {
793  cs_error_t error;
794  struct cpg_inst *cpg_inst;
795 
796  error = hdb_error_to_cs (hdb_handle_get (&cpg_handle_t_db, handle, (void *)&cpg_inst));
797  if (error != CS_OK) {
798  return (error);
799  }
800  *flow_control_state = CPG_FLOW_CONTROL_DISABLED;
801  error = CS_OK;
802 
803  hdb_handle_put (&cpg_handle_t_db, handle);
804 
805  return (error);
806 }
807 
808 static int
809 memory_map (char *path, const char *file, void **buf, size_t bytes)
810 {
811  int32_t fd;
812  void *addr;
813  int32_t res;
814  char *buffer;
815  int32_t i;
816  size_t written;
817  size_t page_size;
818  long int sysconf_page_size;
819  mode_t old_umask;
820 
821  snprintf (path, PATH_MAX, "/dev/shm/%s", file);
822 
823  old_umask = umask(CPG_MEMORY_MAP_UMASK);
824  fd = mkstemp (path);
825  (void)umask(old_umask);
826  if (fd == -1) {
827  snprintf (path, PATH_MAX, LOCALSTATEDIR "/run/%s", file);
828  old_umask = umask(CPG_MEMORY_MAP_UMASK);
829  fd = mkstemp (path);
830  (void)umask(old_umask);
831  if (fd == -1) {
832  return (-1);
833  }
834  }
835 
836  res = ftruncate (fd, bytes);
837  if (res == -1) {
838  goto error_close_unlink;
839  }
840  sysconf_page_size = sysconf(_SC_PAGESIZE);
841  if (sysconf_page_size <= 0) {
842  goto error_close_unlink;
843  }
844  page_size = sysconf_page_size;
845  buffer = malloc (page_size);
846  if (buffer == NULL) {
847  goto error_close_unlink;
848  }
849  memset (buffer, 0, page_size);
850  for (i = 0; i < (bytes / page_size); i++) {
851 retry_write:
852  written = write (fd, buffer, page_size);
853  if (written == -1 && errno == EINTR) {
854  goto retry_write;
855  }
856  if (written != page_size) {
857  free (buffer);
858  goto error_close_unlink;
859  }
860  }
861  free (buffer);
862 
863  addr = mmap (NULL, bytes, PROT_READ | PROT_WRITE,
864  MAP_SHARED, fd, 0);
865 
866  if (addr == MAP_FAILED) {
867  goto error_close_unlink;
868  }
869 #ifdef MADV_NOSYNC
870  madvise(addr, bytes, MADV_NOSYNC);
871 #endif
872 
873  res = close (fd);
874  if (res) {
875  munmap(addr, bytes);
876 
877  return (-1);
878  }
879  *buf = addr;
880 
881  return 0;
882 
883 error_close_unlink:
884  close (fd);
885  unlink(path);
886  return -1;
887 }
888 
890  cpg_handle_t handle,
891  size_t size,
892  void **buffer)
893 {
894  void *buf = NULL;
895  char path[PATH_MAX];
896  mar_req_coroipcc_zc_alloc_t req_coroipcc_zc_alloc;
897  struct qb_ipc_response_header res_coroipcs_zc_alloc;
898  size_t map_size;
899  struct iovec iovec;
900  struct coroipcs_zc_header *hdr;
901  cs_error_t error;
902  struct cpg_inst *cpg_inst;
903 
904  error = hdb_error_to_cs (hdb_handle_get (&cpg_handle_t_db, handle, (void *)&cpg_inst));
905  if (error != CS_OK) {
906  return (error);
907  }
908 
909  map_size = size + sizeof (struct req_lib_cpg_mcast) + sizeof (struct coroipcs_zc_header);
910  assert(memory_map (path, "corosync_zerocopy-XXXXXX", &buf, map_size) != -1);
911 
912  if (strlen(path) >= CPG_ZC_PATH_LEN) {
913  unlink(path);
914  munmap (buf, map_size);
915  return (CS_ERR_NAME_TOO_LONG);
916  }
917 
918  req_coroipcc_zc_alloc.header.size = sizeof (mar_req_coroipcc_zc_alloc_t);
919  req_coroipcc_zc_alloc.header.id = MESSAGE_REQ_CPG_ZC_ALLOC;
920  req_coroipcc_zc_alloc.map_size = map_size;
921  strcpy (req_coroipcc_zc_alloc.path_to_file, path);
922 
923  iovec.iov_base = (void *)&req_coroipcc_zc_alloc;
924  iovec.iov_len = sizeof (mar_req_coroipcc_zc_alloc_t);
925 
926  error = coroipcc_msg_send_reply_receive (
927  cpg_inst->c,
928  &iovec,
929  1,
930  &res_coroipcs_zc_alloc,
931  sizeof (struct qb_ipc_response_header));
932 
933  if (error != CS_OK) {
934  goto error_exit;
935  }
936 
937  hdr = (struct coroipcs_zc_header *)buf;
938  hdr->map_size = map_size;
939  *buffer = ((char *)buf) + sizeof (struct coroipcs_zc_header) + sizeof (struct req_lib_cpg_mcast);
940 
941 error_exit:
942  hdb_handle_put (&cpg_handle_t_db, handle);
943  return (error);
944 }
945 
947  cpg_handle_t handle,
948  void *buffer)
949 {
950  cs_error_t error;
951  unsigned int res;
952  struct cpg_inst *cpg_inst;
953  mar_req_coroipcc_zc_free_t req_coroipcc_zc_free;
954  struct qb_ipc_response_header res_coroipcs_zc_free;
955  struct iovec iovec;
956  struct coroipcs_zc_header *header = (struct coroipcs_zc_header *)((char *)buffer - sizeof (struct coroipcs_zc_header) - sizeof (struct req_lib_cpg_mcast));
957 
958  error = hdb_error_to_cs (hdb_handle_get (&cpg_handle_t_db, handle, (void *)&cpg_inst));
959  if (error != CS_OK) {
960  return (error);
961  }
962 
963  req_coroipcc_zc_free.header.size = sizeof (mar_req_coroipcc_zc_free_t);
964  req_coroipcc_zc_free.header.id = MESSAGE_REQ_CPG_ZC_FREE;
965  req_coroipcc_zc_free.map_size = header->map_size;
966  req_coroipcc_zc_free.server_address = header->server_address;
967 
968  iovec.iov_base = (void *)&req_coroipcc_zc_free;
969  iovec.iov_len = sizeof (mar_req_coroipcc_zc_free_t);
970 
971  error = coroipcc_msg_send_reply_receive (
972  cpg_inst->c,
973  &iovec,
974  1,
975  &res_coroipcs_zc_free,
976  sizeof (struct qb_ipc_response_header));
977 
978  if (error != CS_OK) {
979  goto error_exit;
980  }
981 
982  res = munmap ((void *)header, header->map_size);
983  if (res == -1) {
984  error = qb_to_cs_error(-errno);
985 
986  goto error_exit;
987  }
988 
989 error_exit:
990  hdb_handle_put (&cpg_handle_t_db, handle);
991 
992  return (error);
993 }
994 
996  cpg_handle_t handle,
998  void *msg,
999  size_t msg_len)
1000 {
1001  cs_error_t error;
1002  struct cpg_inst *cpg_inst;
1004  struct res_lib_cpg_mcast res_lib_cpg_mcast;
1005  mar_req_coroipcc_zc_execute_t req_coroipcc_zc_execute;
1006  struct coroipcs_zc_header *hdr;
1007  struct iovec iovec;
1008 
1009  error = hdb_error_to_cs (hdb_handle_get (&cpg_handle_t_db, handle, (void *)&cpg_inst));
1010  if (error != CS_OK) {
1011  return (error);
1012  }
1013 
1014  if (msg_len > IPC_REQUEST_SIZE) {
1015  error = CS_ERR_TOO_BIG;
1016  goto error_exit;
1017  }
1018 
1019  req_lib_cpg_mcast = (struct req_lib_cpg_mcast *)(((char *)msg) - sizeof (struct req_lib_cpg_mcast));
1020  req_lib_cpg_mcast->header.size = sizeof (struct req_lib_cpg_mcast) +
1021  msg_len;
1022 
1023  req_lib_cpg_mcast->header.id = MESSAGE_REQ_CPG_MCAST;
1024  req_lib_cpg_mcast->guarantee = guarantee;
1025  req_lib_cpg_mcast->msglen = msg_len;
1026 
1027  hdr = (struct coroipcs_zc_header *)(((char *)req_lib_cpg_mcast) - sizeof (struct coroipcs_zc_header));
1028 
1029  req_coroipcc_zc_execute.header.size = sizeof (mar_req_coroipcc_zc_execute_t);
1030  req_coroipcc_zc_execute.header.id = MESSAGE_REQ_CPG_ZC_EXECUTE;
1031  req_coroipcc_zc_execute.server_address = hdr->server_address;
1032 
1033  iovec.iov_base = (void *)&req_coroipcc_zc_execute;
1034  iovec.iov_len = sizeof (mar_req_coroipcc_zc_execute_t);
1035 
1036  error = coroipcc_msg_send_reply_receive (
1037  cpg_inst->c,
1038  &iovec,
1039  1,
1040  &res_lib_cpg_mcast,
1041  sizeof(res_lib_cpg_mcast));
1042 
1043  if (error != CS_OK) {
1044  goto error_exit;
1045  }
1046 
1047  error = res_lib_cpg_mcast.header.error;
1048 
1049 error_exit:
1050  hdb_handle_put (&cpg_handle_t_db, handle);
1051 
1052  return (error);
1053 }
1054 
1055 static cs_error_t send_fragments (
1056  struct cpg_inst *cpg_inst,
1058  size_t msg_len,
1059  const struct iovec *iovec,
1060  unsigned int iov_len)
1061 {
1062  int i;
1063  cs_error_t error = CS_OK;
1064  struct iovec iov[2];
1065  struct req_lib_cpg_partial_mcast req_lib_cpg_mcast;
1066  struct res_lib_cpg_partial_send res_lib_cpg_partial_send;
1067  size_t sent = 0;
1068  size_t iov_sent = 0;
1069  int retry_count;
1070 
1071  req_lib_cpg_mcast.header.id = MESSAGE_REQ_CPG_PARTIAL_MCAST;
1072  req_lib_cpg_mcast.guarantee = guarantee;
1073  req_lib_cpg_mcast.msglen = msg_len;
1074 
1075  iov[0].iov_base = (void *)&req_lib_cpg_mcast;
1076  iov[0].iov_len = sizeof (struct req_lib_cpg_partial_mcast);
1077 
1078  i=0;
1079  iov_sent = 0 ;
1080  qb_ipcc_fc_enable_max_set(cpg_inst->c, 2);
1081 
1082  while (error == CS_OK && sent < msg_len) {
1083 
1084  retry_count = 0;
1085  if ( (iovec[i].iov_len - iov_sent) > cpg_inst->max_msg_size) {
1086  iov[1].iov_len = cpg_inst->max_msg_size;
1087  }
1088  else {
1089  iov[1].iov_len = iovec[i].iov_len - iov_sent;
1090  }
1091 
1092  if (sent == 0) {
1093  req_lib_cpg_mcast.type = LIBCPG_PARTIAL_FIRST;
1094  }
1095  else if ((sent + iov[1].iov_len) == msg_len) {
1096  req_lib_cpg_mcast.type = LIBCPG_PARTIAL_LAST;
1097  }
1098  else {
1099  req_lib_cpg_mcast.type = LIBCPG_PARTIAL_CONTINUED;
1100  }
1101 
1102  req_lib_cpg_mcast.fraglen = iov[1].iov_len;
1103  req_lib_cpg_mcast.header.size = sizeof (struct req_lib_cpg_partial_mcast) + iov[1].iov_len;
1104  iov[1].iov_base = (char *)iovec[i].iov_base + iov_sent;
1105 
1106  resend:
1107  error = coroipcc_msg_send_reply_receive (cpg_inst->c, iov, 2,
1108  &res_lib_cpg_partial_send,
1109  sizeof (res_lib_cpg_partial_send));
1110 
1111  if (error == CS_ERR_TRY_AGAIN) {
1112  fprintf(stderr, "sleep. counter=%d\n", retry_count);
1113  if (++retry_count > MAX_RETRIES) {
1114  goto error_exit;
1115  }
1116  usleep(10000);
1117  goto resend;
1118  }
1119 
1120  iov_sent += iov[1].iov_len;
1121  sent += iov[1].iov_len;
1122 
1123  /* Next iovec */
1124  if (iov_sent >= iovec[i].iov_len) {
1125  i++;
1126  iov_sent = 0;
1127  }
1128  error = res_lib_cpg_partial_send.header.error;
1129  }
1130 error_exit:
1131  qb_ipcc_fc_enable_max_set(cpg_inst->c, 1);
1132 
1133  return error;
1134 }
1135 
1136 
1138  cpg_handle_t handle,
1139  cpg_guarantee_t guarantee,
1140  const struct iovec *iovec,
1141  unsigned int iov_len)
1142 {
1143  int i;
1144  cs_error_t error;
1145  struct cpg_inst *cpg_inst;
1146  struct iovec iov[64];
1147  struct req_lib_cpg_mcast req_lib_cpg_mcast;
1148  size_t msg_len = 0;
1149 
1150  error = hdb_error_to_cs (hdb_handle_get (&cpg_handle_t_db, handle, (void *)&cpg_inst));
1151  if (error != CS_OK) {
1152  return (error);
1153  }
1154 
1155  for (i = 0; i < iov_len; i++ ) {
1156  msg_len += iovec[i].iov_len;
1157  }
1158 
1159  if (msg_len > cpg_inst->max_msg_size) {
1160  error = send_fragments(cpg_inst, guarantee, msg_len, iovec, iov_len);
1161  goto error_exit;
1162  }
1163 
1164  req_lib_cpg_mcast.header.size = sizeof (struct req_lib_cpg_mcast) +
1165  msg_len;
1166 
1167  req_lib_cpg_mcast.header.id = MESSAGE_REQ_CPG_MCAST;
1168  req_lib_cpg_mcast.guarantee = guarantee;
1169  req_lib_cpg_mcast.msglen = msg_len;
1170 
1171  iov[0].iov_base = (void *)&req_lib_cpg_mcast;
1172  iov[0].iov_len = sizeof (struct req_lib_cpg_mcast);
1173  memcpy (&iov[1], iovec, iov_len * sizeof (struct iovec));
1174 
1175  qb_ipcc_fc_enable_max_set(cpg_inst->c, 2);
1176  error = qb_to_cs_error(qb_ipcc_sendv(cpg_inst->c, iov, iov_len + 1));
1177  qb_ipcc_fc_enable_max_set(cpg_inst->c, 1);
1178 
1179 error_exit:
1180  hdb_handle_put (&cpg_handle_t_db, handle);
1181 
1182  return (error);
1183 }
1184 
1186  cpg_handle_t handle,
1187  cpg_iteration_type_t iteration_type,
1188  const struct cpg_name *group,
1189  cpg_iteration_handle_t *cpg_iteration_handle)
1190 {
1191  cs_error_t error;
1192  struct iovec iov;
1193  struct cpg_inst *cpg_inst;
1194  struct cpg_iteration_instance_t *cpg_iteration_instance;
1195  struct req_lib_cpg_iterationinitialize req_lib_cpg_iterationinitialize;
1196  struct res_lib_cpg_iterationinitialize res_lib_cpg_iterationinitialize;
1197 
1198  if (group && group->length > CPG_MAX_NAME_LENGTH) {
1199  return (CS_ERR_NAME_TOO_LONG);
1200  }
1201  if (cpg_iteration_handle == NULL) {
1202  return (CS_ERR_INVALID_PARAM);
1203  }
1204 
1205  if ((iteration_type == CPG_ITERATION_ONE_GROUP && group == NULL) ||
1206  (iteration_type != CPG_ITERATION_ONE_GROUP && group != NULL)) {
1207  return (CS_ERR_INVALID_PARAM);
1208  }
1209 
1210  if (iteration_type != CPG_ITERATION_NAME_ONLY && iteration_type != CPG_ITERATION_ONE_GROUP &&
1211  iteration_type != CPG_ITERATION_ALL) {
1212 
1213  return (CS_ERR_INVALID_PARAM);
1214  }
1215 
1216  error = hdb_error_to_cs (hdb_handle_get (&cpg_handle_t_db, handle, (void *)&cpg_inst));
1217  if (error != CS_OK) {
1218  return (error);
1219  }
1220 
1221  error = hdb_error_to_cs (hdb_handle_create (&cpg_iteration_handle_t_db,
1222  sizeof (struct cpg_iteration_instance_t), cpg_iteration_handle));
1223  if (error != CS_OK) {
1224  goto error_put_cpg_db;
1225  }
1226 
1227  error = hdb_error_to_cs (hdb_handle_get (&cpg_iteration_handle_t_db, *cpg_iteration_handle,
1228  (void *)&cpg_iteration_instance));
1229  if (error != CS_OK) {
1230  goto error_destroy;
1231  }
1232 
1233  cpg_iteration_instance->conn = cpg_inst->c;
1234 
1235  list_init (&cpg_iteration_instance->list);
1236 
1237  req_lib_cpg_iterationinitialize.header.size = sizeof (struct req_lib_cpg_iterationinitialize);
1238  req_lib_cpg_iterationinitialize.header.id = MESSAGE_REQ_CPG_ITERATIONINITIALIZE;
1239  req_lib_cpg_iterationinitialize.iteration_type = iteration_type;
1240  if (group) {
1241  marshall_to_mar_cpg_name_t (&req_lib_cpg_iterationinitialize.group_name, group);
1242  }
1243 
1244  iov.iov_base = (void *)&req_lib_cpg_iterationinitialize;
1245  iov.iov_len = sizeof (struct req_lib_cpg_iterationinitialize);
1246 
1247  error = coroipcc_msg_send_reply_receive (cpg_inst->c,
1248  &iov,
1249  1,
1250  &res_lib_cpg_iterationinitialize,
1251  sizeof (struct res_lib_cpg_iterationinitialize));
1252 
1253  if (error != CS_OK) {
1254  goto error_put_destroy;
1255  }
1256 
1257  cpg_iteration_instance->executive_iteration_handle =
1258  res_lib_cpg_iterationinitialize.iteration_handle;
1259  cpg_iteration_instance->cpg_iteration_handle = *cpg_iteration_handle;
1260 
1261  list_add (&cpg_iteration_instance->list, &cpg_inst->iteration_list_head);
1262 
1263  hdb_handle_put (&cpg_iteration_handle_t_db, *cpg_iteration_handle);
1264  hdb_handle_put (&cpg_handle_t_db, handle);
1265 
1266  return (res_lib_cpg_iterationinitialize.header.error);
1267 
1268 error_put_destroy:
1269  hdb_handle_put (&cpg_iteration_handle_t_db, *cpg_iteration_handle);
1270 error_destroy:
1271  hdb_handle_destroy (&cpg_iteration_handle_t_db, *cpg_iteration_handle);
1272 error_put_cpg_db:
1273  hdb_handle_put (&cpg_handle_t_db, handle);
1274 
1275  return (error);
1276 }
1277 
1279  cpg_iteration_handle_t handle,
1280  struct cpg_iteration_description_t *description)
1281 {
1282  cs_error_t error;
1283  struct cpg_iteration_instance_t *cpg_iteration_instance;
1284  struct req_lib_cpg_iterationnext req_lib_cpg_iterationnext;
1285  struct res_lib_cpg_iterationnext res_lib_cpg_iterationnext;
1286 
1287  if (description == NULL) {
1288  return CS_ERR_INVALID_PARAM;
1289  }
1290 
1291  error = hdb_error_to_cs (hdb_handle_get (&cpg_iteration_handle_t_db, handle,
1292  (void *)&cpg_iteration_instance));
1293  if (error != CS_OK) {
1294  goto error_exit;
1295  }
1296 
1297  req_lib_cpg_iterationnext.header.size = sizeof (struct req_lib_cpg_iterationnext);
1298  req_lib_cpg_iterationnext.header.id = MESSAGE_REQ_CPG_ITERATIONNEXT;
1299  req_lib_cpg_iterationnext.iteration_handle = cpg_iteration_instance->executive_iteration_handle;
1300 
1301  error = qb_to_cs_error (qb_ipcc_send (cpg_iteration_instance->conn,
1302  &req_lib_cpg_iterationnext,
1303  req_lib_cpg_iterationnext.header.size));
1304  if (error != CS_OK) {
1305  goto error_put;
1306  }
1307 
1308  error = qb_to_cs_error (qb_ipcc_recv (cpg_iteration_instance->conn,
1309  &res_lib_cpg_iterationnext,
1310  sizeof(struct res_lib_cpg_iterationnext), -1));
1311  if (error != CS_OK) {
1312  goto error_put;
1313  }
1314 
1315  marshall_from_mar_cpg_iteration_description_t(
1316  description,
1317  &res_lib_cpg_iterationnext.description);
1318 
1319  error = res_lib_cpg_iterationnext.header.error;
1320 
1321 error_put:
1322  hdb_handle_put (&cpg_iteration_handle_t_db, handle);
1323 
1324 error_exit:
1325  return (error);
1326 }
1327 
1329  cpg_iteration_handle_t handle)
1330 {
1331  cs_error_t error;
1332  struct iovec iov;
1333  struct cpg_iteration_instance_t *cpg_iteration_instance;
1334  struct req_lib_cpg_iterationfinalize req_lib_cpg_iterationfinalize;
1335  struct res_lib_cpg_iterationfinalize res_lib_cpg_iterationfinalize;
1336 
1337  error = hdb_error_to_cs (hdb_handle_get (&cpg_iteration_handle_t_db, handle,
1338  (void *)&cpg_iteration_instance));
1339  if (error != CS_OK) {
1340  goto error_exit;
1341  }
1342 
1343  req_lib_cpg_iterationfinalize.header.size = sizeof (struct req_lib_cpg_iterationfinalize);
1344  req_lib_cpg_iterationfinalize.header.id = MESSAGE_REQ_CPG_ITERATIONFINALIZE;
1345  req_lib_cpg_iterationfinalize.iteration_handle = cpg_iteration_instance->executive_iteration_handle;
1346 
1347  iov.iov_base = (void *)&req_lib_cpg_iterationfinalize;
1348  iov.iov_len = sizeof (struct req_lib_cpg_iterationfinalize);
1349 
1350  error = coroipcc_msg_send_reply_receive (cpg_iteration_instance->conn,
1351  &iov,
1352  1,
1353  &res_lib_cpg_iterationfinalize,
1354  sizeof (struct req_lib_cpg_iterationfinalize));
1355 
1356  if (error != CS_OK) {
1357  goto error_put;
1358  }
1359 
1360  cpg_iteration_instance_finalize (cpg_iteration_instance);
1361  hdb_handle_put (&cpg_iteration_handle_t_db, cpg_iteration_instance->cpg_iteration_handle);
1362 
1363  return (res_lib_cpg_iterationfinalize.header.error);
1364 
1365 error_put:
1366  hdb_handle_put (&cpg_iteration_handle_t_db, handle);
1367 error_exit:
1368  return (error);
1369 }
1370 
The cpg_ring_id struct.
Definition: cpg.h:138
The cpg_callbacks_t struct.
Definition: cpg.h:180
cs_error_t cpg_iteration_next(cpg_iteration_handle_t handle, struct cpg_iteration_description_t *description)
cpg_iteration_next
Definition: lib/cpg.c:1278
cs_error_t cpg_flow_control_state_get(cpg_handle_t handle, cpg_flow_control_state_t *flow_control_state)
cpg_flow_control_state_get
Definition: lib/cpg.c:789
mar_cpg_address_t member_list[]
Definition: ipc_cpg.h:390
mar_req_coroipcc_zc_free_t struct
Definition: ipc_cpg.h:481
#define CPG_MAX_NAME_LENGTH
Definition: cpg.h:115
cpg_deliver_fn_t cpg_deliver_fn
Definition: cpg.h:199
cs_error_t hdb_error_to_cs(int res)
mar_cpg_address_t struct
Definition: ipc_cpg.h:155
cpg_flow_control_state_t
The cpg_flow_control_state_t enum.
Definition: cpg.h:73
#define CPG_MODEL_V1_DELIVER_INITIAL_TOTEM_CONF
Definition: cpg.h:192
cs_error_t cpg_context_set(cpg_handle_t handle, void *context)
Set contexts for a CPG handle.
Definition: lib/cpg.c:348
cpg_confchg_fn_t cpg_confchg_fn
Definition: cpg.h:182
The req_lib_cpg_join struct.
Definition: ipc_cpg.h:251
char * assembly_buf
Definition: lib/cpg.c:93
mar_req_coroipcc_zc_alloc_t struct
Definition: ipc_cpg.h:472
The cpg_name struct.
Definition: cpg.h:119
struct list_head * next
Definition: list.h:47
cs_error_t cpg_local_get(cpg_handle_t handle, unsigned int *local_nodeid)
cpg_local_get
Definition: lib/cpg.c:751
cs_error_t cpg_membership_get(cpg_handle_t handle, struct cpg_name *group_name, struct cpg_address *member_list, int *member_list_entries)
Get membership information from cpg.
Definition: lib/cpg.c:688
The cpg_address struct.
Definition: cpg.h:109
struct list_head list
Definition: lib/cpg.c:109
The res_lib_cpg_partial_deliver_callback struct.
Definition: ipc_cpg.h:345
cs_error_t cpg_iteration_finalize(cpg_iteration_handle_t handle)
cpg_iteration_finalize
Definition: lib/cpg.c:1328
The req_lib_cpg_mcast struct.
Definition: ipc_cpg.h:304
cpg_iteration_handle_t cpg_iteration_handle
Definition: lib/cpg.c:106
The res_lib_cpg_membership_get struct.
Definition: ipc_cpg.h:375
#define LOCALSTATEDIR
Definition: config.h:352
struct message_header header
Definition: totemsrp.c:60
The res_lib_cpg_iterationnext struct.
Definition: ipc_cpg.h:449
int guarantee
Definition: totemsrp.c:66
unsigned char addr[TOTEMIP_ADDRLEN]
Definition: coroapi.h:77
The cpg_iteration_description_t struct.
Definition: cpg.h:129
#define CPG_MEMBERS_MAX
Definition: cpg.h:124
The res_lib_cpg_iterationinitialize struct.
Definition: ipc_cpg.h:433
The req_lib_cpg_local_get struct.
Definition: ipc_cpg.h:282
cpg_guarantee_t
The cpg_guarantee_t enum.
Definition: cpg.h:63
cpg_confchg_fn_t cpg_confchg_fn
Definition: cpg.h:200
coroipcs_zc_header struct
Definition: ipc_cpg.h:498
qb_ipcc_connection_t * c
Definition: lib/cpg.c:84
Definition: list.h:46
hdb_handle_t executive_iteration_handle
Definition: lib/cpg.c:108
The res_lib_cpg_partial_send struct.
Definition: ipc_cpg.h:297
cs_error_t cpg_fd_get(cpg_handle_t handle, int *fd)
Get a file descriptor on which to poll.
Definition: lib/cpg.c:291
uint64_t server_address
Definition: ipc_cpg.h:500
cpg_model_v1_data_t model_v1_data
Definition: lib/cpg.c:89
#define IPC_DISPATCH_SIZE
Definition: lib/util.h:51
cs_error_t cpg_zcb_alloc(cpg_handle_t handle, size_t size, void **buffer)
cpg_zcb_alloc
Definition: lib/cpg.c:889
The req_lib_cpg_iterationinitialize struct.
Definition: ipc_cpg.h:424
cpg_totem_confchg_fn_t cpg_totem_confchg_fn
Definition: cpg.h:201
#define MAX_RETRIES
Definition: lib/cpg.c:76
cs_error_t cpg_zcb_mcast_joined(cpg_handle_t handle, cpg_guarantee_t guarantee, void *msg, size_t msg_len)
cpg_zcb_mcast_joined
Definition: lib/cpg.c:995
The res_lib_cpg_join struct.
Definition: ipc_cpg.h:261
uint64_t cpg_handle_t
cpg_handle_t
Definition: cpg.h:53
struct list_head iteration_list_head
Definition: lib/cpg.c:91
cs_error_t cpg_mcast_joined(cpg_handle_t handle, cpg_guarantee_t guarantee, const struct iovec *iovec, unsigned int iov_len)
Multicast to groups joined with cpg_join.
Definition: lib/cpg.c:1137
uint32_t max_msg_size
Definition: lib/cpg.c:92
mar_req_coroipcc_zc_execute_t struct
Definition: ipc_cpg.h:490
The res_lib_cpg_mcast struct.
Definition: ipc_cpg.h:326
Linked list API.
#define IPC_REQUEST_SIZE
Definition: lib/util.h:49
cs_error_t
The cs_error_t enum.
Definition: corotypes.h:94
cs_error_t cpg_dispatch(cpg_handle_t handle, cs_dispatch_flags_t dispatch_types)
Dispatch messages and configuration changes.
Definition: lib/cpg.c:367
The req_lib_cpg_leave struct.
Definition: ipc_cpg.h:408
mar_cpg_address_t member_list[PROCESSOR_COUNT_MAX]
Definition: ipc_cpg.h:378
void * context
Definition: lib/cpg.c:86
The req_lib_cpg_iterationfinalize struct.
Definition: ipc_cpg.h:457
cs_dispatch_flags_t
The cs_dispatch_flags_t enum.
Definition: corotypes.h:80
cs_error_t cpg_join(cpg_handle_t handle, const struct cpg_name *group)
Join one or more groups.
Definition: lib/cpg.c:589
The res_lib_cpg_finalize struct.
Definition: ipc_cpg.h:275
#define CPG_ZC_PATH_LEN
Definition: ipc_cpg.h:43
cpg_model_data_t model_data
Definition: lib/cpg.c:88
int assembling
Definition: lib/cpg.c:95
cpg_model_t model
Definition: cpg.h:189
cpg_iteration_type_t
The cpg_iteration_type_t enum.
Definition: cpg.h:93
The res_lib_cpg_local_get struct.
Definition: ipc_cpg.h:289
The req_lib_cpg_finalize struct.
Definition: ipc_cpg.h:268
flow control is disabled - new messages may be sent
Definition: cpg.h:74
qb_handle_t hdb_handle_t
Definition: hdb.h:52
qb_ipcc_connection_t * conn
Definition: lib/cpg.c:107
cs_error_t cpg_finalize(cpg_handle_t handle)
Close the cpg handle.
Definition: lib/cpg.c:246
The res_lib_cpg_iterationfinalize struct.
Definition: ipc_cpg.h:465
The req_lib_cpg_partial_mcast struct.
Definition: ipc_cpg.h:314
cs_error_t cpg_leave(cpg_handle_t handle, const struct cpg_name *group)
Leave one or more groups.
Definition: lib/cpg.c:643
The req_lib_cpg_iterationnext struct.
Definition: ipc_cpg.h:441
The res_lib_cpg_confchg_callback struct.
Definition: ipc_cpg.h:384
#define list_entry(ptr, type, member)
Definition: list.h:84
cs_error_t cpg_model_initialize(cpg_handle_t *handle, cpg_model_t model, cpg_model_data_t *model_data, void *context)
Create a new cpg connection, initialize with model.
Definition: lib/cpg.c:184
cs_error_t cpg_context_get(cpg_handle_t handle, void **context)
Get contexts for a CPG handle.
Definition: lib/cpg.c:329
uint64_t cpg_iteration_handle_t
cpg_iteration_handle_t
Definition: cpg.h:58
The req_lib_cpg_membership_get struct.
Definition: ipc_cpg.h:367
uint32_t length
Definition: cpg.h:120
cs_error_t cpg_initialize(cpg_handle_t *handle, cpg_callbacks_t *callbacks)
Create a new cpg connection.
Definition: lib/cpg.c:168
int finalize
Definition: lib/cpg.c:85
The cpg_model_v1_data_t struct.
Definition: cpg.h:197
The res_lib_cpg_leave struct.
Definition: ipc_cpg.h:417
cpg_model_t
The cpg_model_t enum.
Definition: cpg.h:102
The cpg_model_data_t struct.
Definition: cpg.h:188
cs_error_t cpg_iteration_initialize(cpg_handle_t handle, cpg_iteration_type_t iteration_type, const struct cpg_name *group, cpg_iteration_handle_t *cpg_iteration_handle)
cpg_iteration_initialize
Definition: lib/cpg.c:1185
#define CS_IPC_TIMEOUT_MS
Definition: corotypes.h:127
cs_error_t qb_to_cs_error(int result)
qb_to_cs_error
#define CPG_MEMORY_MAP_UMASK
Definition: lib/cpg.c:81
uint32_t assembly_buf_ptr
Definition: lib/cpg.c:94
The res_lib_cpg_totem_confchg_callback struct.
Definition: ipc_cpg.h:398
cpg_deliver_fn_t cpg_deliver_fn
Definition: cpg.h:181
cs_error_t cpg_zcb_free(cpg_handle_t handle, void *buffer)
cpg_zcb_free
Definition: lib/cpg.c:946
cs_error_t cpg_max_atomic_msgsize_get(cpg_handle_t handle, uint32_t *size)
Get maximum size of a message that will not be fragmented.
Definition: lib/cpg.c:310
Message from another node.
Definition: ipc_cpg.h:333
DECLARE_HDB_DATABASE(cpg_handle_t_db, cpg_inst_free)