1 /*  RTMP Proxy Server
2  *  Copyright (C) 2009 Andrej Stepanchuk
3  *  Copyright (C) 2009 Howard Chu
4  *
5  *  This Program is free software; you can redistribute it and/or modify
6  *  it under the terms of the GNU General Public License as published by
7  *  the Free Software Foundation; either version 2, or (at your option)
8  *  any later version.
9  *
10  *  This Program is distributed in the hope that it will be useful,
11  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
12  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13  *  GNU General Public License for more details.
14  *
15  *  You should have received a copy of the GNU General Public License
16  *  along with RTMPDump; see the file COPYING.  If not, write to
17  *  the Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
18  *  Boston, MA  02110-1301, USA.
19  *  http://www.gnu.org/copyleft/gpl.html
20  *
21  */
22 
23 /* This is a Proxy Server that displays the connection parameters from a
24  * client and then saves any data streamed to the client.
25  */
26 
27 #include <stdlib.h>
28 #include <string.h>
29 #include <math.h>
30 #include <limits.h>
31 
32 #include <signal.h>
33 #include <getopt.h>
34 
35 #include <assert.h>
36 
37 #include "librtmp/rtmp_sys.h"
38 #include "librtmp/log.h"
39 
40 #include "thread.h"
41 
42 #ifdef linux
43 #include <linux/netfilter_ipv4.h>
44 #endif
45 
46 #define RD_SUCCESS		0
47 #define RD_FAILED		1
48 #define RD_INCOMPLETE		2
49 
50 #define PACKET_SIZE 1024*1024
51 
52 #ifdef WIN32
53 #define InitSockets()	{\
54         WORD version;			\
55         WSADATA wsaData;		\
56 					\
57         version = MAKEWORD(1,1);	\
58         WSAStartup(version, &wsaData);	}
59 
60 #define	CleanupSockets()	WSACleanup()
61 #else
62 #define InitSockets()
63 #define	CleanupSockets()
64 #endif
65 
66 enum
67 {
68   STREAMING_ACCEPTING,
69   STREAMING_IN_PROGRESS,
70   STREAMING_STOPPING,
71   STREAMING_STOPPED
72 };
73 
74 typedef struct Flist
75 {
76   struct Flist *f_next;
77   FILE *f_file;
78   AVal f_path;
79 } Flist;
80 
81 typedef struct Plist
82 {
83   struct Plist *p_next;
84   RTMPPacket p_pkt;
85 } Plist;
86 
87 typedef struct
88 {
89   int socket;
90   int state;
91   uint32_t stamp;
92   RTMP rs;
93   RTMP rc;
94   Plist *rs_pkt[2];	/* head, tail */
95   Plist *rc_pkt[2];	/* head, tail */
96   Flist *f_head, *f_tail;
97   Flist *f_cur;
98 
99 } STREAMING_SERVER;
100 
101 STREAMING_SERVER *rtmpServer = 0;	// server structure pointer
102 
103 STREAMING_SERVER *startStreaming(const char *address, int port);
104 void stopStreaming(STREAMING_SERVER * server);
105 
106 #define STR2AVAL(av,str)	av.av_val = str; av.av_len = strlen(av.av_val)
107 
108 #ifdef _DEBUG
109 uint32_t debugTS = 0;
110 
111 int pnum = 0;
112 
113 FILE *netstackdump = NULL;
114 FILE *netstackdump_read = NULL;
115 #endif
116 
117 #define BUFFERTIME	(4*60*60*1000)	/* 4 hours */
118 
119 #define SAVC(x) static const AVal av_##x = AVC(#x)
120 
121 SAVC(app);
122 SAVC(connect);
123 SAVC(flashVer);
124 SAVC(swfUrl);
125 SAVC(pageUrl);
126 SAVC(tcUrl);
127 SAVC(fpad);
128 SAVC(capabilities);
129 SAVC(audioCodecs);
130 SAVC(videoCodecs);
131 SAVC(videoFunction);
132 SAVC(objectEncoding);
133 SAVC(_result);
134 SAVC(createStream);
135 SAVC(play);
136 SAVC(closeStream);
137 SAVC(fmsVer);
138 SAVC(mode);
139 SAVC(level);
140 SAVC(code);
141 SAVC(secureToken);
142 SAVC(onStatus);
143 SAVC(close);
144 static const AVal av_NetStream_Failed = AVC("NetStream.Failed");
145 static const AVal av_NetStream_Play_Failed = AVC("NetStream.Play.Failed");
146 static const AVal av_NetStream_Play_StreamNotFound =
147 AVC("NetStream.Play.StreamNotFound");
148 static const AVal av_NetConnection_Connect_InvalidApp =
149 AVC("NetConnection.Connect.InvalidApp");
150 static const AVal av_NetStream_Play_Start = AVC("NetStream.Play.Start");
151 static const AVal av_NetStream_Play_Complete = AVC("NetStream.Play.Complete");
152 static const AVal av_NetStream_Play_Stop = AVC("NetStream.Play.Stop");
153 
154 static const char *cst[] = { "client", "server" };
155 
156 // Returns 0 for OK/Failed/error, 1 for 'Stop or Complete'
157 int
ServeInvoke(STREAMING_SERVER * server,int which,RTMPPacket * pack,const char * body)158 ServeInvoke(STREAMING_SERVER *server, int which, RTMPPacket *pack, const char *body)
159 {
160   int ret = 0, nRes;
161   int nBodySize = pack->m_nBodySize;
162 
163   if (body > pack->m_body)
164     nBodySize--;
165 
166   if (body[0] != 0x02)		// make sure it is a string method name we start with
167     {
168       RTMP_Log(RTMP_LOGWARNING, "%s, Sanity failed. no string method in invoke packet",
169 	  __FUNCTION__);
170       return 0;
171     }
172 
173   AMFObject obj;
174   nRes = AMF_Decode(&obj, body, nBodySize, FALSE);
175   if (nRes < 0)
176     {
177       RTMP_Log(RTMP_LOGERROR, "%s, error decoding invoke packet", __FUNCTION__);
178       return 0;
179     }
180 
181   AMF_Dump(&obj);
182   AVal method;
183   AMFProp_GetString(AMF_GetProp(&obj, NULL, 0), &method);
184   RTMP_Log(RTMP_LOGDEBUG, "%s, %s invoking <%s>", __FUNCTION__, cst[which], method.av_val);
185 
186   if (AVMATCH(&method, &av_connect))
187     {
188       AMFObject cobj;
189       AVal pname, pval;
190       int i;
191       AMFProp_GetObject(AMF_GetProp(&obj, NULL, 2), &cobj);
192       RTMP_LogPrintf("Processing connect\n");
193       for (i=0; i<cobj.o_num; i++)
194         {
195           pname = cobj.o_props[i].p_name;
196           pval.av_val = NULL;
197           pval.av_len = 0;
198           if (cobj.o_props[i].p_type == AMF_STRING)
199             {
200               pval = cobj.o_props[i].p_vu.p_aval;
201               RTMP_LogPrintf("%.*s: %.*s\n", pname.av_len, pname.av_val, pval.av_len, pval.av_val);
202             }
203           if (AVMATCH(&pname, &av_app))
204             {
205               server->rc.Link.app = pval;
206               pval.av_val = NULL;
207             }
208           else if (AVMATCH(&pname, &av_flashVer))
209             {
210               server->rc.Link.flashVer = pval;
211               pval.av_val = NULL;
212             }
213           else if (AVMATCH(&pname, &av_swfUrl))
214             {
215 #ifdef CRYPTO
216               if (pval.av_val)
217 	        RTMP_HashSWF(pval.av_val, &server->rc.Link.SWFSize,
218 		  (unsigned char *)server->rc.Link.SWFHash, 30);
219 #endif
220               server->rc.Link.swfUrl = pval;
221               pval.av_val = NULL;
222             }
223           else if (AVMATCH(&pname, &av_tcUrl))
224             {
225               char *r1 = NULL, *r2;
226               int len;
227 
228               server->rc.Link.tcUrl = pval;
229               if ((pval.av_val[0] | 0x40) == 'r' &&
230                   (pval.av_val[1] | 0x40) == 't' &&
231                   (pval.av_val[2] | 0x40) == 'm' &&
232                   (pval.av_val[3] | 0x40) == 'p')
233                 {
234                   if (pval.av_val[4] == ':')
235                     {
236                       server->rc.Link.protocol = RTMP_PROTOCOL_RTMP;
237                       r1 = pval.av_val+7;
238                     }
239                   else if ((pval.av_val[4] | 0x40) == 'e' && pval.av_val[5] == ':')
240                     {
241                       server->rc.Link.protocol = RTMP_PROTOCOL_RTMPE;
242                       r1 = pval.av_val+8;
243                     }
244                   r2 = strchr(r1, '/');
245 		  if (r2)
246                     len = r2 - r1;
247 		  else
248 		    len = pval.av_len - (r1 - pval.av_val);
249                   r2 = malloc(len+1);
250                   memcpy(r2, r1, len);
251                   r2[len] = '\0';
252                   server->rc.Link.hostname.av_val = r2;
253                   r1 = strrchr(r2, ':');
254                   if (r1)
255                     {
256 		      server->rc.Link.hostname.av_len = r1 - r2;
257                       *r1++ = '\0';
258                       server->rc.Link.port = atoi(r1);
259                     }
260                   else
261                     {
262 		      server->rc.Link.hostname.av_len = len;
263                       server->rc.Link.port = 1935;
264                     }
265                 }
266               pval.av_val = NULL;
267             }
268           else if (AVMATCH(&pname, &av_pageUrl))
269             {
270               server->rc.Link.pageUrl = pval;
271               pval.av_val = NULL;
272             }
273           else if (AVMATCH(&pname, &av_audioCodecs))
274             {
275               server->rc.m_fAudioCodecs = cobj.o_props[i].p_vu.p_number;
276             }
277           else if (AVMATCH(&pname, &av_videoCodecs))
278             {
279               server->rc.m_fVideoCodecs = cobj.o_props[i].p_vu.p_number;
280             }
281           else if (AVMATCH(&pname, &av_objectEncoding))
282             {
283               server->rc.m_fEncoding = cobj.o_props[i].p_vu.p_number;
284               server->rc.m_bSendEncoding = TRUE;
285             }
286           /* Dup'd a string we didn't recognize? */
287           if (pval.av_val)
288             free(pval.av_val);
289         }
290       if (obj.o_num > 3)
291         {
292           if (AMFProp_GetBoolean(&obj.o_props[3]))
293             server->rc.Link.lFlags |= RTMP_LF_AUTH;
294           if (obj.o_num > 4)
295           {
296             AMFProp_GetString(&obj.o_props[4], &server->rc.Link.auth);
297           }
298         }
299 
300       if (!RTMP_Connect(&server->rc, pack))
301         {
302           /* failed */
303           return 1;
304         }
305       server->rc.m_bSendCounter = FALSE;
306     }
307   else if (AVMATCH(&method, &av_play))
308     {
309       Flist *fl;
310       AVal av;
311       FILE *out;
312       char *file, *p, *q;
313       char flvHeader[] = { 'F', 'L', 'V', 0x01,
314          0x05,                       // video + audio, we finalize later if the value is different
315          0x00, 0x00, 0x00, 0x09,
316          0x00, 0x00, 0x00, 0x00      // first prevTagSize=0
317        };
318       int count = 0, flen;
319 
320       server->rc.m_stream_id = pack->m_nInfoField2;
321       AMFProp_GetString(AMF_GetProp(&obj, NULL, 3), &av);
322       server->rc.Link.playpath = av;
323       if (!av.av_val)
324         goto out;
325 
326       /* check for duplicates */
327       for (fl = server->f_head; fl; fl=fl->f_next)
328         {
329           if (AVMATCH(&av, &fl->f_path))
330             count++;
331         }
332       /* strip trailing URL parameters */
333       q = memchr(av.av_val, '?', av.av_len);
334       if (q)
335         {
336 	  if (q == av.av_val)
337 	    {
338 	      av.av_val++;
339 	      av.av_len--;
340 	    }
341 	  else
342 	    {
343               av.av_len = q - av.av_val;
344 	    }
345 	}
346       /* strip leading slash components */
347       for (p=av.av_val+av.av_len-1; p>=av.av_val; p--)
348         if (*p == '/')
349           {
350             p++;
351             av.av_len -= p - av.av_val;
352             av.av_val = p;
353             break;
354           }
355       /* skip leading dot */
356       if (av.av_val[0] == '.')
357         {
358           av.av_val++;
359           av.av_len--;
360         }
361       flen = av.av_len;
362       /* hope there aren't more than 255 dups */
363       if (count)
364         flen += 2;
365       file = malloc(flen+1);
366 
367       memcpy(file, av.av_val, av.av_len);
368       if (count)
369         sprintf(file+av.av_len, "%02x", count);
370       else
371         file[av.av_len] = '\0';
372       for (p=file; *p; p++)
373         if (*p == ':')
374           *p = '_';
375       RTMP_LogPrintf("Playpath: %.*s\nSaving as: %s\n",
376         server->rc.Link.playpath.av_len, server->rc.Link.playpath.av_val,
377         file);
378       out = fopen(file, "wb");
379       free(file);
380       if (!out)
381         ret = 1;
382       else
383         {
384           fwrite(flvHeader, 1, sizeof(flvHeader), out);
385           av = server->rc.Link.playpath;
386           fl = malloc(sizeof(Flist)+av.av_len+1);
387           fl->f_file = out;
388           fl->f_path.av_len = av.av_len;
389           fl->f_path.av_val = (char *)(fl+1);
390           memcpy(fl->f_path.av_val, av.av_val, av.av_len);
391           fl->f_path.av_val[av.av_len] = '\0';
392           fl->f_next = NULL;
393           if (server->f_tail)
394             server->f_tail->f_next = fl;
395           else
396             server->f_head = fl;
397           server->f_tail = fl;
398         }
399     }
400   else if (AVMATCH(&method, &av_onStatus))
401     {
402       AMFObject obj2;
403       AVal code, level;
404       AMFProp_GetObject(AMF_GetProp(&obj, NULL, 3), &obj2);
405       AMFProp_GetString(AMF_GetProp(&obj2, &av_code, -1), &code);
406       AMFProp_GetString(AMF_GetProp(&obj2, &av_level, -1), &level);
407 
408       RTMP_Log(RTMP_LOGDEBUG, "%s, onStatus: %s", __FUNCTION__, code.av_val);
409       if (AVMATCH(&code, &av_NetStream_Failed)
410 	  || AVMATCH(&code, &av_NetStream_Play_Failed)
411 	  || AVMATCH(&code, &av_NetStream_Play_StreamNotFound)
412 	  || AVMATCH(&code, &av_NetConnection_Connect_InvalidApp))
413 	{
414 	  ret = 1;
415 	}
416 
417       if (AVMATCH(&code, &av_NetStream_Play_Start))
418 	{
419           /* set up the next stream */
420           if (server->f_cur)
421 		    {
422 		      if (server->f_cur->f_next)
423                 server->f_cur = server->f_cur->f_next;
424 			}
425           else
426             {
427               for (server->f_cur = server->f_head; server->f_cur &&
428                     !server->f_cur->f_file; server->f_cur = server->f_cur->f_next) ;
429             }
430 	  server->rc.m_bPlaying = TRUE;
431 	}
432 
433       // Return 1 if this is a Play.Complete or Play.Stop
434       if (AVMATCH(&code, &av_NetStream_Play_Complete)
435 	  || AVMATCH(&code, &av_NetStream_Play_Stop))
436 	{
437 	  ret = 1;
438 	}
439     }
440   else if (AVMATCH(&method, &av_closeStream))
441     {
442       ret = 1;
443     }
444   else if (AVMATCH(&method, &av_close))
445     {
446       RTMP_Close(&server->rc);
447       ret = 1;
448     }
449 out:
450   AMF_Reset(&obj);
451   return ret;
452 }
453 
454 int
ServePacket(STREAMING_SERVER * server,int which,RTMPPacket * packet)455 ServePacket(STREAMING_SERVER *server, int which, RTMPPacket *packet)
456 {
457   int ret = 0;
458 
459   RTMP_Log(RTMP_LOGDEBUG, "%s, %s sent packet type %02X, size %u bytes", __FUNCTION__,
460     cst[which], packet->m_packetType, packet->m_nBodySize);
461 
462   switch (packet->m_packetType)
463     {
464     case RTMP_PACKET_TYPE_CHUNK_SIZE:
465       // chunk size
466 //      HandleChangeChunkSize(r, packet);
467       break;
468 
469     case RTMP_PACKET_TYPE_BYTES_READ_REPORT:
470       // bytes read report
471       break;
472 
473     case RTMP_PACKET_TYPE_CONTROL:
474       // ctrl
475 //      HandleCtrl(r, packet);
476       break;
477 
478     case RTMP_PACKET_TYPE_SERVER_BW:
479       // server bw
480 //      HandleServerBW(r, packet);
481       break;
482 
483     case RTMP_PACKET_TYPE_CLIENT_BW:
484       // client bw
485  //     HandleClientBW(r, packet);
486       break;
487 
488     case RTMP_PACKET_TYPE_AUDIO:
489       // audio data
490       //RTMP_Log(RTMP_LOGDEBUG, "%s, received: audio %lu bytes", __FUNCTION__, packet.m_nBodySize);
491       break;
492 
493     case RTMP_PACKET_TYPE_VIDEO:
494       // video data
495       //RTMP_Log(RTMP_LOGDEBUG, "%s, received: video %lu bytes", __FUNCTION__, packet.m_nBodySize);
496       break;
497 
498     case RTMP_PACKET_TYPE_FLEX_STREAM_SEND:
499       // flex stream send
500       break;
501 
502     case RTMP_PACKET_TYPE_FLEX_SHARED_OBJECT:
503       // flex shared object
504       break;
505 
506     case RTMP_PACKET_TYPE_FLEX_MESSAGE:
507       // flex message
508       {
509 	ret = ServeInvoke(server, which, packet, packet->m_body + 1);
510 	break;
511       }
512     case RTMP_PACKET_TYPE_INFO:
513       // metadata (notify)
514       break;
515 
516     case RTMP_PACKET_TYPE_SHARED_OBJECT:
517       /* shared object */
518       break;
519 
520     case RTMP_PACKET_TYPE_INVOKE:
521       // invoke
522       ret = ServeInvoke(server, which, packet, packet->m_body);
523       break;
524 
525     case RTMP_PACKET_TYPE_FLASH_VIDEO:
526       /* flv */
527 	break;
528 
529     default:
530       RTMP_Log(RTMP_LOGDEBUG, "%s, unknown packet type received: 0x%02x", __FUNCTION__,
531 	  packet->m_packetType);
532 #ifdef _DEBUG
533       RTMP_LogHex(RTMP_LOGDEBUG, packet->m_body, packet->m_nBodySize);
534 #endif
535     }
536   return ret;
537 }
538 
539 int
WriteStream(char ** buf,unsigned int * plen,uint32_t * nTimeStamp,RTMPPacket * packet)540 WriteStream(char **buf,	// target pointer, maybe preallocated
541 	    unsigned int *plen,	// length of buffer if preallocated
542             uint32_t *nTimeStamp,
543             RTMPPacket *packet)
544 {
545   uint32_t prevTagSize = 0;
546   int ret = -1, len = *plen;
547 
548   while (1)
549     {
550       char *packetBody = packet->m_body;
551       unsigned int nPacketLen = packet->m_nBodySize;
552 
553       // skip video info/command packets
554       if (packet->m_packetType == RTMP_PACKET_TYPE_VIDEO &&
555 	  nPacketLen == 2 && ((*packetBody & 0xf0) == 0x50))
556 	{
557 	  ret = 0;
558 	  break;
559 	}
560 
561       if (packet->m_packetType == RTMP_PACKET_TYPE_VIDEO && nPacketLen <= 5)
562 	{
563 	  RTMP_Log(RTMP_LOGWARNING, "ignoring too small video packet: size: %d",
564 	      nPacketLen);
565 	  ret = 0;
566 	  break;
567 	}
568       if (packet->m_packetType == RTMP_PACKET_TYPE_AUDIO && nPacketLen <= 1)
569 	{
570 	  RTMP_Log(RTMP_LOGWARNING, "ignoring too small audio packet: size: %d",
571 	      nPacketLen);
572 	  ret = 0;
573 	  break;
574 	}
575 #ifdef _DEBUG
576       RTMP_Log(RTMP_LOGDEBUG, "type: %02X, size: %d, TS: %d ms", packet->m_packetType,
577 	  nPacketLen, packet->m_nTimeStamp);
578       if (packet->m_packetType == RTMP_PACKET_TYPE_VIDEO)
579 	RTMP_Log(RTMP_LOGDEBUG, "frametype: %02X", (*packetBody & 0xf0));
580 #endif
581 
582       // calculate packet size and reallocate buffer if necessary
583       unsigned int size = nPacketLen
584 	+
585 	((packet->m_packetType == RTMP_PACKET_TYPE_AUDIO
586           || packet->m_packetType == RTMP_PACKET_TYPE_VIDEO
587 	  || packet->m_packetType == RTMP_PACKET_TYPE_INFO) ? 11 : 0)
588         + (packet->m_packetType != 0x16 ? 4 : 0);
589 
590       if (size + 4 > len)
591 	{
592           /* The extra 4 is for the case of an FLV stream without a last
593            * prevTagSize (we need extra 4 bytes to append it).  */
594 	  *buf = (char *) realloc(*buf, size + 4);
595 	  if (*buf == 0)
596 	    {
597 	      RTMP_Log(RTMP_LOGERROR, "Couldn't reallocate memory!");
598 	      ret = -1;		// fatal error
599 	      break;
600 	    }
601 	}
602       char *ptr = *buf, *pend = ptr + size+4;
603 
604       /* audio (RTMP_PACKET_TYPE_AUDIO), video (RTMP_PACKET_TYPE_VIDEO)
605        * or metadata (RTMP_PACKET_TYPE_INFO) packets: construct 11 byte
606        * header then add rtmp packet's data.  */
607       if (packet->m_packetType == RTMP_PACKET_TYPE_AUDIO
608           || packet->m_packetType == RTMP_PACKET_TYPE_VIDEO
609 	  || packet->m_packetType == RTMP_PACKET_TYPE_INFO)
610 	{
611 	  // set data type
612 	  //*dataType |= (((packet->m_packetType == RTMP_PACKET_TYPE_AUDIO)<<2)|(packet->m_packetType == RTMP_PACKET_TYPE_VIDEO));
613 
614 	  (*nTimeStamp) = packet->m_nTimeStamp;
615 	  prevTagSize = 11 + nPacketLen;
616 
617 	  *ptr++ = packet->m_packetType;
618 	  ptr = AMF_EncodeInt24(ptr, pend, nPacketLen);
619 	  ptr = AMF_EncodeInt24(ptr, pend, *nTimeStamp);
620 	  *ptr = (char) (((*nTimeStamp) & 0xFF000000) >> 24);
621 	  ptr++;
622 
623 	  // stream id
624 	  ptr = AMF_EncodeInt24(ptr, pend, 0);
625 	}
626 
627       memcpy(ptr, packetBody, nPacketLen);
628       unsigned int len = nPacketLen;
629 
630       // correct tagSize and obtain timestamp if we have an FLV stream
631       if (packet->m_packetType == RTMP_PACKET_TYPE_FLASH_VIDEO)
632 	{
633 	  unsigned int pos = 0;
634 
635 	  while (pos + 11 < nPacketLen)
636 	    {
637 	      uint32_t dataSize = AMF_DecodeInt24(packetBody + pos + 1);	// size without header (11) and without prevTagSize (4)
638 	      *nTimeStamp = AMF_DecodeInt24(packetBody + pos + 4);
639 	      *nTimeStamp |= (packetBody[pos + 7] << 24);
640 
641 #if 0
642 	      /* set data type */
643 	      *dataType |= (((*(packetBody+pos) == RTMP_PACKET_TYPE_AUDIO) << 2)
644                             | (*(packetBody+pos) == RTMP_PACKET_TYPE_VIDEO));
645 #endif
646 
647 	      if (pos + 11 + dataSize + 4 > nPacketLen)
648 		{
649 		  if (pos + 11 + dataSize > nPacketLen)
650 		    {
651 		      RTMP_Log(RTMP_LOGERROR,
652 			  "Wrong data size (%u), stream corrupted, aborting!",
653 			  dataSize);
654 		      ret = -2;
655 		      break;
656 		    }
657 		  RTMP_Log(RTMP_LOGWARNING, "No tagSize found, appending!");
658 
659 		  // we have to append a last tagSize!
660 		  prevTagSize = dataSize + 11;
661 		  AMF_EncodeInt32(ptr + pos + 11 + dataSize, pend, prevTagSize);
662 		  size += 4;
663 		  len += 4;
664 		}
665 	      else
666 		{
667 		  prevTagSize =
668 		    AMF_DecodeInt32(packetBody + pos + 11 + dataSize);
669 
670 #ifdef _DEBUG
671 		  RTMP_Log(RTMP_LOGDEBUG,
672 		      "FLV Packet: type %02X, dataSize: %lu, tagSize: %lu, timeStamp: %lu ms",
673 		      (unsigned char) packetBody[pos], dataSize, prevTagSize,
674 		      *nTimeStamp);
675 #endif
676 
677 		  if (prevTagSize != (dataSize + 11))
678 		    {
679 #ifdef _DEBUG
680 		      RTMP_Log(RTMP_LOGWARNING,
681 			  "Tag and data size are not consitent, writing tag size according to dataSize+11: %d",
682 			  dataSize + 11);
683 #endif
684 
685 		      prevTagSize = dataSize + 11;
686 		      AMF_EncodeInt32(ptr + pos + 11 + dataSize, pend, prevTagSize);
687 		    }
688 		}
689 
690 	      pos += prevTagSize + 4;	//(11+dataSize+4);
691 	    }
692 	}
693       ptr += len;
694 
695       if (packet->m_packetType != RTMP_PACKET_TYPE_FLASH_VIDEO)
696 	{			// FLV tag packets contain their own prevTagSize
697 	  AMF_EncodeInt32(ptr, pend, prevTagSize);
698 	  //ptr += 4;
699 	}
700 
701       ret = size;
702       break;
703     }
704 
705   if (len > *plen)
706     *plen = len;
707 
708   return ret;			// no more media packets
709 }
710 
711 TFTYPE
controlServerThread(void * unused)712 controlServerThread(void *unused)
713 {
714   char ich;
715   while (1)
716     {
717       ich = getchar();
718       switch (ich)
719 	{
720 	case 'q':
721 	  RTMP_LogPrintf("Exiting\n");
722 	  stopStreaming(rtmpServer);
723           free(rtmpServer);
724 	  exit(0);
725 	  break;
726 	default:
727 	  RTMP_LogPrintf("Unknown command \'%c\', ignoring\n", ich);
728 	}
729     }
730   TFRET();
731 }
732 
doServe(void * arg)733 TFTYPE doServe(void *arg)	// server socket and state (our listening socket)
734 {
735   STREAMING_SERVER *server = arg;
736   RTMPPacket pc = { 0 }, ps = { 0 };
737   RTMPChunk rk = { 0 };
738   char *buf = NULL;
739   unsigned int buflen = 131072;
740   int paused = FALSE;
741   int sockfd = server->socket;
742 
743   // timeout for http requests
744   fd_set rfds;
745   struct timeval tv;
746 
747   server->state = STREAMING_IN_PROGRESS;
748 
749   memset(&tv, 0, sizeof(struct timeval));
750   tv.tv_sec = 5;
751 
752   FD_ZERO(&rfds);
753   FD_SET(sockfd, &rfds);
754 
755   if (select(sockfd + 1, &rfds, NULL, NULL, &tv) <= 0)
756     {
757       RTMP_Log(RTMP_LOGERROR, "Request timeout/select failed, ignoring request");
758       goto quit;
759     }
760   else
761     {
762       RTMP_Init(&server->rs);
763       RTMP_Init(&server->rc);
764       server->rs.m_sb.sb_socket = sockfd;
765       if (!RTMP_Serve(&server->rs))
766         {
767           RTMP_Log(RTMP_LOGERROR, "Handshake failed");
768           goto cleanup;
769         }
770     }
771 
772   buf = malloc(buflen);
773 
774   /* Just process the Connect request */
775   while (RTMP_IsConnected(&server->rs) && RTMP_ReadPacket(&server->rs, &ps))
776     {
777       if (!RTMPPacket_IsReady(&ps))
778         continue;
779       ServePacket(server, 0, &ps);
780       RTMPPacket_Free(&ps);
781       if (RTMP_IsConnected(&server->rc))
782         break;
783     }
784 
785   pc.m_chunk = &rk;
786 
787   /* We have our own timeout in select() */
788   server->rc.Link.timeout = 10;
789   server->rs.Link.timeout = 10;
790   while (RTMP_IsConnected(&server->rs) || RTMP_IsConnected(&server->rc))
791     {
792       int n;
793       int sr, cr;
794 
795       cr = server->rc.m_sb.sb_size;
796       sr = server->rs.m_sb.sb_size;
797 
798       if (cr || sr)
799         {
800         }
801       else
802         {
803           n = server->rs.m_sb.sb_socket;
804 	  if (server->rc.m_sb.sb_socket > n)
805 	    n = server->rc.m_sb.sb_socket;
806 	  FD_ZERO(&rfds);
807 	  if (RTMP_IsConnected(&server->rs))
808 	    FD_SET(sockfd, &rfds);
809 	  if (RTMP_IsConnected(&server->rc))
810 	    FD_SET(server->rc.m_sb.sb_socket, &rfds);
811 
812           /* give more time to start up if we're not playing yet */
813 	  tv.tv_sec = server->f_cur ? 30 : 60;
814 	  tv.tv_usec = 0;
815 
816 	  if (select(n + 1, &rfds, NULL, NULL, &tv) <= 0)
817 	    {
818               if (server->f_cur && server->rc.m_mediaChannel && !paused)
819                 {
820                   server->rc.m_pauseStamp = server->rc.m_channelTimestamp[server->rc.m_mediaChannel];
821                   if (RTMP_ToggleStream(&server->rc))
822                     {
823                       paused = TRUE;
824                       continue;
825                     }
826                 }
827 	      RTMP_Log(RTMP_LOGERROR, "Request timeout/select failed, ignoring request");
828 	      goto cleanup;
829 	    }
830           if (server->rs.m_sb.sb_socket > 0 &&
831 	    FD_ISSET(server->rs.m_sb.sb_socket, &rfds))
832             sr = 1;
833           if (server->rc.m_sb.sb_socket > 0 &&
834 	    FD_ISSET(server->rc.m_sb.sb_socket, &rfds))
835             cr = 1;
836         }
837       if (sr)
838         {
839           while (RTMP_ReadPacket(&server->rs, &ps))
840             if (RTMPPacket_IsReady(&ps))
841               {
842                 /* change chunk size */
843                 if (ps.m_packetType == RTMP_PACKET_TYPE_CHUNK_SIZE)
844                   {
845                     if (ps.m_nBodySize >= 4)
846                       {
847                         server->rs.m_inChunkSize = AMF_DecodeInt32(ps.m_body);
848                         RTMP_Log(RTMP_LOGDEBUG, "%s, client: chunk size change to %d", __FUNCTION__,
849                             server->rs.m_inChunkSize);
850                         server->rc.m_outChunkSize = server->rs.m_inChunkSize;
851                       }
852                   }
853                 /* bytes received */
854                 else if (ps.m_packetType == RTMP_PACKET_TYPE_BYTES_READ_REPORT)
855                   {
856                     if (ps.m_nBodySize >= 4)
857                       {
858                         int count = AMF_DecodeInt32(ps.m_body);
859                         RTMP_Log(RTMP_LOGDEBUG, "%s, client: bytes received = %d", __FUNCTION__,
860                             count);
861                       }
862                   }
863                 /* ctrl */
864                 else if (ps.m_packetType == RTMP_PACKET_TYPE_CONTROL)
865                   {
866                     short nType = AMF_DecodeInt16(ps.m_body);
867                     /* UpdateBufferMS */
868                     if (nType == 0x03)
869                       {
870                         char *ptr = ps.m_body+2;
871                         int id;
872                         int len;
873                         id = AMF_DecodeInt32(ptr);
874                         /* Assume the interesting media is on a non-zero stream */
875                         if (id)
876                           {
877                             len = AMF_DecodeInt32(ptr+4);
878 #if 1
879                             /* request a big buffer */
880                             if (len < BUFFERTIME)
881                               {
882                                 AMF_EncodeInt32(ptr+4, ptr+8, BUFFERTIME);
883                               }
884 #endif
885                             RTMP_Log(RTMP_LOGDEBUG, "%s, client: BufferTime change in stream %d to %d", __FUNCTION__,
886                                 id, len);
887                           }
888                       }
889                   }
890                 else if (ps.m_packetType == RTMP_PACKET_TYPE_FLEX_MESSAGE
891                          || ps.m_packetType == RTMP_PACKET_TYPE_INVOKE)
892                   {
893                     if (ServePacket(server, 0, &ps) && server->f_cur)
894                       {
895                         fclose(server->f_cur->f_file);
896                         server->f_cur->f_file = NULL;
897                         server->f_cur = NULL;
898                       }
899                   }
900                 RTMP_SendPacket(&server->rc, &ps, FALSE);
901                 RTMPPacket_Free(&ps);
902                 break;
903               }
904         }
905       if (cr)
906         {
907           while (RTMP_ReadPacket(&server->rc, &pc))
908             {
909               int sendit = 1;
910               if (RTMPPacket_IsReady(&pc))
911                 {
912                   if (paused)
913                     {
914                       if (pc.m_nTimeStamp <= server->rc.m_mediaStamp)
915                         continue;
916                       paused = 0;
917                       server->rc.m_pausing = 0;
918                     }
919                   /* change chunk size */
920                   if (pc.m_packetType == RTMP_PACKET_TYPE_CHUNK_SIZE)
921                     {
922                       if (pc.m_nBodySize >= 4)
923                         {
924                           server->rc.m_inChunkSize = AMF_DecodeInt32(pc.m_body);
925                           RTMP_Log(RTMP_LOGDEBUG, "%s, server: chunk size change to %d", __FUNCTION__,
926                               server->rc.m_inChunkSize);
927                           server->rs.m_outChunkSize = server->rc.m_inChunkSize;
928                         }
929                     }
930                   else if (pc.m_packetType == RTMP_PACKET_TYPE_CONTROL)
931                     {
932                       short nType = AMF_DecodeInt16(pc.m_body);
933                       /* SWFverification */
934                       if (nType == 0x1a)
935 #ifdef CRYPTO
936                         if (server->rc.Link.SWFSize)
937                         {
938                           RTMP_SendCtrl(&server->rc, 0x1b, 0, 0);
939                           sendit = 0;
940                         }
941 #else
942                         /* The session will certainly fail right after this */
943                         RTMP_Log(RTMP_LOGERROR, "%s, server requested SWF verification, need CRYPTO support! ", __FUNCTION__);
944 #endif
945                     }
946                   else if (server->f_cur && (
947                        pc.m_packetType == RTMP_PACKET_TYPE_AUDIO ||
948                        pc.m_packetType == RTMP_PACKET_TYPE_VIDEO ||
949                        pc.m_packetType == RTMP_PACKET_TYPE_INFO ||
950                        pc.m_packetType == RTMP_PACKET_TYPE_FLASH_VIDEO) &&
951                        RTMP_ClientPacket(&server->rc, &pc))
952                     {
953                       int len = WriteStream(&buf, &buflen, &server->stamp, &pc);
954                       if (len > 0 && fwrite(buf, 1, len, server->f_cur->f_file) != len)
955                         goto cleanup;
956                     }
957                   else if (pc.m_packetType == RTMP_PACKET_TYPE_FLEX_MESSAGE ||
958                            pc.m_packetType == RTMP_PACKET_TYPE_INVOKE)
959                     {
960                       if (ServePacket(server, 1, &pc) && server->f_cur)
961                         {
962                           fclose(server->f_cur->f_file);
963                           server->f_cur->f_file = NULL;
964                           server->f_cur = NULL;
965                         }
966                     }
967                 }
968               if (sendit && RTMP_IsConnected(&server->rs))
969                 RTMP_SendChunk(&server->rs, &rk);
970               if (RTMPPacket_IsReady(&pc))
971                   RTMPPacket_Free(&pc);
972               break;
973             }
974         }
975       if (!RTMP_IsConnected(&server->rs) && RTMP_IsConnected(&server->rc)
976         && !server->f_cur)
977         RTMP_Close(&server->rc);
978     }
979 
980 cleanup:
981   RTMP_LogPrintf("Closing connection... ");
982   RTMP_Close(&server->rs);
983   RTMP_Close(&server->rc);
984   while (server->f_head)
985     {
986       Flist *fl = server->f_head;
987       server->f_head = fl->f_next;
988       if (fl->f_file)
989         fclose(fl->f_file);
990       free(fl);
991     }
992   server->f_tail = NULL;
993   server->f_cur = NULL;
994   free(buf);
995   /* Should probably be done by RTMP_Close() ... */
996   server->rc.Link.hostname.av_val = NULL;
997   server->rc.Link.tcUrl.av_val = NULL;
998   server->rc.Link.swfUrl.av_val = NULL;
999   server->rc.Link.pageUrl.av_val = NULL;
1000   server->rc.Link.app.av_val = NULL;
1001   server->rc.Link.auth.av_val = NULL;
1002   server->rc.Link.flashVer.av_val = NULL;
1003   RTMP_LogPrintf("done!\n\n");
1004 
1005 quit:
1006   if (server->state == STREAMING_IN_PROGRESS)
1007     server->state = STREAMING_ACCEPTING;
1008 
1009   TFRET();
1010 }
1011 
1012 TFTYPE
serverThread(void * arg)1013 serverThread(void *arg)
1014 {
1015   STREAMING_SERVER *server = arg;
1016   server->state = STREAMING_ACCEPTING;
1017 
1018   while (server->state == STREAMING_ACCEPTING)
1019     {
1020       struct sockaddr_in addr;
1021       socklen_t addrlen = sizeof(struct sockaddr_in);
1022       STREAMING_SERVER *srv2 = malloc(sizeof(STREAMING_SERVER));
1023       int sockfd =
1024 	accept(server->socket, (struct sockaddr *) &addr, &addrlen);
1025 
1026       if (sockfd > 0)
1027 	{
1028 #ifdef linux
1029           struct sockaddr_in dest;
1030 	  char destch[16];
1031           socklen_t destlen = sizeof(struct sockaddr_in);
1032 	  getsockopt(sockfd, SOL_IP, SO_ORIGINAL_DST, &dest, &destlen);
1033           strcpy(destch, inet_ntoa(dest.sin_addr));
1034 	  RTMP_Log(RTMP_LOGDEBUG, "%s: accepted connection from %s to %s\n", __FUNCTION__,
1035 	      inet_ntoa(addr.sin_addr), destch);
1036 #else
1037 	  RTMP_Log(RTMP_LOGDEBUG, "%s: accepted connection from %s\n", __FUNCTION__,
1038 	      inet_ntoa(addr.sin_addr));
1039 #endif
1040 	  *srv2 = *server;
1041 	  srv2->socket = sockfd;
1042 	  /* Create a new thread and transfer the control to that */
1043 	  ThreadCreate(doServe, srv2);
1044 	  RTMP_Log(RTMP_LOGDEBUG, "%s: processed request\n", __FUNCTION__);
1045 	}
1046       else
1047 	{
1048 	  RTMP_Log(RTMP_LOGERROR, "%s: accept failed", __FUNCTION__);
1049 	}
1050     }
1051   server->state = STREAMING_STOPPED;
1052   TFRET();
1053 }
1054 
1055 STREAMING_SERVER *
startStreaming(const char * address,int port)1056 startStreaming(const char *address, int port)
1057 {
1058   struct sockaddr_in addr;
1059   int sockfd, tmp;
1060   STREAMING_SERVER *server;
1061 
1062   sockfd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
1063   if (sockfd == -1)
1064     {
1065       RTMP_Log(RTMP_LOGERROR, "%s, couldn't create socket", __FUNCTION__);
1066       return 0;
1067     }
1068 
1069   tmp = 1;
1070   setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR,
1071 				(char *) &tmp, sizeof(tmp) );
1072 
1073   addr.sin_family = AF_INET;
1074   addr.sin_addr.s_addr = inet_addr(address);	//htonl(INADDR_ANY);
1075   addr.sin_port = htons(port);
1076 
1077   if (bind(sockfd, (struct sockaddr *) &addr, sizeof(struct sockaddr_in)) ==
1078       -1)
1079     {
1080       RTMP_Log(RTMP_LOGERROR, "%s, TCP bind failed for port number: %d", __FUNCTION__,
1081 	  port);
1082       return 0;
1083     }
1084 
1085   if (listen(sockfd, 10) == -1)
1086     {
1087       RTMP_Log(RTMP_LOGERROR, "%s, listen failed", __FUNCTION__);
1088       closesocket(sockfd);
1089       return 0;
1090     }
1091 
1092   server = (STREAMING_SERVER *) calloc(1, sizeof(STREAMING_SERVER));
1093   server->socket = sockfd;
1094 
1095   ThreadCreate(serverThread, server);
1096 
1097   return server;
1098 }
1099 
1100 void
stopStreaming(STREAMING_SERVER * server)1101 stopStreaming(STREAMING_SERVER * server)
1102 {
1103   assert(server);
1104 
1105   if (server->state != STREAMING_STOPPED)
1106     {
1107       int fd = server->socket;
1108       server->socket = 0;
1109       if (server->state == STREAMING_IN_PROGRESS)
1110 	{
1111 	  server->state = STREAMING_STOPPING;
1112 
1113 	  // wait for streaming threads to exit
1114 	  while (server->state != STREAMING_STOPPED)
1115 	    msleep(1);
1116 	}
1117 
1118       if (fd && closesocket(fd))
1119 	RTMP_Log(RTMP_LOGERROR, "%s: Failed to close listening socket, error %d",
1120 	    __FUNCTION__, GetSockError());
1121 
1122       server->state = STREAMING_STOPPED;
1123     }
1124 }
1125 
1126 
1127 void
sigIntHandler(int sig)1128 sigIntHandler(int sig)
1129 {
1130   RTMP_ctrlC = TRUE;
1131   RTMP_LogPrintf("Caught signal: %d, cleaning up, just a second...\n", sig);
1132   if (rtmpServer)
1133     stopStreaming(rtmpServer);
1134   signal(SIGINT, SIG_DFL);
1135 }
1136 
1137 int
main(int argc,char ** argv)1138 main(int argc, char **argv)
1139 {
1140   int nStatus = RD_SUCCESS;
1141 
1142   // rtmp streaming server
1143   char DEFAULT_RTMP_STREAMING_DEVICE[] = "0.0.0.0";	// 0.0.0.0 is any device
1144 
1145   char *rtmpStreamingDevice = DEFAULT_RTMP_STREAMING_DEVICE;	// streaming device, default 0.0.0.0
1146   int nRtmpStreamingPort = 1935;	// port
1147 
1148   RTMP_LogPrintf("RTMP Proxy Server %s\n", RTMPDUMP_VERSION);
1149   RTMP_LogPrintf("(c) 2010 Andrej Stepanchuk, Howard Chu; license: GPL\n\n");
1150 
1151   RTMP_debuglevel = RTMP_LOGINFO;
1152 
1153   if (argc > 1 && !strcmp(argv[1], "-z"))
1154     RTMP_debuglevel = RTMP_LOGALL;
1155 
1156   signal(SIGINT, sigIntHandler);
1157 #ifndef WIN32
1158   signal(SIGPIPE, SIG_IGN);
1159 #endif
1160 
1161 #ifdef _DEBUG
1162   netstackdump = fopen("netstackdump", "wb");
1163   netstackdump_read = fopen("netstackdump_read", "wb");
1164 #endif
1165 
1166   InitSockets();
1167 
1168   // start text UI
1169   ThreadCreate(controlServerThread, 0);
1170 
1171   // start http streaming
1172   if ((rtmpServer =
1173        startStreaming(rtmpStreamingDevice, nRtmpStreamingPort)) == 0)
1174     {
1175       RTMP_Log(RTMP_LOGERROR, "Failed to start RTMP server, exiting!");
1176       return RD_FAILED;
1177     }
1178   RTMP_LogPrintf("Streaming on rtmp://%s:%d\n", rtmpStreamingDevice,
1179 	    nRtmpStreamingPort);
1180 
1181   while (rtmpServer->state != STREAMING_STOPPED)
1182     {
1183       sleep(1);
1184     }
1185   RTMP_Log(RTMP_LOGDEBUG, "Done, exiting...");
1186 
1187   free(rtmpServer);
1188 
1189   CleanupSockets();
1190 
1191 #ifdef _DEBUG
1192   if (netstackdump != 0)
1193     fclose(netstackdump);
1194   if (netstackdump_read != 0)
1195     fclose(netstackdump_read);
1196 #endif
1197   return nStatus;
1198 }
1199