Skip to content
Go back

CS144-2024-lab_3: the TCP sender

Published:  at  07:40

CS144 课程 Lab Assignment 中的 Checkpoint 3: the TCP sende

CS144-2024-lab_3: the TCP sender

This week, you’ll implement the “sender” part of TCP, responsible for reading from a ByteStream (created and written to by some sender-side application), and turning the stream into a sequence of outgoing TCP segments. On the remote side, a TCP receiver transforms those segments (those that arrive—they might not all make it) back into the original byte stream, and sends acknowledgments and window advertisements back to the sender.

本周,您将实现 TCP 的 sender 部分,负责从字节流(由某个 sender 应用程序创建和写入)中读取数据,并将该流转换为一系列传出的 TCP 段。在远程端,TCP receiver 将这些段(到达的段 — 它们可能不会全部到达)转换回原始字节流,并将 ack 和 window 发送回 sender。

它说了一长串要求,大致翻译过来是这样:

对于 push():

对于 receive():

对于 tick():

对于 make_empty_message():

我的实现

tcp_sender.hh 中,给 TCPSender 添加一些成员变量

  bool syn_send_ {false};
  bool fin_send_ {false};
  bool keep_rto_ {false};
  uint64_t re_try_count_ {0};
  uint64_t past_time_ {0};
  uint64_t count_c_ {0};
  std::optional<uint64_t> window_size_;
  std::list<struct msg_with_time> buffer_;

这里的 struct msg_with_time 是我自己定义的:

struct msg_with_time {
  TCPSenderMessage msg;
  bool keep_rto;
};

msg_with_time 结构体中的 keep_rto 是用来处理 window size 为 0 的特殊情况,它们的 RTO 不应该翻倍,所以多了个 keep_rto_keep_rto

下面则是具体的实现:

#include "tcp_sender.hh"
#include "tcp_config.hh"
#include "tcp_sender_message.hh"
#include "wrapping_integers.hh"
#include <algorithm>
#include <cstdint>
#include <deque>
#include <memory>
#include <optional>
#include <ranges>

using namespace std;

uint64_t TCPSender::sequence_numbers_in_flight() const
{
  return count_c_;
}

uint64_t TCPSender::consecutive_retransmissions() const
{
  return re_try_count_;
}

void TCPSender::push( const TransmitFunction& transmit )
{
  auto has_cap { false };
  if ( window_size_.has_value() ) {
    if ( window_size_.value() == 0 ) {
      window_size_ = 1;
      keep_rto_ = true;
    }
    if ( window_size_.value() > sequence_numbers_in_flight() ) {
      has_cap = true;
    }
  } else if ( sequence_numbers_in_flight() == 0 ) {
    has_cap = true;
  }

  if ( has_cap
       && ( ( input_.reader().bytes_buffered() > 0 )
            || ( input_.reader().bytes_buffered() == 0
                 && ( !syn_send_ || ( input_.writer().is_closed() && !fin_send_ )
                      || input_.reader().has_error() ) ) ) ) {

    auto limit = std::min( TCPConfig::MAX_PAYLOAD_SIZE, input_.reader().bytes_buffered() );
    if ( window_size_.has_value() ) {
      limit = std::min( limit, window_size_.value() - sequence_numbers_in_flight() );
    }

    const auto fill_enable
      = limit != 0 && window_size_.has_value() && window_size_.value() >= TCPConfig::MAX_PAYLOAD_SIZE;
    for ( auto i = fill_enable ? input_.reader().bytes_buffered() / limit : 0; ( fill_enable ? i > 0 : i == 0 );
          --i ) {
      uint64_t length { 0 };
      buffer_.push_back( { { isn_, false, {}, false, input_.reader().has_error() }, keep_rto_ } );

      if ( input_.reader().bytes_popped() == 0 && !syn_send_ ) {
        buffer_.back().msg.SYN = true;
        syn_send_ = true;
        ++length;
        ++count_c_;
      }

      while ( !input_.reader().has_error() && input_.reader().bytes_buffered() > 0 && length < limit ) {
        const auto& str_t = input_.reader().peek();
        buffer_.back().msg.payload += str_t;
        ++length;
        ++count_c_;
        input_.reader().pop( 1 );
      }

      if ( ( window_size_.has_value() ? window_size_.value() > sequence_numbers_in_flight() : limit == 0 )
           && input_.writer().is_closed() && !fin_send_ && input_.reader().bytes_buffered() == 0 ) {
        fin_send_ = true;
        buffer_.back().msg.FIN = true;
        ++length;
        ++count_c_;
      }

      isn_ = isn_ + length;
      transmit( buffer_.back().msg );
      if ( keep_rto_ ) {
        keep_rto_ = false;
      }
    }
  }
}

