Source code for backscatter

#!/usr/bin/env python
"""Abstract API over the Backscatter API."""
import json
import logging
import requests


__author__ = "Brandon Dixon"
__copyright__ = "Copyright, Backscatter"
__credits__ = ["Brandon Dixon"]
__license__ = "MIT"
__maintainer__ = "Brandon Dixon"
__email__ = "brandon@backscatter.io"
__status__ = "BETA"


class RequestFailure(Exception):
    """Exception to capture a failed request."""
    pass


class InvalidResponse(Exception):
    """Exception to capture a failed response parse."""
    pass


def valid_date(date):
    """Check the input date and ensure it matches the format."""
    import datetime
    try:
        datetime.datetime.strptime(date, '%Y-%m-%d')
    except ValueError:
        raise ValueError("Incorrect data format, should be YYYY-MM-DD")


def valid_ip(ip_address, strict=True):
    """Check if the IP address is valid."""
    if not ip_address:
        return False
    import socket
    try:
        socket.inet_aton(ip_address)
        return True
    except socket.error:
        if strict:
            raise ValueError("Invalid IP address")
        return False


def detect_query(query):
    """Attempt to identify the query type being made."""
    if valid_ip(query) and query.count('.') == 3:
        return 'ip'
    if '/' in query:
        return 'network'
    if not query.isdigit():
        return 'country'
    return None


[docs]class Backscatter: """Abstract interface for Backscatter.""" NAME = "Backscatter" LOG_LEVEL = logging.DEBUG BASE_URL = "https://api.backscatter.io" CLIENT_VERSION = 1 API_VERSION = "v0" EP_ENRICHMENT = "enrichment/{query_type}" EP_OBSERVATIONS = "observations/{query_type}" EP_TRENDS_POPULAR = "trends/popular/{trend_type}" VALID_ENRICHMENT = ['ip', 'asn', 'network', 'port'] VALID_OBSERVATIONS = ['ip', 'asn', 'country', 'network', 'port'] VALID_TRENDS = ['ip', 'asn', 'country', 'network', 'port'] def __init__(self, api_key, version=API_VERSION, log_level=LOG_LEVEL, proxies=None, headers=None): """Init the object.""" self._log = self._logger() self.api_key = api_key self.version = version self._proxies = proxies self._headers = headers self.set_log_level(log_level)
[docs] def _logger(self): """Create a logger to be used between processes. :returns: Logging instance. """ import sys logger = logging.getLogger(self.NAME) logger.setLevel(self.LOG_LEVEL) shandler = logging.StreamHandler(sys.stdout) fmt = '\033[1;32m%(levelname)-5s %(module)s:%(funcName)s():' fmt += '%(lineno)d %(asctime)s\033[0m| %(message)s' shandler.setFormatter(logging.Formatter(fmt)) logger.addHandler(shandler) return logger
[docs] def set_log_level(self, level): """Set the log level.""" to_set = 0 if level == 20: to_set = logging.INFO if level == 10: to_set = logging.DEBUG if level == 40: to_set = logging.ERROR self._log.setLevel(to_set)
[docs] def _request(self, endpoint, params=dict(), data=None): """Handle the requesting of information from the API.""" client_value = "Python Backscatter v%s" % (str(self.CLIENT_VERSION)) headers = {'X-Request-Client': client_value, 'X-API-Key': self.api_key} if self._headers: headers.update(self._headers) url = '/'.join([self.BASE_URL, self.API_VERSION, endpoint]) kwargs = {'url': url, 'headers': headers, 'timeout': 30, 'params': params, 'data': data} if self._proxies: kwargs.update({'proxies': self._proxies}) response = requests.get(**kwargs) if response.status_code not in range(200, 299): raise RequestFailure(response.status_code, response.content) try: loaded = json.loads(response.content) except Exception as error: raise InvalidResponse(error) return loaded
[docs] def get_observations(self, query, query_type=None, scope=None): """Get observations based on a specific query value. :param query: Value to search with :type query: str :param query_type: Type of observation search to run :type query_type: str :param scope: Days of history to search back from today :type scope: int :return: Listing of observations from Backscatter :rtype: dict """ if not query_type: query_type = detect_query(query_type) if not query_type: raise Exception("Ambiguous query type found, please specify using the 'query_type' argument.") if query_type not in self.VALID_OBSERVATIONS: message = "Invalid observation type. Must be of: %s" % (', '.join(self.VALID_OBSERVATIONS)) raise RequestFailure(message) endpoint = self.EP_OBSERVATIONS.format(query_type=query_type) params = {'query': query, 'scope': scope} return self._request(endpoint, params=params)
[docs] def enrich(self, query, query_type=None): """Enrich a specific value with additional context. :param query: Value to search with :type query: str :param query_type: Type of observation search to run :type query_type: str :return: Enrichment information for the query :rtype: dict """ if not query_type: query_type = detect_query(query) if not query_type: raise Exception("Ambiguous query type found, please specify using the 'query_type' argument.") if query_type not in self.VALID_ENRICHMENT: message = "Invalid enrichment type. Must be of: %s" % (', '.join(self.VALID_ENRICHMENT)) raise RequestFailure(message) endpoint = self.EP_ENRICHMENT.format(query_type=query_type) params = {'query': query} return self._request(endpoint, params=params)