Source code for slixmpp.plugins.xep_0404.mix_anon

# Slixmpp: The Slick XMPP Library
# Copyright (C) 2020 Mathieu Pasquet <mathieui@mathieui.net>
# This file is part of Slixmpp.
# See the file LICENSE for copying permission.
from typing import (
    Dict,
    Optional,
    Set,
    Tuple,
)

from slixmpp import JID, Message, Iq
from slixmpp.exceptions import IqError, IqTimeout
from slixmpp.plugins import BasePlugin
from slixmpp.xmlstream import register_stanza_plugin
from slixmpp.xmlstream.matcher import MatchXPath
from slixmpp.xmlstream.handler import Callback
from slixmpp.plugins.xep_0404 import stanza
from slixmpp.plugins.xep_0004.stanza import Form


NODES = [
    'urn:xmpp:mix:nodes:jidmap',
]


[docs] class XEP_0404(BasePlugin): '''XEP-0404: MIX JID Hidden Channels''' name = 'xep_0404' description = 'XEP-0404: MIX-ANON' dependencies = {'xep_0369'} stanza = stanza namespace = stanza.NS def plugin_init(self) -> None: stanza.register_plugins()
[docs] async def get_anon_raw(self, channel: JID, *, ifrom: Optional[JID] = None, **pubsubkwargs) -> Iq: """ Get the jid-participant mapping result (raw). :param JID channel: MIX channel JID """ return await self.xmpp['xep_0030'].get_items( channel.bare, ifrom=ifrom, **pubsubkwargs )
[docs] async def get_anon_by_jid(self, channel: JID, *, ifrom: Optional[JID] = None, **pubsubkwargs) -> Dict[JID, str]: """ Get the jid-participant mapping, by JID :param JID channel: MIX channel JID """ raw = await self.get_anon_raw(channel, ifrom=ifrom, **pubsubkwargs) mapping = {} for item in raw['pubsub']['items']: mapping[item['anon_participant']['jid']] = item['id'] return mapping
[docs] async def get_anon_by_id(self, channel: JID, *, ifrom: Optional[JID] = None, **pubsubkwargs) -> Dict[str, JID]: """ Get the jid-participant mapping, by participant id :param JID channel: MIX channel JID """ raw = await self.get_anon_raw(channel, ifrom=ifrom, **pubsubkwargs) mapping = {} for item in raw['pubsub']['items']: mapping[item['id']] = item['anon_participant']['jid'] return mapping
[docs] async def get_preferences(self, channel: JID, *, ifrom: Optional[JID] = None, **iqkwargs) -> Form: """ Get channel preferences with default values. :param JID channel: MIX channel JID """ iq = self.xmpp.make_iq_get(ito=channel.bare, ifrom=ifrom) iq.enable('user_preference') prefs_stanza = await iq.send(**iqkwargs) return prefs_stanza['user_preference']['form']
[docs] async def set_preferences(self, channel: JID, form: Form, *, ifrom: Optional[JID] = None, **iqkwargs) -> Form: """ Set channel preferences :param JID channel: MIX channel JID :param Form form: A 0004 form with updated preferences """ iq = self.xmpp.make_iq_set(ito=channel.bare, ifrom=ifrom) iq['user_preference']['form'] = form prefs_result = await iq.send(**iqkwargs) return prefs_result['user_preference']['form']