TCPSenderMessage TCPSender::make_empty_message() const
{
  return { isn_, false, {}, false, input_.reader().has_error() };
}

void TCPSender::receive( const TCPReceiverMessage& msg )
{
  if ( msg.RST ) {
    input_.writer().set_error();
    return;
  }

  window_size_ = msg.window_size;

  if ( msg.ackno.has_value() ) {
    uint64_t length { 0 };
    std::deque<std::shared_ptr<Wrap32>> buf_col;
    for ( auto& val : std::ranges::reverse_view( buffer_ ) ) {
      if ( ( val.msg.seqno + length + val.msg.sequence_length() ) == msg.ackno.value() ) {
        buf_col.push_back( std::make_shared<Wrap32>( val.msg.seqno ) );
        length += val.msg.sequence_length();
      }
    }
    if (!buf_col.empty()) {
      past_time_ = 0;
    }
    for (const auto& val : buf_col) {
      buffer_.erase(std::find_if(buffer_.begin(), buffer_.end(), [&val](const struct msg_with_time& arg){
        return arg.msg.seqno == *val;
      }));
    }
    count_c_ -= length;
  }

  if ( re_try_count_ != 0 ) {
    initial_RTO_ms_ /= ( 2 * re_try_count_ );
  }
  re_try_count_ = 0;
}

void TCPSender::tick( uint64_t ms_since_last_tick, const TransmitFunction& transmit )
{
  past_time_ += ms_since_last_tick;
  for ( auto& val : buffer_ ) {
    if ( past_time_ >= initial_RTO_ms_ ) {
      past_time_ = 0;
      if ( !val.keep_rto ) {
        initial_RTO_ms_ *= 2;
        ++re_try_count_;
      }
      transmit( val.msg );
      break;
    }
  }
}

我承认我写的代码还是很难绷的,等有时间我再优化一下看看。

$ cmake --build build -j11 --target check3
[0/1] cd /home/zuos/codpjt/cpp/cs144_minnow_lab/build && /usr/bin/ctest...ure --timeout 12 -R '^byte_stream_|^reassembler_|^wrapping|^recv|^send'             ByteStream throughput: 0.65 Gbit/s
             Reassembler throughput: 0.33 Gbit/s
[1/1] cd /home/zuos/codpjt/cpp/cs144_minnow_lab/build && /usr/bin/ctest...ure --timeout 12 -R '^byte_stream_|^reassembler_|^wrapping|^recv|^send'
Test project /home/zuos/codpjt/cpp/cs144_minnow_lab/build
      Start  1: compile with bug-checkers
 1/36 Test  #1: compile with bug-checkers ........   Passed    2.60 sec
      Start  3: byte_stream_basics
 2/36 Test  #3: byte_stream_basics ...............   Passed    0.01 sec
      Start  4: byte_stream_capacity
 3/36 Test  #4: byte_stream_capacity .............   Passed    0.01 sec
      Start  5: byte_stream_one_write
 4/36 Test  #5: byte_stream_one_write ............   Passed    0.01 sec
      Start  6: byte_stream_two_writes
 5/36 Test  #6: byte_stream_two_writes ...........   Passed    0.01 sec
      Start  7: byte_stream_many_writes
 6/36 Test  #7: byte_stream_many_writes ..........   Passed    0.04 sec
      Start  8: byte_stream_stress_test
 7/36 Test  #8: byte_stream_stress_test ..........   Passed    0.20 sec
      Start  9: reassembler_single
 8/36 Test  #9: reassembler_single ...............   Passed    0.01 sec
      Start 10: reassembler_cap
 9/36 Test #10: reassembler_cap ..................   Passed    0.01 sec
      Start 11: reassembler_seq
