# Slixmpp: The Slick XMPP Library
# Copyright (C) 2011 Nathanael C. Fritz
# This file is part of Slixmpp.
# See the file LICENSE for copying permission.
import datetime as dt
from slixmpp import Message
from slixmpp.xmlstream import register_stanza_plugin, ElementBase, ET, JID
from slixmpp.plugins.xep_0004 import Form
from slixmpp.plugins import xep_0082
[docs]
class Event(ElementBase):
namespace = 'http://jabber.org/protocol/pubsub#event'
name = 'event'
plugin_attrib = 'pubsub_event'
interfaces = set()
[docs]
class EventItem(ElementBase):
namespace = 'http://jabber.org/protocol/pubsub#event'
name = 'item'
plugin_attrib = name
interfaces = {'id', 'payload', 'node', 'publisher'}
[docs]
def set_payload(self, value):
self.xml.append(value)
[docs]
def get_payload(self):
children = list(self.xml)
if len(children) > 0:
return children[0]
[docs]
def del_payload(self):
for child in self.xml:
self.xml.remove(child)
[docs]
class EventRetract(ElementBase):
namespace = 'http://jabber.org/protocol/pubsub#event'
name = 'retract'
plugin_attrib = name
interfaces = {'id'}
[docs]
class EventItems(ElementBase):
namespace = 'http://jabber.org/protocol/pubsub#event'
name = 'items'
plugin_attrib = name
interfaces = {'node'}
[docs]
class EventCollection(ElementBase):
namespace = 'http://jabber.org/protocol/pubsub#event'
name = 'collection'
plugin_attrib = name
interfaces = {'node'}
[docs]
class EventAssociate(ElementBase):
namespace = 'http://jabber.org/protocol/pubsub#event'
name = 'associate'
plugin_attrib = name
interfaces = {'node'}
[docs]
class EventDisassociate(ElementBase):
namespace = 'http://jabber.org/protocol/pubsub#event'
name = 'disassociate'
plugin_attrib = name
interfaces = {'node'}
[docs]
class EventConfiguration(ElementBase):
namespace = 'http://jabber.org/protocol/pubsub#event'
name = 'configuration'
plugin_attrib = name
interfaces = {'node'}
[docs]
class EventPurge(ElementBase):
namespace = 'http://jabber.org/protocol/pubsub#event'
name = 'purge'
plugin_attrib = name
interfaces = {'node'}
[docs]
class EventDelete(ElementBase):
namespace = 'http://jabber.org/protocol/pubsub#event'
name = 'delete'
plugin_attrib = name
interfaces = {'node', 'redirect'}
[docs]
def set_redirect(self, uri):
del self['redirect']
redirect = ET.Element('{%s}redirect' % self.namespace)
redirect.attrib['uri'] = uri
self.xml.append(redirect)
[docs]
def get_redirect(self):
redirect = self.xml.find('{%s}redirect' % self.namespace)
if redirect is not None:
return redirect.attrib.get('uri', '')
return ''
[docs]
def del_redirect(self):
redirect = self.xml.find('{%s}redirect' % self.namespace)
if redirect is not None:
self.xml.remove(redirect)
[docs]
class EventSubscription(ElementBase):
namespace = 'http://jabber.org/protocol/pubsub#event'
name = 'subscription'
plugin_attrib = name
interfaces = {'node', 'expiry', 'jid', 'subid', 'subscription'}
[docs]
def get_expiry(self):
expiry = self._get_attr('expiry')
if expiry.lower() == 'presence':
return expiry
return xep_0082.parse(expiry)
[docs]
def set_expiry(self, value):
if isinstance(value, dt.datetime):
value = xep_0082.format_datetime(value)
self._set_attr('expiry', value)
[docs]
def set_jid(self, value):
self._set_attr('jid', str(value))
[docs]
def get_jid(self):
return JID(self._get_attr('jid'))
register_stanza_plugin(Message, Event)
register_stanza_plugin(Event, EventCollection)
register_stanza_plugin(Event, EventConfiguration)
register_stanza_plugin(Event, EventPurge)
register_stanza_plugin(Event, EventDelete)
register_stanza_plugin(Event, EventItems)
register_stanza_plugin(Event, EventSubscription)
register_stanza_plugin(EventCollection, EventAssociate)
register_stanza_plugin(EventCollection, EventDisassociate)
register_stanza_plugin(EventConfiguration, Form)
register_stanza_plugin(EventItems, EventItem, iterable=True)
register_stanza_plugin(EventItems, EventRetract, iterable=True)