1 /*
2
3 silcpacket.c
4
5 Author: Pekka Riikonen <priikone@silcnet.org>
6
7 Copyright (C) 1997 - 2007 Pekka Riikonen
8
9 This program is free software; you can redistribute it and/or modify
10 it under the terms of the GNU General Public License as published by
11 the Free Software Foundation; version 2 of the License.
12
13 This program is distributed in the hope that it will be useful,
14 but WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 GNU General Public License for more details.
17
18 */
19 /*
20 * Created: Fri Jul 25 18:52:14 1997
21 */
22 /* $Id: silcpacket.c,v 1.111 2007/12/30 12:46:01 priikone Exp $ */
23
24 #include "silc.h"
25
26 /************************** Types and definitions ***************************/
27
28 /* Per scheduler (which usually means per thread) data. We put per scheduler
29 data here for accessing without locking. SILC Schedule dictates that
30 tasks are dispatched in one thread, hence the per scheduler context. */
31 typedef struct {
32 SilcSchedule schedule; /* The scheduler */
33 SilcPacketEngine engine; /* Packet engine */
34 SilcDList inbufs; /* Data inbut buffer list */
35 SilcUInt32 stream_count; /* Number of streams using this */
36 } *SilcPacketEngineContext;
37
38 /* Packet engine */
39 struct SilcPacketEngineStruct {
40 SilcMutex lock; /* Engine lock */
41 SilcRng rng; /* RNG for engine */
42 SilcHashTable contexts; /* Per scheduler contexts */
43 SilcPacketCallbacks *callbacks; /* Packet callbacks */
44 void *callback_context; /* Context for callbacks */
45 SilcList streams; /* All streams in engine */
46 SilcList packet_pool; /* Free list for received packets */
47 SilcHashTable udp_remote; /* UDP remote streams, or NULL */
48 unsigned int local_is_router : 1;
49 };
50
51 /* Packet processor context */
52 typedef struct SilcPacketProcessStruct {
53 SilcPacketType *types; /* Packets to process */
54 SilcPacketCallbacks *callbacks; /* Callbacks or NULL */
55 void *callback_context;
56 SilcInt32 priority; /* Priority */
57 } *SilcPacketProcess;
58
59 /* UDP remote stream tuple */
60 typedef struct {
61 char *remote_ip; /* Remote IP address */
62 SilcUInt16 remote_port; /* Remote port */
63 } *SilcPacketRemoteUDP;
64
65 /* Packet stream */
66 struct SilcPacketStreamStruct {
67 struct SilcPacketStreamStruct *next;
68 SilcPacketEngineContext sc; /* Per scheduler context */
69 SilcStream stream; /* Underlaying stream */
70 SilcMutex lock; /* Packet stream lock */
71 SilcDList process; /* Packet processors, or NULL */
72 SilcPacketRemoteUDP remote_udp; /* UDP remote stream tuple, or NULL */
73 void *stream_context; /* Stream context */
74 SilcBufferStruct outbuf; /* Out buffer */
75 SilcBuffer inbuf; /* Inbuf from inbuf list or NULL */
76 SilcCipher send_key[2]; /* Sending key */
77 SilcHmac send_hmac[2]; /* Sending HMAC */
78 SilcCipher receive_key[2]; /* Receiving key */
79 SilcHmac receive_hmac[2]; /* Receiving HMAC */
80 unsigned char *src_id; /* Source ID */
81 unsigned char *dst_id; /* Destination ID */
82 SilcUInt32 send_psn; /* Sending sequence */
83 SilcUInt32 receive_psn; /* Receiving sequence */
84 SilcAtomic8 refcnt; /* Reference counter */
85 SilcUInt8 sid; /* Security ID, set if IV included */
86 unsigned int src_id_len : 6;
87 unsigned int src_id_type : 2;
88 unsigned int dst_id_len : 6;
89 unsigned int dst_id_type : 2;
90 unsigned int is_router : 1; /* Set if router stream */
91 unsigned int destroyed : 1; /* Set if destroyed */
92 unsigned int iv_included : 1; /* Set if IV included */
93 unsigned int udp : 1; /* UDP remote stream */
94 };
95
96 /* Initial size of stream buffers */
97 #define SILC_PACKET_DEFAULT_SIZE 1024
98
99 /* Header length without source and destination ID's. */
100 #define SILC_PACKET_HEADER_LEN 10
101
102 /* Minimum length of SILC Packet Header. */
103 #define SILC_PACKET_MIN_HEADER_LEN 16
104 #define SILC_PACKET_MIN_HEADER_LEN_IV 32 + 1
105
106 /* Maximum padding length */
107 #define SILC_PACKET_MAX_PADLEN 128
108
109 /* Default padding length */
110 #define SILC_PACKET_DEFAULT_PADLEN 16
111
112 /* Minimum packet length */
113 #define SILC_PACKET_MIN_LEN (SILC_PACKET_HEADER_LEN + 1)
114
115 /* Returns true length of the packet. */
116 #define SILC_PACKET_LENGTH(__packetdata, __ret_truelen, __ret_paddedlen) \
117 do { \
118 SILC_GET16_MSB((__ret_truelen), (__packetdata)); \
119 (__ret_paddedlen) = (__ret_truelen) + (SilcUInt8)(__packetdata)[4]; \
120 } while(0)
121
122 /* Calculates the data length with given header length. This macro
123 can be used to check whether the data_len with header_len exceeds
124 SILC_PACKET_MAX_LEN. If it does, this returns the new data_len
125 so that the SILC_PACKET_MAX_LEN is not exceeded. If the data_len
126 plus header_len fits SILC_PACKET_MAX_LEN the returned data length
127 is the data_len given as argument. */
128 #define SILC_PACKET_DATALEN(data_len, header_len) \
129 ((data_len + header_len) > SILC_PACKET_MAX_LEN ? \
130 data_len - ((data_len + header_len) - SILC_PACKET_MAX_LEN) : data_len)
131
132 /* Calculates the length of the padding in the packet. */
133 #define SILC_PACKET_PADLEN(__packetlen, __blocklen, __padlen) \
134 do { \
135 __padlen = (SILC_PACKET_DEFAULT_PADLEN - (__packetlen) % \
136 ((__blocklen) ? (__blocklen) : SILC_PACKET_DEFAULT_PADLEN)); \
137 if (__padlen < 8) \
138 __padlen += ((__blocklen) ? (__blocklen) : SILC_PACKET_DEFAULT_PADLEN); \
139 } while(0)
140
141 /* Returns the length of the padding up to the maximum length, which
142 is 128 bytes.*/
143 #define SILC_PACKET_PADLEN_MAX(__packetlen, __blocklen, __padlen) \
144 do { \
145 __padlen = (SILC_PACKET_MAX_PADLEN - (__packetlen) % \
146 ((__blocklen) ? (__blocklen) : SILC_PACKET_DEFAULT_PADLEN)); \
147 } while(0)
148
149 /* EOS callback */
150 #define SILC_PACKET_CALLBACK_EOS(s) \
151 do { \
152 (s)->sc->engine->callbacks->eos((s)->sc->engine, s, \
153 (s)->sc->engine->callback_context, \
154 (s)->stream_context); \
155 } while(0)
156
157 /* Error callback */
158 #define SILC_PACKET_CALLBACK_ERROR(s, err) \
159 do { \
160 (s)->sc->engine->callbacks->error((s)->sc->engine, s, err, \
161 (s)->sc->engine->callback_context, \
162 (s)->stream_context); \
163 } while(0)
164
165 static SilcBool silc_packet_dispatch(SilcPacket packet);
166 static void silc_packet_read_process(SilcPacketStream stream);
167 static inline SilcBool silc_packet_send_raw(SilcPacketStream stream,
168 SilcPacketType type,
169 SilcPacketFlags flags,
170 SilcIdType src_id_type,
171 unsigned char *src_id,
172 SilcUInt32 src_id_len,
173 SilcIdType dst_id_type,
174 unsigned char *dst_id,
175 SilcUInt32 dst_id_len,
176 const unsigned char *data,
177 SilcUInt32 data_len,
178 SilcCipher cipher,
179 SilcHmac hmac);
180
181 /************************ Static utility functions **************************/
182
183 /* Injects packet to new stream created with silc_packet_stream_add_remote. */
184
185 SILC_TASK_CALLBACK(silc_packet_stream_inject_packet)
186 {
187 SilcPacket packet = context;
188 SilcPacketStream stream = packet->stream;
189
190 SILC_LOG_DEBUG(("Injecting packet %p to stream %p", packet, packet->stream));
191
192 silc_mutex_lock(stream->lock);
193 if (!stream->destroyed)
194 silc_packet_dispatch(packet);
195 silc_mutex_unlock(stream->lock);
196 silc_packet_stream_unref(stream);
197 }
198
199 /* Write data to the stream. Must be called with ps->lock locked. Unlocks
200 the lock inside this function, unless no_unlock is TRUE. Unlocks always
201 in case it returns FALSE. */
202
203 static inline SilcBool silc_packet_stream_write(SilcPacketStream ps,
204 SilcBool no_unlock)
205 {
206 SilcStream stream;
207 SilcBool connected;
208 int i;
209
210 if (ps->udp)
211 stream = ((SilcPacketStream)ps->stream)->stream;
212 else
213 stream = ps->stream;
214
215 if (ps->udp && silc_socket_stream_is_udp(stream, &connected)) {
216 if (!connected) {
217 /* Connectionless UDP stream */
218 while (silc_buffer_len(&ps->outbuf) > 0) {
219 i = silc_net_udp_send(stream, ps->remote_udp->remote_ip,
220 ps->remote_udp->remote_port,
221 ps->outbuf.data, silc_buffer_len(&ps->outbuf));
222 if (silc_unlikely(i == -2)) {
223 /* Error */
224 silc_buffer_reset(&ps->outbuf);
225 SILC_PACKET_CALLBACK_ERROR(ps, SILC_PACKET_ERR_WRITE);
226 return FALSE;
227 }
228
229 if (silc_unlikely(i == -1)) {
230 /* Cannot write now, write later. */
231 if (!no_unlock)
232 silc_mutex_unlock(ps->lock);
233 return TRUE;
234 }
235
236 /* Wrote data */
237 silc_buffer_pull(&ps->outbuf, i);
238 }
239
240 silc_buffer_reset(&ps->outbuf);
241 if (!no_unlock)
242 silc_mutex_unlock(ps->lock);
243
244 return TRUE;
245 }
246 }
247
248 /* Write the data to the stream */
249 while (silc_buffer_len(&ps->outbuf) > 0) {
250 i = silc_stream_write(stream, ps->outbuf.data,
251 silc_buffer_len(&ps->outbuf));
252 if (silc_unlikely(i == 0)) {
253 /* EOS */
254 silc_buffer_reset(&ps->outbuf);
255 silc_mutex_unlock(ps->lock);
256 SILC_PACKET_CALLBACK_EOS(ps);
257 return FALSE;
258 }
259
260 if (silc_unlikely(i == -2)) {
261 /* Error */
262 silc_buffer_reset(&ps->outbuf);
263 silc_mutex_unlock(ps->lock);
264 SILC_PACKET_CALLBACK_ERROR(ps, SILC_PACKET_ERR_WRITE);
265 return FALSE;
266 }
267
268 if (silc_unlikely(i == -1)) {
269 /* Cannot write now, write later. */
270 if (!no_unlock)
271 silc_mutex_unlock(ps->lock);
272 return TRUE;
273 }
274
275 /* Wrote data */
276 silc_buffer_pull(&ps->outbuf, i);
277 }
278
279 silc_buffer_reset(&ps->outbuf);
280 if (!no_unlock)
281 silc_mutex_unlock(ps->lock);
282
283 return TRUE;
284 }
285
286 /* Reads data from stream. Must be called with ps->lock locked. If this
287 returns FALSE the lock has been unlocked. If this returns packet stream
288 to `ret_ps' its lock has been acquired and `ps' lock has been unlocked.
289 It is returned if the stream is UDP and remote UDP stream exists for
290 the sender of the packet. */
291
292 static inline SilcBool silc_packet_stream_read(SilcPacketStream ps,
293 SilcPacketStream *ret_ps)
294 {
295 SilcStream stream = ps->stream;
296 SilcBuffer inbuf;
297 SilcBool connected;
298 int ret;
299
300 /* Get inbuf. If there is already some data for this stream in the buffer
301 we already have it. Otherwise get the current one from list, it will
302 include the data. */
303 inbuf = ps->inbuf;
304 if (!inbuf) {
305 silc_dlist_start(ps->sc->inbufs);
306 inbuf = silc_dlist_get(ps->sc->inbufs);
307 if (!inbuf) {
308 /* Allocate new data input buffer */
309 inbuf = silc_buffer_alloc(SILC_PACKET_DEFAULT_SIZE * 65);
310 if (!inbuf) {
311 silc_mutex_unlock(ps->lock);
312 return FALSE;
313 }
314 silc_buffer_reset(inbuf);
315 silc_dlist_add(ps->sc->inbufs, inbuf);
316 }
317 }
318
319 /* Make sure there is enough room to read */
320 if (SILC_PACKET_DEFAULT_SIZE * 2 > silc_buffer_taillen(inbuf))
321 silc_buffer_realloc(inbuf, silc_buffer_truelen(inbuf) +
322 (SILC_PACKET_DEFAULT_SIZE * 2));
323
324 if (silc_socket_stream_is_udp(stream, &connected)) {
325 if (!connected) {
326 /* Connectionless UDP stream, read one UDP packet */
327 char remote_ip[64], tuple[64];
328 int remote_port;
329 SilcPacketStream remote;
330
331 ret = silc_net_udp_receive(stream, remote_ip, sizeof(remote_ip),
332 &remote_port, inbuf->tail,
333 silc_buffer_taillen(inbuf));
334
335 if (silc_unlikely(ret < 0)) {
336 silc_mutex_unlock(ps->lock);
337 if (ret == -1) {
338 /* Cannot read now, do it later. */
339 silc_buffer_pull(inbuf, silc_buffer_len(inbuf));
340 return FALSE;
341 }
342
343 /* Error */
344 silc_buffer_reset(inbuf);
345 SILC_PACKET_CALLBACK_ERROR(ps, SILC_PACKET_ERR_READ);
346 return FALSE;
347 }
348
349 /* See if remote packet stream exist for this sender */
350 silc_snprintf(tuple, sizeof(tuple), "%d%s", remote_port, remote_ip);
351 silc_mutex_lock(ps->sc->engine->lock);
352 if (silc_hash_table_find(ps->sc->engine->udp_remote, tuple, NULL,
353 (void *)&remote)) {
354 silc_mutex_unlock(ps->sc->engine->lock);
355 SILC_LOG_DEBUG(("UDP packet from %s:%d for stream %p", remote_ip,
356 remote_port, remote));
357 silc_mutex_unlock(ps->lock);
358 silc_mutex_lock(remote->lock);
359 *ret_ps = remote;
360 return TRUE;
361 }
362 silc_mutex_unlock(ps->sc->engine->lock);
363
364 /* Unknown sender */
365 if (!ps->remote_udp) {
366 ps->remote_udp = silc_calloc(1, sizeof(*ps->remote_udp));
367 if (silc_unlikely(!ps->remote_udp)) {
368 silc_mutex_unlock(ps->lock);
369 SILC_PACKET_CALLBACK_ERROR(ps, SILC_PACKET_ERR_NO_MEMORY);
370 return FALSE;
371 }
372 }
373
374 /* Save sender IP and port */
375 silc_free(ps->remote_udp->remote_ip);
376 ps->remote_udp->remote_ip = strdup(remote_ip);
377 ps->remote_udp->remote_port = remote_port;
378
379 silc_buffer_pull_tail(inbuf, ret);
380 return TRUE;
381 }
382 }
383
384 /* Read data from the stream */
385 ret = silc_stream_read(stream, inbuf->tail, silc_buffer_taillen(inbuf));
386 if (silc_unlikely(ret <= 0)) {
387 silc_mutex_unlock(ps->lock);
388 if (ret == 0) {
389 /* EOS */
390 silc_buffer_reset(inbuf);
391 SILC_PACKET_CALLBACK_EOS(ps);
392 return FALSE;
393 }
394
395 if (ret == -1) {
396 /* Cannot read now, do it later. */
397 silc_buffer_pull(inbuf, silc_buffer_len(inbuf));
398 return FALSE;
399 }
400
401 /* Error */
402 silc_buffer_reset(inbuf);
403 SILC_PACKET_CALLBACK_ERROR(ps, SILC_PACKET_ERR_READ);
404 return FALSE;
405 }
406
407 silc_buffer_pull_tail(inbuf, ret);
408 return TRUE;
409 }
410
411 /* Our stream IO notifier callback. */
412
413 static void silc_packet_stream_io(SilcStream stream, SilcStreamStatus status,
414 void *context)
415 {
416 SilcPacketStream remote = NULL, ps = context;
417
418 silc_mutex_lock(ps->lock);
419
420 if (silc_unlikely(ps->destroyed)) {
421 silc_mutex_unlock(ps->lock);
422 return;
423 }
424
425 switch (status) {
426 case SILC_STREAM_CAN_READ:
427 /* Reading is locked also with stream->lock because we may be reading
428 at the same time other thread is writing to same underlaying stream. */
429 SILC_LOG_DEBUG(("Reading data from stream %p, ps %p", ps->stream, ps));
430
431 /* Read data from stream */
432 if (!silc_packet_stream_read(ps, &remote))
433 return;
434
435 /* Now process the data */
436 silc_packet_stream_ref(ps);
437 if (!remote) {
438 silc_packet_read_process(ps);
439 silc_mutex_unlock(ps->lock);
440 } else {
441 silc_packet_read_process(remote);
442 silc_mutex_unlock(remote->lock);
443 }
444 silc_packet_stream_unref(ps);
445 break;
446
447 case SILC_STREAM_CAN_WRITE:
448 SILC_LOG_DEBUG(("Writing pending data to stream %p, ps %p",
449 ps->stream, ps));
450
451 if (silc_unlikely(!silc_buffer_headlen(&ps->outbuf))) {
452 silc_mutex_unlock(ps->lock);
453 return;
454 }
455
456 /* Write pending data to stream */
457 silc_packet_stream_write(ps, FALSE);
458 break;
459
460 default:
461 silc_mutex_unlock(ps->lock);
462 break;
463 }
464 }
465
466 /* Allocate packet */
467
468 static SilcPacket silc_packet_alloc(SilcPacketEngine engine)
469 {
470 SilcPacket packet;
471
472 SILC_LOG_DEBUG(("Packet pool count %d",
473 silc_list_count(engine->packet_pool)));
474
475 silc_mutex_lock(engine->lock);
476
477 /* Get packet from freelist or allocate new one. */
478 packet = silc_list_get(engine->packet_pool);
479 if (!packet) {
480 void *tmp;
481
482 silc_mutex_unlock(engine->lock);
483
484 packet = silc_calloc(1, sizeof(*packet));
485 if (silc_unlikely(!packet))
486 return NULL;
487
488 SILC_LOG_DEBUG(("Allocating new packet %p", packet));
489
490 tmp = silc_malloc(SILC_PACKET_DEFAULT_SIZE);
491 if (silc_unlikely(!tmp)) {
492 silc_free(packet);
493 return NULL;
494 }
495 silc_buffer_set(&packet->buffer, tmp, SILC_PACKET_DEFAULT_SIZE);
496 silc_buffer_reset(&packet->buffer);
497
498 return packet;
499 }
500
501 SILC_LOG_DEBUG(("Get packet %p", packet));
502
503 /* Delete from freelist */
504 silc_list_del(engine->packet_pool, packet);
505
506 silc_mutex_unlock(engine->lock);
507
508 return packet;
509 }
510
511 /* UDP remote stream hash table destructor */
512
513 static void silc_packet_engine_hash_destr(void *key, void *context,
514 void *user_context)
515 {
516 silc_free(key);
517 }
518
519 /* Per scheduler context hash table destructor */
520
521 static void silc_packet_engine_context_destr(void *key, void *context,
522 void *user_context)
523 {
524 SilcPacketEngineContext sc = context;
525 SilcBuffer buffer;
526
527 silc_dlist_start(sc->inbufs);
528 while ((buffer = silc_dlist_get(sc->inbufs))) {
529 silc_buffer_clear(buffer);
530 silc_buffer_free(buffer);
531 silc_dlist_del(sc->inbufs, buffer);
532 }
533
534 silc_dlist_uninit(sc->inbufs);
535 silc_free(sc);
536 }
537
538
539 /******************************** Packet API ********************************/
540
541 /* Allocate new packet engine */
542
543 SilcPacketEngine
544 silc_packet_engine_start(SilcRng rng, SilcBool router,
545 SilcPacketCallbacks *callbacks,
546 void *callback_context)
547 {
548 SilcPacketEngine engine;
549 SilcPacket packet;
550 int i;
551 void *tmp;
552
553 SILC_LOG_DEBUG(("Starting new packet engine"));
554
555 if (!callbacks)
556 return NULL;
557 if (!callbacks->packet_receive || !callbacks->eos || !callbacks->error)
558 return NULL;
559
560 engine = silc_calloc(1, sizeof(*engine));
561 if (!engine)
562 return NULL;
563
564 engine->contexts = silc_hash_table_alloc(NULL, 0, silc_hash_ptr,
565 NULL, NULL, NULL,
566 silc_packet_engine_context_destr,
567 engine, TRUE);
568 if (!engine->contexts) {
569 silc_free(engine);
570 return NULL;
571 }
572
573 engine->rng = rng;
574 engine->local_is_router = router;
575 engine->callbacks = callbacks;
576 engine->callback_context = callback_context;
577 silc_list_init(engine->streams, struct SilcPacketStreamStruct, next);
578 silc_mutex_alloc(&engine->lock);
579
580 /* Allocate packet free list */
581 silc_list_init(engine->packet_pool, struct SilcPacketStruct, next);
582 for (i = 0; i < 5; i++) {
583 packet = silc_calloc(1, sizeof(*packet));
584 if (!packet) {
585 silc_packet_engine_stop(engine);
586 return NULL;
587 }
588
589 tmp = silc_malloc(SILC_PACKET_DEFAULT_SIZE);
590 if (!tmp) {
591 silc_packet_engine_stop(engine);
592 return NULL;
593 }
594 silc_buffer_set(&packet->buffer, tmp, SILC_PACKET_DEFAULT_SIZE);
595 silc_buffer_reset(&packet->buffer);
596
597 silc_list_add(engine->packet_pool, packet);
598 }
599 silc_list_start(engine->packet_pool);
600
601 return engine;
602 }
603
604 /* Stop packet engine */
605
606 void silc_packet_engine_stop(SilcPacketEngine engine)
607 {
608 SilcPacket packet;
609
610 SILC_LOG_DEBUG(("Stopping packet engine"));
611
612 if (!engine)
613 return;
614
615 /* Free packet free list */
616 silc_list_start(engine->packet_pool);
617 while ((packet = silc_list_get(engine->packet_pool))) {
618 silc_buffer_purge(&packet->buffer);
619 silc_free(packet);
620 }
621
622 silc_hash_table_free(engine->contexts);
623 silc_mutex_free(engine->lock);
624 silc_free(engine);
625 }
626
627 static const char *packet_error[] = {
628 "Cannot read from stream",
629 "Cannot write to stream",
630 "Packet MAC failed",
631 "Packet decryption failed",
632 "Unknown SID",
633 "Packet is malformed",
634 "System out of memory",
635 };
636
637 /* Return packet error string */
638
639 const char *silc_packet_error_string(SilcPacketError error)
640 {
641 if (error < SILC_PACKET_ERR_READ || error > SILC_PACKET_ERR_NO_MEMORY)
642 return "<invalid error code>";
643 return packet_error[error];
644 }
645
646 /* Return list of packet streams in the engine */
647
648 SilcDList silc_packet_engine_get_streams(SilcPacketEngine engine)
649 {
650 SilcDList list;
651 SilcPacketStream ps;
652
653 list = silc_dlist_init();
654 if (!list)
655 return NULL;
656
657 silc_mutex_lock(engine->lock);
658 silc_list_start(engine->streams);
659 while ((ps = silc_list_get(engine->streams))) {
660 silc_packet_stream_ref(ps);
661 silc_dlist_add(list, ps);
662 }
663 silc_mutex_unlock(engine->lock);
664
665 return list;
666 }
667
668 /* Free list returned by silc_packet_engine_get_streams */
669
670 void silc_packet_engine_free_streams_list(SilcDList streams)
671 {
672 SilcPacketStream ps;
673
674 silc_dlist_start(streams);
675 while ((ps = silc_dlist_get(streams)))
676 silc_packet_stream_unref(ps);
677
678 silc_dlist_uninit(streams);
679 }
680
681 /* Create new packet stream */
682
683 SilcPacketStream silc_packet_stream_create(SilcPacketEngine engine,
684 SilcSchedule schedule,
685 SilcStream stream)
686 {
687 SilcPacketStream ps;
688 SilcBuffer inbuf;
689 void *tmp;
690
691 SILC_LOG_DEBUG(("Creating new packet stream"));
692
693 if (!engine || !stream)
694 return NULL;
695
696 ps = silc_calloc(1, sizeof(*ps));
697 if (!ps)
698 return NULL;
699
700 ps->stream = stream;
701 silc_atomic_init8(&ps->refcnt, 1);
702 silc_mutex_alloc(&ps->lock);
703
704 /* Allocate out buffer */
705 tmp = silc_malloc(SILC_PACKET_DEFAULT_SIZE);
706 if (!tmp) {
707 silc_packet_stream_destroy(ps);
708 return NULL;
709 }
710 silc_buffer_set(&ps->outbuf, tmp, SILC_PACKET_DEFAULT_SIZE);
711 silc_buffer_reset(&ps->outbuf);
712
713 /* Initialize packet procesors list */
714 ps->process = silc_dlist_init();
715 if (!ps->process) {
716 silc_packet_stream_destroy(ps);
717 return NULL;
718 }
719
720 silc_mutex_lock(engine->lock);
721
722 /* Add per scheduler context */
723 if (!silc_hash_table_find(engine->contexts, schedule, NULL,
724 (void *)&ps->sc)) {
725 ps->sc = silc_calloc(1, sizeof(*ps->sc));
726 if (!ps->sc) {
727 silc_packet_stream_destroy(ps);
728 silc_mutex_unlock(engine->lock);
729 return NULL;
730 }
731 ps->sc->engine = engine;
732 ps->sc->schedule = schedule;
733
734 /* Allocate data input buffer */
735 inbuf = silc_buffer_alloc(SILC_PACKET_DEFAULT_SIZE * 65);
736 if (!inbuf) {
737 silc_free(ps->sc);
738 ps->sc = NULL;
739 silc_packet_stream_destroy(ps);
740 silc_mutex_unlock(engine->lock);
741 return NULL;
742 }
743 silc_buffer_reset(inbuf);
744
745 ps->sc->inbufs = silc_dlist_init();
746 if (!ps->sc->inbufs) {
747 silc_buffer_free(inbuf);
748 silc_free(ps->sc);
749 ps->sc = NULL;
750 silc_packet_stream_destroy(ps);
751 silc_mutex_unlock(engine->lock);
752 return NULL;
753 }
754 silc_dlist_add(ps->sc->inbufs, inbuf);
755
756 /* Add to per scheduler context hash table */
757 if (!silc_hash_table_add(engine->contexts, schedule, ps->sc)) {
758 silc_buffer_free(inbuf);
759 silc_dlist_del(ps->sc->inbufs, inbuf);
760 silc_free(ps->sc);
761 ps->sc = NULL;
762 silc_packet_stream_destroy(ps);
763 silc_mutex_unlock(engine->lock);
764 return NULL;
765 }
766 }
767 ps->sc->stream_count++;
768
769 /* Add the packet stream to engine */
770 silc_list_add(engine->streams, ps);
771
772 /* If this is UDP stream, allocate UDP remote stream hash table */
773 if (!engine->udp_remote && silc_socket_stream_is_udp(stream, NULL))
774 engine->udp_remote =
775 silc_hash_table_alloc(NULL, 0, silc_hash_string_case, NULL,
776 silc_hash_string_case_compare, NULL,
777 silc_packet_engine_hash_destr, NULL, TRUE);
778
779 silc_mutex_unlock(engine->lock);
780
781 /* Set IO notifier callback. This schedules this stream for I/O. */
782 if (!silc_stream_set_notifier(ps->stream, schedule,
783 silc_packet_stream_io, ps)) {
784 SILC_LOG_DEBUG(("Cannot set stream notifier for packet stream"));
785 silc_packet_stream_destroy(ps);
786 return NULL;
787 }
788
789 SILC_LOG_DEBUG(("Created packet stream %p", ps));
790
791 return ps;
792 }
793
794 /* Add new remote packet stream for UDP packet streams */
795
796 SilcPacketStream silc_packet_stream_add_remote(SilcPacketStream stream,
797 const char *remote_ip,
798 SilcUInt16 remote_port,
799 SilcPacket packet)
800 {
801 SilcPacketEngine engine = stream->sc->engine;
802 SilcPacketStream ps;
803 char *tuple;
804 void *tmp;
805
806 SILC_LOG_DEBUG(("Adding UDP remote %s:%d to packet stream %p",
807 remote_ip, remote_port, stream));
808
809 if (!stream || !remote_ip || !remote_port)
810 return NULL;
811
812 if (!silc_socket_stream_is_udp(stream->stream, NULL)) {
813 SILC_LOG_ERROR(("Stream is not UDP stream, cannot add remote IP"));
814 return NULL;
815 }
816
817 ps = silc_calloc(1, sizeof(*ps));
818 if (!ps)
819 return NULL;
820 ps->sc = stream->sc;
821
822 silc_atomic_init8(&ps->refcnt, 1);
823 silc_mutex_alloc(&ps->lock);
824
825 /* Set the UDP packet stream as underlaying stream */
826 silc_packet_stream_ref(stream);
827 ps->stream = (SilcStream)stream;
828 ps->udp = TRUE;
829
830 /* Allocate out buffer */
831 tmp = silc_malloc(SILC_PACKET_DEFAULT_SIZE);
832 if (!tmp) {
833 silc_packet_stream_destroy(ps);
834 return NULL;
835 }
836 silc_buffer_set(&ps->outbuf, tmp, SILC_PACKET_DEFAULT_SIZE);
837 silc_buffer_reset(&ps->outbuf);
838
839 /* Initialize packet procesors list */
840 ps->process = silc_dlist_init();
841 if (!ps->process) {
842 silc_packet_stream_destroy(ps);
843 return NULL;
844 }
845
846 /* Add to engine with this IP and port pair */
847 tuple = silc_format("%d%s", remote_port, remote_ip);
848 silc_mutex_lock(engine->lock);
849 if (!tuple || !silc_hash_table_add(engine->udp_remote, tuple, ps)) {
850 silc_mutex_unlock(engine->lock);
851 silc_packet_stream_destroy(ps);
852 return NULL;
853 }
854 silc_mutex_unlock(engine->lock);
855
856 /* Save remote IP and port pair */
857 ps->remote_udp = silc_calloc(1, sizeof(*ps->remote_udp));
858 if (!ps->remote_udp) {
859 silc_packet_stream_destroy(ps);
860 return NULL;
861 }
862 ps->remote_udp->remote_port = remote_port;
863 ps->remote_udp->remote_ip = strdup(remote_ip);
864 if (!ps->remote_udp->remote_ip) {
865 silc_packet_stream_destroy(ps);
866 return NULL;
867 }
868
869 if (packet) {
870 /* Inject packet to the new stream */
871 packet->stream = ps;
872 silc_packet_stream_ref(ps);
873 silc_schedule_task_add_timeout(silc_stream_get_schedule(stream->stream),
874 silc_packet_stream_inject_packet, packet,
875 0, 0);
876 }
877
878 return ps;
879 }
880
881 /* Destroy packet stream */
882
883 void silc_packet_stream_destroy(SilcPacketStream stream)
884 {
885 SilcPacketEngine engine;
886
887 if (!stream)
888 return;
889
890 if (silc_atomic_sub_int8(&stream->refcnt, 1) > 0) {
891 stream->destroyed = TRUE;
892
893 SILC_LOG_DEBUG(("Marking packet stream %p destroyed", stream));
894
895 /* Close the underlaying stream */
896 if (!stream->udp && stream->stream)
897 silc_stream_close(stream->stream);
898 return;
899 }
900
901 SILC_LOG_DEBUG(("Destroying packet stream %p", stream));
902
903 if (!stream->udp) {
904 /* Delete from engine */
905 engine = stream->sc->engine;
906 silc_mutex_lock(engine->lock);
907 silc_list_del(engine->streams, stream);
908
909 /* Remove per scheduler context, if it is not used anymore */
910 if (stream->sc) {
911 stream->sc->stream_count--;
912 if (!stream->sc->stream_count)
913 silc_hash_table_del(engine->contexts, stream->sc->schedule);
914 }
915 silc_mutex_unlock(engine->lock);
916
917 /* Destroy the underlaying stream */
918 if (stream->stream)
919 silc_stream_destroy(stream->stream);
920 } else {
921 /* Delete from UDP remote hash table */
922 char tuple[64];
923 engine = stream->sc->engine;
924 silc_snprintf(tuple, sizeof(tuple), "%d%s",
925 stream->remote_udp->remote_port,
926 stream->remote_udp->remote_ip);
927 silc_mutex_lock(engine->lock);
928 silc_hash_table_del(engine->udp_remote, tuple);
929 silc_mutex_unlock(engine->lock);
930
931 silc_free(stream->remote_udp->remote_ip);
932 silc_free(stream->remote_udp);
933
934 /* Unreference the underlaying packet stream */
935 silc_packet_stream_unref((SilcPacketStream)stream->stream);
936 }
937
938 /* Clear and free buffers */
939 silc_buffer_clear(&stream->outbuf);
940 silc_buffer_purge(&stream->outbuf);
941
942 if (stream->process) {
943 SilcPacketProcess p;
944 silc_dlist_start(stream->process);
945 while ((p = silc_dlist_get(stream->process))) {
946 silc_free(p->types);
947 silc_free(p);
948 silc_dlist_del(stream->process, p);
949 }
950 silc_dlist_uninit(stream->process);
951 }
952
953 /* Destroy ciphers and HMACs */
954 if (stream->send_key[0])
955 silc_cipher_free(stream->send_key[0]);
956 if (stream->receive_key[0])
957 silc_cipher_free(stream->receive_key[0]);
958 if (stream->send_hmac[0])
959 silc_hmac_free(stream->send_hmac[0]);
960 if (stream->receive_hmac[0])
961 silc_hmac_free(stream->receive_hmac[0]);
962 if (stream->send_key[1])
963 silc_cipher_free(stream->send_key[1]);
964 if (stream->receive_key[1])
965 silc_cipher_free(stream->receive_key[1]);
966 if (stream->send_hmac[1])
967 silc_hmac_free(stream->send_hmac[1]);
968 if (stream->receive_hmac[1])
969 silc_hmac_free(stream->receive_hmac[1]);
970
971 /* Free IDs */
972 silc_free(stream->src_id);
973 silc_free(stream->dst_id);
974
975 silc_atomic_uninit8(&stream->refcnt);
976 silc_mutex_free(stream->lock);
977 silc_free(stream);
978 }
979
980 /* Return TRUE if the stream is valid */
981
982 SilcBool silc_packet_stream_is_valid(SilcPacketStream stream)
983 {
984 return stream->destroyed == FALSE;
985 }
986
987 /* Marks as router stream */
988
989 void silc_packet_stream_set_router(SilcPacketStream stream)
990 {
991 stream->is_router = TRUE;
992 }
993
994 /* Mark to include IV in ciphertext */
995
996 void silc_packet_stream_set_iv_included(SilcPacketStream stream)
997 {
998 stream->iv_included = TRUE;
999 }
1000
1001 /* Links `callbacks' to `stream' for specified packet types */
1002
1003 static SilcBool silc_packet_stream_link_va(SilcPacketStream stream,
1004 SilcPacketCallbacks *callbacks,
1005 void *callback_context,
1006 int priority, va_list ap)
1007 {
1008 SilcPacketProcess p, e;
1009 SilcInt32 packet_type;
1010 int i;
1011
1012 SILC_LOG_DEBUG(("Linking callbacks %p to stream %p", callbacks, stream));
1013
1014 if (!callbacks)
1015 return FALSE;
1016 if (!callbacks->packet_receive)
1017 return FALSE;
1018
1019 p = silc_calloc(1, sizeof(*p));
1020 if (!p)
1021 return FALSE;
1022
1023 p->priority = priority;
1024 p->callbacks = callbacks;
1025 p->callback_context = callback_context;
1026
1027 silc_mutex_lock(stream->lock);
1028
1029 if (!stream->process) {
1030 stream->process = silc_dlist_init();
1031 if (!stream->process) {
1032 silc_mutex_unlock(stream->lock);
1033 return FALSE;
1034 }
1035 }
1036
1037 /* According to priority set the procesor to correct position. First
1038 entry has the highest priority */
1039 silc_dlist_start(stream->process);
1040 while ((e = silc_dlist_get(stream->process)) != SILC_LIST_END) {
1041 if (p->priority > e->priority) {
1042 silc_dlist_insert(stream->process, p);
1043 break;
1044 }
1045 }
1046 if (!e)
1047 silc_dlist_add(stream->process, p);
1048
1049 /* Ge