10/36 Test #11: reassembler_seq ..................   Passed    0.02 sec
      Start 12: reassembler_dup
11/36 Test #12: reassembler_dup ..................   Passed    0.02 sec
      Start 13: reassembler_holes
12/36 Test #13: reassembler_holes ................   Passed    0.01 sec
      Start 14: reassembler_overlapping
13/36 Test #14: reassembler_overlapping ..........   Passed    0.01 sec
      Start 15: reassembler_win
14/36 Test #15: reassembler_win ..................   Passed    4.22 sec
      Start 16: wrapping_integers_cmp
15/36 Test #16: wrapping_integers_cmp ............   Passed    0.01 sec
      Start 17: wrapping_integers_wrap
16/36 Test #17: wrapping_integers_wrap ...........   Passed    0.01 sec
      Start 18: wrapping_integers_unwrap
17/36 Test #18: wrapping_integers_unwrap .........   Passed    0.01 sec
      Start 19: wrapping_integers_roundtrip
18/36 Test #19: wrapping_integers_roundtrip ......   Passed    0.46 sec
      Start 20: wrapping_integers_extra
19/36 Test #20: wrapping_integers_extra ..........   Passed    0.07 sec
      Start 21: recv_connect
20/36 Test #21: recv_connect .....................   Passed    0.01 sec
      Start 22: recv_transmit
21/36 Test #22: recv_transmit ....................   Passed    0.19 sec
      Start 23: recv_window
22/36 Test #23: recv_window ......................   Passed    0.01 sec
      Start 24: recv_reorder
23/36 Test #24: recv_reorder .....................   Passed    0.01 sec
      Start 25: recv_reorder_more
24/36 Test #25: recv_reorder_more ................   Passed    8.67 sec
      Start 26: recv_close
25/36 Test #26: recv_close .......................   Passed    0.01 sec
      Start 27: recv_special
26/36 Test #27: recv_special .....................   Passed    0.01 sec
      Start 28: send_connect
27/36 Test #28: send_connect .....................   Passed    0.01 sec
      Start 29: send_transmit
28/36 Test #29: send_transmit ....................   Passed    0.27 sec
      Start 30: send_retx
29/36 Test #30: send_retx ........................   Passed    0.01 sec
      Start 31: send_window
30/36 Test #31: send_window ......................   Passed    0.19 sec
      Start 32: send_ack
31/36 Test #32: send_ack .........................   Passed    0.01 sec
      Start 33: send_close
32/36 Test #33: send_close .......................   Passed    0.01 sec
      Start 34: send_extra
33/36 Test #34: send_extra .......................   Passed    0.09 sec
      Start 37: compile with optimization
34/36 Test #37: compile with optimization ........   Passed    0.80 sec
      Start 38: byte_stream_speed_test
35/36 Test #38: byte_stream_speed_test ...........   Passed    0.17 sec
      Start 39: reassembler_speed_test
36/36 Test #39: reassembler_speed_test ...........   Passed    0.45 sec

100% tests passed, 0 tests failed out of 36

Total Test time (real) =  18.68 sec

Suggest Changes

Previous Post
TCP 的错误检测 & 流量控制 & 状态转换
Next Post
Linux kernel 代码规范