plptools
Loading...
Searching...
No Matches
link.cc
Go to the documentation of this file.
1/*
2 * This file is part of plptools.
3 *
4 * Copyright (C) 1999 Philip Proudman <philip.proudman@btinternet.com>
5 * Copyright (C) 1999-2002 Fritz Elfert <felfert@to.com>
6 *
7 * This program is free software; you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation; either version 2 of the License, or
10 * (at your option) any later version.
11 *
12 * This program is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 * GNU General Public License for more details.
16 *
17 * You should have received a copy of the GNU General Public License along
18 * along with this program; if not, see <https://www.gnu.org/licenses/>.
19 *
20 */
21#include "Enum.h"
22#include "config.h"
23
24#include <cerrno>
25#include <iostream>
26
27#include <stdint.h>
28#include <stdlib.h>
29#include <unistd.h>
30#include <stdio.h>
31#include <sys/time.h>
32
33#include "bufferstore.h"
34#include "bufferarray.h"
35
36#include "link.h"
37#include "ncp_log.h"
38#include "ncp.h"
39#include "datalink.h"
40
41extern "C" {
42
43 static void *expire_check(void *arg) {
44 Link *link = (Link *)arg;
45 while (1) {
46 fd_set r_set;
47 FD_ZERO(&r_set);
48 FD_SET(link->cancellationFd_, &r_set);
49
50 useconds_t duration = link->retransTimeout() * 500;
51 struct timeval tv = {
52 static_cast<time_t>(duration / 1000000),
53 static_cast<suseconds_t>(duration % 1000000)
54 };
55
56 // Wait for the timeout or cancellation.
57 // This select call will return in the if the thread is interrupted but we ignore that and simply allow
58 // the retransmit check to occur a little early, rather than adding significant complexity to match the
59 // already-estimated timeout.
60 int res = select(link->cancellationFd_ + 1, &r_set, NULL, NULL, &tv);
61
62 // Exit early with non-recoverable errors.
63 if (res == -1 && errno != EINTR) {
64 return nullptr;
65 }
66
67 // Exit if we've been cancelled.
68 if (FD_ISSET(link->cancellationFd_, &r_set)) {
69 return nullptr;
70 }
71
72 // Retransmit any packets needing retransmission.
73 link->retransmit();
74 }
75 }
76
77};
78
79using namespace std;
80
82 stringRep.add(Link::LINK_TYPE_UNKNOWN, N_("Unknown"));
83 stringRep.add(Link::LINK_TYPE_SIBO, N_("SIBO"));
84 stringRep.add(Link::LINK_TYPE_EPOC, N_("EPOC"));
85ENUM_DEFINITION_END(Link::link_type)
86
87Link::Link(const char *fname, int baud, NCP *ncp, bool noDSRCheck, unsigned short verbose, const int cancellationFd)
88: ncp_(ncp)
89, verbose_(verbose)
90, cancellationFd_(cancellationFd) {
91 failed_ = false;
92 linkType_ = LINK_TYPE_UNKNOWN;
93 for (int i = 0; i < 256; i++)
94 xoff_[i] = false;
95 // generate magic number for sendReqCon()
96 srandom(time(NULL));
97 conMagic_ = random();
98
99 dataLink_ = new DataLink(fname, baud, this, noDSRCheck, verbose, cancellationFd);
100
101 pthread_mutex_init(&queueMutex_, NULL);
102 pthread_create(&checkThreadId_, NULL, expire_check, this);
103
104 // submit a link request
105 sendReqReq();
106}
107
109 pthread_join(checkThreadId_, NULL);
110 pthread_mutex_destroy(&queueMutex_);
111 delete dataLink_;
112}
113
114unsigned long Link::retransTimeout() {
115 return ((unsigned long)getSpeed() * 1000 / 13200) + 200;
116}
117
119 txSequence_ = 1;
120 rxSequence_ = -1;
121 failed_ = false;
122 seqMask_ = 7;
123 maxOutstanding_ = 1;
126 for (int i = 0; i < 256; i++)
127 xoff_[i] = false;
128 dataLink_->reset();
129 // submit a link request
130 sendReqReq();
131}
132
133void Link::send(const BufferStore & buff) {
134 if (buff.getLen() > 300) {
135 failed_ = true;
136 } else {
137 transmit(buff);
138 }
139}
142 pthread_mutex_lock(&queueMutex_);
143 ackWaitQueue.clear();
144 holdQueue_.clear();
145 pthread_mutex_unlock(&queueMutex_);
146}
147
148void Link::purgeQueue(int channel) {
149 pthread_mutex_lock(&queueMutex_);
150 vector<AckWaitQueueElement>::iterator i;
151 for (i = ackWaitQueue.begin(); i != ackWaitQueue.end(); i++)
152 if (i->data.getByte(0) == channel) {
153 ackWaitQueue.erase(i);
154 i--;
155 }
156 vector<BufferStore>::iterator j;
157 for (j = holdQueue_.begin(); j != holdQueue_.end(); j++)
158 if (j->getByte(0) == channel) {
159 holdQueue_.erase(j);
160 j--;
161 }
162 pthread_mutex_unlock(&queueMutex_);
163}
164
165void Link::sendAck(int seq, Enum<link_type> linkType) {
166 if (hasFailed())
167 return;
168 BufferStore tmp;
170 lout << "Link: >> ack seq=" << seq << endl;
171 if (seq > 7) {
172 int hseq = seq >> 3;
173 int lseq = (seq & 7) | 8;
174 seq = (hseq << 8) + lseq;
175 tmp.prependWord(seq);
176 } else {
177 tmp.prependByte(seq);
178 }
179 dataLink_->send(tmp, linkType == LINK_TYPE_EPOC);
180}
181
183 if (hasFailed())
184 return;
185 BufferStore tmp;
187 lout << "Link: >> con seq=4" << endl;
188 tmp.addByte(0x24);
189 tmp.addDWord(conMagic_);
191 e.seq = 0; // expected ACK is 0, _NOT_ 4!
192 gettimeofday(&e.stamp, NULL);
193 e.data = tmp;
194 e.txcount = 4;
195 pthread_mutex_lock(&queueMutex_);
196 ackWaitQueue.push_back(e);
197 bool isEPOC = linkType_ == LINK_TYPE_EPOC;
198 pthread_mutex_unlock(&queueMutex_);
199 dataLink_->send(tmp, isEPOC);
200}
201
203 if (hasFailed())
204 return;
205 BufferStore tmp;
207 lout << "Link: >> con seq=1" << endl;
208 tmp.addByte(0x21);
210 e.seq = 0; // expected response is Ack with seq=0 or ReqCon
211 gettimeofday(&e.stamp, NULL);
212 e.data = tmp;
213 e.txcount = 4;
214 pthread_mutex_lock(&queueMutex_);
215 ackWaitQueue.push_back(e);
216 bool isEPOC = linkType_ == LINK_TYPE_EPOC;
217 pthread_mutex_unlock(&queueMutex_);
218 dataLink_->send(tmp, isEPOC);
219}
220
222 if (hasFailed())
223 return;
224 BufferStore tmp;
226 lout << "Link: >> con seq=1" << endl;
227 tmp.addByte(0x20);
228 // No Ack expected for this, so no new entry in ackWaitQueue
229 dataLink_->send(tmp, linkType == LINK_TYPE_EPOC);
230}
231
232// This is called on the write queue's thread.
234 if (!dataLink_) {
235 return;
236 }
237
238 vector<AckWaitQueueElement>::iterator i;
239 bool ackFound;
240 bool conFound;
241 int type = buff.getByte(0);
242 int seq = type & 0x0f;
243
244 type &= 0xf0;
245 // Support for incoming extended sequence numbers
246 if (seq & 8) {
247 int tseq = buff.getByte(1);
248 buff.discardFirstBytes(2);
249 seq = (tseq << 3) + (seq & 0x07);
250 } else
251 buff.discardFirstBytes(1);
252
253 switch (type) {
254 case 0x30:
255 // Normal data
256 if (verbose_ & LNK_DEBUG_LOG) {
257 lout << "Link: << dat seq=" << seq ;
259 lout << " " << buff << endl;
260 else
261 lout << " len=" << buff.getLen() << endl;
262 }
263
264 if (((rxSequence_ + 1) & seqMask_) == seq) {
265 rxSequence_++;
267
269 // Must check for XOFF/XON ncp frames HERE!
270 if ((buff.getLen() == 3) && (buff.getByte(0) == 0)) {
271 switch (buff.getByte(2)) {
272 case 1:
273 // XOFF
274 xoff_[buff.getByte(1)] = true;
276 lout << "Link: got XOFF for channel "
277 << buff.getByte(1) << endl;
278 break;
279 case 2:
280 // XON
281 xoff_[buff.getByte(1)] = false;
283 lout << "Link: got XON for channel "
284 << buff.getByte(1) << endl;
285 // Transmit packets on hold queue
286 transmitHoldQueue(buff.getByte(1));
287 break;
288 default:
289 ncp_->receive(buff);
290 }
291 } else {
292 ncp_->receive(buff);
293 }
294 } else {
297 lout << "Link: DUP\n";
298 }
299 break;
300
301 case 0x00:
302 // Incoming ack
303 // Find corresponding packet in ackWaitQueue
304 ackFound = false;
305 struct timeval refstamp;
306 pthread_mutex_lock(&queueMutex_);
307 for (i = ackWaitQueue.begin(); i != ackWaitQueue.end(); i++)
308 if (i->seq == seq) {
309 ackFound = true;
310 refstamp = i->stamp;
311 ackWaitQueue.erase(i);
312 if (verbose_ & LNK_DEBUG_LOG) {
313 lout << "Link: << ack seq=" << seq ;
315 lout << " " << buff;
316 lout << endl;
317 }
318 break;
319 }
320 pthread_mutex_unlock(&queueMutex_);
321 if (ackFound) {
322 if ((linkType_ == LINK_TYPE_UNKNOWN) && (seq == 0)) {
323 // If the remote device runs SIBO protocol, this ACK
324 // should be 0 (the Ack on our ReqReq request, which is
325 // treated as a normal Req by the SIBO machine.
326 failed_ = false;
328 seqMask_ = 7;
329 maxOutstanding_ = 1;
330 rxSequence_ = 0;
331 txSequence_ = 1;
334 lout << "Link: 1-linkType set to " << linkType_ << endl;
335 }
336 // Older packets implicitely ack'ed
337 multiAck(refstamp);
338 // Transmit waiting packets
340 } else {
341 // If packet with seq+1 is in ackWaitQueue, resend it immediately
342 // (Receiving an ack for a packet not on our wait queue is a
343 // hint by the Psion about which was the last packet it
344 // received successfully.)
345 vector<BufferStore> pendingData;
346 pthread_mutex_lock(&queueMutex_);
347 struct timeval now;
348 gettimeofday(&now, NULL);
349 bool nextFound = false;
350 for (i = ackWaitQueue.begin(); i != ackWaitQueue.end(); i++) {
351 if (i->seq == seq+1) {
352 nextFound = true;
353 if (i->txcount-- == 0) {
354 // timeout, remove packet
356 lout << "Link: >> TRANSMIT timeout seq=" <<
357 i->seq << endl;
358 ackWaitQueue.erase(i);
359 i--;
360 } else {
361 // retransmit it
362 i->stamp = now;
364 lout << "Link: >> RETRANSMIT seq=" << i->seq
365 << endl;
366 pendingData.push_back(i->data);
367 }
368 break;
369 }
370 }
371 bool isEPOC = linkType_ == LINK_TYPE_EPOC;
372 pthread_mutex_unlock(&queueMutex_);
373
374 // Retransmit the data.
375 for (vector<BufferStore>::iterator i = pendingData.begin(); i != pendingData.end(); i++) {
376 dataLink_->send(*i, isEPOC);
377 }
378
379 if ((verbose_ & LNK_DEBUG_LOG) && (!nextFound)) {
380 lout << "Link: << UNMATCHED ack seq=" << seq;
382 lout << " " << buff;
383 lout << endl;
384 }
385 }
386 break;
387
388 case 0x20:
389 // New link
390 conFound = false;
391 if (seq > 3) {
392 // May be a link confirm packet (EPOC)
393 pthread_mutex_lock(&queueMutex_);
394 for (i = ackWaitQueue.begin(); i != ackWaitQueue.end(); i++)
395 if ((i->seq == 0) && (i->data.getByte(0) == 0x21)) {
396 ackWaitQueue.erase(i);
399 lout << "Link: 2-linkType set to " << linkType_ << endl;
400 conFound = true;
401 failed_ = false;
402 // EPOC can handle extended sequence numbers
403 seqMask_ = 0x7ff;
404 // EPOC can handle up to 8 unacknowledged packets
405 maxOutstanding_ = 8;
406 if (verbose_ & LNK_DEBUG_LOG) {
407 lout << "Link: << con seq=" << seq ;
409 lout << " " << buff;
410 lout << endl;
411 }
412 break;
413 }
414 pthread_mutex_unlock(&queueMutex_);
415 }
416 if (conFound) {
417 rxSequence_ = 0;
418 txSequence_ = 1;
420 } else {
421 if (verbose_ & LNK_DEBUG_LOG) {
422 lout << "Link: << req seq=" << seq;
424 lout << " " << buff;
425 lout << endl;
426 }
428 if (seq > 0) {
431 lout << "Link: 3-linkType set to " << linkType_ << endl;
432 // EPOC can handle extended sequence numbers
433 seqMask_ = 0x7ff;
434 // EPOC can handle up to 8 unacknowledged packets
435 maxOutstanding_ = 8;
436 failed_ = false;
437 sendReqCon();
438 } else {
439 // SIBO
441 failed_ = false;
442 seqMask_ = 7;
443 maxOutstanding_ = 1;
445 lout << "Link: 4-linkType set to " << linkType_ << endl;
446 rxSequence_ = 0;
447 txSequence_ = 1; // Our ReqReq was seq 0
450 }
451 }
452 break;
453
454 case 0x10:
455 // Disconnect
457 lout << "Link: << DISC" << endl;
458 failed_ = true;
459 break;
460
461 default:
462 lerr << "Link: FATAL: Unknown packet type " << type << endl;
463 }
464}
465
466void Link::transmitHoldQueue(int channel) {
467 vector<BufferStore> tmpQueue;
468 vector<BufferStore>::iterator i;
469
470 // First, move desired packets to a temporary queue
471 pthread_mutex_lock(&queueMutex_);
472 for (i = holdQueue_.begin(); i != holdQueue_.end(); i++)
473 if (i->getByte(0) == channel) {
474 tmpQueue.push_back(*i);
475 holdQueue_.erase(i);
476 i--;
477 }
478 pthread_mutex_unlock(&queueMutex_);
479
480 // ... then transmit the moved packets
481 for (i = tmpQueue.begin(); i != tmpQueue.end(); i++)
482 transmit(*i);
483}
484
486 vector<BufferStore> tmpQueue;
487 vector<BufferStore>::iterator i;
488
489 // First, move desired packets to a temporary queue
490 pthread_mutex_lock(&queueMutex_);
491 for (i = waitQueue_.begin(); i != waitQueue_.end(); i++)
492 tmpQueue.push_back(*i);
493 waitQueue_.clear();
494 pthread_mutex_unlock(&queueMutex_);
495
496 // transmit the moved packets. If the backlock gets
497 // full, they are put into waitQueue again.
498 for (i = tmpQueue.begin(); i != tmpQueue.end(); i++) {
499 transmit(*i);
500 }
501}
502
504 if (hasFailed())
505 return;
506
507 int remoteChan = buf.getByte(0);
508 if (xoff_[remoteChan]) {
509 pthread_mutex_lock(&queueMutex_);
510 holdQueue_.push_back(buf);
511 pthread_mutex_unlock(&queueMutex_);
512 } else {
513
514 // If backlock is full, put on waitQueue
515 int ql;
516 pthread_mutex_lock(&queueMutex_);
517 ql = ackWaitQueue.size();
518 if (ql >= maxOutstanding_) {
519 waitQueue_.push_back(buf);
520 pthread_mutex_unlock(&queueMutex_);
521 return;
522 }
523 pthread_mutex_unlock(&queueMutex_);
524
526 e.seq = txSequence_++;
528 gettimeofday(&e.stamp, NULL);
529 // An empty buffer is considered a new link request
530 if (buf.empty()) {
531 // Request for new link
532 e.txcount = 4;
534 lout << "Link: >> req seq=" << e.seq << endl;
535 buf.prependByte(0x20 + e.seq);
536 } else {
537 e.txcount = 8;
538 if (verbose_ & LNK_DEBUG_LOG) {
539 lout << "Link: >> dat seq=" << e.seq;
541 lout << " " << buf;
542 lout << endl;
543 }
544 if (e.seq > 7) {
545 int hseq = e.seq >> 3;
546 int lseq = 0x30 + ((e.seq & 7) | 8);
547 int seq = (hseq << 8) + lseq;
548 buf.prependWord(seq);
549 } else {
550 buf.prependByte(0x30 + e.seq);
551 }
552 }
553 e.data = buf;
554 pthread_mutex_lock(&queueMutex_);
555 ackWaitQueue.push_back(e);
556 bool isEPOC = linkType_ == LINK_TYPE_EPOC;
557 pthread_mutex_unlock(&queueMutex_);
558 dataLink_->send(buf, isEPOC);
559 }
560}
561
562static void timesub(struct timeval *tv, unsigned long millisecs) {
563 uint64_t micros = tv->tv_sec;
564 uint64_t sub = millisecs;
565
566 micros <<= 32;
567 micros += tv->tv_usec;
568 micros -= (sub * 1000);
569 tv->tv_usec = micros & 0xffffffff;
570 tv->tv_sec = (micros >>= 32) & 0xffffffff;
571}
572
573static bool olderthan(struct timeval t1, struct timeval t2) {
574 uint64_t m1 = t1.tv_sec;
575 uint64_t m2 = t2.tv_sec;
576 m1 <<= 32;
577 m2 <<= 32;
578 m1 += t1.tv_usec;
579 m2 += t2.tv_usec;
580 return (m1 < m2);
581}
582
583void Link::multiAck(struct timeval refstamp) {
584 vector<AckWaitQueueElement>::iterator i;
585 pthread_mutex_lock(&queueMutex_);
586 for (i = ackWaitQueue.begin(); i != ackWaitQueue.end(); i++)
587 if (olderthan(i->stamp, refstamp)) {
588 ackWaitQueue.erase(i);
589 i--;
590 }
591 pthread_mutex_unlock(&queueMutex_);
592}
593
594// Called from the retransmit thread.
596
597 if (hasFailed()) {
599 return;
600 }
601
602 // Iterate over the messages awaiting an ack, remove ones that have timed out, and enqueue the data of
603 // messages to retransmit. We do not send them in the loop to ensure we can't deadlock if the
604 // @ref DataLink::send call blocks, leaving us holding a lot while threads need to process data to make space
605 // for sending.
606 vector<BufferStore> pendingData;
607 pthread_mutex_lock(&queueMutex_);
608 vector<AckWaitQueueElement>::iterator i;
609 struct timeval now;
610 gettimeofday(&now, NULL);
611 struct timeval expired = now;
612 timesub(&expired, retransTimeout());
613 for (i = ackWaitQueue.begin(); i != ackWaitQueue.end(); i++)
614 if (olderthan(i->stamp, expired)) {
615 if (i->txcount-- == 0) {
616 // Remove timed out packets.
618 lout << "Link: >> TRANSMIT timeout seq=" << i->seq << endl;
619 ackWaitQueue.erase(i);
620 failed_ = true;
621 i--;
622 } else {
623 // Enqueue the buffer for retransmission.
624 i->stamp = now;
626 lout << "Link: >> RETRANSMIT seq=" << i->seq << endl;
627 pendingData.push_back(i->data);
628 }
629 }
630 bool isEPOC = linkType_ == LINK_TYPE_EPOC;
631 pthread_mutex_unlock(&queueMutex_);
632
633 // Retransmit the data.
634 for (vector<BufferStore>::iterator i = pendingData.begin(); i != pendingData.end(); i++) {
635 dataLink_->send(*i, isEPOC);
636 }
637
638}
639
641 bool result;
642 pthread_mutex_lock(&queueMutex_);
643 result = ((!failed_) && (!ackWaitQueue.empty()));
644 pthread_mutex_unlock(&queueMutex_);
645 return result;
646}
647
649 bool lfailed = dataLink_->linkFailed();
650 if (failed_ || lfailed) {
652 lout << "Link: hasFailed: " << failed_ << ", " << lfailed << endl;
653 }
654 failed_ |= lfailed;
655 return failed_;
656}
657
659 return linkType_;
660}
661
663 return dataLink_->getSpeed();
664}
#define ENUM_DEFINITION_END(EnumName)
Definition: Enum.h:303
#define ENUM_DEFINITION_BEGIN(EnumName, initWith)
Helper macro to construct an enumeration wrapper Enum<E> for a specific enum type.
Definition: Enum.h:298
A generic container for an array of bytes.
Definition: bufferstore.h:36
void prependByte(unsigned char c)
Prepends a byte to the content of this instance.
Definition: bufferstore.cc:211
void prependWord(int w)
Prepends a word to the content of this instance.
Definition: bufferstore.cc:217
void addByte(unsigned char c)
Appends a byte to the content of this instance.
Definition: bufferstore.cc:157
void addDWord(long dw)
Appends a dword to the content of this instance.
Definition: bufferstore.cc:197
unsigned long getLen() const
Retrieves the length of a BufferStore.
Definition: bufferstore.cc:92
bool empty() const
Tests if the BufferStore is empty.
Definition: bufferstore.h:257
unsigned char getByte(long pos=0) const
Retrieves the byte at index pos.
Definition: bufferstore.cc:96
void discardFirstBytes(int len=0)
Removes bytes from the start of the buffer.
Definition: bufferstore.cc:141
Wrapper class featuring range-checking and string representation of enumerated values.
Definition: Enum.h:135
Definition: ncp.h:55
void receive(BufferStore s)
Definition: ncp.cc:108
Definition: doctest.h:522
std::ostream lerr
#define LNK_DEBUG_LOG
Definition: ncp_log.h:30
std::ostream lout
#define LNK_DEBUG_DUMP
Definition: ncp_log.h:31
#define N_(String)
Definition: plpintl.h:35
int verbose
Definition: plpprintd.cc:58
Describes a transmitted packet which has not yet been acknowledged by the peer.
Definition: link.h:39
BufferStore data
Packet content.
Definition: link.h:55
int txcount
Number of remaining transmit retries.
Definition: link.h:47
int seq
Original sequence number.
Definition: link.h:43
struct timeval stamp
Time of last transmit.
Definition: link.h:51