12 #include <sys/socket.h>
13 #include <netinet/in.h>
14 #include <arpa/inet.h>
20 #include <sys/utsname.h>
22 #include <qb/qbipcc.h>
23 #include <qb/qbutil.h>
25 #include <corosync/corodefs.h>
26 #include <corosync/corotypes.h>
27 #include <corosync/hdb.h>
28 #include <corosync/cpg.h>
36 static bool cpg_evicted = FALSE;
39 #define cs_repeat(counter, max, code) do { \
41 if(rc == CS_ERR_TRY_AGAIN || rc == CS_ERR_QUEUE_FULL) { \
43 crm_debug("Retrying operation after %ds", counter); \
48 } while(counter < max)
54 if (cluster->cpg_handle) {
56 cpg_leave(cluster->cpg_handle, &cluster->group);
57 cpg_finalize(cluster->cpg_handle);
58 cluster->cpg_handle = 0;
67 cs_error_t
rc = CS_OK;
69 static uint32_t local_nodeid = 0;
70 cpg_handle_t local_handle = handle;
71 cpg_callbacks_t cb = { };
78 if(local_nodeid != 0) {
84 cs_repeat(retries, 5,
rc = cpg_initialize(&local_handle, &cb));
86 crm_err(
"Could not connect to the CPG API: %s (%d)",
91 rc = cpg_fd_get(local_handle, &fd);
93 crm_err(
"Could not obtain the CPG API connection: %s (%d)",
100 &found_uid, &found_gid))) {
101 crm_err(
"CPG provider is not authentic:"
102 " process %lld (uid: %lld, gid: %lld)",
104 (
long long) found_uid, (
long long) found_gid);
107 crm_err(
"Could not verify authenticity of CPG provider: %s (%d)",
116 cs_repeat(retries, 5,
rc = cpg_local_get(local_handle, &local_nodeid));
120 crm_err(
"Could not get local node id from the CPG API: %s (%d)", ais_error2text(
rc),
rc);
126 cpg_finalize(local_handle);
128 crm_debug(
"Local nodeid is %u", local_nodeid);
136 static ssize_t crm_cs_flush(gpointer
data);
139 crm_cs_flush_cb(gpointer
data)
146 #define CS_SEND_MAX 200
148 crm_cs_flush(gpointer
data)
153 static unsigned int last_sent = 0;
154 cpg_handle_t *handle = (cpg_handle_t *)
data;
162 if ((queue_len % 1000) == 0 && queue_len > 1) {
163 crm_err(
"CPG queue has grown to %d", queue_len);
166 crm_warn(
"CPG queue has grown to %d", queue_len);
179 rc = cpg_mcast_joined(*handle, CPG_TYPE_AGREED, iov, 1);
188 (
unsigned long long) iov->iov_len);
197 crm_info(
"Sent %d CPG messages (%d remaining, last=%u): %s (%lld)",
198 sent, queue_len, last_sent, ais_error2text(
rc),
201 crm_trace(
"Sent %d CPG messages (%d remaining, last=%u): %s (%lld)",
202 sent, queue_len, last_sent, ais_error2text(
rc),
207 uint32_t delay_ms = 100;
210 delay_ms = QB_MIN(1000,
CS_SEND_MAX + (10 * queue_len));
221 static unsigned int queued = 0;
224 crm_trace(
"Queueing CPG message %u (%llu bytes)",
225 queued, (
unsigned long long) iov->iov_len);
232 pcmk_cpg_dispatch(gpointer user_data)
237 rc = cpg_dispatch(cluster->cpg_handle, CS_DISPATCH_ONE);
239 crm_err(
"Connection to the CPG API failed: %s (%d)", ais_error2text(
rc),
rc);
240 cpg_finalize(cluster->cpg_handle);
241 cluster->cpg_handle = 0;
244 }
else if(cpg_evicted) {
245 crm_err(
"Evicted from CPG membership");
253 uint32_t *kind,
const char **from)
264 crm_err(
"Nodeid mismatch from %d.%d: claimed nodeid=%u", nodeid,
pid, msg->
sender.
id);
267 }
else if (msg->
host.
id != 0 && (local_nodeid != msg->
host.
id)) {
282 crm_err(
"Peer with nodeid=%u is unknown", nodeid);
284 }
else if (peer->
uname == NULL) {
285 crm_err(
"No uname for peer with nodeid=%u", nodeid);
288 crm_notice(
"Fixing uname for peer with nodeid=%u", nodeid);
296 crm_trace(
"Got new%s message (size=%d, %d, %d)",
301 *kind = msg->header.
id;
309 char *uncompressed = NULL;
310 unsigned int new_size = msg->
size + 1;
317 uncompressed = calloc(1, new_size);
336 char *pid_s = pcmk__getpid_s();
353 crm_err(
"Invalid message (id=%d, dest=%s:%s, from=%s:%s.%d):"
354 " min=%d, total=%d, size=%d, bz2_size=%d",
364 static int cmp_member_list_nodeid(
const void *first,
367 const struct cpg_address *
const a = *((
const struct cpg_address **) first),
368 *
const b = *((
const struct cpg_address **) second);
369 if (a->nodeid < b->nodeid) {
371 }
else if (a->nodeid > b->nodeid) {
379 cpgreason2str(cpg_reason_t reason)
382 case CPG_REASON_JOIN:
return " via cpg_join";
383 case CPG_REASON_LEAVE:
return " via cpg_leave";
384 case CPG_REASON_NODEDOWN:
return " via cluster exit";
385 case CPG_REASON_NODEUP:
return " via cluster join";
386 case CPG_REASON_PROCDOWN:
return " for unknown reason";
392 static inline const char *
396 return "unknown node";
397 }
else if (peer->
uname == NULL) {
406 const struct cpg_name *groupName,
407 const struct cpg_address *member_list,
size_t member_list_entries,
408 const struct cpg_address *left_list,
size_t left_list_entries,
409 const struct cpg_address *joined_list,
size_t joined_list_entries)
412 gboolean found = FALSE;
413 static int counter = 0;
415 const struct cpg_address *key, **sorted;
417 sorted = malloc(member_list_entries *
sizeof(
const struct cpg_address *));
420 for (
size_t iter = 0; iter < member_list_entries; iter++) {
421 sorted[iter] = member_list + iter;
424 qsort(sorted, member_list_entries,
sizeof(
const struct cpg_address *),
425 cmp_member_list_nodeid);
427 for (i = 0; i < left_list_entries; i++) {
429 const struct cpg_address **rival = NULL;
448 rival = bsearch(&key, sorted, member_list_entries,
449 sizeof(
const struct cpg_address *),
450 cmp_member_list_nodeid);
454 crm_info(
"Group %s event %d: %s (node %u pid %u) left%s",
455 groupName->value, counter, peer_name(peer),
456 left_list[i].nodeid, left_list[i].
pid,
457 cpgreason2str(left_list[i].reason));
462 }
else if (left_list[i].nodeid == local_nodeid) {
463 crm_warn(
"Group %s event %d: duplicate local pid %u left%s",
464 groupName->value, counter,
465 left_list[i].pid, cpgreason2str(left_list[i].reason));
468 "%s (node %u) duplicate pid %u left%s (%u remains)",
469 groupName->value, counter, peer_name(peer),
470 left_list[i].nodeid, left_list[i].
pid,
471 cpgreason2str(left_list[i].reason), (*rival)->pid);
477 for (i = 0; i < joined_list_entries; i++) {
478 crm_info(
"Group %s event %d: node %u pid %u joined%s",
479 groupName->value, counter, joined_list[i].nodeid,
480 joined_list[i].pid, cpgreason2str(joined_list[i].reason));
483 for (i = 0; i < member_list_entries; i++) {
486 if (member_list[i].nodeid == local_nodeid
487 && member_list[i].
pid != getpid()) {
489 crm_warn(
"Group %s event %d: detected duplicate local pid %u",
490 groupName->value, counter, member_list[i].pid);
493 crm_info(
"Group %s event %d: %s (node %u pid %u) is member",
494 groupName->value, counter, peer_name(peer),
495 member_list[i].nodeid, member_list[i].
pid);
510 time_t now = time(NULL);
516 }
else if (now > (peer->
when_lost + 60)) {
518 crm_warn(
"Node %u is member of group %s but was believed offline",
519 member_list[i].nodeid, groupName->value);
524 if (local_nodeid == member_list[i].nodeid) {
530 crm_err(
"Local node was evicted from group %s", groupName->value);
545 cpg_handle_t handle = 0;
557 cpg_callbacks_t cpg_callbacks = {
558 .cpg_deliver_fn = cluster->cpg.cpg_deliver_fn,
559 .cpg_confchg_fn = cluster->cpg.cpg_confchg_fn,
565 cluster->group.length = 0;
566 cluster->group.value[0] = 0;
569 strncpy(cluster->group.value, message_name, 127);
570 cluster->group.value[127] = 0;
571 cluster->group.length = 1 + QB_MIN(127, strlen(cluster->group.value));
573 cs_repeat(retries, 30,
rc = cpg_initialize(&handle, &cpg_callbacks));
575 crm_err(
"Could not connect to the CPG API: %s (%d)",
576 cs_strerror(
rc),
rc);
580 rc = cpg_fd_get(handle, &fd);
582 crm_err(
"Could not obtain the CPG API connection: %s (%d)",
583 cs_strerror(
rc),
rc);
589 &found_uid, &found_gid))) {
590 crm_err(
"CPG provider is not authentic:"
591 " process %lld (uid: %lld, gid: %lld)",
593 (
long long) found_uid, (
long long) found_gid);
597 crm_err(
"Could not verify authenticity of CPG provider: %s (%d)",
605 crm_err(
"Could not get local node id from the CPG API");
612 cs_repeat(retries, 30,
rc = cpg_join(handle, &cluster->group));
614 crm_err(
"Could not join the CPG group '%s': %d", message_name,
rc);
619 cluster->cpg_handle = handle;
624 cpg_finalize(handle);
649 static int msg_id = 0;
650 static int local_pid = 0;
651 static int local_name_len = 0;
652 static const char *local_name = NULL;
663 crm_err(
"Invalid message class: %d", msg_class);
669 if(local_name == NULL) {
672 if(local_name_len == 0 && local_name) {
673 local_name_len = strlen(local_name);
680 if (local_pid == 0) {
681 local_pid = getpid();
692 msg->header.
id = msg_class;
693 msg->header.error = CS_OK;
725 msg = realloc_safe(msg, msg->header.
size);
729 char *compressed = NULL;
730 unsigned int new_size = 0;
731 char *uncompressed = strdup(
data);
737 msg = realloc_safe(msg, msg->header.
size);
738 memcpy(msg->
data, compressed, new_size);
746 msg = realloc_safe(msg, msg->header.
size);
754 iov = calloc(1,
sizeof(
struct iovec));
756 iov->iov_len = msg->header.
size;
759 crm_trace(
"Queueing CPG message %u to %s (%llu bytes, %d bytes compressed payload): %.200s",
760 msg->
id,
target, (
unsigned long long) iov->iov_len,
763 crm_trace(
"Queueing CPG message %u to %s (%llu bytes, %d bytes payload): %.200s",
764 msg->
id,
target, (
unsigned long long) iov->iov_len,
805 int scan_rc = sscanf(text,
"%d", &
type);
crm_node_t * crm_update_peer_state(const char *source, crm_node_t *node, const char *state, uint64_t membership)
Update a node's state and membership information.
crm_node_t * crm_update_peer_proc(const char *source, crm_node_t *peer, uint32_t flag, const char *status)
#define ais_data_len(msg)
enum crm_ais_msg_types type
crm_node_t * crm_find_peer(unsigned int id, const char *uname)
struct crm_ais_msg_s AIS_Message
gboolean check_message_sanity(const AIS_Message *msg, const char *data)
const char * get_local_node_name(void)
crm_node_t * crm_get_peer(unsigned int id, const char *uname)
const char * pcmk_message_name(const char *name)
Get name to be used as identifier for cluster messages.
int pcmk__compress(const char *data, unsigned int length, unsigned int max, char **result, unsigned int *result_len)
char * crm_strdup_printf(char const *format,...) __attribute__((__format__(__printf__
gboolean safe_str_neq(const char *a, const char *b)
#define safe_str_eq(a, b)
gboolean send_cluster_message_cs(xmlNode *msg, gboolean local, crm_node_t *node, enum crm_ais_msg_types dest)
void pcmk_cpg_membership(cpg_handle_t handle, const struct cpg_name *groupName, const struct cpg_address *member_list, size_t member_list_entries, const struct cpg_address *left_list, size_t left_list_entries, const struct cpg_address *joined_list, size_t joined_list_entries)
void cluster_disconnect_cpg(crm_cluster_t *cluster)
gboolean send_cluster_text(enum crm_ais_msg_class msg_class, const char *data, gboolean local, crm_node_t *node, enum crm_ais_msg_types dest)
gboolean send_cpg_iov(struct iovec *iov)
cpg_handle_t pcmk_cpg_handle
uint32_t get_local_nodeid(cpg_handle_t handle)
GListPtr cs_message_queue
#define cs_repeat(counter, max, code)
char * pcmk_message_common_cs(cpg_handle_t handle, uint32_t nodeid, uint32_t pid, void *content, uint32_t *kind, const char **from)
enum crm_ais_msg_types text2msg_type(const char *text)
gboolean(* pcmk_cpg_dispatch_fn)(int kind, const char *from, const char *data)
gboolean cluster_connect_cpg(crm_cluster_t *cluster)
#define CRM_SYSTEM_STONITHD
#define CRM_SYSTEM_TENGINE
#define CRM_SYSTEM_PENGINE
Wrappers for and extensions to libqb IPC.
int crm_ipc_is_authentic_process(int sock, uid_t refuid, gid_t refgid, pid_t *gotpid, uid_t *gotuid, gid_t *gotgid)
Check the authenticity of the IPC socket peer process.
#define PCMK__SPECIAL_PID_AS_0(p)
#define crm_info(fmt, args...)
#define crm_warn(fmt, args...)
#define crm_notice(fmt, args...)
#define CRM_CHECK(expr, failure_action)
#define crm_debug(fmt, args...)
#define crm_err(fmt, args...)
#define crm_trace(fmt, args...)
Wrappers for and extensions to glib mainloop.
mainloop_io_t * mainloop_add_fd(const char *name, int priority, int fd, void *userdata, struct mainloop_fd_callbacks *callbacks)
#define G_PRIORITY_MEDIUM
char * strerror(int errnum)
const char * bz2_strerror(int rc)
enum crm_ais_msg_types type
void(* destroy)(gpointer)
int(* dispatch)(gpointer userdata)
char * dump_xml_unformatted(xmlNode *msg)
#define CRM_BZ2_THRESHOLD