diff options
author | cvs2svn <admin@example.com> | 2009-10-21 00:02:02 +0000 |
---|---|---|
committer | cvs2svn <admin@example.com> | 2009-10-21 00:02:02 +0000 |
commit | 97a97ca3358eb48de3eb7a222e487e800566569f (patch) | |
tree | 97c920d0225a1c9773a3bce2207f261d7d230123 /tools/udp_pes_scheduler.c | |
parent | a61961358c5a2ec92340b3f8e056bab55438f103 (diff) | |
download | xineliboutput-CVS.tar.gz xineliboutput-CVS.tar.bz2 |
This commit was manufactured by cvs2svn to create branch 'CVS'.CVS
Diffstat (limited to 'tools/udp_pes_scheduler.c')
-rw-r--r-- | tools/udp_pes_scheduler.c | 938 |
1 files changed, 0 insertions, 938 deletions
diff --git a/tools/udp_pes_scheduler.c b/tools/udp_pes_scheduler.c deleted file mode 100644 index d46a99a8..00000000 --- a/tools/udp_pes_scheduler.c +++ /dev/null @@ -1,938 +0,0 @@ -/* - * udp_pes_scheduler.h: PES scheduler for UDP/RTP streams - * - * See the main source file 'xineliboutput.c' for copyright information and - * how to reach the author. - * - * $Id: udp_pes_scheduler.c,v 1.52 2009-08-19 12:19:43 phintuka Exp $ - * - */ - -//#define LOG_UDP_RATE -//#define LOG_RESEND -//#define LOG_SCR - -#define __STDC_FORMAT_MACROS -#define __STDC_CONSTANT_MACROS -#include <inttypes.h> - -#include <stdint.h> -#include <stdlib.h> -#include <errno.h> -#include <sys/types.h> -#include <sys/socket.h> -#include <sys/ioctl.h> - -#include <vdr/config.h> -#include <vdr/tools.h> -#include <vdr/videodir.h> - -#include "../logdefs.h" // logging -#include "../config.h" // configuration data -#include "../xine_input_vdr_net.h" // frame headers and constants - -#include "ts.h" -#include "pes.h" -#include "udp_buffer.h" -#include "time_pts.h" -#include "cxsocket.h" -#include "sap.h" // SAP - Session Announcement Protocol -#include "sdp.h" // SDP - Session Description Protocol -#include "rtcp.h" // RTCP - -#include "udp_pes_scheduler.h" - - -#ifdef LOG_RESEND -# define LOGRESEND LOGDBG -#else -# define LOGRESEND(x...) -#endif - -#ifdef LOG_SCR -# define LOGSCR LOGDBG -#else -# define LOGSCR(x...) -#endif - -/* - * constants - */ - -const uint MAX_QUEUE_SIZE = 64; // ~ 65 ms with typical DVB stream -const uint MAX_LIVE_QUEUE_SIZE = (64+60); // ~ 100 ms with typical DVB stream -const uint HARD_LIMIT = (4*1024); // ~ 40 Mbit/s === 4 Mb/s - -const uint MAX_BURST_BYTES = 32768; // 32 kb -const uint MAX_BURST_FRAMES = 15; // 15 UDP packets - -const uint RTCP_MIN_INTERVAL = 45000; // 500 ms (pts units) - -// initial burst length after seek (500ms = ~13 video frames) -const int64_t INITIAL_BURST_TIME = INT64_C(45000); // pts units (90kHz) - -// assume seek when when pts difference between two frames exceeds this (2,5 seconds) -const int64_t JUMP_LIMIT_TIME = INT64_C(225000); // pts units (90kHz) - -const uint SCHEDULER_MIN_DELAY_MS = 3; -const uint SCHEDULER_MAX_DELAY_MS = 20; - -static inline int64_t abs64(int64_t val) { return val<0 ? -val : val; } - -cUdpScheduler::cUdpScheduler() -{ - - // Scheduler data - - m_ScrSource = eScrDetect; - m_CurrentAudioVtime = 0; - m_CurrentVideoVtime = 0; - m_CurrentPcr = 0; - - m_MasterClock.Set(INT64_C(0)); - - m_BurstBytes = 0; - m_BurstFrames = 0; - - m_Master = false; - m_TrickSpeed = false; - - // RTP - - srandom(time(NULL) ^ getpid()); - - m_ssrc = random(); - LOGDBG("RTP SSRC: 0x%08x", m_ssrc); - m_LastRtcpTime = 0; - m_Frames = 0; - m_Octets = 0; - m_RtpScr.Set((int64_t)random()); - - m_fd_sap = -1; - - // Queuing - - int i; - for(i=0; i<MAX_UDP_HANDLES; i++) - m_Handles[i] = -1; - - m_BackLog = new cUdpBackLog; - - m_QueueNextSeq = 0; - m_QueuePending = 0; - - // Thread - - Start(); -} - -cUdpScheduler::~cUdpScheduler() -{ - m_Lock.Lock(); - - Cancel(-1); - m_Cond.Broadcast(); - m_CondWait.Signal(); - m_Lock.Unlock(); - - Cancel(3); - - if(m_fd_rtcp.open() || m_fd_rtp.open()) - Send_SAP(false); - - CLOSESOCKET(m_fd_sap); - - delete m_BackLog; -} - -void cUdpScheduler::Scheduler_Sleep(int ms) -{ - m_CondWait.Wait(ms); - m_BurstBytes = 0; - m_BurstFrames = 0; -} - -bool cUdpScheduler::AddRtp(void) -{ - cMutexLock ml(&m_Lock); - - if(m_fd_rtcp.open()) { - LOGERR("cUdpScheduler::AddHandle: RTCP socket already open !"); - Send_SAP(false); - m_fd_rtcp.close(); - } - - /* need new ssrc */ - m_ssrc = random(); - LOGDBG("RTP SSRC: 0x%08x", m_ssrc); - - // - // RTP - // - if(! m_fd_rtp.create(cxSocket::estDGRAM)) { - LOGERR("socket() failed (UDP/RTP multicast)"); - return false; - } - - // Set buffer sizes - m_fd_rtp.set_buffers(KILOBYTE(256), 2048); - - // Set multicast socket options - if(!m_fd_rtp.set_multicast(xc.remote_rtp_ttl)) { - m_fd_rtp.close(); - return false; - } - - if(xc.remote_local_ip[0]) { - struct sockaddr_in name; - name.sin_family = AF_INET; - name.sin_addr.s_addr = inet_addr(xc.remote_local_ip); - if(name.sin_addr.s_addr == INADDR_NONE) - LOGERR("Local address %s is invalid", xc.remote_local_ip); - name.sin_port = htons(xc.remote_rtp_port); - if (bind(m_fd_rtp.handle(), (struct sockaddr *)&name, sizeof(name)) < 0) - LOGERR("bind(%s:%d) failed for udp/rtp multicast", xc.remote_local_ip, xc.remote_rtp_port); -#if 0 - struct ip_mreqn mreqn; - mreqn.imr_multiaddr.s_addr = inet_addr(xc.remote_rtp_addr); - mreqn.imr_address.s_addr = inet_addr(xc.remote_local_ip); /* IP address of local interface */ - //mreqn.imr_ifindex = ; /* interface index */ - if(setsockopt(m_fd_rtp.handle(), IPPROTO_IP, IP_MULTICAST_IF, &mreqn, sizeof(mreqn))) - LOGERR("setting multicast source address/interface failed"); -#endif - } - - // Connect to multicast address - if(!m_fd_rtp.connect(xc.remote_rtp_addr, xc.remote_rtp_port) && - errno != EINPROGRESS) { - LOGERR("connect(fd_rtp) failed. Address=%s, port=%d", - xc.remote_rtp_addr, xc.remote_rtp_port); - m_fd_rtp.close(); - return false; - } - - // Set to non-blocking mode - m_fd_rtp.set_blocking(false); - - // - // RTCP - // - if(! m_fd_rtcp.create(cxSocket::estDGRAM)) - LOGERR("socket() failed (RTCP multicast)"); - - m_fd_rtcp.set_buffers(16384, 16384); - if(!m_fd_rtcp.set_multicast(xc.remote_rtp_ttl)) - m_fd_rtcp.close(); - - if(xc.remote_local_ip[0]) { - struct sockaddr_in name; - name.sin_family = AF_INET; - name.sin_addr.s_addr = inet_addr(xc.remote_local_ip); - name.sin_port = htons(xc.remote_rtp_port+1); - if (bind(m_fd_rtcp.handle(), (struct sockaddr *)&name, sizeof(name)) < 0) - LOGERR("bind(%s:%d) failed for udp/rtp multicast", xc.remote_local_ip, xc.remote_rtp_port); -#if 0 - struct ip_mreqn mreqn; - mreqn.imr_multiaddr.s_addr = inet_addr(xc.remote_rtp_addr); - mreqn.imr_address.s_addr = inet_addr(xc.remote_local_ip); /* IP address of local interface */ - //mreqn.imr_ifindex = ; /* interface index */ - if(setsockopt(m_fd_rtp.handle(), IPPROTO_IP, IP_MULTICAST_IF, &mreqn, sizeof(mreqn))) - LOGERR("setting multicast source address/interface failed"); -#endif - } - - /* RTCP port (RFC 1889) */ - if(!m_fd_rtcp.connect(xc.remote_rtp_addr, xc.remote_rtp_port + 1) && - errno != EINPROGRESS) { - LOGERR("connect(fd_rtcp) failed. Address=%s, port=%d", - xc.remote_rtp_addr, xc.remote_rtp_port + - (xc.remote_rtp_port&1)?-1:1); - m_fd_rtcp.close(); - } - - // Set to non-blocking mode - m_fd_rtcp.set_blocking(false); - - // Finished - - if(!AddHandle(m_fd_rtp)) - LOGERR("cUdpScheduler::AddHandle(fd_rtp) failed"); - - Send_SAP(true); - - return true; -} - -bool cUdpScheduler::AddHandle(int fd) -{ - cMutexLock ml(&m_Lock); - - int i; - - for(i=0; i<MAX_UDP_HANDLES; i++) - if(m_Handles[i] < 0 || m_Handles[i] == fd) { - m_Handles[i] = fd; - - /* query socket send buffer size */ - m_wmem[i] = 0x10000; /* default to 64k */ - socklen_t l = sizeof(int); - if(getsockopt(m_Handles[i], SOL_SOCKET, SO_SNDBUF, &m_wmem[i], &l)) - LOGERR("getsockopt(SO_SNDBUF) failed"); - m_wmem[i] /= 2; /* man 7 socket */ - - m_Cond.Broadcast(); - - return true; - } - - return false; -} - -void cUdpScheduler::RemoveRtp(void) -{ - cMutexLock ml(&m_Lock); - - if(m_fd_rtp.open() || m_fd_rtcp.open()) { - Send_SAP(false); - - RemoveHandle(m_fd_rtp); - - m_fd_rtp.close(); - m_fd_rtcp.close(); - CLOSESOCKET(m_fd_sap); - } -} - -void cUdpScheduler::RemoveHandle(int fd) -{ - cMutexLock ml(&m_Lock); - - int i; - for(i=0; i<MAX_UDP_HANDLES; i++) - if(m_Handles[i] == fd) - break; - - for(; i<MAX_UDP_HANDLES-1; i++) - m_Handles[i] = m_Handles[i+1]; - - m_Handles[MAX_UDP_HANDLES-1] = -1; - - if(m_Handles[0] < 0) { - // No clients left ... - - // Flush all buffers - m_QueueNextSeq = 0; - m_QueuePending = 0; - - m_BackLogDeleteMutex.Lock(); - delete m_BackLog; - m_BackLog = new cUdpBackLog; - m_BackLogDeleteMutex.Unlock(); - - m_Frames = 0; - m_Octets = 0; - } -} - -int cUdpScheduler::Poll(int TimeoutMs, bool Master) -{ - cMutexLock ml(&m_Lock); - - m_Master = Master; - - if(m_Handles[0] < 0) { - // no clients, so we can eat all data we are given ... - return DEFAULT_POLL_SIZE; - } - - uint limit = m_Master ? MAX_QUEUE_SIZE : MAX_LIVE_QUEUE_SIZE; - if(m_QueuePending >= limit) { - uint64_t WaitEnd = cTimeMs::Now(); - if(TimeoutMs >= 0) - WaitEnd += (uint64_t)TimeoutMs; - - while (cTimeMs::Now() < WaitEnd && - Running() && - m_QueuePending >= limit) - m_Cond.TimedWait(m_Lock, 5); - } - - return limit > m_QueuePending ? limit - m_QueuePending : 0; -} - -bool cUdpScheduler::Flush(int TimeoutMs) -{ - cMutexLock ml(&m_Lock); - - if(m_Handles[0] < 0) - return true; - - if(m_QueuePending > 0) { - uint64_t WaitEnd = cTimeMs::Now(); - if(TimeoutMs >= 0) - WaitEnd += (uint64_t)TimeoutMs; - - while (cTimeMs::Now() < WaitEnd && - Running() && - m_QueuePending > 0) - m_Cond.TimedWait(m_Lock, 5); - } - return m_QueuePending == 0; -} - -void cUdpScheduler::Clear(void) -{ - cMutexLock ml(&m_Lock); - - m_BackLog->Clear(m_QueuePending); - - m_QueuePending = 0; - m_Cond.Broadcast(); - - m_ScrSource = eScrDetect; -} - -void cUdpScheduler::Pause(bool On) -{ - cMutexLock ml(&m_Lock); - - if(On) - m_MasterClock.Pause(); - else - m_MasterClock.Resume(); - - m_TrickSpeed = false; -} - -void cUdpScheduler::TrickSpeed(const int Multiplier) -{ - cMutexLock ml(&m_Lock); - -#ifdef LOG_SCR - if(Multiplier == 1 || Multiplier == -1) { - LOGMSG("UDP clock --> normal"); - } else if(Multiplier < 0) - LOGMSG("UDP clock --> %dx", -Multiplier); - else - LOGMSG("UDP clock --> 1/%d", Multiplier); -#endif - - m_MasterClock.TrickSpeed(Multiplier); - - m_TrickSpeed = (Multiplier==-1 || Multiplier==1) ? false : true; -} - -void cUdpScheduler::SetScrSpeed(const int Speed) -{ - cMutexLock ml(&m_Lock); - - m_MasterClock.SetScrSpeed(Speed); - m_RtpScr.SetScrSpeed(Speed); -} - -bool cUdpScheduler::Queue(uint64_t StreamPos, const uchar *Data, int Length) -{ - cMutexLock ml(&m_Lock); - - if(m_Handles[0] < 0) - return true; - - uint limit = m_Master ? MAX_QUEUE_SIZE : MAX_LIVE_QUEUE_SIZE; - if(m_QueuePending >= limit) - return false; - - m_BackLog->MakeFrame(StreamPos, Data, Length); - m_QueuePending++; - - m_Cond.Broadcast(); - - return true; -} - -void cUdpScheduler::QueuePadding(void) -{ - cMutexLock ml(&m_Lock); - - if (m_Handles[0] < 0) - return; - - if (m_QueuePending > 2) - return; - - QueuePaddingInternal(); - - m_Cond.Broadcast(); -} - -void cUdpScheduler::QueuePaddingInternal(void) -{ - static unsigned const char Padding[] = {0x00,0x00,0x01,0xBE,0x00,0x02,0xff,0xff}; - - int PrevSeq = (m_QueueNextSeq + UDP_BUFFER_SIZE - 1) & UDP_BUFFER_MASK; - stream_rtp_header_impl_t *Frame = m_BackLog->Get(PrevSeq); - if (Frame) { - int PrevLen = m_BackLog->PayloadSize(PrevSeq); - uint64_t Pos = ntohll(Frame->hdr_ext.pos) + PrevLen - 8; - m_BackLog->MakeFrame(Pos, Padding, 8); - } else - m_BackLog->MakeFrame(0, Padding, 8); - - m_QueuePending++; -} - -int cUdpScheduler::CalcElapsedVtime(int64_t pts, ScrSource_t ScrSource) -{ - int64_t diff = 0; - - switch (ScrSource) { - - case eScrFromVideo: - diff = pts - m_CurrentVideoVtime; - if (diff > JUMP_LIMIT_TIME || (-diff) > JUMP_LIMIT_TIME) { // 1 s (must be > GOP) - // RESET - LOGSCR("cUdpScheduler SCR RESET (Video jump %lld->%lld)", m_CurrentVideoVtime, pts); - m_CurrentVideoVtime = pts; - - // Use video pts for sync only in audioless trickspeeds - // (audio has smaller, constant and increasing intervals) - if (m_TrickSpeed) - m_MasterClock.Set(m_CurrentVideoVtime + INITIAL_BURST_TIME); - - return -1; - } - if (diff < 0) /* ignore small negative differences (B/P frames are sent out-of-order) */ - diff = 0; - else - m_CurrentVideoVtime = pts; - break; - - case eScrFromAudio: - diff = abs64(pts - m_CurrentAudioVtime); - if (diff > JUMP_LIMIT_TIME) { // 1 sec - // RESET - LOGSCR("cUdpScheduler SCR RESET (Audio jump %lld->%lld)", m_CurrentAudioVtime, pts); - m_CurrentAudioVtime = pts; - m_MasterClock.Set(m_CurrentAudioVtime + INITIAL_BURST_TIME); - return -1; - } - m_CurrentAudioVtime = pts; - break; - - case eScrFromPcr: - diff = pts - m_CurrentPcr; - if (diff > JUMP_LIMIT_TIME || diff < 0) { // 1 sec - // RESET - LOGSCR("cUdpScheduler RESET (PCR jump %lld->%lld)", m_CurrentPcr, pts); - m_CurrentPcr = pts; - m_MasterClock.Set(m_CurrentPcr + INITIAL_BURST_TIME); - return -1; - } - m_CurrentPcr = pts; - break; - - default: break; - } - - return (int) diff; -} - -void cUdpScheduler::Send_RTCP(void) -{ - if(!m_fd_rtcp.open()) - return; - - uint64_t scr = m_RtpScr.Now(); - - if(scr > (m_LastRtcpTime + RTCP_MIN_INTERVAL)) { - uint8_t frame[2048], *content = frame; - char hostname[64] = ""; - rtcp_packet_t *msg = (rtcp_packet_t *)content; - struct timeval tv; - - gettimeofday(&tv, NULL); - gethostname(hostname, sizeof(hostname)-1); - hostname[sizeof(hostname)-1] = 0; - - // SR (Sender report) - msg->hdr.raw[0] = 0x81; // RTP version = 2, Report count = 1 */ - msg->hdr.ptype = RTCP_SR; - msg->hdr.length = htons(6); // length 6 dwords - - msg->sr.ssrc = htonl(m_ssrc); - msg->sr.ntp_sec = htonl(tv.tv_sec + 0x83AA7E80); - msg->sr.ntp_frac = htonl((uint32_t)((double)tv.tv_usec*(double)(1LL<<32)*1.0e-6)); - msg->sr.rtp_ts = htonl((uint32_t)(scr & 0xffffffff)); - msg->sr.psent = htonl((uint32_t)(m_Frames & 0xffffffff)); - msg->sr.osent = htonl((uint32_t)(m_Octets & 0xffffffff)); - - content += sizeof(rtcp_common_t) + sizeof(rtcp_sr_t); - msg = (rtcp_packet_t *)content; - - // SDES - msg->hdr.raw[0] = 0x81; // RTP version = 2, Report count = 1 */ - msg->hdr.ptype = RTCP_SDES; - msg->hdr.count = 1; - - msg->sdes.ssrc = m_ssrc; - msg->sdes.item[0].type = RTCP_SDES_CNAME; - sprintf(msg->sdes.item[0].data, "VDR@%s:%d%c%c%c", - hostname[0] ? hostname : xc.remote_rtp_addr, - xc.remote_rtp_port, 0, 0, 0); - msg->sdes.item[0].length = strlen(msg->sdes.item[0].data); - msg->hdr.length = htons(1 + 1 + ((msg->sdes.item[0].length - 2) + 3) / 4); - - content += sizeof(rtcp_common_t) + 4*ntohs(msg->hdr.length); - msg = (rtcp_packet_t *)content; - - // Send -#ifndef LOG_RTCP - (void) m_fd_rtcp.send(frame, content - frame); -#else - LOGMSG("RTCP send (%d)", m_fd_rtcp.send(frame, content - frame)); - for(int i=0; i<content-frame; i+=16) - LOGMSG("%02X %02X %02X %02X %02X %02X %02X %02X " - "%02X %02X %02X %02X %02X %02X %02X %02X " - " %c%c%c%c%c%c%c%c %c%c%c%c%c%c%c%c", - frame[i+0],frame[i+1],frame[i+2],frame[i+3], - frame[i+4],frame[i+5],frame[i+6],frame[i+7], - frame[i+8],frame[i+9],frame[i+10],frame[i+11], - frame[i+12],frame[i+13],frame[i+14],frame[i+15], - frame[i+0],frame[i+1],frame[i+2],frame[i+3], - frame[i+4],frame[i+5],frame[i+6],frame[i+7], - frame[i+8],frame[i+9],frame[i+10],frame[i+11], - frame[i+12],frame[i+13],frame[i+14],frame[i+15]); -#endif - - m_LastRtcpTime = scr; - } -} - - -void cUdpScheduler::Send_SAP(bool Announce) -{ - if(!xc.remote_rtp_sap || !m_fd_rtp.open()) - return; - - char ip[64] = ""; - uint32_t local_addr = m_fd_rtp.get_local_address(ip); - - if(!local_addr) - return; - - uint32_t payload_type = VDRVERSNUM > 10702 ? SDP_PAYLOAD_MPEG_TS : SDP_PAYLOAD_MPEG_PES; - const char *sdp_descr = vdr_sdp_description(ip, - 2001, - xc.listen_port, - xc.remote_rtp_addr, - payload_type, - m_ssrc, - xc.remote_rtp_port, - xc.remote_rtp_ttl); - if(!sdp_descr) - return; - -#if 1 - /* store copy of SDP data */ - if(m_fd_sap < 0) { - cString fname = AddDirectory(VideoDirectory, - cString::sprintf("xineliboutput@%s.sdp", - ip)); - FILE *fp = fopen(fname, "w"); - if(fp) { - fprintf(fp, "%s", sdp_descr); - fclose(fp); - } - } -#endif - - sap_pdu_t *pdu = sap_create_pdu(local_addr, - Announce, - (m_ssrc >> 16 | m_ssrc) & 0xffff, - "application/sdp", - sdp_descr); - - if(!sap_send_pdu(&m_fd_sap, pdu, 0)) - LOGERR("SAP/SDP announce failed"); - free(pdu); - - if(!Announce) - CLOSESOCKET(m_fd_sap); -} - -#ifdef LOG_SCR -static const char ScrSourceName[][6] = {"???", "VIDEO", "PS1", "AUDIO", "PCR"}; -#endif - -void cUdpScheduler::Schedule(const uchar *Data, int Length) -{ - int64_t pts = NO_PTS; - int elapsed = 0; - ScrSource_t ScrSource = eScrDetect; - - // Get timestamp from data - - if (DATA_IS_TS(Data)) { - if (ts_get_pcr_n(Data, Length/TS_SIZE, &pts)) { - LOGSCR("UDP PCR: %"PRId64, pts); - ScrSource = eScrFromPcr; - elapsed = CalcElapsedVtime(pts, eScrFromPcr); - } - } else /* if (DATA_IS_PES(Data)) */ { - if (PES_HAS_PTS(Data)) { - ScrSource = IS_VIDEO_PACKET(Data) ? eScrFromVideo : IS_AUDIO_PACKET(Data) ? eScrFromAudio : eScrDetect; - pts = pes_get_pts(Data, Length); - elapsed = CalcElapsedVtime(pts, ScrSource); - } - } - - if (m_ScrSource < ScrSource) - m_ScrSource = ScrSource; - - if(elapsed > 0) { - - int64_t now = m_MasterClock.Now(); - int64_t SendTime = ScrSource==eScrFromVideo ? m_CurrentVideoVtime : - ScrSource==eScrFromAudio ? m_CurrentAudioVtime : - ScrSource==eScrFromPcr ? m_CurrentPcr : now; - - LOGSCR("PTS: %lld (%s) elapsed %d ms (PID %02x)", pts, ScrSourceStr[ScrSource], - pts_to_ms(elapsed), DATA_IS_TS(Data) ? TS_PID(Data) : Data[3]); - - // - // Detect discontinuity (against SCR) - // - if (now > SendTime && (now - SendTime) > JUMP_LIMIT_TIME) { - LOGSCR("cUdpScheduler MasterClock init (was in past) %s", ScrSourceStr[ScrSource]); - now = SendTime + INITIAL_BURST_TIME; - m_MasterClock.Set(now); - } else if (now < SendTime && (SendTime - now) > JUMP_LIMIT_TIME) { - LOGSCR("cUdpScheduler MasterClock init (was in future) %s", ScrSourceStr[ScrSource]); - now = SendTime + INITIAL_BURST_TIME; - m_MasterClock.Set(now); - } - - // - // Delay - // - while (SendTime > now) { - - uint delay_ms = pts_to_ms(SendTime - now); - if (delay_ms < SCHEDULER_MIN_DELAY_MS) - break; - - LOGSCR("cUdpScheduler sleeping %d ms (time ref: %s, beat interval %d ms)", - delay_ms, ScrSourceStr[ScrSource], pts_to_ms(elapsed)); - - if (delay_ms > SCHEDULER_MAX_DELAY_MS) - delay_ms = SCHEDULER_MAX_DELAY_MS; - - Scheduler_Sleep(delay_ms); - - now = m_MasterClock.Now(); - } - } -} - -void cUdpScheduler::Action(void) -{ - /* UDP Scheduler needs high priority */ - const int priority = -5; - SetPriority(priority); - errno = 0; - if ((nice(priority) == -1) && errno) - LOGDBG("cUdpScheduler: Can't nice to value: %d", priority); - - m_Lock.Lock(); - - while (Running()) { - - if(m_Handles[0] < 0) { - m_Cond.TimedWait(m_Lock, 5000); - continue; - } - - // Wait until we have outgoing data in queue - if(m_QueuePending <= 0) { - m_BurstFrames = m_BurstBytes = 0; - m_Cond.TimedWait(m_Lock, 100); - if(m_QueuePending <= 0) { - // Still nothing... - // Send padding frame once in 100ms so clients can detect - // possible missing frames and server shutdown - QueuePaddingInternal(); - } - continue; // to check Running() - } - - // Take next frame from queue - stream_rtp_header_impl_t *frame = m_BackLog->Get(m_QueueNextSeq); - int PayloadSize = m_BackLog->PayloadSize(m_QueueNextSeq); - int UdpPacketLen = PayloadSize + sizeof(stream_udp_header_t); - int RtpPacketLen = PayloadSize + sizeof(stream_rtp_header_impl_t); - - m_QueueNextSeq = (m_QueueNextSeq + 1) & UDP_BUFFER_MASK; - m_QueuePending--; - - m_Cond.Broadcast(); - m_BackLogDeleteMutex.Lock(); /* ensure frame will not be deleted from queue */ - m_Lock.Unlock(); - - // Schedule frame - if(m_Master) - Schedule(frame->payload, PayloadSize); - - // Need some bandwidth limit for ex. sequence of still frames when - // moving cutting marks very fast (no audio or PTS available). - // Hard limit for used bandwidth: - // - ~1 frames/ms & 8kb/ms -> 8mb/s -> ~ 80 Mbit/s ( / client) - // - max burst 15 frames or 32 kb - m_BurstFrames ++; - m_BurstBytes += PayloadSize; - if (m_BurstFrames >= MAX_BURST_FRAMES && m_BurstBytes >= MAX_BURST_BYTES) { - Scheduler_Sleep(4); -#ifdef LOG_UDP_RATE - static uint64_t dbg_timer = cTimeMs::Now(); - static int dbg_bytes = 0; - dbg_bytes += bytes; - if (dbg_timer + 60000 <= cTimeMs::Now()) { - LOGDBG("UDP rate: %4d Kbps (queue %d)", dbg_bytes/(60*1024/8), m_QueuePending); - dbg_bytes = 0; - dbg_timer = cTimeMs::Now(); - } -#endif - } - - /* tag frame with ssrc and timestamp */ - frame->rtp_hdr.ts = htonl((uint32_t)(m_RtpScr.Now() & 0xffffffff)); - frame->rtp_hdr.ssrc = htonl(m_ssrc); - - /* deliver to all active sockets */ - for(int i=0; i<MAX_UDP_HANDLES && m_Handles[i]>=0; i++) { - - // - // use TIOCOUTQ ioctl instead of poll/select. - // - poll/select for UDP/RTP may return true even when queue - // is (almost) full - // - kernel silently drops frames it cant send - // -> poll() + send() just causes frames to be dropped - // - uint size = 0; - if(!ioctl(m_Handles[i], TIOCOUTQ, &size)) { - if(size >= (m_wmem[i] - 2*RtpPacketLen)) { - LOGMSG("cUdpScheduler: kernel transmit queue > ~%dkb (max %dkb) ! (master=%d)", - (m_wmem[i] - 2*RtpPacketLen)/1024, m_wmem[i]/1024, m_Master); - Scheduler_Sleep(2); - } - } else { - if(m_QueuePending > (MAX_QUEUE_SIZE-5)) - LOGDBG("cUdpScheduler: kernel transmit queue > ~30kb ! (master=%d ; Queue=%d)", - m_Master, m_QueuePending); - Scheduler_Sleep(2); - } - - if(m_Handles[i] == m_fd_rtp.handle()) { - if(send(m_Handles[i], frame, RtpPacketLen, 0) <= 0) - LOGERR("cUdpScheduler: UDP/RTP send() failed !"); - } else { - /* UDP: send without rtp header */ - if (send(m_Handles[i], RTP_UDP_PAYLOAD(frame), UdpPacketLen, 0) <= 0) - LOGERR("cUdpScheduler: UDP send() failed !"); - } - } - - m_BackLogDeleteMutex.Unlock(); /* release queue */ - m_Lock.Lock(); - - m_Frames ++; - m_Octets += PayloadSize; - if(m_fd_rtcp.open() && (m_Frames & 0xff) == 1) { // every 256th frame - Send_RTCP(); -#if 0 - if((m_Frames & 0xff00) == 0) // every 65536th frame (~ 2 min) - Send_SAP(); -#else - if((m_Frames & 0x0300) == 0) // every 1024th frame (~ 2...4 sec) - Send_SAP(); -#endif - } - } - - m_Lock.Unlock(); -} - -void cUdpScheduler::ReSend(int fd, uint64_t Pos, int Seq1, int Seq2) -{ - if(fd < 0) /* no re-send for RTP */ - return; - - struct { - stream_udp_header_t hdr; - char payload[64-sizeof(stream_udp_header_t)]; - } udp_ctrl = {{(uint64_t)INT64_C(-1), (uint16_t)-1}, {0}}; - - // Handle buffer wrap - if(Seq1 > Seq2) - Seq2 += UDP_BUFFER_SIZE; - - cMutexLock ml(&m_Lock); // keeps also scheduler thread suspended ... - - if(Seq2-Seq1 > 64) { - LOGDBG("cUdpScheduler::ReSend: requested range too large (%d-%d)", Seq1, Seq2); - snprintf(udp_ctrl.payload, sizeof(udp_ctrl.payload), - "UDP MISSING %d-%d %" PRIu64, - Seq1, (Seq2 & UDP_BUFFER_MASK), Pos); - send(fd, &udp_ctrl, sizeof(udp_ctrl), 0); - return; - } - - // re-send whole range - for(; Seq1 <= Seq2; Seq1++) { - - // Wait if kernel queue is full - int size = 0; - if(!ioctl(fd, TIOCOUTQ, &size)) - if(size > ((0x10000)/2 - 2048)) { // assume 64k kernel buffer - LOGDBG("cUdpScheduler::ReSend: kernel transmit queue > ~30kb !"); - Scheduler_Sleep(2); - } - - stream_rtp_header_impl_t *frame = m_BackLog->Get(Seq1); - - if(frame) { - if(ntohull(frame->hdr_ext.pos) - Pos < 100000) { - send(fd, - RTP_UDP_PAYLOAD(frame), - m_BackLog->PayloadSize(Seq1) + sizeof(stream_udp_header_t), - 0); - LOGRESEND("cUdpScheduler::ReSend: %d (%d bytes) @%lld sent", - Seq1, m_BackLog->PayloadSize(Seq1), Pos); - Pos = ntohull(frame->hdr_ext.pos) + m_BackLog->PayloadSize(Seq1); - continue; - } else { - // buffer has been lost long time ago... - LOGRESEND("cUdpScheduler::ReSend: Requested position does not match " - "(%lld ; has %lld)", Pos, ntohll(frame->hdr_ext.pos)); - } - } else { - LOGRESEND("cUdpScheduler::ReSend: %d @%lld missing", Seq1, Pos); - } - - // buffer has been lost - send packet missing info - - LOGRESEND("cUdpScheduler::ReSend: missing %d-%d @%d (hdr 0x%llx 0x%x)", - Seq1, Seq1, Pos, udp_ctrl.hdr.pos, udp_ctrl.hdr.seq); - - int Seq0 = Seq1; - for(; Seq1 < Seq2; Seq1++) { - stream_rtp_header_impl_t *frame = m_BackLog->Get(Seq1+1); - if(frame && (ntohull(frame->hdr_ext.pos) - Pos < 100000)) - break; - } - - snprintf(udp_ctrl.payload, sizeof(udp_ctrl.payload), - "UDP MISSING %d-%d %" PRIu64, - Seq0, (Seq1 & UDP_BUFFER_MASK), Pos); - - send(fd, &udp_ctrl, sizeof(udp_ctrl), 0); - } -} |