#
# coding=utf-8
"""utils - Utility functions for cupage"""
# Copyright © 2009-2014 James Rowe <jnrowe@gmail.com>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
import datetime
import json
import os
import re
import socket
import sys
try:
# For Python 3
from urllib import robotparser
import urllib.parse as urlparse
except ImportError:
import robotparser # NOQA
import urlparse # NOQA
import blessings
import httplib2
from .i18n import _
T = blessings.Terminal()
try:
# httplib2 0.8 and above support setting certs via ca_certs_locater module,
# making this dirty mess even dirtier
assert [int(i) for i in httplib2.__version__.split('.')] >= [0, 8]
import ca_certs_locater
except (AssertionError, ImportError):
_HTTPLIB2_BUNDLE = os.path.realpath(os.path.dirname(httplib2.CA_CERTS))
SYSTEM_CERTS = \
not _HTTPLIB2_BUNDLE.startswith(os.path.dirname(httplib2.__file__))
CA_CERTS = None
CURL_CERTS = False
if not SYSTEM_CERTS and sys.platform.startswith('linux'):
for cert_file in ['/etc/ssl/certs/ca-certificates.crt',
'/etc/pki/tls/certs/ca-bundle.crt']:
if os.path.exists(cert_file):
CA_CERTS = cert_file
SYSTEM_CERTS = True
break
elif not SYSTEM_CERTS and sys.platform.startswith('freebsd'):
if os.path.exists('/usr/local/share/certs/ca-root-nss.crt'):
CA_CERTS = '/usr/local/share/certs/ca-root-nss.crt'
SYSTEM_CERTS = True
elif os.path.exists(os.getenv('CURL_CA_BUNDLE', '')):
CA_CERTS = os.getenv('CURL_CA_BUNDLE')
CURL_CERTS = True
else:
CA_CERTS = ca_certs_locater.get()
[docs]def parse_timedelta(delta):
"""Parse human readable frequency.
:param str delta: Frequency to parse
"""
match = re.match('^(\d+(?:|\.\d+)) *([hdwmy])$', delta, re.IGNORECASE)
if not match:
raise ValueError("Invalid 'frequency' value")
value, units = match.groups()
units = 'hdwmy'.index(units.lower())
# hours per hour/day/week/month/year
multiplier = (1, 24, 168, 672, 8760)
return datetime.timedelta(hours=float(value) * multiplier[units])
[docs]def sort_packages(packages):
"""Order package list according to version number.
:param list packages: Packages to sort
"""
# Very ugly key function, but it handles the common case of varying
# component length just about 'Good Enough™'
return sorted(packages,
key=lambda s: [i for i in s if i.isdigit() or i == '.'])
[docs]def robots_test(http, url, name, user_agent='*'):
"""Check whether a given URL is blocked by ``robots.txt``.
:param http: :class:`httplib2.Http` object to use for requests
:param str url: URL to check
:param name: Site name being checked
:param str user_agent: User agent to check in :file:`robots.txt`
"""
parsed = urlparse.urlparse(url, 'http')
if parsed.scheme.startswith('http'):
robots_url = '%(scheme)s://%(netloc)s/robots.txt' \
% parsed._asdict()
robots = robotparser.RobotFileParser(robots_url)
try:
headers, content = http.request(robots_url)
except httplib2.ServerNotFoundError:
print(fail(_('Domain name lookup failed for %s') % name))
return False
except socket.timeout:
print(fail(_('Socket timed out on %s') % name))
return False
# Ignore errors 4xx errors for robots.txt
if not str(headers.status).startswith('4'):
robots.parse(content.splitlines())
if not robots.can_fetch(user_agent, url):
print(fail(_("Can't check %s, blocked by robots.txt") % name))
return False
def _format_info(text, colour):
return '%s %s' % (getattr(T, 'bold_white_on_%s' % colour)('*'),
getattr(T, 'bright_%s' % colour)(text))
[docs]def success(text):
"""Format a success message with colour, if possible.
:param str text: Text to format
"""
return _format_info(text, 'green')
[docs]def fail(text):
"""Format a failure message with colour, if possible.
:param str text: Text to format
"""
return _format_info(text, 'red')
[docs]def warn(text):
"""Format a warning message with colour, if possible.
:param str text: Text to format
"""
return _format_info(text, 'yellow')
[docs]class CupageEncoder(json.JSONEncoder):
"""Custom JSON encoding for supporting ``datetime`` objects."""
[docs] def default(self, obj):
"""Handle ``datetime`` objects when encoding as JSON.
This simply falls through to :meth:`~json.JSONEncoder.default` if
``obj`` has no ``isoformat`` method.
:param obj: Object to encode
"""
try:
return obj.isoformat()
except TypeError:
pass
return json.JSONEncoder.default(self, obj)
[docs]def json_to_datetime(obj):
"""Parse ``checked`` datetimes from ``cupage`` databases.
:see: `json.JSONDecoder`
:param obj: Object to decode
"""
if 'checked' in obj:
try:
result = datetime.datetime.strptime(obj['checked'],
'%Y-%m-%dT%H:%M:%S.%f')
except TypeError:
try:
# <0.7 compatibility
result = datetime.datetime.fromtimestamp(float(obj['checked']))
except TypeError:
result = None
obj['checked'] = result
return obj
def charset_from_headers(headers):
"""Parse charset from headers.
:param httplib2.Response headers: Request headers
:return: Defined encoding, or default to ISO-8859-1
"""
match = re.search("charset=([^ ;]+)", headers.get('content-type', ""))
if match:
charset = match.groups()[0]
else:
charset = "iso-8859-1"
return charset