204 lines
7.3 KiB
Python
204 lines
7.3 KiB
Python
import os
|
|
import re
|
|
import json
|
|
|
|
import sublime
|
|
|
|
from ..console_write import console_write
|
|
from ..open_compat import open_compat, read_compat
|
|
from ..package_io import read_package_file
|
|
from ..cache import get_cache
|
|
from ..ca_certs import get_system_ca_bundle_path
|
|
from .no_ca_cert_exception import NoCaCertException
|
|
from .downloader_exception import DownloaderException
|
|
|
|
|
|
class CertProvider(object):
|
|
"""
|
|
A base downloader that provides access to a ca-bundle for validating
|
|
SSL certificates.
|
|
"""
|
|
|
|
def check_certs(self, domain, timeout):
|
|
"""
|
|
Ensures that the SSL CA cert for a domain is present on the machine
|
|
|
|
:param domain:
|
|
The domain to ensure there is a CA cert for
|
|
|
|
:param timeout:
|
|
The int timeout for downloading the CA cert from the channel
|
|
|
|
:raises:
|
|
NoCaCertException: when a suitable CA cert could not be found
|
|
|
|
:return:
|
|
The CA cert bundle path
|
|
"""
|
|
|
|
# Try to use the system CA bundle
|
|
ca_bundle_path = get_system_ca_bundle_path(self.settings)
|
|
if ca_bundle_path:
|
|
return ca_bundle_path
|
|
|
|
# If the system bundle did not work, fall back to our CA distribution
|
|
# system. Hopefully this will be going away soon.
|
|
if self.settings.get('debug'):
|
|
console_write(u'Unable to find system CA cert bundle, falling back to certs provided by Package Control')
|
|
|
|
cert_match = False
|
|
|
|
certs_list = get_cache('*.certs', self.settings.get('certs', {}))
|
|
|
|
ca_bundle_path = os.path.join(sublime.packages_path(), 'User', 'Package Control.ca-bundle')
|
|
if not os.path.exists(ca_bundle_path) or os.stat(ca_bundle_path).st_size == 0:
|
|
bundle_contents = read_package_file('Package Control', 'Package Control.ca-bundle', True)
|
|
if not bundle_contents:
|
|
raise NoCaCertException(u'Unable to copy distributed Package Control.ca-bundle', domain)
|
|
with open_compat(ca_bundle_path, 'wb') as f:
|
|
f.write(bundle_contents)
|
|
|
|
cert_info = certs_list.get(domain)
|
|
if cert_info:
|
|
cert_match = self.locate_cert(cert_info[0],
|
|
cert_info[1], domain, timeout)
|
|
|
|
wildcard_info = certs_list.get('*')
|
|
if wildcard_info:
|
|
cert_match = self.locate_cert(wildcard_info[0],
|
|
wildcard_info[1], domain, timeout) or cert_match
|
|
|
|
if not cert_match:
|
|
raise NoCaCertException(u'No CA certs available for %s' % domain, domain)
|
|
|
|
return ca_bundle_path
|
|
|
|
def locate_cert(self, cert_id, location, domain, timeout):
|
|
"""
|
|
Makes sure the SSL cert specified has been added to the CA cert
|
|
bundle that is present on the machine
|
|
|
|
:param cert_id:
|
|
The identifier for CA cert(s). For those provided by the channel
|
|
system, this will be an md5 of the contents of the cert(s). For
|
|
user-provided certs, this is something they provide.
|
|
|
|
:param location:
|
|
An http(s) URL, or absolute filesystem path to the CA cert(s)
|
|
|
|
:param domain:
|
|
The domain to ensure there is a CA cert for
|
|
|
|
:param timeout:
|
|
The int timeout for downloading the CA cert from the channel
|
|
|
|
:return:
|
|
If the cert specified (by cert_id) is present on the machine and
|
|
part of the Package Control.ca-bundle file in the User package folder
|
|
"""
|
|
|
|
ca_list_path = os.path.join(sublime.packages_path(), 'User', 'Package Control.ca-list')
|
|
if not os.path.exists(ca_list_path) or os.stat(ca_list_path).st_size == 0:
|
|
list_contents = read_package_file('Package Control', 'Package Control.ca-list')
|
|
if not list_contents:
|
|
raise NoCaCertException(u'Unable to copy distributed Package Control.ca-list', domain)
|
|
with open_compat(ca_list_path, 'w') as f:
|
|
f.write(list_contents)
|
|
|
|
ca_certs = []
|
|
with open_compat(ca_list_path, 'r') as f:
|
|
ca_certs = json.loads(read_compat(f))
|
|
|
|
if not cert_id in ca_certs:
|
|
if str(location) != '':
|
|
if re.match('^https?://', location):
|
|
contents = self.download_cert(cert_id, location, domain,
|
|
timeout)
|
|
else:
|
|
contents = self.load_cert(cert_id, location, domain)
|
|
if contents:
|
|
self.save_cert(cert_id, contents)
|
|
return True
|
|
return False
|
|
return True
|
|
|
|
def download_cert(self, cert_id, url, domain, timeout):
|
|
"""
|
|
Downloads CA cert(s) from a URL
|
|
|
|
:param cert_id:
|
|
The identifier for CA cert(s). For those provided by the channel
|
|
system, this will be an md5 of the contents of the cert(s). For
|
|
user-provided certs, this is something they provide.
|
|
|
|
:param url:
|
|
An http(s) URL to the CA cert(s)
|
|
|
|
:param domain:
|
|
The domain to ensure there is a CA cert for
|
|
|
|
:param timeout:
|
|
The int timeout for downloading the CA cert from the channel
|
|
|
|
:return:
|
|
The contents of the CA cert(s)
|
|
"""
|
|
|
|
cert_downloader = self.__class__(self.settings)
|
|
if self.settings.get('debug'):
|
|
console_write(u"Downloading CA cert for %s from \"%s\"" % (domain, url), True)
|
|
return cert_downloader.download(url,
|
|
'Error downloading CA certs for %s.' % domain, timeout, 1)
|
|
|
|
def load_cert(self, cert_id, path, domain):
|
|
"""
|
|
Copies CA cert(s) from a file path
|
|
|
|
:param cert_id:
|
|
The identifier for CA cert(s). For those provided by the channel
|
|
system, this will be an md5 of the contents of the cert(s). For
|
|
user-provided certs, this is something they provide.
|
|
|
|
:param path:
|
|
The absolute filesystem path to a file containing the CA cert(s)
|
|
|
|
:param domain:
|
|
The domain name the cert is for
|
|
|
|
:return:
|
|
The contents of the CA cert(s)
|
|
"""
|
|
|
|
if os.path.exists(path):
|
|
if self.settings.get('debug'):
|
|
console_write(u"Copying CA cert for %s from \"%s\"" % (domain, path), True)
|
|
with open_compat(path, 'rb') as f:
|
|
return f.read()
|
|
else:
|
|
raise NoCaCertException(u"Unable to find CA cert for %s at \"%s\"" % (domain, path), domain)
|
|
|
|
def save_cert(self, cert_id, contents):
|
|
"""
|
|
Saves CA cert(s) to the Package Control.ca-bundle
|
|
|
|
:param cert_id:
|
|
The identifier for CA cert(s). For those provided by the channel
|
|
system, this will be an md5 of the contents of the cert(s). For
|
|
user-provided certs, this is something they provide.
|
|
|
|
:param contents:
|
|
The contents of the CA cert(s)
|
|
"""
|
|
|
|
|
|
ca_bundle_path = os.path.join(sublime.packages_path(), 'User', 'Package Control.ca-bundle')
|
|
with open_compat(ca_bundle_path, 'ab') as f:
|
|
f.write(b"\n" + contents)
|
|
|
|
ca_list_path = os.path.join(sublime.packages_path(), 'User', 'Package Control.ca-list')
|
|
with open_compat(ca_list_path, 'r') as f:
|
|
ca_certs = json.loads(read_compat(f))
|
|
ca_certs.append(cert_id)
|
|
with open_compat(ca_list_path, 'w') as f:
|
|
f.write(json.dumps(ca_certs, indent=4))
|