Source code for frisbee

#!/usr/bin/env python
import copy
import json
import logging
import os
import random
from importlib import import_module
from typing import ClassVar
from typing import Dict
from typing import List
import namesgenerator
from frisbee.utils import gen_logger
from frisbee.utils import str_datetime
from frisbee.utils import now_time
from concurrent.futures import ProcessPoolExecutor, as_completed


os.environ['OBJC_DISABLE_INITIALIZE_FORK_SAFETY'] = 'YES'


def dyn_loader(module: str, kwargs: str):
    """Dynamically load a specific module instance.

    The purpose of this function is to peek into the modules directory and
    load up the parsing module the user has specified. The benefit this brings
    is that we have no hardcoding for any module identification. Drop it into
    the folder and it's callable.
    """
    package_directory: str = os.path.dirname(os.path.abspath(__file__))
    modules: str = package_directory + "/modules"
    module = module + ".py"
    if module not in os.listdir(modules):
        raise Exception("Module %s is not valid" % module)
    module_name: str = module[:-3]
    import_path: str = "%s.%s" % ('frisbee.modules', module_name)
    imported = import_module(import_path)
    obj = getattr(imported, 'Module')
    return obj(**kwargs)


def collect(job):
    """Collect based on the job order.

    Ideally this would be part of the Frisbee class, but futures are not a fan
    of referencing self and prefer to have their targets outside of the class
    space.
    """
    print("Job: %s" % str(job))
    engine = dyn_loader(job['engine'], job)
    job['start_time'] = now_time()
    results = engine.search()
    job['end_time'] = now_time()
    duration: str = str((job['end_time'] - job['start_time']).seconds)
    job['duration'] = duration
    job.update({'results': results})
    return job


[docs]class Frisbee: """Class to interact with the core code.""" NAME: ClassVar[str] = "Frisbee" def __init__(self, project: str = namesgenerator.get_random_name(), log_level: int = logging.INFO, save: bool = False): """Creation. The moons and the planets are there.""" self.project: str = project self.project += "_%d" % (random.randint(100000, 999999)) self._log: logging.Logger = gen_logger(self.NAME, log_level) self.output: bool = save self.folder: str = os.getcwd() self._config_bootstrap() self._processed: List = list() self.results: List = list() self.saved: List = list()
[docs] def _reset(self) -> None: """Reset some of the state in the class for multi-searches.""" self.project: str = namesgenerator.get_random_name() self.project += "_%d" % (random.randint(100000, 999999)) self._processed: List = list() self.results: List = list()
[docs] def _config_bootstrap(self) -> None: """Handle the basic setup of the tool prior to user control. Bootstrap will load all the available modules for searching and set them up for use by this main class. """ if self.output: self.folder: str = os.getcwd() + "/results" if not os.path.exists(self.folder): os.mkdir(self.folder) self.folder += "/" + self.project os.mkdir(self.folder)
[docs] def _progressive_save(self, job) -> None: """Save output to a dictionary as results stream in. Depending on the options used, Frisbee can run for quite a long time. Each individual job is written after its completed and includes the findings along with the job details. """ self._log.info("Saving results to '%s'" % self.folder) path: str = self.folder + "/" if job['domain'] in self.saved: return job['start_time'] = str_datetime(job['start_time']) job['end_time'] = str_datetime(job['end_time']) jid: int = random.randint(100000, 999999) filename: str = "%s_%s_%d_job.json" % (self.project, job['domain'], jid) handle = open(path + filename, 'w') handle.write(json.dumps(job, indent=4)) handle.close() filename = "%s_%s_%d_emails.txt" % (self.project, job['domain'], jid) handle = open(path + filename, 'w') for email in job['results']['emails']: handle.write(email + "\n") handle.close() self.saved.append(job['domain'])
[docs] def search(self, jobs: List[Dict[str, str]], executor=None) -> None: """Perform searches based on job orders.""" if not isinstance(jobs, list): raise Exception("Jobs must be of type list.") self._log.info("Project: %s" % self.project) self._log.info("Processing jobs: %d", len(jobs)) if not executor: # Reuse the same executor pool when processing greedy jobs executor = ProcessPoolExecutor() futures = [executor.submit(collect, job) for job in jobs] for future in as_completed(futures): output = future.result() output.update({'project': self.project}) self._processed.append(output['domain']) self.results.append(output) self._progressive_save(output) if output['greedy']: bonus_jobs: List = list() observed: List = list() for item in output['results']['emails']: part_split = item.split('@') if len(part_split) == 1: continue found: str = item.split('@')[1] if found in self._processed or found in observed: continue observed.append(found) base: Dict = dict() base['limit'] = output['limit'] base['modifier'] = output['modifier'] base['engine'] = output['engine'] base['greedy'] = False base['domain'] = found bonus_jobs.append(base) if bonus_jobs: self.search(bonus_jobs, executor=executor) self._log.info("All jobs processed")
[docs] def get_results(self) -> List: """Return results from the search.""" return self.results