# -*- coding: utf-8 -*-
#
# LinOTP - the open source solution for two factor authentication
# Copyright (C) 2010 - 2014 LSE Leading Security Experts GmbH
#
# This file is part of LinOTP server.
#
# This program is free software: you can redistribute it and/or
# modify it under the terms of the GNU Affero General Public
# License, version 3, as published by the Free Software Foundation.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the
# GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
#
# E-mail: linotp@lsexperts.de
# Contact: www.linotp.org
# Support: www.lsexperts.de
#
""" contains utility functions """
from linotp.lib.error import ParameterError
from linotp.lib.config import getFromConfig
from pylons import request
from pylons import config
from pylons.controllers.util import abort
import binascii
import os
import string
from linotp.lib.crypt import urandom
from linotp.lib.crypt import geturandom
import pkg_resources
import logging
log = logging.getLogger(__name__)
from linotp.lib.selftest import isSelfTest
from netaddr import IPAddress, IPNetwork
import re
SESSION_KEY_LENGTH = 32
optional = True
required = False
[docs]def get_version_number():
'''
returns the linotp version
'''
return pkg_resources.get_distribution("linotp").version
[docs]def get_version():
'''
This returns the version, that is displayed in the WebUI and self service portal.
'''
version = get_version_number()
return "LinOTP %s" % version
[docs]def get_copyright_info():
'''
This returns the copyright information displayed in the WebUI and selfservice portal.
'''
return "2010-2014 LSE Leading Security Experts GmbH"
[docs]def getParam(param, which, optional=True):
"""
getParam()
input:
- param (hash set): the set, which contains all parameters
- which (lteral): the entry lookup
- optional (boolean): defines if this parameter is optional or not
- an exception is thrown if the parameter is required
- otherwise: nothing done!
return:
- the value (literal) of the parameter if exists or nothing
in case the parameter is optional, otherwise throw an exception
"""
ret = None
if param.has_key(which):
ret = param[which]
else:
if (optional == False):
raise ParameterError("Missing parameter: %r" % which, id=905)
return ret
[docs]def getLowerParams(param):
ret = {}
for key in param:
lkey = key.lower()
# strip the session parameter!
if "session" != lkey:
lval = param[key]
ret[lkey] = lval
log.debug("[getLowerParams] Parameter key:%s=%s", lkey, lval)
return ret
[docs]def getUserRealm(param, optional):
user = ""
realm = ""
return (user, realm)
[docs]def uniquify(doubleList):
# uniquify the realm list
uniqueList = []
for e in doubleList:
if e.lower() not in uniqueList:
uniqueList.append(e.lower())
return uniqueList
[docs]def generate_otpkey(key_size=20):
'''
generates the HMAC key of keysize. Should be 20 or 32
THe key is returned as a hexlified string
'''
log.debug("generating key of size %s" % key_size)
return binascii.hexlify(geturandom(key_size))
[docs]def generate_password(size=6, characters=string.ascii_lowercase + string.ascii_uppercase + string.digits):
return ''.join(urandom.choice(characters) for x in range(size))
[docs]def check_session():
'''
This function checks the session cookie for management API
and compares it to the session parameter
'''
if isSelfTest():
return
# check if the client is in the allowed IP range
no_session_clients = [c.strip() for c in config.get("linotpNoSessionCheck", "").split(",")]
client = request.environ.get('REMOTE_ADDR', None)
log.debug("[check_session] checking %s in %s" % (client, no_session_clients))
for network in no_session_clients:
if not network:
continue
try:
if IPAddress(client) in IPNetwork(network):
log.debug("[check_session] skipping session check since client %s in allowed: %s" % (client, no_session_clients))
return
except Exception as ex:
log.warning("[check_session] misconfiguration in linotpNoSessionCheck: %r - %r" % (network, ex))
if request.path.lower() == '/admin/getsession':
log.debug('[check_session] requesting a new session cookie')
else:
cookie = request.cookies.get('admin_session')
session = request.params.get('session')
# doing any other request, we need to check the session!
log.debug("[check_session]: session: %s" % session)
log.debug("[check_session]: cookie: %s" % cookie)
if session is None or session == "" or session != cookie:
log.error("[check_session] The request did not pass a valid session!")
abort(401, "You have no valid session!")
pass
[docs]def check_selfservice_session():
'''
This function checks the session cookie for the
selfservcice session
'''
res = True
cookie = None
session = None
log.debug(request.path.lower())
# All functions starting with /selfservice/user are data functions and protected
# by the session key
if request.path.lower()[:17] != "/selfservice/user":
log.info('[check_selfservice_session] nothing to check')
else:
try:
cookie = request.cookies.get('linotp_selfservice')[0:40]
session = request.params.get('session')[0:40]
except Exception as e:
log.warning("[check_selfservice_session] failed to check selfservice session: %r" % e)
res = False
log.info("[check_selfservice_session]: session: %s" % session)
log.info("[check_selfservice_session]: cookie: %s" % cookie)
if session is None or session != cookie:
log.error("[check_selfservice_session] The request %s did not pass a valid session!" % request.url)
res = False
return res
[docs]def remove_session_from_param(param):
'''
Some low level functions like the userlisting do not like to have a
session parameter in the param dictionary.
So we remove the session from the params.
'''
return_param = {}
for key in param.keys():
if "session" != key.lower():
return_param[key] = param[key]
return return_param
#########################################################################################
# Client overwriting stuff
#
[docs]def get_client_from_request():
'''
This function returns the client as it is passed in the HTTP Request.
This is the very HTTP client, that contacts the LinOTP server.
'''
client = request.environ.get('REMOTE_ADDR', None)
log.debug("[get_client_from_request] got the client %s" % client)
return client
[docs]def get_client_from_param():
'''
This function returns the client, that is passed with the GET/POST parameter "client"
'''
param = request.params
client = getParam(param, "client", optional)
log.debug("[get_client_from_param] got the client %s" % client)
return client
[docs]def get_client():
'''
This function returns the client.
It first tries to get the client as it is passed as the HTTP Client via REMOTE_ADDR.
If this client Address is in a list, that is allowed to overwrite its client address (like e.g.
a FreeRADIUS server, which will always pass the FreeRADIUS address but not the address of the
RADIUS client) it checks for the existance of the client parameter.
'''
may_overwrite = []
over_client = getFromConfig("mayOverwriteClient", "")
log.debug("[get_client] config entry mayOverwriteClient: %s" % over_client)
try:
may_overwrite = [ c.strip() for c in over_client.split(',') ]
except Exception as e:
log.warning("[get_client] evaluating config entry 'mayOverwriteClient': %r" % e)
client = get_client_from_request()
log.debug("[get_client] got the original client %s" % client)
if client in may_overwrite or client == None:
log.debug("[get_client] client %s may overwrite!" % client)
if get_client_from_param():
client = get_client_from_param()
log.debug("[get_client] client overwritten to %s" % client)
log.debug("[get_client] returning %s" % client)
return client
[docs]def normalize_activation_code(activationcode, upper=True, convert_o=True, convert_0=True):
'''
This normalizes the activation code.
1. lower letters are capitaliezed
2. Oh's in the last two characters are turned to zeros
3. zeros in in the first-2 characters are turned to Ohs
'''
if upper:
activationcode = activationcode.upper()
if convert_o:
activationcode = activationcode[:-2] + activationcode[-2:].replace("O", "0")
if convert_0:
activationcode = activationcode[:-2].replace("0", "O") + activationcode[-2:]
return activationcode
[docs]def is_valid_fqdn(hostname, split_port=False):
'''
Checks if the hostname is a valid FQDN
'''
if split_port:
hostname = hostname.split(':')[0]
if len(hostname) > 255:
return False
if hostname[-1:] == ".":
hostname = hostname[:-1] # strip exactly one dot from the right, if present
allowed = re.compile("(?!-)[A-Z\d-]{1,63}(?<!-)$", re.IGNORECASE)
return all(allowed.match(x) for x in hostname.split("."))
[docs]def remove_empty_lines(doc):
'''
remove empty lines from the input document
@param doc: documemt containing long multiline text
@type doc: string
@return: data without empty lines
@rtype: string
'''
data = '\n'.join([line for line in doc.split('\n') if line.strip() != ''])
return data
##
## Modhex calculations for Yubikey
##
hexHexChars = '0123456789abcdef'
modHexChars = 'cbdefghijklnrtuv'
hex2ModDict = dict(zip(hexHexChars, modHexChars))
mod2HexDict = dict(zip(modHexChars, hexHexChars))
[docs]def modhex_encode(s):
return ''.join(
[ hex2ModDict[c] for c in s.encode('hex') ]
)
# end def modhex_encode
[docs]def modhex_decode(m):
return ''.join(
[ mod2HexDict[c] for c in m ]
).decode('hex')
# end def modhex_decode
[docs]def checksum(msg):
crc = 0xffff;
for i in range(0, len(msg) / 2):
b = int(msg[i * 2] + msg[(i * 2) + 1], 16)
crc = crc ^ (b & 0xff)
for j in range(0, 8):
n = crc & 1
crc = crc >> 1
if n != 0:
crc = crc ^ 0x8408
return crc