summaryrefslogtreecommitdiff
path: root/server
diff options
context:
space:
mode:
authorschmirl <schmirl>2009-02-13 10:39:20 +0000
committerschmirl <schmirl>2009-02-13 10:39:20 +0000
commit78410ea5761eab03a7bc33852e85621594df7254 (patch)
tree38b0238e9797d4ab97fee50c822518ff460def16 /server
parentc26b89f9c287d64915b94cc56fb0e4e709d235a4 (diff)
downloadvdr-plugin-streamdev-78410ea5761eab03a7bc33852e85621594df7254.tar.gz
vdr-plugin-streamdev-78410ea5761eab03a7bc33852e85621594df7254.tar.bz2
Added IGMP multicast server
Modified Files: CONTRIBUTORS HISTORY Makefile README po/de_DE.po po/fi_FI.po po/fr_FR.po po/it_IT.po po/ru_RU.po server/component.c server/component.h server/connection.c server/connection.h server/livefilter.c server/server.c server/setup.c server/setup.h server/streamer.c server/streamer.h streamdev/streamdevhosts.conf tools/socket.c tools/socket.h Added Files: patches/vdr-cap_net_raw.diff server/componentIGMP.c server/componentIGMP.h server/connectionIGMP.c server/connectionIGMP.h
Diffstat (limited to 'server')
-rw-r--r--server/component.c5
-rw-r--r--server/component.h6
-rw-r--r--server/componentIGMP.c447
-rw-r--r--server/componentIGMP.h62
-rw-r--r--server/connection.c5
-rw-r--r--server/connection.h4
-rw-r--r--server/connectionIGMP.c64
-rw-r--r--server/connectionIGMP.h45
-rw-r--r--server/livefilter.c5
-rw-r--r--server/server.c9
-rw-r--r--server/setup.c25
-rw-r--r--server/setup.h6
-rw-r--r--server/streamer.c25
-rw-r--r--server/streamer.h6
14 files changed, 691 insertions, 23 deletions
diff --git a/server/component.c b/server/component.c
index 1a584b5..70d861a 100644
--- a/server/component.c
+++ b/server/component.c
@@ -1,13 +1,14 @@
/*
- * $Id: component.c,v 1.3 2005/05/09 20:22:29 lordjaxom Exp $
+ * $Id: component.c,v 1.4 2009/02/13 10:39:22 schmirl Exp $
*/
#include "server/component.h"
#include "server/connection.h"
cServerComponent::cServerComponent(const char *Protocol, const char *ListenIp,
- uint ListenPort):
+ uint ListenPort, int Type, int IpProto):
m_Protocol(Protocol),
+ m_Listen(Type, IpProto),
m_ListenIp(ListenIp),
m_ListenPort(ListenPort)
{
diff --git a/server/component.h b/server/component.h
index 8703348..7efd4ba 100644
--- a/server/component.h
+++ b/server/component.h
@@ -1,5 +1,5 @@
/*
- * $Id: component.h,v 1.2 2005/05/09 20:22:29 lordjaxom Exp $
+ * $Id: component.h,v 1.3 2009/02/13 10:39:22 schmirl Exp $
*/
#ifndef VDR_STREAMDEV_SERVERS_COMPONENT_H
@@ -17,8 +17,8 @@ class cServerConnection;
class cServerComponent: public cListObject {
private:
- cTBSocket m_Listen;
const char *m_Protocol;
+ cTBSocket m_Listen;
const char *m_ListenIp;
uint m_ListenPort;
@@ -27,7 +27,7 @@ protected:
virtual cServerConnection *NewClient(void) = 0;
public:
- cServerComponent(const char *Protocol, const char *ListenIp, uint ListenPort);
+ cServerComponent(const char *Protocol, const char *ListenIp, uint ListenPort, int Type = SOCK_STREAM, int IpProto = 0);
virtual ~cServerComponent();
/* Starts listening on the specified Port, override if you want to do things
diff --git a/server/componentIGMP.c b/server/componentIGMP.c
new file mode 100644
index 0000000..946c513
--- /dev/null
+++ b/server/componentIGMP.c
@@ -0,0 +1,447 @@
+/*
+ * $Id: componentIGMP.c,v 1.1 2009/02/13 10:39:22 schmirl Exp $
+ */
+#include <netinet/ip.h>
+#include <netinet/igmp.h>
+
+#include "server/componentIGMP.h"
+#include "server/connectionIGMP.h"
+#include "server/setup.h"
+
+#ifndef IGMP_ALL_HOSTS
+#define IGMP_ALL_HOSTS htonl(0xE0000001L)
+#endif
+#ifndef IGMP_ALL_ROUTER
+#define IGMP_ALL_ROUTER htonl(0xE0000002L)
+#endif
+
+// IGMP parameters according to RFC2236. All time values in seconds.
+#define IGMP_ROBUSTNESS 2
+#define IGMP_QUERY_INTERVAL 125
+#define IGMP_QUERY_RESPONSE_INTERVAL 10
+#define IGMP_GROUP_MEMBERSHIP_INTERVAL (2 * IGMP_QUERY_INTERVAL + IGMP_QUERY_RESPONSE_INTERVAL)
+#define IGMP_OTHER_QUERIER_PRESENT_INTERVAL (2 * IGMP_QUERY_INTERVAL + IGMP_QUERY_RESPONSE_INTERVAL / 2)
+#define IGMP_STARTUP_QUERY_INTERVAL (IGMP_QUERY_INTERVAL / 4)
+#define IGMP_STARTUP_QUERY_COUNT IGMP_ROBUSTNESS
+// This value is 1/10 sec. RFC default is 10. Reduced to minimum to free unused channels ASAP
+#define IGMP_LAST_MEMBER_QUERY_INTERVAL_TS 1
+#define IGMP_LAST_MEMBER_QUERY_COUNT IGMP_ROBUSTNESS
+
+// operations on struct timeval
+#define TV_CMP(a, cmp, b) (a.tv_sec == b.tv_sec ? a.tv_usec cmp b.tv_usec : a.tv_sec cmp b.tv_sec)
+#define TV_SET(tv) (tv.tv_sec || tv.tv_usec)
+#define TV_CLR(tv) memset(&tv, 0, sizeof(tv))
+#define TV_CPY(dst, src) memcpy(&dst, &src, sizeof(dst))
+#define TV_ADD(dst, ts) dst.tv_sec += ts / 10; dst.tv_usec += (ts % 10) * 100000; if (dst.tv_usec >= 1000000) { dst.tv_usec -= 1000000; dst.tv_sec++; }
+
+class cMulticastGroup: public cListObject
+{
+public:
+ cConnectionIGMP *connection;
+ in_addr_t group;
+ in_addr_t reporter;
+ struct timeval timeout;
+ struct timeval v1timer;
+ struct timeval retransmit;
+
+ cMulticastGroup(in_addr_t Group);
+};
+
+cMulticastGroup::cMulticastGroup(in_addr_t Group) :
+ connection(NULL),
+ group(Group),
+ reporter(0)
+{
+ TV_CLR(timeout);
+ TV_CLR(v1timer);
+ TV_CLR(retransmit);
+}
+
+void logIGMP(uint8_t type, struct in_addr Src, struct in_addr Dst, struct in_addr Grp)
+{
+ const char* msg;
+ switch (type) {
+ case IGMP_MEMBERSHIP_QUERY: msg = "membership query"; break;
+ case IGMP_V1_MEMBERSHIP_REPORT: msg = "V1 membership report"; break;
+ case IGMP_V2_MEMBERSHIP_REPORT: msg = "V2 membership report"; break;
+ case IGMP_V2_LEAVE_GROUP: msg = "leave group"; break;
+ default: msg = "unknown"; break;
+ }
+ char* s = strdup(inet_ntoa(Src));
+ char* d = strdup(inet_ntoa(Dst));
+ dsyslog("streamdev-server IGMP: Received %s from %s (dst %s) for %s", msg, s, d, inet_ntoa(Grp));
+ free(s);
+ free(d);
+}
+
+/* Taken from http://tools.ietf.org/html/rfc1071 */
+uint16_t inetChecksum(uint16_t *addr, int count)
+{
+ uint32_t sum = 0;
+ while (count > 1) {
+ sum += *addr++;
+ count -= 2;
+ }
+
+ if( count > 0 )
+ sum += * (uint8_t *) addr;
+
+ while (sum>>16)
+ sum = (sum & 0xffff) + (sum >> 16);
+
+ return ~sum;
+}
+
+cComponentIGMP::cComponentIGMP(void):
+ cServerComponent("IGMP", "0.0.0.0", 0, SOCK_RAW, IPPROTO_IGMP),
+ cThread("IGMP timeout handler"),
+ m_BindIp(inet_addr(StreamdevServerSetup.IGMPBindIP)),
+ m_MaxChannelNumber(0),
+ m_StartupQueryCount(IGMP_STARTUP_QUERY_COUNT),
+ m_Querier(true)
+{
+}
+
+cComponentIGMP::~cComponentIGMP(void)
+{
+}
+
+cMulticastGroup* cComponentIGMP::FindGroup(in_addr_t Group) const
+{
+ cMulticastGroup *group = m_Groups.First();
+ while (group && group->group != Group)
+ group = m_Groups.Next(group);
+ return group;
+}
+
+bool cComponentIGMP::Initialize(void)
+{
+ if (cServerComponent::Initialize() && IGMPMembership(IGMP_ALL_ROUTER))
+ {
+ for (cChannel *channel = Channels.First(); channel; channel = Channels.Next(channel))
+ {
+ if (channel->GroupSep())
+ continue;
+ int num = channel->Number();
+ if (!IGMPMembership(htonl(MULTICAST_PRIV_MIN + num)))
+ break;
+ m_MaxChannelNumber = num;
+ }
+ if (m_MaxChannelNumber == 0)
+ {
+ IGMPMembership(IGMP_ALL_ROUTER, false);
+ esyslog("streamdev-server IGMP: no multicast group joined");
+ }
+ else
+ {
+ Start();
+ }
+ }
+ return m_MaxChannelNumber > 0;
+}
+
+void cComponentIGMP::Destruct(void)
+{
+ if (m_MaxChannelNumber > 0)
+ {
+ Cancel(3);
+ for (cChannel *channel = Channels.First(); channel; channel = Channels.Next(channel))
+ {
+ if (channel->GroupSep())
+ continue;
+ int num = channel->Number();
+ if (num > m_MaxChannelNumber)
+ break;
+ IGMPMembership(htonl(MULTICAST_PRIV_MIN + num), false);
+ }
+ IGMPMembership(IGMP_ALL_ROUTER, false);
+ }
+ m_MaxChannelNumber = 0;
+ cServerComponent::Destruct();
+}
+
+cServerConnection *cComponentIGMP::NewClient(void)
+{
+ return new cConnectionIGMP("IGMP", StreamdevServerSetup.IGMPClientPort, (eStreamType) StreamdevServerSetup.IGMPStreamType);
+}
+
+cServerConnection* cComponentIGMP::Accept(void)
+{
+ ssize_t recv_len;
+ int ip_hdrlen, ip_datalen;
+ struct ip *ip;
+ struct igmp *igmp;
+
+ while ((recv_len = ::recvfrom(Socket(), m_ReadBuffer, sizeof(m_ReadBuffer), 0, NULL, NULL)) < 0 && errno == EINTR)
+ errno = 0;
+
+ if (recv_len < 0) {
+ esyslog("streamdev-server IGMP: read failed: %m");
+ return NULL;
+ }
+ else if (recv_len < (ssize_t) sizeof(struct ip)) {
+ esyslog("streamdev-server IGMP: IP packet too short");
+ return NULL;
+ }
+
+ ip = (struct ip*) m_ReadBuffer;
+
+ // filter out my own packets
+ if (ip->ip_src.s_addr == m_BindIp)
+ return NULL;
+
+ ip_hdrlen = ip->ip_hl << 2;
+#ifdef __FreeBSD__
+ ip_datalen = ip->ip_len;
+#else
+ ip_datalen = ntohs(ip->ip_len) - ip_hdrlen;
+#endif
+ if (ip->ip_p != IPPROTO_IGMP) {
+ esyslog("streamdev-server IGMP: Unexpected protocol %hhu", ip->ip_p);
+ return NULL;
+ }
+ if (recv_len < ip_hdrlen + IGMP_MINLEN) {
+ esyslog("streamdev-server IGMP: packet too short");
+ return NULL;
+ }
+ igmp = (struct igmp*) (m_ReadBuffer + ip_hdrlen);
+ uint16_t chksum = igmp->igmp_cksum;
+ igmp->igmp_cksum = 0;
+ if (chksum != inetChecksum((uint16_t *)igmp, ip_datalen))
+ {
+ esyslog("INVALID CHECKSUM %d %d %d %d 0x%x 0x%x", ntohs(ip->ip_len), ip_hdrlen, ip_datalen, recv_len, chksum, inetChecksum((uint16_t *)igmp, ip_datalen));
+ return NULL;
+ }
+ logIGMP(igmp->igmp_type, ip->ip_src, ip->ip_dst, igmp->igmp_group);
+ return ProcessMessage(igmp, igmp->igmp_group.s_addr, ip->ip_src.s_addr);
+}
+
+cServerConnection* cComponentIGMP::ProcessMessage(struct igmp *Igmp, in_addr_t Group, in_addr_t Sender)
+{
+ cServerConnection* conn = NULL;
+ cMulticastGroup* group;
+ LOCK_THREAD;
+ switch (Igmp->igmp_type) {
+ case IGMP_MEMBERSHIP_QUERY:
+ if (ntohl(Sender) < ntohl(m_BindIp))
+ IGMPStartOtherQuerierPresentTimer();
+ break;
+ case IGMP_V1_MEMBERSHIP_REPORT:
+ case IGMP_V2_MEMBERSHIP_REPORT:
+ group = FindGroup(Group);
+ if (!group) {
+ group = new cMulticastGroup(Group);
+ m_Groups.Add(group);
+ }
+ if (!group->connection) {
+ IGMPStartMulticast(group);
+ conn = group->connection;
+ }
+ IGMPStartTimer(group, Sender);
+ if (Igmp->igmp_type == IGMP_V1_MEMBERSHIP_REPORT)
+ IGMPStartV1HostTimer(group);
+ break;
+ case IGMP_V2_LEAVE_GROUP:
+ group = FindGroup(Group);
+ if (group && !TV_SET(group->v1timer)) {
+ if (group->reporter == Sender) {
+ IGMPStartTimerAfterLeave(group, m_Querier ? IGMP_LAST_MEMBER_QUERY_INTERVAL_TS : Igmp->igmp_code);
+ if (m_Querier)
+ IGMPSendGroupQuery(group);
+ IGMPStartRetransmitTimer(group);
+ }
+ m_CondWait.Signal();
+ }
+ break;
+ default:
+ break;
+ }
+ return conn;
+}
+
+void cComponentIGMP::Action()
+{
+ while (Running()) {
+ struct timeval now;
+ struct timeval next;
+
+ gettimeofday(&now, NULL);
+ TV_CPY(next, now);
+ next.tv_sec += IGMP_QUERY_INTERVAL;
+
+ cMulticastGroup *del = NULL;
+ {
+ LOCK_THREAD;
+ if (TV_CMP(m_GeneralQueryTimer, <, now)) {
+ dsyslog("General Query");
+ IGMPSendGeneralQuery();
+ IGMPStartGeneralQueryTimer();
+ }
+ if (TV_CMP(next, >, m_GeneralQueryTimer))
+ TV_CPY(next, m_GeneralQueryTimer);
+
+ for (cMulticastGroup *group = m_Groups.First(); group; group = m_Groups.Next(group)) {
+ if (TV_CMP(group->timeout, <, now)) {
+ IGMPStopMulticast(group);
+ IGMPClearRetransmitTimer(group);
+ if (del)
+ m_Groups.Del(del);
+ del = group;
+ }
+ else if (m_Querier && TV_SET(group->retransmit) && TV_CMP(group->retransmit, <, now)) {
+ IGMPSendGroupQuery(group);
+ IGMPStartRetransmitTimer(group);
+ if (TV_CMP(next, >, group->retransmit))
+ TV_CPY(next, group->retransmit);
+ }
+ else if (TV_SET(group->v1timer) && TV_CMP(group->v1timer, <, now)) {
+ TV_CLR(group->v1timer);
+ }
+ else {
+ if (TV_CMP(next, >, group->timeout))
+ TV_CPY(next, group->timeout);
+ if (TV_SET(group->retransmit) && TV_CMP(next, >, group->retransmit))
+ TV_CPY(next, group->retransmit);
+ if (TV_SET(group->v1timer) && TV_CMP(next, >, group->v1timer))
+ TV_CPY(next, group->v1timer);
+ }
+ }
+ if (del)
+ m_Groups.Del(del);
+ }
+
+ int sleep = (next.tv_sec - now.tv_sec) * 1000;
+ sleep += (next.tv_usec - now.tv_usec) / 1000;
+ if (next.tv_usec < now.tv_usec)
+ sleep += 1000;
+ dsyslog("Sleeping %d ms", sleep);
+ m_CondWait.Wait(sleep);
+ }
+}
+
+bool cComponentIGMP::IGMPMembership(in_addr_t Group, bool Add)
+{
+ struct ip_mreqn mreq;
+ mreq.imr_multiaddr.s_addr = Group;
+ mreq.imr_address.s_addr = INADDR_ANY;
+ mreq.imr_ifindex = 0;
+ if (setsockopt(Socket(), IPPROTO_IP, Add ? IP_ADD_MEMBERSHIP : IP_DROP_MEMBERSHIP, &mreq, sizeof(mreq)) < 0)
+ {
+ esyslog("streamdev-server IGMP: unable to %s %s: %m", Add ? "join" : "leave", inet_ntoa(mreq.imr_multiaddr));
+ if (errno == ENOBUFS)
+ esyslog("consider increasing sys.net.ipv4.igmp_max_memberships");
+ return false;
+ }
+ return true;
+}
+
+void cComponentIGMP::IGMPSendQuery(in_addr_t Group, int Timeout)
+{
+ struct sockaddr_in dst;
+ struct igmp query;
+
+ dst.sin_family = AF_INET;
+ dst.sin_port = IPPROTO_IGMP;
+ dst.sin_addr.s_addr = Group;
+ query.igmp_type = IGMP_MEMBERSHIP_QUERY;
+ query.igmp_code = Timeout * 10;
+ query.igmp_cksum = 0;
+ query.igmp_group.s_addr = (Group == IGMP_ALL_HOSTS) ? 0 : Group;
+ query.igmp_cksum = inetChecksum((uint16_t *) &query, sizeof(query));
+
+ for (int i = 0; i < 5 && ::sendto(Socket(), &query, sizeof(query), 0, (sockaddr*)&dst, sizeof(dst)) == -1; i++) {
+ if (errno != EAGAIN && errno != EWOULDBLOCK) {
+ esyslog("streamdev-server IGMP: unable to query group %s: %m", inet_ntoa(dst.sin_addr));
+ break;
+ }
+ cCondWait::SleepMs(10);
+ }
+}
+
+// Querier state actions
+void cComponentIGMP::IGMPStartGeneralQueryTimer()
+{
+ m_Querier = true;
+ if (m_StartupQueryCount) {
+ gettimeofday(&m_GeneralQueryTimer, NULL);
+ m_GeneralQueryTimer.tv_sec += IGMP_STARTUP_QUERY_INTERVAL;
+ m_StartupQueryCount--;
+ }
+ else {
+ gettimeofday(&m_GeneralQueryTimer, NULL);
+ m_GeneralQueryTimer.tv_sec += IGMP_QUERY_INTERVAL;
+ }
+}
+
+void cComponentIGMP::IGMPStartOtherQuerierPresentTimer()
+{
+ m_Querier = false;
+ m_StartupQueryCount = 0;
+ gettimeofday(&m_GeneralQueryTimer, NULL);
+ m_GeneralQueryTimer.tv_sec += IGMP_OTHER_QUERIER_PRESENT_INTERVAL;
+}
+
+void cComponentIGMP::IGMPSendGeneralQuery()
+{
+ IGMPSendQuery(IGMP_ALL_HOSTS, IGMP_QUERY_RESPONSE_INTERVAL);
+}
+
+// Group state actions
+void cComponentIGMP::IGMPStartTimer(cMulticastGroup* Group, in_addr_t Member)
+{
+ gettimeofday(&Group->timeout, NULL);
+ Group->timeout.tv_sec += IGMP_GROUP_MEMBERSHIP_INTERVAL;
+ TV_CLR(Group->retransmit);
+ Group->reporter = Member;
+
+}
+
+void cComponentIGMP::IGMPStartV1HostTimer(cMulticastGroup* Group)
+{
+ gettimeofday(&Group->v1timer, NULL);
+ Group->v1timer.tv_sec += IGMP_GROUP_MEMBERSHIP_INTERVAL;
+}
+
+void cComponentIGMP::IGMPStartTimerAfterLeave(cMulticastGroup* Group, unsigned int MaxResponseTimeTs)
+{
+ //Group->Update(time(NULL) + MaxResponseTime * IGMP_LAST_MEMBER_QUERY_COUNT / 10);
+ MaxResponseTimeTs *= IGMP_LAST_MEMBER_QUERY_COUNT;
+ gettimeofday(&Group->timeout, NULL);
+ TV_ADD(Group->timeout, MaxResponseTimeTs);
+ TV_CLR(Group->retransmit);
+ Group->reporter = 0;
+}
+
+void cComponentIGMP::IGMPStartRetransmitTimer(cMulticastGroup* Group)
+{
+ gettimeofday(&Group->retransmit, NULL);
+ TV_ADD(Group->retransmit, IGMP_LAST_MEMBER_QUERY_INTERVAL_TS);
+}
+
+void cComponentIGMP::IGMPClearRetransmitTimer(cMulticastGroup* Group)
+{
+ TV_CLR(Group->retransmit);
+}
+
+void cComponentIGMP::IGMPSendGroupQuery(cMulticastGroup* Group)
+{
+ IGMPSendQuery(Group->group, IGMP_LAST_MEMBER_QUERY_INTERVAL_TS);
+}
+
+void cComponentIGMP::IGMPStartMulticast(cMulticastGroup* Group)
+{
+ in_addr_t g = ntohl(Group->group);
+ if (g > MULTICAST_PRIV_MIN && g <= MULTICAST_PRIV_MAX) {
+ cChannel *channel = Channels.GetByNumber(g - MULTICAST_PRIV_MIN);
+ Group->connection = (cConnectionIGMP*) NewClient();
+ if (!Group->connection->Start(channel, Group->group)) {
+ DELETENULL(Group->connection);
+ }
+ }
+}
+
+void cComponentIGMP::IGMPStopMulticast(cMulticastGroup* Group)
+{
+ if (Group->connection)
+ Group->connection->Stop();
+}
diff --git a/server/componentIGMP.h b/server/componentIGMP.h
new file mode 100644
index 0000000..09d8fde
--- /dev/null
+++ b/server/componentIGMP.h
@@ -0,0 +1,62 @@
+/*
+ * $Id: componentIGMP.h,v 1.1 2009/02/13 10:39:22 schmirl Exp $
+ */
+
+#ifndef VDR_STREAMDEV_IGMPSERVER_H
+#define VDR_STREAMDEV_IGMPSERVER_H
+
+#include <sys/time.h>
+#include <time.h>
+#include <vdr/thread.h>
+#include "server/component.h"
+
+class cConnectionIGMP;
+class cMulticastGroup;
+
+class cComponentIGMP: public cServerComponent, public cThread {
+private:
+ char m_ReadBuffer[2048];
+ cList<cMulticastGroup> m_Groups;
+ in_addr_t m_BindIp;
+ int m_MaxChannelNumber;
+ struct timeval m_GeneralQueryTimer;
+ int m_StartupQueryCount;
+ bool m_Querier;
+ cCondWait m_CondWait;
+
+ cMulticastGroup* FindGroup(in_addr_t Group) const;
+
+ /* Add or remove local host to multicast group */
+ bool IGMPMembership(in_addr_t Group, bool Add = true);
+ void IGMPSendQuery(in_addr_t Group, int Timeout);
+
+ cServerConnection* ProcessMessage(struct igmp *Igmp, in_addr_t Group, in_addr_t Sender);
+
+ void IGMPStartGeneralQueryTimer();
+ void IGMPStartOtherQuerierPresentTimer();
+ void IGMPSendGeneralQuery();
+
+ void IGMPStartTimer(cMulticastGroup* Group, in_addr_t Member);
+ void IGMPStartV1HostTimer(cMulticastGroup* Group);
+ void IGMPStartTimerAfterLeave(cMulticastGroup* Group, unsigned int MaxResponseTime);
+ void IGMPStartRetransmitTimer(cMulticastGroup* Group);
+ void IGMPClearRetransmitTimer(cMulticastGroup* Group);
+ void IGMPSendGroupQuery(cMulticastGroup* Group);
+ void IGMPStartMulticast(cMulticastGroup* Group);
+ void IGMPStopMulticast(cMulticastGroup* Group);
+
+ virtual void Action();
+
+protected:
+ virtual cServerConnection *NewClient(void);
+
+public:
+ virtual bool Initialize(void);
+ virtual void Destruct(void);
+ virtual cServerConnection* Accept(void);
+
+ cComponentIGMP(void);
+ ~cComponentIGMP(void);
+};
+
+#endif // VDR_STREAMDEV_IGMPSERVER_H
diff --git a/server/connection.c b/server/connection.c
index 3e00aa7..74b2783 100644
--- a/server/connection.c
+++ b/server/connection.c
@@ -1,5 +1,5 @@
/*
- * $Id: connection.c,v 1.11 2008/04/08 14:18:18 schmirl Exp $
+ * $Id: connection.c,v 1.12 2009/02/13 10:39:22 schmirl Exp $
*/
#include "server/connection.h"
@@ -12,7 +12,8 @@
#include <stdarg.h>
#include <errno.h>
-cServerConnection::cServerConnection(const char *Protocol):
+cServerConnection::cServerConnection(const char *Protocol, int Type):
+ cTBSocket(Type),
m_Protocol(Protocol),
m_DeferClose(false),
m_Pending(false),
diff --git a/server/connection.h b/server/connection.h
index 69c24fe..2c28a09 100644
--- a/server/connection.h
+++ b/server/connection.h
@@ -1,5 +1,5 @@
/*
- * $Id: connection.h,v 1.6 2008/10/14 11:05:47 schmirl Exp $
+ * $Id: connection.h,v 1.7 2009/02/13 10:39:22 schmirl Exp $
*/
#ifndef VDR_STREAMDEV_SERVER_CONNECTION_H
@@ -44,7 +44,7 @@ protected:
public:
/* If you derive, specify a short string such as HTTP for Protocol, which
will be displayed in error messages */
- cServerConnection(const char *Protocol);
+ cServerConnection(const char *Protocol, int Type = SOCK_STREAM);
virtual ~cServerConnection();
/* If true, any client IP will be accepted */
diff --git a/server/connectionIGMP.c b/server/connectionIGMP.c
new file mode 100644
index 0000000..dc08798
--- /dev/null
+++ b/server/connectionIGMP.c
@@ -0,0 +1,64 @@
+/*
+ * $Id: connectionIGMP.c,v 1.1 2009/02/13 10:39:22 schmirl Exp $
+ */
+
+#include <ctype.h>
+
+#include "server/connectionIGMP.h"
+#include "server/server.h"
+#include "server/setup.h"
+#include <vdr/channels.h>
+
+cConnectionIGMP::cConnectionIGMP(const char* Name, int ClientPort, eStreamType StreamType) :
+ cServerConnection(Name, SOCK_DGRAM),
+ m_LiveStreamer(NULL),
+ m_ClientPort(ClientPort),
+ m_StreamType(StreamType)
+{
+}
+
+cConnectionIGMP::~cConnectionIGMP()
+{
+ delete m_LiveStreamer;
+}
+
+bool cConnectionIGMP::Start(cChannel *Channel, in_addr_t Dst)
+{
+ if (Channel != NULL) {
+ cDevice *device = GetDevice(Channel, 0);
+ if (device != NULL) {
+ device->SwitchChannel(Channel, false);
+ struct in_addr ip;
+ ip.s_addr = Dst;
+ if (Connect(inet_ntoa(ip), m_ClientPort)) {
+ m_LiveStreamer = new cStreamdevLiveStreamer(0);
+ if (m_LiveStreamer->SetChannel(Channel, m_StreamType)) {
+ m_LiveStreamer->SetDevice(device);
+ if (!SetDSCP())
+ LOG_ERROR_STR("unable to set DSCP sockopt");
+ Dprintf("streamer start\n");
+ m_LiveStreamer->Start(this);
+ return true;
+ }
+ else
+ esyslog("streamdev-server IGMP: SetDevice failed");
+ DELETENULL(m_LiveStreamer);
+ }
+ else
+ esyslog("streamdev-server IGMP: Connect failed: %m");
+ }
+ else
+ esyslog("streamdev-server IGMP: GetDevice failed");
+ }
+ else
+ esyslog("streamdev-server IGMP: Channel not found");
+ return false;
+}
+
+void cConnectionIGMP::Stop()
+{
+ if (m_LiveStreamer) {
+ m_LiveStreamer->Stop();
+ DELETENULL(m_LiveStreamer);
+ }
+}
diff --git a/server/connectionIGMP.h b/server/connectionIGMP.h
new file mode 100644
index 0000000..90abd58
--- /dev/null
+++ b/server/connectionIGMP.h
@@ -0,0 +1,45 @@
+/*
+ * $Id: connectionIGMP.h,v 1.1 2009/02/13 10:39:22 schmirl Exp $
+ */
+
+#ifndef VDR_STREAMDEV_SERVERS_CONNECTIONIGMP_H
+#define VDR_STREAMDEV_SERVERS_CONNECTIONIGMP_H
+
+#include "connection.h"
+#include "server/livestreamer.h"
+
+#include <tools/select.h>
+
+#define MULTICAST_PRIV_MIN ((uint32_t) 0xefff0000)
+#define MULTICAST_PRIV_MAX ((uint32_t) 0xeffffeff)
+
+class cStreamdevLiveStreamer;
+
+class cConnectionIGMP: public cServerConnection {
+private:
+ cStreamdevLiveStreamer *m_LiveStreamer;
+ int m_ClientPort;
+ eStreamType m_StreamType;
+
+public:
+ cConnectionIGMP(const char* Name, int ClientPort, eStreamType StreamType);
+ virtual ~cConnectionIGMP();
+
+ bool Start(cChannel *Channel, in_addr_t Dst);
+ void Stop();
+
+ /* Not used here */
+ virtual bool Command(char *Cmd) { return false; }
+
+ virtual void Attach(void) { if (m_LiveStreamer != NULL) m_LiveStreamer->Attach(); }
+ virtual void Detach(void) { if (m_LiveStreamer != NULL) m_LiveStreamer->Detach(); }
+
+ virtual bool Abort(void) const;
+};
+
+inline bool cConnectionIGMP::Abort(void) const
+{
+ return !m_LiveStreamer || m_LiveStreamer->Abort();
+}
+
+#endif // VDR_STREAMDEV_SERVERS_CONNECTIONIGMP_H
diff --git a/server/livefilter.c b/server/livefilter.c
index ac9c284..730fedc 100644
--- a/server/livefilter.c
+++ b/server/livefilter.c
@@ -1,14 +1,11 @@
/*
- * $Id: livefilter.c,v 1.5 2008/04/07 14:27:31 schmirl Exp $
+ * $Id: livefilter.c,v 1.6 2009/02/13 10:39:22 schmirl Exp $
*/
#include "server/livefilter.h"
#include "server/streamer.h"
#include "common.h"
-#ifndef TS_SIZE
-# define TS_SIZE 188
-#endif
#ifndef TS_SYNC_BYTE
# define TS_SYNC_BYTE 0x47
#endif
diff --git a/server/server.c b/server/server.c
index bc5e83f..1bdb20a 100644
--- a/server/server.c
+++ b/server/server.c
@@ -1,10 +1,11 @@
/*
- * $Id: server.c,v 1.9 2008/10/31 12:19:57 schmirl Exp $
+ * $Id: server.c,v 1.10 2009/02/13 10:39:22 schmirl Exp $
*/
#include "server/server.h"
#include "server/componentVTP.h"
#include "server/componentHTTP.h"
+#include "server/componentIGMP.h"
#include "server/setup.h"
#include <vdr/tools.h>
@@ -36,6 +37,12 @@ void cStreamdevServer::Initialize(void)
if (m_Instance == NULL) {
if (StreamdevServerSetup.StartVTPServer) Register(new cComponentVTP);
if (StreamdevServerSetup.StartHTTPServer) Register(new cComponentHTTP);
+ if (StreamdevServerSetup.StartIGMPServer) {
+ if (strcmp(StreamdevServerSetup.IGMPBindIP, "0.0.0.0") == 0)
+ esyslog("streamdev-server: Not starting IGMP. IGMP must be bound to a local IP");
+ else
+ Register(new cComponentIGMP);
+ }
m_Instance = new cStreamdevServer;
}
diff --git a/server/setup.c b/server/setup.c
index 6867cdb..710b1fa 100644
--- a/server/setup.c
+++ b/server/setup.c
@@ -1,5 +1,5 @@
/*
- * $Id: setup.c,v 1.5 2009/01/16 11:35:44 schmirl Exp $
+ * $Id: setup.c,v 1.6 2009/02/13 10:39:22 schmirl Exp $
*/
#include <vdr/menuitems.h>
@@ -16,10 +16,14 @@ cStreamdevServerSetup::cStreamdevServerSetup(void) {
StartHTTPServer = true;
HTTPServerPort = 3000;
HTTPStreamType = stTS;
+ StartIGMPServer = false;
+ IGMPClientPort = 1234;
+ IGMPStreamType = stTS;
SuspendMode = smAlways;
AllowSuspend = false;
strcpy(VTPBindIP, "0.0.0.0");
strcpy(HTTPBindIP, "0.0.0.0");
+ strcpy(IGMPBindIP, "0.0.0.0");
}
bool cStreamdevServerSetup::SetupParse(const char *Name, const char *Value) {
@@ -31,6 +35,10 @@ bool cStreamdevServerSetup::SetupParse(const char *Name, const char *Value) {
else if (strcmp(Name, "HTTPServerPort") == 0) HTTPServerPort = atoi(Value);
else if (strcmp(Name, "HTTPStreamType") == 0) HTTPStreamType = atoi(Value);
else if (strcmp(Name, "HTTPBindIP") == 0) strcpy(HTTPBindIP, Value);
+ else if (strcmp(Name, "StartIGMPServer") == 0) StartIGMPServer = atoi(Value);
+ else if (strcmp(Name, "IGMPClientPort") == 0) IGMPClientPort = atoi(Value);
+ else if (strcmp(Name, "IGMPStreamType") == 0) IGMPStreamType = atoi(Value);
+ else if (strcmp(Name, "IGMPBindIP") == 0) strcpy(IGMPBindIP, Value);
else if (strcmp(Name, "SuspendMode") == 0) SuspendMode = atoi(Value);
else if (strcmp(Name, "AllowSuspend") == 0) AllowSuspend = atoi(Value);
else return false;
@@ -55,7 +63,11 @@ cStreamdevServerMenuSetupPage::cStreamdevServerMenuSetupPage(void) {
AddShortEdit(tr("HTTP Server Port"), m_NewSetup.HTTPServerPort);
AddTypeEdit (tr("HTTP Streamtype"), m_NewSetup.HTTPStreamType);
AddIpEdit (tr("Bind to IP"), m_NewSetup.HTTPBindIP);
-
+ AddCategory (tr("Multicast Streaming Server"));
+ AddBoolEdit (tr("Start IGMP Server"), m_NewSetup.StartIGMPServer);
+ AddShortEdit(tr("Multicast Client Port"), m_NewSetup.IGMPClientPort);
+ AddTypeEdit (tr("Multicast Streamtype"), m_NewSetup.IGMPStreamType);
+ AddIpEdit (tr("Bind to IP"), m_NewSetup.IGMPBindIP);
SetCurrent(Get(1));
}
@@ -69,7 +81,10 @@ void cStreamdevServerMenuSetupPage::Store(void) {
|| strcmp(m_NewSetup.VTPBindIP, StreamdevServerSetup.VTPBindIP) != 0
|| m_NewSetup.StartHTTPServer != StreamdevServerSetup.StartHTTPServer
|| m_NewSetup.HTTPServerPort != StreamdevServerSetup.HTTPServerPort
- || strcmp(m_NewSetup.HTTPBindIP, StreamdevServerSetup.HTTPBindIP) != 0) {
+ || strcmp(m_NewSetup.HTTPBindIP, StreamdevServerSetup.HTTPBindIP) != 0
+ || m_NewSetup.StartIGMPServer != StreamdevServerSetup.StartIGMPServer
+ || m_NewSetup.IGMPClientPort != StreamdevServerSetup.IGMPClientPort
+ || strcmp(m_NewSetup.IGMPBindIP, StreamdevServerSetup.IGMPBindIP) != 0) {
restart = true;
cStreamdevServer::Destruct();
}
@@ -82,6 +97,10 @@ void cStreamdevServerMenuSetupPage::Store(void) {
SetupStore("HTTPServerPort", m_NewSetup.HTTPServerPort);
SetupStore("HTTPStreamType", m_NewSetup.HTTPStreamType);
SetupStore("HTTPBindIP", m_NewSetup.HTTPBindIP);
+ SetupStore("StartIGMPServer", m_NewSetup.StartIGMPServer);
+ SetupStore("IGMPClientPort", m_NewSetup.IGMPClientPort);
+ SetupStore("IGMPStreamType", m_NewSetup.IGMPStreamType);
+ SetupStore("IGMPBindIP", m_NewSetup.IGMPBindIP);
SetupStore("SuspendMode", m_NewSetup.SuspendMode);
SetupStore("AllowSuspend", m_NewSetup.AllowSuspend);
diff --git a/server/setup.h b/server/setup.h
index ff27618..d2b1592 100644
--- a/server/setup.h
+++ b/server/setup.h
@@ -1,5 +1,5 @@
/*
- * $Id: setup.h,v 1.1 2004/12/30 22:44:21 lordjaxom Exp $
+ * $Id: setup.h,v 1.2 2009/02/13 10:39:22 schmirl Exp $
*/
#ifndef VDR_STREAMDEV_SETUPSERVER_H
@@ -20,6 +20,10 @@ struct cStreamdevServerSetup {
int HTTPServerPort;
int HTTPStreamType;
char HTTPBindIP[20];
+ int StartIGMPServer;
+ int IGMPClientPort;
+ int IGMPStreamType;
+ char IGMPBindIP[20];
int SuspendMode;
int AllowSuspend;
};
diff --git a/server/streamer.c b/server/streamer.c
index 63776cf..9795cc6 100644
--- a/server/streamer.c
+++ b/server/streamer.c
@@ -1,5 +1,5 @@
/*
- * $Id: streamer.c,v 1.17 2008/10/22 11:59:32 schmirl Exp $
+ * $Id: streamer.c,v 1.18 2009/02/13 10:39:22 schmirl Exp $
*/
#include <vdr/ringbuffer.h>
@@ -55,16 +55,33 @@ void cStreamdevWriter::Action(void)
if (sel.CanWrite(*m_Socket)) {
int written;
- if ((written = m_Socket->Write(block + offset, count)) == -1) {
- esyslog("ERROR: streamdev-server: couldn't send data: %m");
+ int pkgsize = count;
+ // SOCK_DGRAM indicates multicast
+ if (m_Socket->Type() == SOCK_DGRAM) {
+ // don't fragment multicast packets
+ // max. payload on standard local ethernet is 1416 to 1456 bytes
+ // and some STBs expect complete TS packets
+ // so let's always limit to 7 * TS_SIZE = 1316
+ if (pkgsize > 7 * TS_SIZE)
+ pkgsize = 7 * TS_SIZE;
+ else
+ pkgsize -= pkgsize % TS_SIZE;
+ }
+ if ((written = m_Socket->Write(block + offset, pkgsize)) == -1) {
+ esyslog("ERROR: streamdev-server: couldn't send %d bytes: %m", pkgsize);
break;
}
+
+ // statistics
if (count > max)
max = count;
offset += written;
count -= written;
- if (count == 0) {
+
+ // less than one TS packet left:
+ // delete what we've written so far and get next chunk
+ if (count < TS_SIZE) {
m_Streamer->Del(offset);
block = NULL;
}
diff --git a/server/streamer.h b/server/streamer.h
index 9deec8d..20323b7 100644
--- a/server/streamer.h
+++ b/server/streamer.h
@@ -1,5 +1,5 @@
/*
- * $Id: streamer.h,v 1.9 2008/10/22 11:59:32 schmirl Exp $
+ * $Id: streamer.h,v 1.10 2009/02/13 10:39:22 schmirl Exp $
*/
#ifndef VDR_STREAMDEV_STREAMER_H
@@ -12,6 +12,10 @@
class cTBSocket;
class cStreamdevStreamer;
+#ifndef TS_SIZE
+#define TS_SIZE 188
+#endif
+
#define STREAMERBUFSIZE MEGABYTE(4)
#define WRITERBUFSIZE KILOBYTE(256)