Lines Matching refs:con

119 static void con_flag_clear(struct ceph_connection *con, unsigned long con_flag)  in con_flag_clear()  argument
123 clear_bit(con_flag, &con->flags); in con_flag_clear()
126 static void con_flag_set(struct ceph_connection *con, unsigned long con_flag) in con_flag_set() argument
130 set_bit(con_flag, &con->flags); in con_flag_set()
133 static bool con_flag_test(struct ceph_connection *con, unsigned long con_flag) in con_flag_test() argument
137 return test_bit(con_flag, &con->flags); in con_flag_test()
140 static bool con_flag_test_and_clear(struct ceph_connection *con, in con_flag_test_and_clear() argument
145 return test_and_clear_bit(con_flag, &con->flags); in con_flag_test_and_clear()
148 static bool con_flag_test_and_set(struct ceph_connection *con, in con_flag_test_and_set() argument
153 return test_and_set_bit(con_flag, &con->flags); in con_flag_test_and_set()
170 static void queue_con(struct ceph_connection *con);
171 static void cancel_con(struct ceph_connection *con);
173 static void con_fault(struct ceph_connection *con);
302 static void con_sock_state_init(struct ceph_connection *con) in con_sock_state_init() argument
306 old_state = atomic_xchg(&con->sock_state, CON_SOCK_STATE_CLOSED); in con_sock_state_init()
309 dout("%s con %p sock %d -> %d\n", __func__, con, old_state, in con_sock_state_init()
313 static void con_sock_state_connecting(struct ceph_connection *con) in con_sock_state_connecting() argument
317 old_state = atomic_xchg(&con->sock_state, CON_SOCK_STATE_CONNECTING); in con_sock_state_connecting()
320 dout("%s con %p sock %d -> %d\n", __func__, con, old_state, in con_sock_state_connecting()
324 static void con_sock_state_connected(struct ceph_connection *con) in con_sock_state_connected() argument
328 old_state = atomic_xchg(&con->sock_state, CON_SOCK_STATE_CONNECTED); in con_sock_state_connected()
331 dout("%s con %p sock %d -> %d\n", __func__, con, old_state, in con_sock_state_connected()
335 static void con_sock_state_closing(struct ceph_connection *con) in con_sock_state_closing() argument
339 old_state = atomic_xchg(&con->sock_state, CON_SOCK_STATE_CLOSING); in con_sock_state_closing()
344 dout("%s con %p sock %d -> %d\n", __func__, con, old_state, in con_sock_state_closing()
348 static void con_sock_state_closed(struct ceph_connection *con) in con_sock_state_closed() argument
352 old_state = atomic_xchg(&con->sock_state, CON_SOCK_STATE_CLOSED); in con_sock_state_closed()
358 dout("%s con %p sock %d -> %d\n", __func__, con, old_state, in con_sock_state_closed()
369 struct ceph_connection *con = sk->sk_user_data; in ceph_sock_data_ready() local
370 if (atomic_read(&con->msgr->stopping)) { in ceph_sock_data_ready()
376 con, con->state); in ceph_sock_data_ready()
377 queue_con(con); in ceph_sock_data_ready()
384 struct ceph_connection *con = sk->sk_user_data; in ceph_sock_write_space() local
393 if (con_flag_test(con, CON_FLAG_WRITE_PENDING)) { in ceph_sock_write_space()
395 dout("%s %p queueing write work\n", __func__, con); in ceph_sock_write_space()
397 queue_con(con); in ceph_sock_write_space()
400 dout("%s %p nothing to write\n", __func__, con); in ceph_sock_write_space()
407 struct ceph_connection *con = sk->sk_user_data; in ceph_sock_state_change() local
410 con, con->state, sk->sk_state); in ceph_sock_state_change()
418 con_sock_state_closing(con); in ceph_sock_state_change()
419 con_flag_set(con, CON_FLAG_SOCK_CLOSED); in ceph_sock_state_change()
420 queue_con(con); in ceph_sock_state_change()
424 con_sock_state_connected(con); in ceph_sock_state_change()
425 queue_con(con); in ceph_sock_state_change()
436 struct ceph_connection *con) in set_sock_callbacks() argument
439 sk->sk_user_data = con; in set_sock_callbacks()
453 static int ceph_tcp_connect(struct ceph_connection *con) in ceph_tcp_connect() argument
455 struct sockaddr_storage ss = con->peer_addr.in_addr; /* align */ in ceph_tcp_connect()
460 BUG_ON(con->sock); in ceph_tcp_connect()
464 ret = sock_create_kern(read_pnet(&con->msgr->net), ss.ss_family, in ceph_tcp_connect()
475 set_sock_callbacks(sock, con); in ceph_tcp_connect()
477 dout("connect %s\n", ceph_pr_addr(&con->peer_addr)); in ceph_tcp_connect()
479 con_sock_state_connecting(con); in ceph_tcp_connect()
484 ceph_pr_addr(&con->peer_addr), in ceph_tcp_connect()
488 ceph_pr_addr(&con->peer_addr), ret); in ceph_tcp_connect()
493 if (ceph_test_opt(from_msgr(con->msgr), TCP_NODELAY)) in ceph_tcp_connect()
496 con->sock = sock; in ceph_tcp_connect()
593 static int con_close_socket(struct ceph_connection *con) in con_close_socket() argument
597 dout("con_close_socket on %p sock %p\n", con, con->sock); in con_close_socket()
598 if (con->sock) { in con_close_socket()
599 rc = con->sock->ops->shutdown(con->sock, SHUT_RDWR); in con_close_socket()
600 sock_release(con->sock); in con_close_socket()
601 con->sock = NULL; in con_close_socket()
610 con_flag_clear(con, CON_FLAG_SOCK_CLOSED); in con_close_socket()
612 con_sock_state_closed(con); in con_close_socket()
635 static void reset_connection(struct ceph_connection *con) in reset_connection() argument
639 dout("reset_connection %p\n", con); in reset_connection()
640 ceph_msg_remove_list(&con->out_queue); in reset_connection()
641 ceph_msg_remove_list(&con->out_sent); in reset_connection()
643 if (con->in_msg) { in reset_connection()
644 BUG_ON(con->in_msg->con != con); in reset_connection()
645 ceph_msg_put(con->in_msg); in reset_connection()
646 con->in_msg = NULL; in reset_connection()
649 con->connect_seq = 0; in reset_connection()
650 con->out_seq = 0; in reset_connection()
651 if (con->out_msg) { in reset_connection()
652 BUG_ON(con->out_msg->con != con); in reset_connection()
653 ceph_msg_put(con->out_msg); in reset_connection()
654 con->out_msg = NULL; in reset_connection()
656 con->in_seq = 0; in reset_connection()
657 con->in_seq_acked = 0; in reset_connection()
659 con->out_skip = 0; in reset_connection()
665 void ceph_con_close(struct ceph_connection *con) in ceph_con_close() argument
667 mutex_lock(&con->mutex); in ceph_con_close()
668 dout("con_close %p peer %s\n", con, ceph_pr_addr(&con->peer_addr)); in ceph_con_close()
669 con->state = CON_STATE_CLOSED; in ceph_con_close()
671 con_flag_clear(con, CON_FLAG_LOSSYTX); /* so we retry next connect */ in ceph_con_close()
672 con_flag_clear(con, CON_FLAG_KEEPALIVE_PENDING); in ceph_con_close()
673 con_flag_clear(con, CON_FLAG_WRITE_PENDING); in ceph_con_close()
674 con_flag_clear(con, CON_FLAG_BACKOFF); in ceph_con_close()
676 reset_connection(con); in ceph_con_close()
677 con->peer_global_seq = 0; in ceph_con_close()
678 cancel_con(con); in ceph_con_close()
679 con_close_socket(con); in ceph_con_close()
680 mutex_unlock(&con->mutex); in ceph_con_close()
687 void ceph_con_open(struct ceph_connection *con, in ceph_con_open() argument
691 mutex_lock(&con->mutex); in ceph_con_open()
692 dout("con_open %p %s\n", con, ceph_pr_addr(addr)); in ceph_con_open()
694 WARN_ON(con->state != CON_STATE_CLOSED); in ceph_con_open()
695 con->state = CON_STATE_PREOPEN; in ceph_con_open()
697 con->peer_name.type = (__u8) entity_type; in ceph_con_open()
698 con->peer_name.num = cpu_to_le64(entity_num); in ceph_con_open()
700 memcpy(&con->peer_addr, addr, sizeof(*addr)); in ceph_con_open()
701 con->delay = 0; /* reset backoff memory */ in ceph_con_open()
702 mutex_unlock(&con->mutex); in ceph_con_open()
703 queue_con(con); in ceph_con_open()
710 bool ceph_con_opened(struct ceph_connection *con) in ceph_con_opened() argument
712 return con->connect_seq > 0; in ceph_con_opened()
718 void ceph_con_init(struct ceph_connection *con, void *private, in ceph_con_init() argument
722 dout("con_init %p\n", con); in ceph_con_init()
723 memset(con, 0, sizeof(*con)); in ceph_con_init()
724 con->private = private; in ceph_con_init()
725 con->ops = ops; in ceph_con_init()
726 con->msgr = msgr; in ceph_con_init()
728 con_sock_state_init(con); in ceph_con_init()
730 mutex_init(&con->mutex); in ceph_con_init()
731 INIT_LIST_HEAD(&con->out_queue); in ceph_con_init()
732 INIT_LIST_HEAD(&con->out_sent); in ceph_con_init()
733 INIT_DELAYED_WORK(&con->work, ceph_con_workfn); in ceph_con_init()
735 con->state = CON_STATE_CLOSED; in ceph_con_init()
756 static void con_out_kvec_reset(struct ceph_connection *con) in con_out_kvec_reset() argument
758 BUG_ON(con->out_skip); in con_out_kvec_reset()
760 con->out_kvec_left = 0; in con_out_kvec_reset()
761 con->out_kvec_bytes = 0; in con_out_kvec_reset()
762 con->out_kvec_cur = &con->out_kvec[0]; in con_out_kvec_reset()
765 static void con_out_kvec_add(struct ceph_connection *con, in con_out_kvec_add() argument
768 int index = con->out_kvec_left; in con_out_kvec_add()
770 BUG_ON(con->out_skip); in con_out_kvec_add()
771 BUG_ON(index >= ARRAY_SIZE(con->out_kvec)); in con_out_kvec_add()
773 con->out_kvec[index].iov_len = size; in con_out_kvec_add()
774 con->out_kvec[index].iov_base = data; in con_out_kvec_add()
775 con->out_kvec_left++; in con_out_kvec_add()
776 con->out_kvec_bytes += size; in con_out_kvec_add()
784 static int con_out_kvec_skip(struct ceph_connection *con) in con_out_kvec_skip() argument
786 int off = con->out_kvec_cur - con->out_kvec; in con_out_kvec_skip()
789 if (con->out_kvec_bytes > 0) { in con_out_kvec_skip()
790 skip = con->out_kvec[off + con->out_kvec_left - 1].iov_len; in con_out_kvec_skip()
791 BUG_ON(con->out_kvec_bytes < skip); in con_out_kvec_skip()
792 BUG_ON(!con->out_kvec_left); in con_out_kvec_skip()
793 con->out_kvec_bytes -= skip; in con_out_kvec_skip()
794 con->out_kvec_left--; in con_out_kvec_skip()
1213 static size_t sizeof_footer(struct ceph_connection *con) in sizeof_footer() argument
1215 return (con->peer_features & CEPH_FEATURE_MSG_AUTH) ? in sizeof_footer()
1231 static void prepare_write_message_footer(struct ceph_connection *con) in prepare_write_message_footer() argument
1233 struct ceph_msg *m = con->out_msg; in prepare_write_message_footer()
1237 dout("prepare_write_message_footer %p\n", con); in prepare_write_message_footer()
1238 con_out_kvec_add(con, sizeof_footer(con), &m->footer); in prepare_write_message_footer()
1239 if (con->peer_features & CEPH_FEATURE_MSG_AUTH) { in prepare_write_message_footer()
1240 if (con->ops->sign_message) in prepare_write_message_footer()
1241 con->ops->sign_message(m); in prepare_write_message_footer()
1247 con->out_more = m->more_to_follow; in prepare_write_message_footer()
1248 con->out_msg_done = true; in prepare_write_message_footer()
1254 static void prepare_write_message(struct ceph_connection *con) in prepare_write_message() argument
1259 con_out_kvec_reset(con); in prepare_write_message()
1260 con->out_msg_done = false; in prepare_write_message()
1264 if (con->in_seq > con->in_seq_acked) { in prepare_write_message()
1265 con->in_seq_acked = con->in_seq; in prepare_write_message()
1266 con_out_kvec_add(con, sizeof (tag_ack), &tag_ack); in prepare_write_message()
1267 con->out_temp_ack = cpu_to_le64(con->in_seq_acked); in prepare_write_message()
1268 con_out_kvec_add(con, sizeof (con->out_temp_ack), in prepare_write_message()
1269 &con->out_temp_ack); in prepare_write_message()
1272 BUG_ON(list_empty(&con->out_queue)); in prepare_write_message()
1273 m = list_first_entry(&con->out_queue, struct ceph_msg, list_head); in prepare_write_message()
1274 con->out_msg = m; in prepare_write_message()
1275 BUG_ON(m->con != con); in prepare_write_message()
1279 list_move_tail(&m->list_head, &con->out_sent); in prepare_write_message()
1286 m->hdr.seq = cpu_to_le64(++con->out_seq); in prepare_write_message()
1289 if (con->ops->reencode_message) in prepare_write_message()
1290 con->ops->reencode_message(m); in prepare_write_message()
1294 m, con->out_seq, le16_to_cpu(m->hdr.type), in prepare_write_message()
1301 con_out_kvec_add(con, sizeof (tag_msg), &tag_msg); in prepare_write_message()
1302 con_out_kvec_add(con, sizeof(con->out_hdr), &con->out_hdr); in prepare_write_message()
1303 con_out_kvec_add(con, m->front.iov_len, m->front.iov_base); in prepare_write_message()
1306 con_out_kvec_add(con, m->middle->vec.iov_len, in prepare_write_message()
1311 con->out_msg->hdr.crc = cpu_to_le32(crc); in prepare_write_message()
1312 memcpy(&con->out_hdr, &con->out_msg->hdr, sizeof(con->out_hdr)); in prepare_write_message()
1316 con->out_msg->footer.front_crc = cpu_to_le32(crc); in prepare_write_message()
1320 con->out_msg->footer.middle_crc = cpu_to_le32(crc); in prepare_write_message()
1322 con->out_msg->footer.middle_crc = 0; in prepare_write_message()
1324 le32_to_cpu(con->out_msg->footer.front_crc), in prepare_write_message()
1325 le32_to_cpu(con->out_msg->footer.middle_crc)); in prepare_write_message()
1326 con->out_msg->footer.flags = 0; in prepare_write_message()
1329 con->out_msg->footer.data_crc = 0; in prepare_write_message()
1331 prepare_message_data(con->out_msg, m->data_length); in prepare_write_message()
1332 con->out_more = 1; /* data + footer will follow */ in prepare_write_message()
1335 prepare_write_message_footer(con); in prepare_write_message()
1338 con_flag_set(con, CON_FLAG_WRITE_PENDING); in prepare_write_message()
1344 static void prepare_write_ack(struct ceph_connection *con) in prepare_write_ack() argument
1346 dout("prepare_write_ack %p %llu -> %llu\n", con, in prepare_write_ack()
1347 con->in_seq_acked, con->in_seq); in prepare_write_ack()
1348 con->in_seq_acked = con->in_seq; in prepare_write_ack()
1350 con_out_kvec_reset(con); in prepare_write_ack()
1352 con_out_kvec_add(con, sizeof (tag_ack), &tag_ack); in prepare_write_ack()
1354 con->out_temp_ack = cpu_to_le64(con->in_seq_acked); in prepare_write_ack()
1355 con_out_kvec_add(con, sizeof (con->out_temp_ack), in prepare_write_ack()
1356 &con->out_temp_ack); in prepare_write_ack()
1358 con->out_more = 1; /* more will follow.. eventually.. */ in prepare_write_ack()
1359 con_flag_set(con, CON_FLAG_WRITE_PENDING); in prepare_write_ack()
1365 static void prepare_write_seq(struct ceph_connection *con) in prepare_write_seq() argument
1367 dout("prepare_write_seq %p %llu -> %llu\n", con, in prepare_write_seq()
1368 con->in_seq_acked, con->in_seq); in prepare_write_seq()
1369 con->in_seq_acked = con->in_seq; in prepare_write_seq()
1371 con_out_kvec_reset(con); in prepare_write_seq()
1373 con->out_temp_ack = cpu_to_le64(con->in_seq_acked); in prepare_write_seq()
1374 con_out_kvec_add(con, sizeof (con->out_temp_ack), in prepare_write_seq()
1375 &con->out_temp_ack); in prepare_write_seq()
1377 con_flag_set(con, CON_FLAG_WRITE_PENDING); in prepare_write_seq()
1383 static void prepare_write_keepalive(struct ceph_connection *con) in prepare_write_keepalive() argument
1385 dout("prepare_write_keepalive %p\n", con); in prepare_write_keepalive()
1386 con_out_kvec_reset(con); in prepare_write_keepalive()
1387 if (con->peer_features & CEPH_FEATURE_MSGR_KEEPALIVE2) { in prepare_write_keepalive()
1391 con_out_kvec_add(con, sizeof(tag_keepalive2), &tag_keepalive2); in prepare_write_keepalive()
1392 ceph_encode_timespec64(&con->out_temp_keepalive2, &now); in prepare_write_keepalive()
1393 con_out_kvec_add(con, sizeof(con->out_temp_keepalive2), in prepare_write_keepalive()
1394 &con->out_temp_keepalive2); in prepare_write_keepalive()
1396 con_out_kvec_add(con, sizeof(tag_keepalive), &tag_keepalive); in prepare_write_keepalive()
1398 con_flag_set(con, CON_FLAG_WRITE_PENDING); in prepare_write_keepalive()
1405 static int get_connect_authorizer(struct ceph_connection *con) in get_connect_authorizer() argument
1410 if (!con->ops->get_authorizer) { in get_connect_authorizer()
1411 con->auth = NULL; in get_connect_authorizer()
1412 con->out_connect.authorizer_protocol = CEPH_AUTH_UNKNOWN; in get_connect_authorizer()
1413 con->out_connect.authorizer_len = 0; in get_connect_authorizer()
1417 auth = con->ops->get_authorizer(con, &auth_proto, con->auth_retry); in get_connect_authorizer()
1421 con->auth = auth; in get_connect_authorizer()
1422 con->out_connect.authorizer_protocol = cpu_to_le32(auth_proto); in get_connect_authorizer()
1423 con->out_connect.authorizer_len = cpu_to_le32(auth->authorizer_buf_len); in get_connect_authorizer()
1430 static void prepare_write_banner(struct ceph_connection *con) in prepare_write_banner() argument
1432 con_out_kvec_add(con, strlen(CEPH_BANNER), CEPH_BANNER); in prepare_write_banner()
1433 con_out_kvec_add(con, sizeof (con->msgr->my_enc_addr), in prepare_write_banner()
1434 &con->msgr->my_enc_addr); in prepare_write_banner()
1436 con->out_more = 0; in prepare_write_banner()
1437 con_flag_set(con, CON_FLAG_WRITE_PENDING); in prepare_write_banner()
1440 static void __prepare_write_connect(struct ceph_connection *con) in __prepare_write_connect() argument
1442 con_out_kvec_add(con, sizeof(con->out_connect), &con->out_connect); in __prepare_write_connect()
1443 if (con->auth) in __prepare_write_connect()
1444 con_out_kvec_add(con, con->auth->authorizer_buf_len, in __prepare_write_connect()
1445 con->auth->authorizer_buf); in __prepare_write_connect()
1447 con->out_more = 0; in __prepare_write_connect()
1448 con_flag_set(con, CON_FLAG_WRITE_PENDING); in __prepare_write_connect()
1451 static int prepare_write_connect(struct ceph_connection *con) in prepare_write_connect() argument
1453 unsigned int global_seq = get_global_seq(con->msgr, 0); in prepare_write_connect()
1457 switch (con->peer_name.type) { in prepare_write_connect()
1471 dout("prepare_write_connect %p cseq=%d gseq=%d proto=%d\n", con, in prepare_write_connect()
1472 con->connect_seq, global_seq, proto); in prepare_write_connect()
1474 con->out_connect.features = in prepare_write_connect()
1475 cpu_to_le64(from_msgr(con->msgr)->supported_features); in prepare_write_connect()
1476 con->out_connect.host_type = cpu_to_le32(CEPH_ENTITY_TYPE_CLIENT); in prepare_write_connect()
1477 con->out_connect.connect_seq = cpu_to_le32(con->connect_seq); in prepare_write_connect()
1478 con->out_connect.global_seq = cpu_to_le32(global_seq); in prepare_write_connect()
1479 con->out_connect.protocol_version = cpu_to_le32(proto); in prepare_write_connect()
1480 con->out_connect.flags = 0; in prepare_write_connect()
1482 ret = get_connect_authorizer(con); in prepare_write_connect()
1486 __prepare_write_connect(con); in prepare_write_connect()
1496 static int write_partial_kvec(struct ceph_connection *con) in write_partial_kvec() argument
1500 dout("write_partial_kvec %p %d left\n", con, con->out_kvec_bytes); in write_partial_kvec()
1501 while (con->out_kvec_bytes > 0) { in write_partial_kvec()
1502 ret = ceph_tcp_sendmsg(con->sock, con->out_kvec_cur, in write_partial_kvec()
1503 con->out_kvec_left, con->out_kvec_bytes, in write_partial_kvec()
1504 con->out_more); in write_partial_kvec()
1507 con->out_kvec_bytes -= ret; in write_partial_kvec()
1508 if (con->out_kvec_bytes == 0) in write_partial_kvec()
1512 while (ret >= con->out_kvec_cur->iov_len) { in write_partial_kvec()
1513 BUG_ON(!con->out_kvec_left); in write_partial_kvec()
1514 ret -= con->out_kvec_cur->iov_len; in write_partial_kvec()
1515 con->out_kvec_cur++; in write_partial_kvec()
1516 con->out_kvec_left--; in write_partial_kvec()
1520 con->out_kvec_cur->iov_len -= ret; in write_partial_kvec()
1521 con->out_kvec_cur->iov_base += ret; in write_partial_kvec()
1524 con->out_kvec_left = 0; in write_partial_kvec()
1527 dout("write_partial_kvec %p %d left in %d kvecs ret = %d\n", con, in write_partial_kvec()
1528 con->out_kvec_bytes, con->out_kvec_left, ret); in write_partial_kvec()
1552 static int write_partial_message_data(struct ceph_connection *con) in write_partial_message_data() argument
1554 struct ceph_msg *msg = con->out_msg; in write_partial_message_data()
1556 bool do_datacrc = !ceph_test_opt(from_msgr(con->msgr), NOCRC); in write_partial_message_data()
1560 dout("%s %p msg %p\n", __func__, con, msg); in write_partial_message_data()
1588 ret = ceph_tcp_sendpage(con->sock, page, page_offset, length, in write_partial_message_data()
1601 dout("%s %p msg %p done\n", __func__, con, msg); in write_partial_message_data()
1608 con_out_kvec_reset(con); in write_partial_message_data()
1609 prepare_write_message_footer(con); in write_partial_message_data()
1617 static int write_partial_skip(struct ceph_connection *con) in write_partial_skip() argument
1622 dout("%s %p %d left\n", __func__, con, con->out_skip); in write_partial_skip()
1623 while (con->out_skip > 0) { in write_partial_skip()
1624 size_t size = min(con->out_skip, (int) PAGE_SIZE); in write_partial_skip()
1626 if (size == con->out_skip) in write_partial_skip()
1628 ret = ceph_tcp_sendpage(con->sock, zero_page, 0, size, more); in write_partial_skip()
1631 con->out_skip -= ret; in write_partial_skip()
1641 static void prepare_read_banner(struct ceph_connection *con) in prepare_read_banner() argument
1643 dout("prepare_read_banner %p\n", con); in prepare_read_banner()
1644 con->in_base_pos = 0; in prepare_read_banner()
1647 static void prepare_read_connect(struct ceph_connection *con) in prepare_read_connect() argument
1649 dout("prepare_read_connect %p\n", con); in prepare_read_connect()
1650 con->in_base_pos = 0; in prepare_read_connect()
1653 static void prepare_read_ack(struct ceph_connection *con) in prepare_read_ack() argument
1655 dout("prepare_read_ack %p\n", con); in prepare_read_ack()
1656 con->in_base_pos = 0; in prepare_read_ack()
1659 static void prepare_read_seq(struct ceph_connection *con) in prepare_read_seq() argument
1661 dout("prepare_read_seq %p\n", con); in prepare_read_seq()
1662 con->in_base_pos = 0; in prepare_read_seq()
1663 con->in_tag = CEPH_MSGR_TAG_SEQ; in prepare_read_seq()
1666 static void prepare_read_tag(struct ceph_connection *con) in prepare_read_tag() argument
1668 dout("prepare_read_tag %p\n", con); in prepare_read_tag()
1669 con->in_base_pos = 0; in prepare_read_tag()
1670 con->in_tag = CEPH_MSGR_TAG_READY; in prepare_read_tag()
1673 static void prepare_read_keepalive_ack(struct ceph_connection *con) in prepare_read_keepalive_ack() argument
1675 dout("prepare_read_keepalive_ack %p\n", con); in prepare_read_keepalive_ack()
1676 con->in_base_pos = 0; in prepare_read_keepalive_ack()
1682 static int prepare_read_message(struct ceph_connection *con) in prepare_read_message() argument
1684 dout("prepare_read_message %p\n", con); in prepare_read_message()
1685 BUG_ON(con->in_msg != NULL); in prepare_read_message()
1686 con->in_base_pos = 0; in prepare_read_message()
1687 con->in_front_crc = con->in_middle_crc = con->in_data_crc = 0; in prepare_read_message()
1692 static int read_partial(struct ceph_connection *con, in read_partial() argument
1695 while (con->in_base_pos < end) { in read_partial()
1696 int left = end - con->in_base_pos; in read_partial()
1698 int ret = ceph_tcp_recvmsg(con->sock, object + have, left); in read_partial()
1701 con->in_base_pos += ret; in read_partial()
1710 static int read_partial_banner(struct ceph_connection *con) in read_partial_banner() argument
1716 dout("read_partial_banner %p at %d\n", con, con->in_base_pos); in read_partial_banner()
1721 ret = read_partial(con, end, size, con->in_banner); in read_partial_banner()
1725 size = sizeof (con->actual_peer_addr); in read_partial_banner()
1727 ret = read_partial(con, end, size, &con->actual_peer_addr); in read_partial_banner()
1730 ceph_decode_banner_addr(&con->actual_peer_addr); in read_partial_banner()
1732 size = sizeof (con->peer_addr_for_me); in read_partial_banner()
1734 ret = read_partial(con, end, size, &con->peer_addr_for_me); in read_partial_banner()
1737 ceph_decode_banner_addr(&con->peer_addr_for_me); in read_partial_banner()
1743 static int read_partial_connect(struct ceph_connection *con) in read_partial_connect() argument
1749 dout("read_partial_connect %p at %d\n", con, con->in_base_pos); in read_partial_connect()
1751 size = sizeof (con->in_reply); in read_partial_connect()
1753 ret = read_partial(con, end, size, &con->in_reply); in read_partial_connect()
1757 if (con->auth) { in read_partial_connect()
1758 size = le32_to_cpu(con->in_reply.authorizer_len); in read_partial_connect()
1759 if (size > con->auth->authorizer_reply_buf_len) { in read_partial_connect()
1761 con->auth->authorizer_reply_buf_len); in read_partial_connect()
1767 ret = read_partial(con, end, size, in read_partial_connect()
1768 con->auth->authorizer_reply_buf); in read_partial_connect()
1774 con, (int)con->in_reply.tag, in read_partial_connect()
1775 le32_to_cpu(con->in_reply.connect_seq), in read_partial_connect()
1776 le32_to_cpu(con->in_reply.global_seq)); in read_partial_connect()
1784 static int verify_hello(struct ceph_connection *con) in verify_hello() argument
1786 if (memcmp(con->in_banner, CEPH_BANNER, strlen(CEPH_BANNER))) { in verify_hello()
1788 ceph_pr_addr(&con->peer_addr)); in verify_hello()
1789 con->error_msg = "protocol error, bad banner"; in verify_hello()
2003 static int process_banner(struct ceph_connection *con) in process_banner() argument
2005 dout("process_banner on %p\n", con); in process_banner()
2007 if (verify_hello(con) < 0) in process_banner()
2015 if (memcmp(&con->peer_addr, &con->actual_peer_addr, in process_banner()
2016 sizeof(con->peer_addr)) != 0 && in process_banner()
2017 !(addr_is_blank(&con->actual_peer_addr) && in process_banner()
2018 con->actual_peer_addr.nonce == con->peer_addr.nonce)) { in process_banner()
2020 ceph_pr_addr(&con->peer_addr), in process_banner()
2021 le32_to_cpu(con->peer_addr.nonce), in process_banner()
2022 ceph_pr_addr(&con->actual_peer_addr), in process_banner()
2023 le32_to_cpu(con->actual_peer_addr.nonce)); in process_banner()
2024 con->error_msg = "wrong peer at address"; in process_banner()
2031 if (addr_is_blank(&con->msgr->inst.addr)) { in process_banner()
2032 int port = addr_port(&con->msgr->inst.addr); in process_banner()
2034 memcpy(&con->msgr->inst.addr.in_addr, in process_banner()
2035 &con->peer_addr_for_me.in_addr, in process_banner()
2036 sizeof(con->peer_addr_for_me.in_addr)); in process_banner()
2037 addr_set_port(&con->msgr->inst.addr, port); in process_banner()
2038 encode_my_addr(con->msgr); in process_banner()
2040 ceph_pr_addr(&con->msgr->inst.addr)); in process_banner()
2046 static int process_connect(struct ceph_connection *con) in process_connect() argument
2048 u64 sup_feat = from_msgr(con->msgr)->supported_features; in process_connect()
2049 u64 req_feat = from_msgr(con->msgr)->required_features; in process_connect()
2050 u64 server_feat = le64_to_cpu(con->in_reply.features); in process_connect()
2053 dout("process_connect on %p tag %d\n", con, (int)con->in_tag); in process_connect()
2055 if (con->auth) { in process_connect()
2056 int len = le32_to_cpu(con->in_reply.authorizer_len); in process_connect()
2065 if (con->in_reply.tag == CEPH_MSGR_TAG_CHALLENGE_AUTHORIZER) { in process_connect()
2066 ret = con->ops->add_authorizer_challenge( in process_connect()
2067 con, con->auth->authorizer_reply_buf, len); in process_connect()
2071 con_out_kvec_reset(con); in process_connect()
2072 __prepare_write_connect(con); in process_connect()
2073 prepare_read_connect(con); in process_connect()
2078 ret = con->ops->verify_authorizer_reply(con); in process_connect()
2080 con->error_msg = "bad authorize reply"; in process_connect()
2086 switch (con->in_reply.tag) { in process_connect()
2090 ENTITY_NAME(con->peer_name), in process_connect()
2091 ceph_pr_addr(&con->peer_addr), in process_connect()
2093 con->error_msg = "missing required protocol features"; in process_connect()
2094 reset_connection(con); in process_connect()
2100 ENTITY_NAME(con->peer_name), in process_connect()
2101 ceph_pr_addr(&con->peer_addr), in process_connect()
2102 le32_to_cpu(con->out_connect.protocol_version), in process_connect()
2103 le32_to_cpu(con->in_reply.protocol_version)); in process_connect()
2104 con->error_msg = "protocol version mismatch"; in process_connect()
2105 reset_connection(con); in process_connect()
2109 con->auth_retry++; in process_connect()
2110 dout("process_connect %p got BADAUTHORIZER attempt %d\n", con, in process_connect()
2111 con->auth_retry); in process_connect()
2112 if (con->auth_retry == 2) { in process_connect()
2113 con->error_msg = "connect authorization failure"; in process_connect()
2116 con_out_kvec_reset(con); in process_connect()
2117 ret = prepare_write_connect(con); in process_connect()
2120 prepare_read_connect(con); in process_connect()
2132 le32_to_cpu(con->in_reply.connect_seq)); in process_connect()
2134 ENTITY_NAME(con->peer_name), in process_connect()
2135 ceph_pr_addr(&con->peer_addr)); in process_connect()
2136 reset_connection(con); in process_connect()
2137 con_out_kvec_reset(con); in process_connect()
2138 ret = prepare_write_connect(con); in process_connect()
2141 prepare_read_connect(con); in process_connect()
2144 mutex_unlock(&con->mutex); in process_connect()
2145 pr_info("reset on %s%lld\n", ENTITY_NAME(con->peer_name)); in process_connect()
2146 if (con->ops->peer_reset) in process_connect()
2147 con->ops->peer_reset(con); in process_connect()
2148 mutex_lock(&con->mutex); in process_connect()
2149 if (con->state != CON_STATE_NEGOTIATING) in process_connect()
2159 le32_to_cpu(con->out_connect.connect_seq), in process_connect()
2160 le32_to_cpu(con->in_reply.connect_seq)); in process_connect()
2161 con->connect_seq = le32_to_cpu(con->in_reply.connect_seq); in process_connect()
2162 con_out_kvec_reset(con); in process_connect()
2163 ret = prepare_write_connect(con); in process_connect()
2166 prepare_read_connect(con); in process_connect()
2175 con->peer_global_seq, in process_connect()
2176 le32_to_cpu(con->in_reply.global_seq)); in process_connect()
2177 get_global_seq(con->msgr, in process_connect()
2178 le32_to_cpu(con->in_reply.global_seq)); in process_connect()
2179 con_out_kvec_reset(con); in process_connect()
2180 ret = prepare_write_connect(con); in process_connect()
2183 prepare_read_connect(con); in process_connect()
2191 ENTITY_NAME(con->peer_name), in process_connect()
2192 ceph_pr_addr(&con->peer_addr), in process_connect()
2194 con->error_msg = "missing required protocol features"; in process_connect()
2195 reset_connection(con); in process_connect()
2199 WARN_ON(con->state != CON_STATE_NEGOTIATING); in process_connect()
2200 con->state = CON_STATE_OPEN; in process_connect()
2201 con->auth_retry = 0; /* we authenticated; clear flag */ in process_connect()
2202 con->peer_global_seq = le32_to_cpu(con->in_reply.global_seq); in process_connect()
2203 con->connect_seq++; in process_connect()
2204 con->peer_features = server_feat; in process_connect()
2206 con->peer_global_seq, in process_connect()
2207 le32_to_cpu(con->in_reply.connect_seq), in process_connect()
2208 con->connect_seq); in process_connect()
2209 WARN_ON(con->connect_seq != in process_connect()
2210 le32_to_cpu(con->in_reply.connect_seq)); in process_connect()
2212 if (con->in_reply.flags & CEPH_MSG_CONNECT_LOSSY) in process_connect()
2213 con_flag_set(con, CON_FLAG_LOSSYTX); in process_connect()
2215 con->delay = 0; /* reset backoff memory */ in process_connect()
2217 if (con->in_reply.tag == CEPH_MSGR_TAG_SEQ) { in process_connect()
2218 prepare_write_seq(con); in process_connect()
2219 prepare_read_seq(con); in process_connect()
2221 prepare_read_tag(con); in process_connect()
2232 con->error_msg = "protocol error, got WAIT as client"; in process_connect()
2236 con->error_msg = "protocol error, garbage tag during connect"; in process_connect()
2246 static int read_partial_ack(struct ceph_connection *con) in read_partial_ack() argument
2248 int size = sizeof (con->in_temp_ack); in read_partial_ack()
2251 return read_partial(con, end, size, &con->in_temp_ack); in read_partial_ack()
2257 static void process_ack(struct ceph_connection *con) in process_ack() argument
2260 u64 ack = le64_to_cpu(con->in_temp_ack); in process_ack()
2262 bool reconnect = (con->in_tag == CEPH_MSGR_TAG_SEQ); in process_ack()
2263 struct list_head *list = reconnect ? &con->out_queue : &con->out_sent; in process_ack()
2283 prepare_read_tag(con); in process_ack()
2287 static int read_partial_message_section(struct ceph_connection *con, in read_partial_message_section() argument
2298 ret = ceph_tcp_recvmsg(con->sock, (char *)section->iov_base + in read_partial_message_section()
2310 static int read_partial_msg_data(struct ceph_connection *con) in read_partial_msg_data() argument
2312 struct ceph_msg *msg = con->in_msg; in read_partial_msg_data()
2314 bool do_datacrc = !ceph_test_opt(from_msgr(con->msgr), NOCRC); in read_partial_msg_data()
2325 crc = con->in_data_crc; in read_partial_msg_data()
2333 ret = ceph_tcp_recvpage(con->sock, page, page_offset, length); in read_partial_msg_data()
2336 con->in_data_crc = crc; in read_partial_msg_data()
2346 con->in_data_crc = crc; in read_partial_msg_data()
2354 static int ceph_con_in_msg_alloc(struct ceph_connection *con, int *skip);
2356 static int read_partial_message(struct ceph_connection *con) in read_partial_message() argument
2358 struct ceph_msg *m = con->in_msg; in read_partial_message()
2363 bool do_datacrc = !ceph_test_opt(from_msgr(con->msgr), NOCRC); in read_partial_message()
2364 bool need_sign = (con->peer_features & CEPH_FEATURE_MSG_AUTH); in read_partial_message()
2368 dout("read_partial_message con %p msg %p\n", con, m); in read_partial_message()
2371 size = sizeof (con->in_hdr); in read_partial_message()
2373 ret = read_partial(con, end, size, &con->in_hdr); in read_partial_message()
2377 crc = crc32c(0, &con->in_hdr, offsetof(struct ceph_msg_header, crc)); in read_partial_message()
2378 if (cpu_to_le32(crc) != con->in_hdr.crc) { in read_partial_message()
2380 crc, con->in_hdr.crc); in read_partial_message()
2384 front_len = le32_to_cpu(con->in_hdr.front_len); in read_partial_message()
2387 middle_len = le32_to_cpu(con->in_hdr.middle_len); in read_partial_message()
2390 data_len = le32_to_cpu(con->in_hdr.data_len); in read_partial_message()
2395 seq = le64_to_cpu(con->in_hdr.seq); in read_partial_message()
2396 if ((s64)seq - (s64)con->in_seq < 1) { in read_partial_message()
2398 ENTITY_NAME(con->peer_name), in read_partial_message()
2399 ceph_pr_addr(&con->peer_addr), in read_partial_message()
2400 seq, con->in_seq + 1); in read_partial_message()
2401 con->in_base_pos = -front_len - middle_len - data_len - in read_partial_message()
2402 sizeof_footer(con); in read_partial_message()
2403 con->in_tag = CEPH_MSGR_TAG_READY; in read_partial_message()
2405 } else if ((s64)seq - (s64)con->in_seq > 1) { in read_partial_message()
2407 seq, con->in_seq + 1); in read_partial_message()
2408 con->error_msg = "bad message sequence # for incoming message"; in read_partial_message()
2413 if (!con->in_msg) { in read_partial_message()
2416 dout("got hdr type %d front %d data %d\n", con->in_hdr.type, in read_partial_message()
2418 ret = ceph_con_in_msg_alloc(con, &skip); in read_partial_message()
2422 BUG_ON(!con->in_msg ^ skip); in read_partial_message()
2426 con->in_base_pos = -front_len - middle_len - data_len - in read_partial_message()
2427 sizeof_footer(con); in read_partial_message()
2428 con->in_tag = CEPH_MSGR_TAG_READY; in read_partial_message()
2429 con->in_seq++; in read_partial_message()
2433 BUG_ON(!con->in_msg); in read_partial_message()
2434 BUG_ON(con->in_msg->con != con); in read_partial_message()
2435 m = con->in_msg; in read_partial_message()
2443 prepare_message_data(con->in_msg, data_len); in read_partial_message()
2447 ret = read_partial_message_section(con, &m->front, front_len, in read_partial_message()
2448 &con->in_front_crc); in read_partial_message()
2454 ret = read_partial_message_section(con, &m->middle->vec, in read_partial_message()
2456 &con->in_middle_crc); in read_partial_message()
2463 ret = read_partial_msg_data(con); in read_partial_message()
2469 size = sizeof_footer(con); in read_partial_message()
2471 ret = read_partial(con, end, size, &m->footer); in read_partial_message()
2485 if (con->in_front_crc != le32_to_cpu(m->footer.front_crc)) { in read_partial_message()
2487 m, con->in_front_crc, m->footer.front_crc); in read_partial_message()
2490 if (con->in_middle_crc != le32_to_cpu(m->footer.middle_crc)) { in read_partial_message()
2492 m, con->in_middle_crc, m->footer.middle_crc); in read_partial_message()
2497 con->in_data_crc != le32_to_cpu(m->footer.data_crc)) { in read_partial_message()
2499 con->in_data_crc, le32_to_cpu(m->footer.data_crc)); in read_partial_message()
2503 if (need_sign && con->ops->check_message_signature && in read_partial_message()
2504 con->ops->check_message_signature(m)) { in read_partial_message()
2517 static void process_message(struct ceph_connection *con) in process_message() argument
2519 struct ceph_msg *msg = con->in_msg; in process_message()
2521 BUG_ON(con->in_msg->con != con); in process_message()
2522 con->in_msg = NULL; in process_message()
2525 if (con->peer_name.type == 0) in process_message()
2526 con->peer_name = msg->hdr.src; in process_message()
2528 con->in_seq++; in process_message()
2529 mutex_unlock(&con->mutex); in process_message()
2538 con->in_front_crc, con->in_middle_crc, con->in_data_crc); in process_message()
2539 con->ops->dispatch(con, msg); in process_message()
2541 mutex_lock(&con->mutex); in process_message()
2544 static int read_keepalive_ack(struct ceph_connection *con) in read_keepalive_ack() argument
2548 int ret = read_partial(con, size, size, &ceph_ts); in read_keepalive_ack()
2551 ceph_decode_timespec64(&con->last_keepalive_ack, &ceph_ts); in read_keepalive_ack()
2552 prepare_read_tag(con); in read_keepalive_ack()
2560 static int try_write(struct ceph_connection *con) in try_write() argument
2564 dout("try_write start %p state %lu\n", con, con->state); in try_write()
2565 if (con->state != CON_STATE_PREOPEN && in try_write()
2566 con->state != CON_STATE_CONNECTING && in try_write()
2567 con->state != CON_STATE_NEGOTIATING && in try_write()
2568 con->state != CON_STATE_OPEN) in try_write()
2572 if (con->state == CON_STATE_PREOPEN) { in try_write()
2573 BUG_ON(con->sock); in try_write()
2574 con->state = CON_STATE_CONNECTING; in try_write()
2576 con_out_kvec_reset(con); in try_write()
2577 prepare_write_banner(con); in try_write()
2578 prepare_read_banner(con); in try_write()
2580 BUG_ON(con->in_msg); in try_write()
2581 con->in_tag = CEPH_MSGR_TAG_READY; in try_write()
2583 con, con->state); in try_write()
2584 ret = ceph_tcp_connect(con); in try_write()
2586 con->error_msg = "connect error"; in try_write()
2592 dout("try_write out_kvec_bytes %d\n", con->out_kvec_bytes); in try_write()
2593 BUG_ON(!con->sock); in try_write()
2596 if (con->out_kvec_left) { in try_write()
2597 ret = write_partial_kvec(con); in try_write()
2601 if (con->out_skip) { in try_write()
2602 ret = write_partial_skip(con); in try_write()
2608 if (con->out_msg) { in try_write()
2609 if (con->out_msg_done) { in try_write()
2610 ceph_msg_put(con->out_msg); in try_write()
2611 con->out_msg = NULL; /* we're done with this one */ in try_write()
2615 ret = write_partial_message_data(con); in try_write()
2628 if (con->state == CON_STATE_OPEN) { in try_write()
2629 if (con_flag_test_and_clear(con, CON_FLAG_KEEPALIVE_PENDING)) { in try_write()
2630 prepare_write_keepalive(con); in try_write()
2634 if (!list_empty(&con->out_queue)) { in try_write()
2635 prepare_write_message(con); in try_write()
2638 if (con->in_seq > con->in_seq_acked) { in try_write()
2639 prepare_write_ack(con); in try_write()
2645 con_flag_clear(con, CON_FLAG_WRITE_PENDING); in try_write()
2649 dout("try_write done on %p ret %d\n", con, ret); in try_write()
2656 static int try_read(struct ceph_connection *con) in try_read() argument
2661 dout("try_read start on %p state %lu\n", con, con->state); in try_read()
2662 if (con->state != CON_STATE_CONNECTING && in try_read()
2663 con->state != CON_STATE_NEGOTIATING && in try_read()
2664 con->state != CON_STATE_OPEN) in try_read()
2667 BUG_ON(!con->sock); in try_read()
2669 dout("try_read tag %d in_base_pos %d\n", (int)con->in_tag, in try_read()
2670 con->in_base_pos); in try_read()
2672 if (con->state == CON_STATE_CONNECTING) { in try_read()
2674 ret = read_partial_banner(con); in try_read()
2677 ret = process_banner(con); in try_read()
2681 con->state = CON_STATE_NEGOTIATING; in try_read()
2688 ret = prepare_write_connect(con); in try_read()
2691 prepare_read_connect(con); in try_read()
2697 if (con->state == CON_STATE_NEGOTIATING) { in try_read()
2699 ret = read_partial_connect(con); in try_read()
2702 ret = process_connect(con); in try_read()
2708 WARN_ON(con->state != CON_STATE_OPEN); in try_read()
2710 if (con->in_base_pos < 0) { in try_read()
2714 ret = ceph_tcp_recvmsg(con->sock, NULL, -con->in_base_pos); in try_read()
2717 dout("skipped %d / %d bytes\n", ret, -con->in_base_pos); in try_read()
2718 con->in_base_pos += ret; in try_read()
2719 if (con->in_base_pos) in try_read()
2722 if (con->in_tag == CEPH_MSGR_TAG_READY) { in try_read()
2726 ret = ceph_tcp_recvmsg(con->sock, &con->in_tag, 1); in try_read()
2729 dout("try_read got tag %d\n", (int)con->in_tag); in try_read()
2730 switch (con->in_tag) { in try_read()
2732 prepare_read_message(con); in try_read()
2735 prepare_read_ack(con); in try_read()
2738 prepare_read_keepalive_ack(con); in try_read()
2741 con_close_socket(con); in try_read()
2742 con->state = CON_STATE_CLOSED; in try_read()
2748 if (con->in_tag == CEPH_MSGR_TAG_MSG) { in try_read()
2749 ret = read_partial_message(con); in try_read()
2753 con->error_msg = "bad crc/signature"; in try_read()
2759 con->error_msg = "io error"; in try_read()
2764 if (con->in_tag == CEPH_MSGR_TAG_READY) in try_read()
2766 process_message(con); in try_read()
2767 if (con->state == CON_STATE_OPEN) in try_read()
2768 prepare_read_tag(con); in try_read()
2771 if (con->in_tag == CEPH_MSGR_TAG_ACK || in try_read()
2772 con->in_tag == CEPH_MSGR_TAG_SEQ) { in try_read()
2777 ret = read_partial_ack(con); in try_read()
2780 process_ack(con); in try_read()
2783 if (con->in_tag == CEPH_MSGR_TAG_KEEPALIVE2_ACK) { in try_read()
2784 ret = read_keepalive_ack(con); in try_read()
2791 dout("try_read done on %p ret %d\n", con, ret); in try_read()
2795 pr_err("try_read bad con->in_tag = %d\n", (int)con->in_tag); in try_read()
2796 con->error_msg = "protocol error, garbage tag"; in try_read()
2807 static int queue_con_delay(struct ceph_connection *con, unsigned long delay) in queue_con_delay() argument
2809 if (!con->ops->get(con)) { in queue_con_delay()
2810 dout("%s %p ref count 0\n", __func__, con); in queue_con_delay()
2814 dout("%s %p %lu\n", __func__, con, delay); in queue_con_delay()
2815 if (!queue_delayed_work(ceph_msgr_wq, &con->work, delay)) { in queue_con_delay()
2816 dout("%s %p - already queued\n", __func__, con); in queue_con_delay()
2817 con->ops->put(con); in queue_con_delay()
2824 static void queue_con(struct ceph_connection *con) in queue_con() argument
2826 (void) queue_con_delay(con, 0); in queue_con()
2829 static void cancel_con(struct ceph_connection *con) in cancel_con() argument
2831 if (cancel_delayed_work(&con->work)) { in cancel_con()
2832 dout("%s %p\n", __func__, con); in cancel_con()
2833 con->ops->put(con); in cancel_con()
2837 static bool con_sock_closed(struct ceph_connection *con) in con_sock_closed() argument
2839 if (!con_flag_test_and_clear(con, CON_FLAG_SOCK_CLOSED)) in con_sock_closed()
2844 con->error_msg = "socket closed (con state " #x ")"; \ in con_sock_closed()
2847 switch (con->state) { in con_sock_closed()
2856 __func__, con, con->state); in con_sock_closed()
2857 con->error_msg = "unrecognized con state"; in con_sock_closed()
2866 static bool con_backoff(struct ceph_connection *con) in con_backoff() argument
2870 if (!con_flag_test_and_clear(con, CON_FLAG_BACKOFF)) in con_backoff()
2873 ret = queue_con_delay(con, round_jiffies_relative(con->delay)); in con_backoff()
2876 con, con->delay); in con_backoff()
2878 con_flag_set(con, CON_FLAG_BACKOFF); in con_backoff()
2886 static void con_fault_finish(struct ceph_connection *con) in con_fault_finish() argument
2888 dout("%s %p\n", __func__, con); in con_fault_finish()
2894 if (con->auth_retry) { in con_fault_finish()
2895 dout("auth_retry %d, invalidating\n", con->auth_retry); in con_fault_finish()
2896 if (con->ops->invalidate_authorizer) in con_fault_finish()
2897 con->ops->invalidate_authorizer(con); in con_fault_finish()
2898 con->auth_retry = 0; in con_fault_finish()
2901 if (con->ops->fault) in con_fault_finish()
2902 con->ops->fault(con); in con_fault_finish()
2910 struct ceph_connection *con = container_of(work, struct ceph_connection, in ceph_con_workfn() local
2914 mutex_lock(&con->mutex); in ceph_con_workfn()
2918 if ((fault = con_sock_closed(con))) { in ceph_con_workfn()
2919 dout("%s: con %p SOCK_CLOSED\n", __func__, con); in ceph_con_workfn()
2922 if (con_backoff(con)) { in ceph_con_workfn()
2923 dout("%s: con %p BACKOFF\n", __func__, con); in ceph_con_workfn()
2926 if (con->state == CON_STATE_STANDBY) { in ceph_con_workfn()
2927 dout("%s: con %p STANDBY\n", __func__, con); in ceph_con_workfn()
2930 if (con->state == CON_STATE_CLOSED) { in ceph_con_workfn()
2931 dout("%s: con %p CLOSED\n", __func__, con); in ceph_con_workfn()
2932 BUG_ON(con->sock); in ceph_con_workfn()
2935 if (con->state == CON_STATE_PREOPEN) { in ceph_con_workfn()
2936 dout("%s: con %p PREOPEN\n", __func__, con); in ceph_con_workfn()
2937 BUG_ON(con->sock); in ceph_con_workfn()
2940 ret = try_read(con); in ceph_con_workfn()
2944 if (!con->error_msg) in ceph_con_workfn()
2945 con->error_msg = "socket error on read"; in ceph_con_workfn()
2950 ret = try_write(con); in ceph_con_workfn()
2954 if (!con->error_msg) in ceph_con_workfn()
2955 con->error_msg = "socket error on write"; in ceph_con_workfn()
2962 con_fault(con); in ceph_con_workfn()
2963 mutex_unlock(&con->mutex); in ceph_con_workfn()
2966 con_fault_finish(con); in ceph_con_workfn()
2968 con->ops->put(con); in ceph_con_workfn()
2975 static void con_fault(struct ceph_connection *con) in con_fault() argument
2978 con, con->state, ceph_pr_addr(&con->peer_addr)); in con_fault()
2980 pr_warn("%s%lld %s %s\n", ENTITY_NAME(con->peer_name), in con_fault()
2981 ceph_pr_addr(&con->peer_addr), con->error_msg); in con_fault()
2982 con->error_msg = NULL; in con_fault()
2984 WARN_ON(con->state != CON_STATE_CONNECTING && in con_fault()
2985 con->state != CON_STATE_NEGOTIATING && in con_fault()
2986 con->state != CON_STATE_OPEN); in con_fault()
2988 con_close_socket(con); in con_fault()
2990 if (con_flag_test(con, CON_FLAG_LOSSYTX)) { in con_fault()
2992 con->state = CON_STATE_CLOSED; in con_fault()
2996 if (con->in_msg) { in con_fault()
2997 BUG_ON(con->in_msg->con != con); in con_fault()
2998 ceph_msg_put(con->in_msg); in con_fault()
2999 con->in_msg = NULL; in con_fault()
3001 if (con->out_msg) { in con_fault()
3002 BUG_ON(con->out_msg->con != con); in con_fault()
3003 ceph_msg_put(con->out_msg); in con_fault()
3004 con->out_msg = NULL; in con_fault()
3008 list_splice_init(&con->out_sent, &con->out_queue); in con_fault()
3012 if (list_empty(&con->out_queue) && in con_fault()
3013 !con_flag_test(con, CON_FLAG_KEEPALIVE_PENDING)) { in con_fault()
3014 dout("fault %p setting STANDBY clearing WRITE_PENDING\n", con); in con_fault()
3015 con_flag_clear(con, CON_FLAG_WRITE_PENDING); in con_fault()
3016 con->state = CON_STATE_STANDBY; in con_fault()
3019 con->state = CON_STATE_PREOPEN; in con_fault()
3020 if (con->delay == 0) in con_fault()
3021 con->delay = BASE_DELAY_INTERVAL; in con_fault()
3022 else if (con->delay < MAX_DELAY_INTERVAL) in con_fault()
3023 con->delay *= 2; in con_fault()
3024 con_flag_set(con, CON_FLAG_BACKOFF); in con_fault()
3025 queue_con(con); in con_fault()
3066 static void msg_con_set(struct ceph_msg *msg, struct ceph_connection *con) in msg_con_set() argument
3068 if (msg->con) in msg_con_set()
3069 msg->con->ops->put(msg->con); in msg_con_set()
3071 msg->con = con ? con->ops->get(con) : NULL; in msg_con_set()
3072 BUG_ON(msg->con != con); in msg_con_set()
3075 static void clear_standby(struct ceph_connection *con) in clear_standby() argument
3078 if (con->state == CON_STATE_STANDBY) { in clear_standby()
3079 dout("clear_standby %p and ++connect_seq\n", con); in clear_standby()
3080 con->state = CON_STATE_PREOPEN; in clear_standby()
3081 con->connect_seq++; in clear_standby()
3082 WARN_ON(con_flag_test(con, CON_FLAG_WRITE_PENDING)); in clear_standby()
3083 WARN_ON(con_flag_test(con, CON_FLAG_KEEPALIVE_PENDING)); in clear_standby()
3090 void ceph_con_send(struct ceph_connection *con, struct ceph_msg *msg) in ceph_con_send() argument
3093 msg->hdr.src = con->msgr->inst.name; in ceph_con_send()
3097 mutex_lock(&con->mutex); in ceph_con_send()
3099 if (con->state == CON_STATE_CLOSED) { in ceph_con_send()
3100 dout("con_send %p closed, dropping %p\n", con, msg); in ceph_con_send()
3102 mutex_unlock(&con->mutex); in ceph_con_send()
3106 msg_con_set(msg, con); in ceph_con_send()
3109 list_add_tail(&msg->list_head, &con->out_queue); in ceph_con_send()
3111 ENTITY_NAME(con->peer_name), le16_to_cpu(msg->hdr.type), in ceph_con_send()
3117 clear_standby(con); in ceph_con_send()
3118 mutex_unlock(&con->mutex); in ceph_con_send()
3122 if (con_flag_test_and_set(con, CON_FLAG_WRITE_PENDING) == 0) in ceph_con_send()
3123 queue_con(con); in ceph_con_send()
3132 struct ceph_connection *con = msg->con; in ceph_msg_revoke() local
3134 if (!con) { in ceph_msg_revoke()
3139 mutex_lock(&con->mutex); in ceph_msg_revoke()
3141 dout("%s %p msg %p - was on queue\n", __func__, con, msg); in ceph_msg_revoke()
3147 if (con->out_msg == msg) { in ceph_msg_revoke()
3148 BUG_ON(con->out_skip); in ceph_msg_revoke()
3150 if (con->out_msg_done) { in ceph_msg_revoke()
3151 con->out_skip += con_out_kvec_skip(con); in ceph_msg_revoke()
3154 con->out_skip += sizeof_footer(con); in ceph_msg_revoke()
3158 con->out_skip += msg->cursor.total_resid; in ceph_msg_revoke()
3160 con->out_skip += con_out_kvec_skip(con); in ceph_msg_revoke()
3161 con->out_skip += con_out_kvec_skip(con); in ceph_msg_revoke()
3164 __func__, con, msg, con->out_kvec_bytes, con->out_skip); in ceph_msg_revoke()
3166 con->out_msg = NULL; in ceph_msg_revoke()
3170 mutex_unlock(&con->mutex); in ceph_msg_revoke()
3178 struct ceph_connection *con = msg->con; in ceph_msg_revoke_incoming() local
3180 if (!con) { in ceph_msg_revoke_incoming()
3185 mutex_lock(&con->mutex); in ceph_msg_revoke_incoming()
3186 if (con->in_msg == msg) { in ceph_msg_revoke_incoming()
3187 unsigned int front_len = le32_to_cpu(con->in_hdr.front_len); in ceph_msg_revoke_incoming()
3188 unsigned int middle_len = le32_to_cpu(con->in_hdr.middle_len); in ceph_msg_revoke_incoming()
3189 unsigned int data_len = le32_to_cpu(con->in_hdr.data_len); in ceph_msg_revoke_incoming()
3192 dout("%s %p msg %p revoked\n", __func__, con, msg); in ceph_msg_revoke_incoming()
3193 con->in_base_pos = con->in_base_pos - in ceph_msg_revoke_incoming()
3199 ceph_msg_put(con->in_msg); in ceph_msg_revoke_incoming()
3200 con->in_msg = NULL; in ceph_msg_revoke_incoming()
3201 con->in_tag = CEPH_MSGR_TAG_READY; in ceph_msg_revoke_incoming()
3202 con->in_seq++; in ceph_msg_revoke_incoming()
3205 __func__, con, con->in_msg, msg); in ceph_msg_revoke_incoming()
3207 mutex_unlock(&con->mutex); in ceph_msg_revoke_incoming()
3213 void ceph_con_keepalive(struct ceph_connection *con) in ceph_con_keepalive() argument
3215 dout("con_keepalive %p\n", con); in ceph_con_keepalive()
3216 mutex_lock(&con->mutex); in ceph_con_keepalive()
3217 clear_standby(con); in ceph_con_keepalive()
3218 con_flag_set(con, CON_FLAG_KEEPALIVE_PENDING); in ceph_con_keepalive()
3219 mutex_unlock(&con->mutex); in ceph_con_keepalive()
3221 if (con_flag_test_and_set(con, CON_FLAG_WRITE_PENDING) == 0) in ceph_con_keepalive()
3222 queue_con(con); in ceph_con_keepalive()
3226 bool ceph_con_keepalive_expired(struct ceph_connection *con, in ceph_con_keepalive_expired() argument
3230 (con->peer_features & CEPH_FEATURE_MSGR_KEEPALIVE2)) { in ceph_con_keepalive_expired()
3235 ts = timespec64_add(con->last_keepalive_ack, ts); in ceph_con_keepalive_expired()
3396 static int ceph_alloc_middle(struct ceph_connection *con, struct ceph_msg *msg) in ceph_alloc_middle() argument
3427 static int ceph_con_in_msg_alloc(struct ceph_connection *con, int *skip) in ceph_con_in_msg_alloc() argument
3429 struct ceph_msg_header *hdr = &con->in_hdr; in ceph_con_in_msg_alloc()
3434 BUG_ON(con->in_msg != NULL); in ceph_con_in_msg_alloc()
3435 BUG_ON(!con->ops->alloc_msg); in ceph_con_in_msg_alloc()
3437 mutex_unlock(&con->mutex); in ceph_con_in_msg_alloc()
3438 msg = con->ops->alloc_msg(con, hdr, skip); in ceph_con_in_msg_alloc()
3439 mutex_lock(&con->mutex); in ceph_con_in_msg_alloc()
3440 if (con->state != CON_STATE_OPEN) { in ceph_con_in_msg_alloc()
3447 msg_con_set(msg, con); in ceph_con_in_msg_alloc()
3448 con->in_msg = msg; in ceph_con_in_msg_alloc()
3458 con->error_msg = "error allocating memory for incoming message"; in ceph_con_in_msg_alloc()
3461 memcpy(&con->in_msg->hdr, &con->in_hdr, sizeof(con->in_hdr)); in ceph_con_in_msg_alloc()
3463 if (middle_len && !con->in_msg->middle) { in ceph_con_in_msg_alloc()
3464 ret = ceph_alloc_middle(con, con->in_msg); in ceph_con_in_msg_alloc()
3466 ceph_msg_put(con->in_msg); in ceph_con_in_msg_alloc()
3467 con->in_msg = NULL; in ceph_con_in_msg_alloc()