diff options
author | schmirl <schmirl> | 2009-02-13 10:39:20 +0000 |
---|---|---|
committer | schmirl <schmirl> | 2009-02-13 10:39:20 +0000 |
commit | 78410ea5761eab03a7bc33852e85621594df7254 (patch) | |
tree | 38b0238e9797d4ab97fee50c822518ff460def16 /server | |
parent | c26b89f9c287d64915b94cc56fb0e4e709d235a4 (diff) | |
download | vdr-plugin-streamdev-78410ea5761eab03a7bc33852e85621594df7254.tar.gz vdr-plugin-streamdev-78410ea5761eab03a7bc33852e85621594df7254.tar.bz2 |
Added IGMP multicast server
Modified Files:
CONTRIBUTORS HISTORY Makefile README po/de_DE.po po/fi_FI.po
po/fr_FR.po po/it_IT.po po/ru_RU.po server/component.c
server/component.h server/connection.c server/connection.h
server/livefilter.c server/server.c server/setup.c
server/setup.h server/streamer.c server/streamer.h
streamdev/streamdevhosts.conf tools/socket.c tools/socket.h
Added Files:
patches/vdr-cap_net_raw.diff server/componentIGMP.c
server/componentIGMP.h server/connectionIGMP.c
server/connectionIGMP.h
Diffstat (limited to 'server')
-rw-r--r-- | server/component.c | 5 | ||||
-rw-r--r-- | server/component.h | 6 | ||||
-rw-r--r-- | server/componentIGMP.c | 447 | ||||
-rw-r--r-- | server/componentIGMP.h | 62 | ||||
-rw-r--r-- | server/connection.c | 5 | ||||
-rw-r--r-- | server/connection.h | 4 | ||||
-rw-r--r-- | server/connectionIGMP.c | 64 | ||||
-rw-r--r-- | server/connectionIGMP.h | 45 | ||||
-rw-r--r-- | server/livefilter.c | 5 | ||||
-rw-r--r-- | server/server.c | 9 | ||||
-rw-r--r-- | server/setup.c | 25 | ||||
-rw-r--r-- | server/setup.h | 6 | ||||
-rw-r--r-- | server/streamer.c | 25 | ||||
-rw-r--r-- | server/streamer.h | 6 |
14 files changed, 691 insertions, 23 deletions
diff --git a/server/component.c b/server/component.c index 1a584b5..70d861a 100644 --- a/server/component.c +++ b/server/component.c @@ -1,13 +1,14 @@ /* - * $Id: component.c,v 1.3 2005/05/09 20:22:29 lordjaxom Exp $ + * $Id: component.c,v 1.4 2009/02/13 10:39:22 schmirl Exp $ */ #include "server/component.h" #include "server/connection.h" cServerComponent::cServerComponent(const char *Protocol, const char *ListenIp, - uint ListenPort): + uint ListenPort, int Type, int IpProto): m_Protocol(Protocol), + m_Listen(Type, IpProto), m_ListenIp(ListenIp), m_ListenPort(ListenPort) { diff --git a/server/component.h b/server/component.h index 8703348..7efd4ba 100644 --- a/server/component.h +++ b/server/component.h @@ -1,5 +1,5 @@ /* - * $Id: component.h,v 1.2 2005/05/09 20:22:29 lordjaxom Exp $ + * $Id: component.h,v 1.3 2009/02/13 10:39:22 schmirl Exp $ */ #ifndef VDR_STREAMDEV_SERVERS_COMPONENT_H @@ -17,8 +17,8 @@ class cServerConnection; class cServerComponent: public cListObject { private: - cTBSocket m_Listen; const char *m_Protocol; + cTBSocket m_Listen; const char *m_ListenIp; uint m_ListenPort; @@ -27,7 +27,7 @@ protected: virtual cServerConnection *NewClient(void) = 0; public: - cServerComponent(const char *Protocol, const char *ListenIp, uint ListenPort); + cServerComponent(const char *Protocol, const char *ListenIp, uint ListenPort, int Type = SOCK_STREAM, int IpProto = 0); virtual ~cServerComponent(); /* Starts listening on the specified Port, override if you want to do things diff --git a/server/componentIGMP.c b/server/componentIGMP.c new file mode 100644 index 0000000..946c513 --- /dev/null +++ b/server/componentIGMP.c @@ -0,0 +1,447 @@ +/* + * $Id: componentIGMP.c,v 1.1 2009/02/13 10:39:22 schmirl Exp $ + */ +#include <netinet/ip.h> +#include <netinet/igmp.h> + +#include "server/componentIGMP.h" +#include "server/connectionIGMP.h" +#include "server/setup.h" + +#ifndef IGMP_ALL_HOSTS +#define IGMP_ALL_HOSTS htonl(0xE0000001L) +#endif +#ifndef IGMP_ALL_ROUTER +#define IGMP_ALL_ROUTER htonl(0xE0000002L) +#endif + +// IGMP parameters according to RFC2236. All time values in seconds. +#define IGMP_ROBUSTNESS 2 +#define IGMP_QUERY_INTERVAL 125 +#define IGMP_QUERY_RESPONSE_INTERVAL 10 +#define IGMP_GROUP_MEMBERSHIP_INTERVAL (2 * IGMP_QUERY_INTERVAL + IGMP_QUERY_RESPONSE_INTERVAL) +#define IGMP_OTHER_QUERIER_PRESENT_INTERVAL (2 * IGMP_QUERY_INTERVAL + IGMP_QUERY_RESPONSE_INTERVAL / 2) +#define IGMP_STARTUP_QUERY_INTERVAL (IGMP_QUERY_INTERVAL / 4) +#define IGMP_STARTUP_QUERY_COUNT IGMP_ROBUSTNESS +// This value is 1/10 sec. RFC default is 10. Reduced to minimum to free unused channels ASAP +#define IGMP_LAST_MEMBER_QUERY_INTERVAL_TS 1 +#define IGMP_LAST_MEMBER_QUERY_COUNT IGMP_ROBUSTNESS + +// operations on struct timeval +#define TV_CMP(a, cmp, b) (a.tv_sec == b.tv_sec ? a.tv_usec cmp b.tv_usec : a.tv_sec cmp b.tv_sec) +#define TV_SET(tv) (tv.tv_sec || tv.tv_usec) +#define TV_CLR(tv) memset(&tv, 0, sizeof(tv)) +#define TV_CPY(dst, src) memcpy(&dst, &src, sizeof(dst)) +#define TV_ADD(dst, ts) dst.tv_sec += ts / 10; dst.tv_usec += (ts % 10) * 100000; if (dst.tv_usec >= 1000000) { dst.tv_usec -= 1000000; dst.tv_sec++; } + +class cMulticastGroup: public cListObject +{ +public: + cConnectionIGMP *connection; + in_addr_t group; + in_addr_t reporter; + struct timeval timeout; + struct timeval v1timer; + struct timeval retransmit; + + cMulticastGroup(in_addr_t Group); +}; + +cMulticastGroup::cMulticastGroup(in_addr_t Group) : + connection(NULL), + group(Group), + reporter(0) +{ + TV_CLR(timeout); + TV_CLR(v1timer); + TV_CLR(retransmit); +} + +void logIGMP(uint8_t type, struct in_addr Src, struct in_addr Dst, struct in_addr Grp) +{ + const char* msg; + switch (type) { + case IGMP_MEMBERSHIP_QUERY: msg = "membership query"; break; + case IGMP_V1_MEMBERSHIP_REPORT: msg = "V1 membership report"; break; + case IGMP_V2_MEMBERSHIP_REPORT: msg = "V2 membership report"; break; + case IGMP_V2_LEAVE_GROUP: msg = "leave group"; break; + default: msg = "unknown"; break; + } + char* s = strdup(inet_ntoa(Src)); + char* d = strdup(inet_ntoa(Dst)); + dsyslog("streamdev-server IGMP: Received %s from %s (dst %s) for %s", msg, s, d, inet_ntoa(Grp)); + free(s); + free(d); +} + +/* Taken from http://tools.ietf.org/html/rfc1071 */ +uint16_t inetChecksum(uint16_t *addr, int count) +{ + uint32_t sum = 0; + while (count > 1) { + sum += *addr++; + count -= 2; + } + + if( count > 0 ) + sum += * (uint8_t *) addr; + + while (sum>>16) + sum = (sum & 0xffff) + (sum >> 16); + + return ~sum; +} + +cComponentIGMP::cComponentIGMP(void): + cServerComponent("IGMP", "0.0.0.0", 0, SOCK_RAW, IPPROTO_IGMP), + cThread("IGMP timeout handler"), + m_BindIp(inet_addr(StreamdevServerSetup.IGMPBindIP)), + m_MaxChannelNumber(0), + m_StartupQueryCount(IGMP_STARTUP_QUERY_COUNT), + m_Querier(true) +{ +} + +cComponentIGMP::~cComponentIGMP(void) +{ +} + +cMulticastGroup* cComponentIGMP::FindGroup(in_addr_t Group) const +{ + cMulticastGroup *group = m_Groups.First(); + while (group && group->group != Group) + group = m_Groups.Next(group); + return group; +} + +bool cComponentIGMP::Initialize(void) +{ + if (cServerComponent::Initialize() && IGMPMembership(IGMP_ALL_ROUTER)) + { + for (cChannel *channel = Channels.First(); channel; channel = Channels.Next(channel)) + { + if (channel->GroupSep()) + continue; + int num = channel->Number(); + if (!IGMPMembership(htonl(MULTICAST_PRIV_MIN + num))) + break; + m_MaxChannelNumber = num; + } + if (m_MaxChannelNumber == 0) + { + IGMPMembership(IGMP_ALL_ROUTER, false); + esyslog("streamdev-server IGMP: no multicast group joined"); + } + else + { + Start(); + } + } + return m_MaxChannelNumber > 0; +} + +void cComponentIGMP::Destruct(void) +{ + if (m_MaxChannelNumber > 0) + { + Cancel(3); + for (cChannel *channel = Channels.First(); channel; channel = Channels.Next(channel)) + { + if (channel->GroupSep()) + continue; + int num = channel->Number(); + if (num > m_MaxChannelNumber) + break; + IGMPMembership(htonl(MULTICAST_PRIV_MIN + num), false); + } + IGMPMembership(IGMP_ALL_ROUTER, false); + } + m_MaxChannelNumber = 0; + cServerComponent::Destruct(); +} + +cServerConnection *cComponentIGMP::NewClient(void) +{ + return new cConnectionIGMP("IGMP", StreamdevServerSetup.IGMPClientPort, (eStreamType) StreamdevServerSetup.IGMPStreamType); +} + +cServerConnection* cComponentIGMP::Accept(void) +{ + ssize_t recv_len; + int ip_hdrlen, ip_datalen; + struct ip *ip; + struct igmp *igmp; + + while ((recv_len = ::recvfrom(Socket(), m_ReadBuffer, sizeof(m_ReadBuffer), 0, NULL, NULL)) < 0 && errno == EINTR) + errno = 0; + + if (recv_len < 0) { + esyslog("streamdev-server IGMP: read failed: %m"); + return NULL; + } + else if (recv_len < (ssize_t) sizeof(struct ip)) { + esyslog("streamdev-server IGMP: IP packet too short"); + return NULL; + } + + ip = (struct ip*) m_ReadBuffer; + + // filter out my own packets + if (ip->ip_src.s_addr == m_BindIp) + return NULL; + + ip_hdrlen = ip->ip_hl << 2; +#ifdef __FreeBSD__ + ip_datalen = ip->ip_len; +#else + ip_datalen = ntohs(ip->ip_len) - ip_hdrlen; +#endif + if (ip->ip_p != IPPROTO_IGMP) { + esyslog("streamdev-server IGMP: Unexpected protocol %hhu", ip->ip_p); + return NULL; + } + if (recv_len < ip_hdrlen + IGMP_MINLEN) { + esyslog("streamdev-server IGMP: packet too short"); + return NULL; + } + igmp = (struct igmp*) (m_ReadBuffer + ip_hdrlen); + uint16_t chksum = igmp->igmp_cksum; + igmp->igmp_cksum = 0; + if (chksum != inetChecksum((uint16_t *)igmp, ip_datalen)) + { + esyslog("INVALID CHECKSUM %d %d %d %d 0x%x 0x%x", ntohs(ip->ip_len), ip_hdrlen, ip_datalen, recv_len, chksum, inetChecksum((uint16_t *)igmp, ip_datalen)); + return NULL; + } + logIGMP(igmp->igmp_type, ip->ip_src, ip->ip_dst, igmp->igmp_group); + return ProcessMessage(igmp, igmp->igmp_group.s_addr, ip->ip_src.s_addr); +} + +cServerConnection* cComponentIGMP::ProcessMessage(struct igmp *Igmp, in_addr_t Group, in_addr_t Sender) +{ + cServerConnection* conn = NULL; + cMulticastGroup* group; + LOCK_THREAD; + switch (Igmp->igmp_type) { + case IGMP_MEMBERSHIP_QUERY: + if (ntohl(Sender) < ntohl(m_BindIp)) + IGMPStartOtherQuerierPresentTimer(); + break; + case IGMP_V1_MEMBERSHIP_REPORT: + case IGMP_V2_MEMBERSHIP_REPORT: + group = FindGroup(Group); + if (!group) { + group = new cMulticastGroup(Group); + m_Groups.Add(group); + } + if (!group->connection) { + IGMPStartMulticast(group); + conn = group->connection; + } + IGMPStartTimer(group, Sender); + if (Igmp->igmp_type == IGMP_V1_MEMBERSHIP_REPORT) + IGMPStartV1HostTimer(group); + break; + case IGMP_V2_LEAVE_GROUP: + group = FindGroup(Group); + if (group && !TV_SET(group->v1timer)) { + if (group->reporter == Sender) { + IGMPStartTimerAfterLeave(group, m_Querier ? IGMP_LAST_MEMBER_QUERY_INTERVAL_TS : Igmp->igmp_code); + if (m_Querier) + IGMPSendGroupQuery(group); + IGMPStartRetransmitTimer(group); + } + m_CondWait.Signal(); + } + break; + default: + break; + } + return conn; +} + +void cComponentIGMP::Action() +{ + while (Running()) { + struct timeval now; + struct timeval next; + + gettimeofday(&now, NULL); + TV_CPY(next, now); + next.tv_sec += IGMP_QUERY_INTERVAL; + + cMulticastGroup *del = NULL; + { + LOCK_THREAD; + if (TV_CMP(m_GeneralQueryTimer, <, now)) { + dsyslog("General Query"); + IGMPSendGeneralQuery(); + IGMPStartGeneralQueryTimer(); + } + if (TV_CMP(next, >, m_GeneralQueryTimer)) + TV_CPY(next, m_GeneralQueryTimer); + + for (cMulticastGroup *group = m_Groups.First(); group; group = m_Groups.Next(group)) { + if (TV_CMP(group->timeout, <, now)) { + IGMPStopMulticast(group); + IGMPClearRetransmitTimer(group); + if (del) + m_Groups.Del(del); + del = group; + } + else if (m_Querier && TV_SET(group->retransmit) && TV_CMP(group->retransmit, <, now)) { + IGMPSendGroupQuery(group); + IGMPStartRetransmitTimer(group); + if (TV_CMP(next, >, group->retransmit)) + TV_CPY(next, group->retransmit); + } + else if (TV_SET(group->v1timer) && TV_CMP(group->v1timer, <, now)) { + TV_CLR(group->v1timer); + } + else { + if (TV_CMP(next, >, group->timeout)) + TV_CPY(next, group->timeout); + if (TV_SET(group->retransmit) && TV_CMP(next, >, group->retransmit)) + TV_CPY(next, group->retransmit); + if (TV_SET(group->v1timer) && TV_CMP(next, >, group->v1timer)) + TV_CPY(next, group->v1timer); + } + } + if (del) + m_Groups.Del(del); + } + + int sleep = (next.tv_sec - now.tv_sec) * 1000; + sleep += (next.tv_usec - now.tv_usec) / 1000; + if (next.tv_usec < now.tv_usec) + sleep += 1000; + dsyslog("Sleeping %d ms", sleep); + m_CondWait.Wait(sleep); + } +} + +bool cComponentIGMP::IGMPMembership(in_addr_t Group, bool Add) +{ + struct ip_mreqn mreq; + mreq.imr_multiaddr.s_addr = Group; + mreq.imr_address.s_addr = INADDR_ANY; + mreq.imr_ifindex = 0; + if (setsockopt(Socket(), IPPROTO_IP, Add ? IP_ADD_MEMBERSHIP : IP_DROP_MEMBERSHIP, &mreq, sizeof(mreq)) < 0) + { + esyslog("streamdev-server IGMP: unable to %s %s: %m", Add ? "join" : "leave", inet_ntoa(mreq.imr_multiaddr)); + if (errno == ENOBUFS) + esyslog("consider increasing sys.net.ipv4.igmp_max_memberships"); + return false; + } + return true; +} + +void cComponentIGMP::IGMPSendQuery(in_addr_t Group, int Timeout) +{ + struct sockaddr_in dst; + struct igmp query; + + dst.sin_family = AF_INET; + dst.sin_port = IPPROTO_IGMP; + dst.sin_addr.s_addr = Group; + query.igmp_type = IGMP_MEMBERSHIP_QUERY; + query.igmp_code = Timeout * 10; + query.igmp_cksum = 0; + query.igmp_group.s_addr = (Group == IGMP_ALL_HOSTS) ? 0 : Group; + query.igmp_cksum = inetChecksum((uint16_t *) &query, sizeof(query)); + + for (int i = 0; i < 5 && ::sendto(Socket(), &query, sizeof(query), 0, (sockaddr*)&dst, sizeof(dst)) == -1; i++) { + if (errno != EAGAIN && errno != EWOULDBLOCK) { + esyslog("streamdev-server IGMP: unable to query group %s: %m", inet_ntoa(dst.sin_addr)); + break; + } + cCondWait::SleepMs(10); + } +} + +// Querier state actions +void cComponentIGMP::IGMPStartGeneralQueryTimer() +{ + m_Querier = true; + if (m_StartupQueryCount) { + gettimeofday(&m_GeneralQueryTimer, NULL); + m_GeneralQueryTimer.tv_sec += IGMP_STARTUP_QUERY_INTERVAL; + m_StartupQueryCount--; + } + else { + gettimeofday(&m_GeneralQueryTimer, NULL); + m_GeneralQueryTimer.tv_sec += IGMP_QUERY_INTERVAL; + } +} + +void cComponentIGMP::IGMPStartOtherQuerierPresentTimer() +{ + m_Querier = false; + m_StartupQueryCount = 0; + gettimeofday(&m_GeneralQueryTimer, NULL); + m_GeneralQueryTimer.tv_sec += IGMP_OTHER_QUERIER_PRESENT_INTERVAL; +} + +void cComponentIGMP::IGMPSendGeneralQuery() +{ + IGMPSendQuery(IGMP_ALL_HOSTS, IGMP_QUERY_RESPONSE_INTERVAL); +} + +// Group state actions +void cComponentIGMP::IGMPStartTimer(cMulticastGroup* Group, in_addr_t Member) +{ + gettimeofday(&Group->timeout, NULL); + Group->timeout.tv_sec += IGMP_GROUP_MEMBERSHIP_INTERVAL; + TV_CLR(Group->retransmit); + Group->reporter = Member; + +} + +void cComponentIGMP::IGMPStartV1HostTimer(cMulticastGroup* Group) +{ + gettimeofday(&Group->v1timer, NULL); + Group->v1timer.tv_sec += IGMP_GROUP_MEMBERSHIP_INTERVAL; +} + +void cComponentIGMP::IGMPStartTimerAfterLeave(cMulticastGroup* Group, unsigned int MaxResponseTimeTs) +{ + //Group->Update(time(NULL) + MaxResponseTime * IGMP_LAST_MEMBER_QUERY_COUNT / 10); + MaxResponseTimeTs *= IGMP_LAST_MEMBER_QUERY_COUNT; + gettimeofday(&Group->timeout, NULL); + TV_ADD(Group->timeout, MaxResponseTimeTs); + TV_CLR(Group->retransmit); + Group->reporter = 0; +} + +void cComponentIGMP::IGMPStartRetransmitTimer(cMulticastGroup* Group) +{ + gettimeofday(&Group->retransmit, NULL); + TV_ADD(Group->retransmit, IGMP_LAST_MEMBER_QUERY_INTERVAL_TS); +} + +void cComponentIGMP::IGMPClearRetransmitTimer(cMulticastGroup* Group) +{ + TV_CLR(Group->retransmit); +} + +void cComponentIGMP::IGMPSendGroupQuery(cMulticastGroup* Group) +{ + IGMPSendQuery(Group->group, IGMP_LAST_MEMBER_QUERY_INTERVAL_TS); +} + +void cComponentIGMP::IGMPStartMulticast(cMulticastGroup* Group) +{ + in_addr_t g = ntohl(Group->group); + if (g > MULTICAST_PRIV_MIN && g <= MULTICAST_PRIV_MAX) { + cChannel *channel = Channels.GetByNumber(g - MULTICAST_PRIV_MIN); + Group->connection = (cConnectionIGMP*) NewClient(); + if (!Group->connection->Start(channel, Group->group)) { + DELETENULL(Group->connection); + } + } +} + +void cComponentIGMP::IGMPStopMulticast(cMulticastGroup* Group) +{ + if (Group->connection) + Group->connection->Stop(); +} diff --git a/server/componentIGMP.h b/server/componentIGMP.h new file mode 100644 index 0000000..09d8fde --- /dev/null +++ b/server/componentIGMP.h @@ -0,0 +1,62 @@ +/* + * $Id: componentIGMP.h,v 1.1 2009/02/13 10:39:22 schmirl Exp $ + */ + +#ifndef VDR_STREAMDEV_IGMPSERVER_H +#define VDR_STREAMDEV_IGMPSERVER_H + +#include <sys/time.h> +#include <time.h> +#include <vdr/thread.h> +#include "server/component.h" + +class cConnectionIGMP; +class cMulticastGroup; + +class cComponentIGMP: public cServerComponent, public cThread { +private: + char m_ReadBuffer[2048]; + cList<cMulticastGroup> m_Groups; + in_addr_t m_BindIp; + int m_MaxChannelNumber; + struct timeval m_GeneralQueryTimer; + int m_StartupQueryCount; + bool m_Querier; + cCondWait m_CondWait; + + cMulticastGroup* FindGroup(in_addr_t Group) const; + + /* Add or remove local host to multicast group */ + bool IGMPMembership(in_addr_t Group, bool Add = true); + void IGMPSendQuery(in_addr_t Group, int Timeout); + + cServerConnection* ProcessMessage(struct igmp *Igmp, in_addr_t Group, in_addr_t Sender); + + void IGMPStartGeneralQueryTimer(); + void IGMPStartOtherQuerierPresentTimer(); + void IGMPSendGeneralQuery(); + + void IGMPStartTimer(cMulticastGroup* Group, in_addr_t Member); + void IGMPStartV1HostTimer(cMulticastGroup* Group); + void IGMPStartTimerAfterLeave(cMulticastGroup* Group, unsigned int MaxResponseTime); + void IGMPStartRetransmitTimer(cMulticastGroup* Group); + void IGMPClearRetransmitTimer(cMulticastGroup* Group); + void IGMPSendGroupQuery(cMulticastGroup* Group); + void IGMPStartMulticast(cMulticastGroup* Group); + void IGMPStopMulticast(cMulticastGroup* Group); + + virtual void Action(); + +protected: + virtual cServerConnection *NewClient(void); + +public: + virtual bool Initialize(void); + virtual void Destruct(void); + virtual cServerConnection* Accept(void); + + cComponentIGMP(void); + ~cComponentIGMP(void); +}; + +#endif // VDR_STREAMDEV_IGMPSERVER_H diff --git a/server/connection.c b/server/connection.c index 3e00aa7..74b2783 100644 --- a/server/connection.c +++ b/server/connection.c @@ -1,5 +1,5 @@ /* - * $Id: connection.c,v 1.11 2008/04/08 14:18:18 schmirl Exp $ + * $Id: connection.c,v 1.12 2009/02/13 10:39:22 schmirl Exp $ */ #include "server/connection.h" @@ -12,7 +12,8 @@ #include <stdarg.h> #include <errno.h> -cServerConnection::cServerConnection(const char *Protocol): +cServerConnection::cServerConnection(const char *Protocol, int Type): + cTBSocket(Type), m_Protocol(Protocol), m_DeferClose(false), m_Pending(false), diff --git a/server/connection.h b/server/connection.h index 69c24fe..2c28a09 100644 --- a/server/connection.h +++ b/server/connection.h @@ -1,5 +1,5 @@ /* - * $Id: connection.h,v 1.6 2008/10/14 11:05:47 schmirl Exp $ + * $Id: connection.h,v 1.7 2009/02/13 10:39:22 schmirl Exp $ */ #ifndef VDR_STREAMDEV_SERVER_CONNECTION_H @@ -44,7 +44,7 @@ protected: public: /* If you derive, specify a short string such as HTTP for Protocol, which will be displayed in error messages */ - cServerConnection(const char *Protocol); + cServerConnection(const char *Protocol, int Type = SOCK_STREAM); virtual ~cServerConnection(); /* If true, any client IP will be accepted */ diff --git a/server/connectionIGMP.c b/server/connectionIGMP.c new file mode 100644 index 0000000..dc08798 --- /dev/null +++ b/server/connectionIGMP.c @@ -0,0 +1,64 @@ +/* + * $Id: connectionIGMP.c,v 1.1 2009/02/13 10:39:22 schmirl Exp $ + */ + +#include <ctype.h> + +#include "server/connectionIGMP.h" +#include "server/server.h" +#include "server/setup.h" +#include <vdr/channels.h> + +cConnectionIGMP::cConnectionIGMP(const char* Name, int ClientPort, eStreamType StreamType) : + cServerConnection(Name, SOCK_DGRAM), + m_LiveStreamer(NULL), + m_ClientPort(ClientPort), + m_StreamType(StreamType) +{ +} + +cConnectionIGMP::~cConnectionIGMP() +{ + delete m_LiveStreamer; +} + +bool cConnectionIGMP::Start(cChannel *Channel, in_addr_t Dst) +{ + if (Channel != NULL) { + cDevice *device = GetDevice(Channel, 0); + if (device != NULL) { + device->SwitchChannel(Channel, false); + struct in_addr ip; + ip.s_addr = Dst; + if (Connect(inet_ntoa(ip), m_ClientPort)) { + m_LiveStreamer = new cStreamdevLiveStreamer(0); + if (m_LiveStreamer->SetChannel(Channel, m_StreamType)) { + m_LiveStreamer->SetDevice(device); + if (!SetDSCP()) + LOG_ERROR_STR("unable to set DSCP sockopt"); + Dprintf("streamer start\n"); + m_LiveStreamer->Start(this); + return true; + } + else + esyslog("streamdev-server IGMP: SetDevice failed"); + DELETENULL(m_LiveStreamer); + } + else + esyslog("streamdev-server IGMP: Connect failed: %m"); + } + else + esyslog("streamdev-server IGMP: GetDevice failed"); + } + else + esyslog("streamdev-server IGMP: Channel not found"); + return false; +} + +void cConnectionIGMP::Stop() +{ + if (m_LiveStreamer) { + m_LiveStreamer->Stop(); + DELETENULL(m_LiveStreamer); + } +} diff --git a/server/connectionIGMP.h b/server/connectionIGMP.h new file mode 100644 index 0000000..90abd58 --- /dev/null +++ b/server/connectionIGMP.h @@ -0,0 +1,45 @@ +/* + * $Id: connectionIGMP.h,v 1.1 2009/02/13 10:39:22 schmirl Exp $ + */ + +#ifndef VDR_STREAMDEV_SERVERS_CONNECTIONIGMP_H +#define VDR_STREAMDEV_SERVERS_CONNECTIONIGMP_H + +#include "connection.h" +#include "server/livestreamer.h" + +#include <tools/select.h> + +#define MULTICAST_PRIV_MIN ((uint32_t) 0xefff0000) +#define MULTICAST_PRIV_MAX ((uint32_t) 0xeffffeff) + +class cStreamdevLiveStreamer; + +class cConnectionIGMP: public cServerConnection { +private: + cStreamdevLiveStreamer *m_LiveStreamer; + int m_ClientPort; + eStreamType m_StreamType; + +public: + cConnectionIGMP(const char* Name, int ClientPort, eStreamType StreamType); + virtual ~cConnectionIGMP(); + + bool Start(cChannel *Channel, in_addr_t Dst); + void Stop(); + + /* Not used here */ + virtual bool Command(char *Cmd) { return false; } + + virtual void Attach(void) { if (m_LiveStreamer != NULL) m_LiveStreamer->Attach(); } + virtual void Detach(void) { if (m_LiveStreamer != NULL) m_LiveStreamer->Detach(); } + + virtual bool Abort(void) const; +}; + +inline bool cConnectionIGMP::Abort(void) const +{ + return !m_LiveStreamer || m_LiveStreamer->Abort(); +} + +#endif // VDR_STREAMDEV_SERVERS_CONNECTIONIGMP_H diff --git a/server/livefilter.c b/server/livefilter.c index ac9c284..730fedc 100644 --- a/server/livefilter.c +++ b/server/livefilter.c @@ -1,14 +1,11 @@ /* - * $Id: livefilter.c,v 1.5 2008/04/07 14:27:31 schmirl Exp $ + * $Id: livefilter.c,v 1.6 2009/02/13 10:39:22 schmirl Exp $ */ #include "server/livefilter.h" #include "server/streamer.h" #include "common.h" -#ifndef TS_SIZE -# define TS_SIZE 188 -#endif #ifndef TS_SYNC_BYTE # define TS_SYNC_BYTE 0x47 #endif diff --git a/server/server.c b/server/server.c index bc5e83f..1bdb20a 100644 --- a/server/server.c +++ b/server/server.c @@ -1,10 +1,11 @@ /* - * $Id: server.c,v 1.9 2008/10/31 12:19:57 schmirl Exp $ + * $Id: server.c,v 1.10 2009/02/13 10:39:22 schmirl Exp $ */ #include "server/server.h" #include "server/componentVTP.h" #include "server/componentHTTP.h" +#include "server/componentIGMP.h" #include "server/setup.h" #include <vdr/tools.h> @@ -36,6 +37,12 @@ void cStreamdevServer::Initialize(void) if (m_Instance == NULL) { if (StreamdevServerSetup.StartVTPServer) Register(new cComponentVTP); if (StreamdevServerSetup.StartHTTPServer) Register(new cComponentHTTP); + if (StreamdevServerSetup.StartIGMPServer) { + if (strcmp(StreamdevServerSetup.IGMPBindIP, "0.0.0.0") == 0) + esyslog("streamdev-server: Not starting IGMP. IGMP must be bound to a local IP"); + else + Register(new cComponentIGMP); + } m_Instance = new cStreamdevServer; } diff --git a/server/setup.c b/server/setup.c index 6867cdb..710b1fa 100644 --- a/server/setup.c +++ b/server/setup.c @@ -1,5 +1,5 @@ /* - * $Id: setup.c,v 1.5 2009/01/16 11:35:44 schmirl Exp $ + * $Id: setup.c,v 1.6 2009/02/13 10:39:22 schmirl Exp $ */ #include <vdr/menuitems.h> @@ -16,10 +16,14 @@ cStreamdevServerSetup::cStreamdevServerSetup(void) { StartHTTPServer = true; HTTPServerPort = 3000; HTTPStreamType = stTS; + StartIGMPServer = false; + IGMPClientPort = 1234; + IGMPStreamType = stTS; SuspendMode = smAlways; AllowSuspend = false; strcpy(VTPBindIP, "0.0.0.0"); strcpy(HTTPBindIP, "0.0.0.0"); + strcpy(IGMPBindIP, "0.0.0.0"); } bool cStreamdevServerSetup::SetupParse(const char *Name, const char *Value) { @@ -31,6 +35,10 @@ bool cStreamdevServerSetup::SetupParse(const char *Name, const char *Value) { else if (strcmp(Name, "HTTPServerPort") == 0) HTTPServerPort = atoi(Value); else if (strcmp(Name, "HTTPStreamType") == 0) HTTPStreamType = atoi(Value); else if (strcmp(Name, "HTTPBindIP") == 0) strcpy(HTTPBindIP, Value); + else if (strcmp(Name, "StartIGMPServer") == 0) StartIGMPServer = atoi(Value); + else if (strcmp(Name, "IGMPClientPort") == 0) IGMPClientPort = atoi(Value); + else if (strcmp(Name, "IGMPStreamType") == 0) IGMPStreamType = atoi(Value); + else if (strcmp(Name, "IGMPBindIP") == 0) strcpy(IGMPBindIP, Value); else if (strcmp(Name, "SuspendMode") == 0) SuspendMode = atoi(Value); else if (strcmp(Name, "AllowSuspend") == 0) AllowSuspend = atoi(Value); else return false; @@ -55,7 +63,11 @@ cStreamdevServerMenuSetupPage::cStreamdevServerMenuSetupPage(void) { AddShortEdit(tr("HTTP Server Port"), m_NewSetup.HTTPServerPort); AddTypeEdit (tr("HTTP Streamtype"), m_NewSetup.HTTPStreamType); AddIpEdit (tr("Bind to IP"), m_NewSetup.HTTPBindIP); - + AddCategory (tr("Multicast Streaming Server")); + AddBoolEdit (tr("Start IGMP Server"), m_NewSetup.StartIGMPServer); + AddShortEdit(tr("Multicast Client Port"), m_NewSetup.IGMPClientPort); + AddTypeEdit (tr("Multicast Streamtype"), m_NewSetup.IGMPStreamType); + AddIpEdit (tr("Bind to IP"), m_NewSetup.IGMPBindIP); SetCurrent(Get(1)); } @@ -69,7 +81,10 @@ void cStreamdevServerMenuSetupPage::Store(void) { || strcmp(m_NewSetup.VTPBindIP, StreamdevServerSetup.VTPBindIP) != 0 || m_NewSetup.StartHTTPServer != StreamdevServerSetup.StartHTTPServer || m_NewSetup.HTTPServerPort != StreamdevServerSetup.HTTPServerPort - || strcmp(m_NewSetup.HTTPBindIP, StreamdevServerSetup.HTTPBindIP) != 0) { + || strcmp(m_NewSetup.HTTPBindIP, StreamdevServerSetup.HTTPBindIP) != 0 + || m_NewSetup.StartIGMPServer != StreamdevServerSetup.StartIGMPServer + || m_NewSetup.IGMPClientPort != StreamdevServerSetup.IGMPClientPort + || strcmp(m_NewSetup.IGMPBindIP, StreamdevServerSetup.IGMPBindIP) != 0) { restart = true; cStreamdevServer::Destruct(); } @@ -82,6 +97,10 @@ void cStreamdevServerMenuSetupPage::Store(void) { SetupStore("HTTPServerPort", m_NewSetup.HTTPServerPort); SetupStore("HTTPStreamType", m_NewSetup.HTTPStreamType); SetupStore("HTTPBindIP", m_NewSetup.HTTPBindIP); + SetupStore("StartIGMPServer", m_NewSetup.StartIGMPServer); + SetupStore("IGMPClientPort", m_NewSetup.IGMPClientPort); + SetupStore("IGMPStreamType", m_NewSetup.IGMPStreamType); + SetupStore("IGMPBindIP", m_NewSetup.IGMPBindIP); SetupStore("SuspendMode", m_NewSetup.SuspendMode); SetupStore("AllowSuspend", m_NewSetup.AllowSuspend); diff --git a/server/setup.h b/server/setup.h index ff27618..d2b1592 100644 --- a/server/setup.h +++ b/server/setup.h @@ -1,5 +1,5 @@ /* - * $Id: setup.h,v 1.1 2004/12/30 22:44:21 lordjaxom Exp $ + * $Id: setup.h,v 1.2 2009/02/13 10:39:22 schmirl Exp $ */ #ifndef VDR_STREAMDEV_SETUPSERVER_H @@ -20,6 +20,10 @@ struct cStreamdevServerSetup { int HTTPServerPort; int HTTPStreamType; char HTTPBindIP[20]; + int StartIGMPServer; + int IGMPClientPort; + int IGMPStreamType; + char IGMPBindIP[20]; int SuspendMode; int AllowSuspend; }; diff --git a/server/streamer.c b/server/streamer.c index 63776cf..9795cc6 100644 --- a/server/streamer.c +++ b/server/streamer.c @@ -1,5 +1,5 @@ /* - * $Id: streamer.c,v 1.17 2008/10/22 11:59:32 schmirl Exp $ + * $Id: streamer.c,v 1.18 2009/02/13 10:39:22 schmirl Exp $ */ #include <vdr/ringbuffer.h> @@ -55,16 +55,33 @@ void cStreamdevWriter::Action(void) if (sel.CanWrite(*m_Socket)) { int written; - if ((written = m_Socket->Write(block + offset, count)) == -1) { - esyslog("ERROR: streamdev-server: couldn't send data: %m"); + int pkgsize = count; + // SOCK_DGRAM indicates multicast + if (m_Socket->Type() == SOCK_DGRAM) { + // don't fragment multicast packets + // max. payload on standard local ethernet is 1416 to 1456 bytes + // and some STBs expect complete TS packets + // so let's always limit to 7 * TS_SIZE = 1316 + if (pkgsize > 7 * TS_SIZE) + pkgsize = 7 * TS_SIZE; + else + pkgsize -= pkgsize % TS_SIZE; + } + if ((written = m_Socket->Write(block + offset, pkgsize)) == -1) { + esyslog("ERROR: streamdev-server: couldn't send %d bytes: %m", pkgsize); break; } + + // statistics if (count > max) max = count; offset += written; count -= written; - if (count == 0) { + + // less than one TS packet left: + // delete what we've written so far and get next chunk + if (count < TS_SIZE) { m_Streamer->Del(offset); block = NULL; } diff --git a/server/streamer.h b/server/streamer.h index 9deec8d..20323b7 100644 --- a/server/streamer.h +++ b/server/streamer.h @@ -1,5 +1,5 @@ /* - * $Id: streamer.h,v 1.9 2008/10/22 11:59:32 schmirl Exp $ + * $Id: streamer.h,v 1.10 2009/02/13 10:39:22 schmirl Exp $ */ #ifndef VDR_STREAMDEV_STREAMER_H @@ -12,6 +12,10 @@ class cTBSocket; class cStreamdevStreamer; +#ifndef TS_SIZE +#define TS_SIZE 188 +#endif + #define STREAMERBUFSIZE MEGABYTE(4) #define WRITERBUFSIZE KILOBYTE(256) |