Source code for know_your_ip.know_your_ip

#!/usr/bin/env python

from __future__ import annotations

import argparse
import logging
import re
import signal
import sys
import time
from collections import defaultdict
from csv import DictWriter
from functools import partial
from multiprocessing import Pool
from pathlib import Path
from typing import Any

import geoip2.database
import geoip2.webservice
import requests
import shodan
from bs4 import BeautifulSoup

from .config import KnowYourIPConfig
from .config import load_config as load_modern_config
from .ping import quiet_ping
from .traceroute import os_traceroute

logging.getLogger("requests").setLevel(logging.WARNING)

LOG_FILE = Path("know_your_ip.log")

MAX_RETRIES = 5

# Official AbuseIPDB category codes (as of 2024)
# Source: https://docs.abuseipdb.com/ and https://www.abuseipdb.com/categories
ABUSEIPDB_CATEGORIES = {
    "1": "DNS Compromise",
    "2": "DNS Poisoning",
    "3": "Fraud Orders",
    "4": "DDoS Attack",
    "5": "FTP Brute-Force",
    "6": "Ping of Death",
    "7": "Phishing",
    "8": "Fraud VoIP",
    "9": "Open Proxy",
    "10": "Web Spam",
    "11": "Email Spam",
    "12": "Blog Spam",
    "13": "VPN IP",
    "14": "Port Scan",
    "15": "Hacking",
    "16": "SQL Injection",
    "17": "Spoofing",
    "18": "Brute Force",
    "19": "Bad Web Bot",
    "20": "Exploited Host",
    "21": "Web App Attack",
    "22": "SSH",
    "23": "IoT Targeted",
}


def setup_logger() -> None:
    """Set up logging."""
    logging.basicConfig(
        level=logging.INFO,
        format="%(asctime)s %(levelname)-8s %(message)s",
        datefmt="%m-%d %H:%M",
        filename=LOG_FILE,
        filemode="w",
    )
    console = logging.StreamHandler()
    console.setLevel(logging.INFO)
    formatter = logging.Formatter("%(message)s")
    console.setFormatter(formatter)
    logging.getLogger("").addHandler(console)


def table_to_list(table: Any) -> list[list[str]]:
    """Convert BeautifulSoup table to list of lists."""
    dct = table_to_2d_dict(table)
    return list(iter_2d_dict(dct))


def table_to_2d_dict(table: Any) -> dict[int, dict[int, str]]:
    """Convert BeautifulSoup table to 2D dictionary."""
    result = defaultdict(lambda: defaultdict())
    for row_i, row in enumerate(table.find_all("tr")):
        for col_i, col in enumerate(row.find_all(["td", "th"])):
            colspan = int(col.get("colspan", 1))
            rowspan = int(col.get("rowspan", 1))
            col_data = col.text
            while row_i in result and col_i in result[row_i]:
                col_i += 1
            for i in range(row_i, row_i + rowspan):
                for j in range(col_i, col_i + colspan):
                    result[i][j] = col_data
    return result


def iter_2d_dict(dct: dict[int, dict[int, str]]) -> list[str]:
    """Iterate over 2D dictionary and yield column lists."""
    for _i, row in sorted(dct.items()):
        cols = []
        for _j, col in sorted(row.items()):
            cols.append(col)
        yield cols


def flatten_dict(dd: Any, separator: str = "_", prefix: str = "") -> dict[str, Any]:
    """Flatten nested dictionary using separator."""
    return (
        {
            prefix + separator + k if prefix else k: v
            for kk, vv in dd.items()
            for k, v in flatten_dict(vv, separator, kk).items()
        }
        if isinstance(dd, dict)
        else {prefix: dd}
    )


def clean_colname(name: str) -> str:
    """Clean column name to be valid identifier."""
    c = re.sub(r"\W|^(?=\d)", "_", name)
    return (re.sub("_+", "_", c)).lower()


def load_config(config_file: str | Path | None = None) -> KnowYourIPConfig:
    """Load configuration using modern Pydantic-based system.

    Args:
        config_file: Path to configuration file. If None, searches standard locations.

    Returns:
        Typed and validated configuration object.
    """
    if isinstance(config_file, str):
        config_file = Path(config_file)

    return load_modern_config(config_file)


[docs] def maxmind_geocode_ip(config: KnowYourIPConfig, ip: str) -> dict[str, Any]: """Get location of IP address from Maxmind City database (GeoLite2-City.mmdb) Args: config: Typed configuration object. ip: an IP address Returns: dict: Geolocation data Notes: There are other Maxmind databases including: * Country Database (GeoLite2-Country.mmdb) * Anonymous IP Database (GeoIP2-Anonymouse-IP.mmdb) * Connection-Type Database (GeoIP2-Connection-Type.mmdb) * Domain Database (GeoIP2-Domain.mmdb) * ISP Database (GeoIP2-ISP.mmdb) """ reader = geoip2.database.Reader(config.maxmind.db_path / "GeoLite2-City.mmdb") response = reader.city(ip) out = flatten_dict(response.raw, separator=".") reader.close() result = {} for k in out.keys(): result[f"maxmind.{k}"] = out[k] return result
[docs] def geonames_timezone( config: KnowYourIPConfig, lat: float, lng: float ) -> dict[str, Any]: """Get timezone for a latitude/longitude from GeoNames Args: config: Typed configuration object. lat (float): latitude lng (float): longitude Returns: dict: GeoNames data Notes: Please visit `this link <http://www.geonames.org/export/ws-overview.html>`_ for more information about GeoNames.org Web Services e.g. URL: http://api.geonames.org/timezone?lat=47.01&lng=10.2&username=demo Limit: 30,000 credits daily limit per application (identified by the parameter 'username'), the hourly limit is 2000 credits. A credit is a web service request hit for most services. An exception is thrown when the limit is exceeded. Example: geonames_timezone(config, 32.0617, 118.7778) """ data = {} payload = {"lat": lat, "lng": lng, "username": config.geonames.username} retry = 0 while retry < MAX_RETRIES: try: r = requests.get( "http://api.geonames.org/timezoneJSON", params=payload, timeout=30 ) if r.status_code == 200: out = r.json() for k in out.keys(): data[f"geonames.{k}"] = out[k] break except (requests.RequestException, ValueError, KeyError) as e: logging.warning(f"geonames_timezone: {e}") retry += 1 time.sleep(retry) return data
[docs] def tzwhere_timezone(config: KnowYourIPConfig, lat: float, lng: float) -> str | None: """Get timezone of a latitude/longitude using the tzwhere package. Args: config: Typed configuration object. lat (float): latitude lng (float): longitude Returns: dict: timezone data Example: tzwhere_timezone(args, 32.0617, 118.7778) """ from tzwhere import tzwhere # Note: This function now requires static initialization # Since we can't cache on the config object, we'll create fresh instance each time tz_finder = tzwhere.tzwhere() return tz_finder.tzNameAt(lat, lng)
[docs] def abuseipdb_api(config: KnowYourIPConfig, ip: str) -> dict[str, Any]: """Get abuse information for an IP address from AbuseIPDB API. Args: config: Configuration object containing AbuseIPDB settings including API key, days lookback period, and category mappings. ip: IP address to check for abuse reports. Returns: Dictionary containing AbuseIPDB analysis results with keys: - abuseipdb.categories: Human-readable abuse categories (e.g., "DDoS Attack|Phishing") - abuseipdb.confidence: Abuse confidence percentage - abuseipdb.country: Country of origin - abuseipdb.reports: Number of reports - Other fields from API response Note: Uses embedded category mapping to convert numeric category IDs (e.g., 4, 7, 15) to descriptive names (e.g., "DDoS Attack", "Phishing", "Hacking"). References: https://www.abuseipdb.com/api.html https://docs.abuseipdb.com/ Example: >>> config = KnowYourIPConfig() >>> config.abuseipdb.api_key = "your_api_key" >>> config.abuseipdb.days = 90 >>> result = abuseipdb_api(config, '222.186.30.49') >>> print(result.get('abuseipdb.categories', 'Clean')) SSH|Brute Force """ out = {} # Use embedded category mapping for better performance and reliability categories = ABUSEIPDB_CATEGORIES retry = 0 while retry < MAX_RETRIES: try: headers = {"Key": config.abuseipdb.api_key, "Accept": "application/json"} params = { "ipAddress": ip, "maxAgeInDays": config.abuseipdb.days, "verbose": "", } r = requests.get( "https://api.abuseipdb.com/api/v2/check", headers=headers, params=params, timeout=30, ) if r.status_code == 200: response = r.json() if "data" in response: data = response["data"] out = {} # Core fields out["abuseipdb.abuse_confidence_score"] = data.get( "abuseConfidencePercentage", 0 ) out["abuseipdb.country_code"] = data.get("countryCode") out["abuseipdb.usage_type"] = data.get("usageType") out["abuseipdb.isp"] = data.get("isp") out["abuseipdb.domain"] = data.get("domain") out["abuseipdb.is_public"] = data.get("isPublic") out["abuseipdb.is_whitelisted"] = data.get("isWhitelisted") out["abuseipdb.total_reports"] = data.get("totalReports", 0) out["abuseipdb.num_distinct_users"] = data.get( "numDistinctUsers", 0 ) out["abuseipdb.last_reported_at"] = data.get("lastReportedAt") # Process categories with embedded mapping if "categories" in data and data["categories"]: category_names = [] for cat_id in data["categories"]: cat_str = str(cat_id) if cat_str in categories: category_names.append(categories[cat_str]) out["abuseipdb.categories"] = ( "|".join(category_names) if category_names else "" ) else: out["abuseipdb.categories"] = "" break elif r.status_code == 429: logging.warning("AbuseIPDB rate limit exceeded") out["abuseipdb.status"] = "rate_limited" break elif r.status_code == 401: logging.error("AbuseIPDB authentication failed - check API key") out["abuseipdb.status"] = "auth_failed" break else: logging.warning( f"AbuseIPDB API returned status {r.status_code}: {r.text}" ) retry += 1 if retry < MAX_RETRIES: time.sleep(retry * 2) # Exponential backoff except requests.RequestException as e: logging.warning(f"abuseipdb_api request error: {e}") retry += 1 if retry < MAX_RETRIES: time.sleep(retry) return out
[docs] def abuseipdb_web(config: KnowYourIPConfig, ip: str) -> dict[str, Any]: """Get information from `AbuseIPDB website <https://www.abuseipdb.com/>`_ Args: config: Typed configuration object. ip (str): an IP address Returns: dict: AbuseIPDB information References: e.g. http://www.abuseipdb.com/check/94.31.29.154 Example: abuseipdb_web(args, '222.186.30.49') """ data = {} retry = 0 while retry < MAX_RETRIES: try: r = requests.get("http://www.abuseipdb.com/check/" + ip, timeout=30) if r.status_code == 200: soup = BeautifulSoup(r.text, "lxml") for t in soup.select("table"): table = table_to_list(t) for r in table: col = r[0].strip() col = clean_colname(col) data["abuseipdb." + col] = r[1] break div = soup.select("div#body div.well")[0] result = div.text if result.find("was not found") != -1: data["abuseipdb.found"] = 0 else: data["abuseipdb.found"] = 1 count = 0 for m in re.finditer(r"was reported (\d+) time", result): count = int(m.group(1)) break if count: for t in soup.select("table")[1:]: table = table_to_list(t) rows = [] for r in table: rows.append("|".join(r)) break data["abuseipdb.history"] = "\n".join(rows) break except (requests.RequestException, ValueError, KeyError) as e: logging.warning(f"abuseipdb_web: {e}") retry += 1 time.sleep(retry) return data
[docs] def ipvoid_scan(config: KnowYourIPConfig, ip: str) -> dict[str, Any]: """Get Blacklist information from `IPVoid website <http://www.ipvoid.com/ip-blacklist-check>`_ Args: config: Typed configuration object. ip (str): an IP address Returns: dict: IPVoid information Example: ipvoid_scan(args, '222.186.30.49') """ retry = 0 while retry < MAX_RETRIES: try: data = {} r = requests.post( "http://www.ipvoid.com/ip-blacklist-check/", data={"ip": ip}, timeout=30 ) if r.status_code == 200: soup = BeautifulSoup(r.text, "lxml") data = {} tables = soup.select("table") table = table_to_list(tables[0]) for r in table: col = "ipvoid." + clean_colname(r[0]) data[col] = r[1] alerts = [] for tr in tables[1].select("tr"): tds = tr.select("td") if len(tds) == 2: if len(tds[0].select("i.text-danger")): alerts.append(tds[0].text.strip()) data["ipvoid.alerts"] = "|".join(alerts) break except (requests.RequestException, ValueError, KeyError) as e: logging.warning(f"ipvoid_scan: {e}") retry += 1 time.sleep(retry) return data
[docs] def apivoid_api(config: KnowYourIPConfig, ip: str) -> dict[str, Any]: """Get information from APIVoid `IP Reputation API <https://www.apivoid.com/api/ip-reputation/>`_ Args: config: Typed configuration object. ip (str): an IP address Returns: dict: IP Reputation API information Notes: Must register and get 25 free API credits valid for 30 days Example: apivoid_api(args, '222.186.30.49') """ url = "https://endpoint.apivoid.com/iprep/v1/pay-as-you-go/" params = {"ip": ip, "key": config.apivoid.api_key} retry = 0 data = {} while retry < MAX_RETRIES: try: r = requests.get(url, params=params, timeout=30) if r.status_code == 200: out = r.json() logging.info( f"apivoid_api: credit_remained: {out['credits_remained']:.2f}, estimated_queries: {out['estimated_queries']}" ) out = flatten_dict(out["data"]["report"], separator=".") for k in out.keys(): if isinstance(out[k], list): out[k] = "|".join([str(i) for i in out[k]]) data["apivoid." + k] = out[k] break except (requests.RequestException, ValueError, KeyError) as e: logging.warning(f"apivoid_api: {e}") retry += 1 time.sleep(retry) return data
[docs] def censys_api(config: KnowYourIPConfig, ip: str) -> dict[str, Any]: """Get information from Censys Platform API. Note: Legacy Censys Search v1/v2 APIs are deprecated as of 2025. This uses the new Censys Platform API with updated authentication. Args: config: Typed configuration object containing Censys settings. ip: IP address to query. Returns: Dictionary containing Censys data with 'censys.' prefixed keys. Rate Limits: Free tier: 250 requests/month, 1 request per 2.5 seconds References: https://search.censys.io/api https://docs.censys.com/reference/get-started Example: >>> config.censys.enabled = True >>> config.censys.api_key = "your_api_key" >>> result = censys_api(config, '8.8.8.8') >>> print(result.get('censys.autonomous_system.name')) """ data = {} if not config.censys.api_key: logging.warning("Censys API key not configured") return data headers = { "Authorization": f"Bearer {config.censys.api_key}", "Content-Type": "application/json", } retry = 0 while retry < MAX_RETRIES: try: # Use the new Platform API endpoint for host lookup url = f"{config.censys.api_url}/v2/hosts/{ip}" r = requests.get(url, headers=headers, timeout=30) match r.status_code: case 200: result = r.json() if "result" in result: host_data = result["result"] # Extract key information with flattened keys data["censys.ip"] = host_data.get("ip") # Autonomous System information if "autonomous_system" in host_data: asn_data = host_data["autonomous_system"] data["censys.asn"] = asn_data.get("asn") data["censys.as_name"] = asn_data.get("name") data["censys.as_country_code"] = asn_data.get( "country_code" ) # Location information if "location" in host_data: loc_data = host_data["location"] data["censys.country"] = loc_data.get("country") data["censys.country_code"] = loc_data.get("country_code") data["censys.city"] = loc_data.get("city") data["censys.timezone"] = loc_data.get("timezone") # Services information if "services" in host_data and host_data["services"]: services = host_data["services"] ports = [ str(s.get("port", "")) for s in services if "port" in s ] data["censys.ports"] = "|".join(ports) if ports else "" protocols = [ s.get("transport_protocol", "") for s in services if "transport_protocol" in s ] data["censys.protocols"] = "|".join( {p for p in protocols if p} ) break case 404: logging.info(f"IP {ip} not found in Censys database") data["censys.status"] = "not_found" break case 429: logging.warning("Censys rate limit exceeded") data["censys.status"] = "rate_limited" break case 401 | 403: logging.error("Censys authentication failed - check API key") data["censys.status"] = "auth_failed" break case _: logging.warning( f"Censys API returned status {r.status_code}: {r.text}" ) retry += 1 if retry < MAX_RETRIES: time.sleep(retry * 2) # Exponential backoff except requests.RequestException as e: logging.warning(f"censys_api request error: {e}") retry += 1 if retry < MAX_RETRIES: time.sleep(retry) return data
[docs] def shodan_api(config: KnowYourIPConfig, ip: str) -> dict[str, Any]: """Get information from Shodan Args: config: Typed configuration object. ip (str): an IP address Returns: dict: Shodan information Example: shodan_api(args, '222.186.30.49') """ api = shodan.Shodan(config.shodan.api_key) data = {} try: out = api.host(ip) out = flatten_dict(out) for k in out.keys(): if isinstance(out[k], list): out[k] = "|".join([str(i) for i in out[k]]) data["shodan." + k] = out[k] except shodan.APIError as e: logging.warning(f"shodan_api(ip={ip}): {e}") return data
[docs] def virustotal_api(config: KnowYourIPConfig, ip: str) -> dict[str, Any]: """Get information from VirusTotal API v3. Args: config: Typed configuration object containing VirusTotal settings. ip: IP address to analyze. Returns: Dictionary containing VirusTotal analysis results with keys: - virustotal.harmless: Number of harmless detections - virustotal.malicious: Number of malicious detections - virustotal.suspicious: Number of suspicious detections - virustotal.undetected: Number of undetected results - virustotal.asn: Autonomous System Number - virustotal.as_owner: AS owner name - virustotal.country: Country code - virustotal.network: Network range - virustotal.reputation: Reputation score - virustotal.categories: Threat categories (if available) Note: VirusTotal API v3 Rate Limits: * Public API: 500 requests/day, 4 requests/minute * Premium API: Higher limits based on subscription Uses HTTP requests with improved error handling. Official vt-py client dependency is available for future async improvements. References: https://developers.virustotal.com/reference/ip-info https://docs.virustotal.com/reference/overview Example: >>> config = KnowYourIPConfig() >>> config.virustotal.api_key = "your_api_key" >>> result = virustotal_api(config, '8.8.8.8') >>> print(result['virustotal.reputation']) 530 """ data = {} if not config.virustotal.api_key: logging.warning("VirusTotal API key not configured") return data url = f"https://www.virustotal.com/api/v3/ip_addresses/{ip}" headers = {"x-apikey": config.virustotal.api_key} retry = 0 while retry < MAX_RETRIES: try: r = requests.get(url, headers=headers, timeout=30) match r.status_code: case 200: response_data = r.json() if "data" in response_data: # Extract key attributes from v3 API response attributes = response_data["data"].get("attributes", {}) # Process reputation and detection info if "last_analysis_stats" in attributes: stats = attributes["last_analysis_stats"] data["virustotal.harmless"] = stats.get("harmless", 0) data["virustotal.malicious"] = stats.get("malicious", 0) data["virustotal.suspicious"] = stats.get("suspicious", 0) data["virustotal.undetected"] = stats.get("undetected", 0) # Network info data["virustotal.asn"] = attributes.get("asn") data["virustotal.as_owner"] = attributes.get("as_owner") data["virustotal.country"] = attributes.get("country") data["virustotal.network"] = attributes.get("network") # Reputation score data["virustotal.reputation"] = attributes.get("reputation", 0) # Categories (if available) if "categories" in attributes: categories = attributes["categories"] if isinstance(categories, dict): data["virustotal.categories"] = "|".join( categories.keys() ) elif isinstance(categories, list): data["virustotal.categories"] = "|".join( [str(c) for c in categories] ) break case 404: # IP not found in VirusTotal database logging.info(f"IP {ip} not found in VirusTotal database") data["virustotal.status"] = "not_found" break case 429: # Rate limit exceeded logging.warning("VirusTotal rate limit exceeded") data["virustotal.status"] = "rate_limited" break case 401 | 403: # Authentication error logging.error("VirusTotal authentication failed - check API key") data["virustotal.status"] = "auth_failed" break case _: # Other HTTP errors logging.warning( f"VirusTotal API returned status {r.status_code}: {r.text}" ) retry += 1 if retry < MAX_RETRIES: time.sleep(retry * 2) # Exponential backoff except requests.RequestException as e: logging.warning(f"virustotal_api request error: {e}") retry += 1 if retry < MAX_RETRIES: time.sleep(retry) return data
[docs] def ping(config: KnowYourIPConfig, ip: str) -> dict[str, Any]: """Get information using Ping (ICMP protocol) Args: config: Typed configuration object. ip (str): an IP address Returns: dict: Ping statistics information Notes: Ping function is based on a pure python ping implementation using raw socket and you must have root (on Linux) or Admin (on Windows) privileges to run. Example: ping(args, '222.186.30.49') """ data = {} data["ping.count"] = config.ping.count data["ping.timeout"] = config.ping.timeout stat = quiet_ping(ip, timeout=config.ping.timeout, count=config.ping.count) if stat: data["ping.max"] = stat[0] data["ping.min"] = stat[1] data["ping.avg"] = stat[2] data["ping.percent_loss"] = stat[3] * 100 return data
[docs] def traceroute(config: KnowYourIPConfig, ip: str) -> dict[str, Any]: """Get information using traceroute Args: config: Typed configuration object. ip (str): an IP address Returns: dict: traceroute information Notes: Currently traceroute uses the operating system command traceroute on Linux and tracert on Windows. Example: traceroute(args, '222.186.30.49') """ data = {} hops = os_traceroute(ip, max_hops=config.traceroute.max_hops) data["traceroute.max_hops"] = config.traceroute.max_hops data["traceroute.hops"] = hops return data
def init_worker(): signal.signal(signal.SIGINT, signal.SIG_IGN)
[docs] def query_ip(config: KnowYourIPConfig, ip: str) -> dict[str, Any]: """Get all information of IP address Args: config: Typed configuration object. ip (str): an IP address Returns: dict: Information on the given IP address Example: query_ip(args, '222.186.30.49') """ data = {"ip": ip} udata = {} try: if config.ping.enabled: out = ping(config, ip) data.update(out) if config.traceroute.enabled: out = traceroute(config, ip) data.update(out) try: if config.maxmind.enabled: out = maxmind_geocode_ip(config, ip) lat = out["maxmind.location.latitude"] lng = out["maxmind.location.longitude"] data.update(out) if config.geonames.enabled: out = geonames_timezone(config, lat, lng) data.update(out) if config.tzwhere.enabled: tz = tzwhere_timezone(config, lat, lng) data["tzwhere.timezone"] = tz except Exception as e: logging.error(e) if config.abuseipdb.enabled: out = abuseipdb_api(config, ip) data.update(out) out = abuseipdb_web(config, ip) data.update(out) if config.ipvoid.enabled: out = ipvoid_scan(config, ip) data.update(out) if config.apivoid.enabled: out = apivoid_api(config, ip) data.update(out) if config.censys.enabled: out = censys_api(config, ip) data.update(out) if config.shodan.enabled: out = shodan_api(config, ip) data.update(out) if config.virustotal.enabled: out = virustotal_api(config, ip) data.update(out) # Encode columns to UTF-8 where possible, fallback to original value for k, v in data.items(): if k in config.output.columns: try: udata[k] = v.encode("utf-8") except (AttributeError, UnicodeEncodeError): udata[k] = v except Exception as e: logging.error(e) # Note: verbose flag removed - logging should be controlled by logging level import traceback traceback.print_exc() return udata
def main() -> None: setup_logger() parser = argparse.ArgumentParser(description="Know Your IP") parser.add_argument("ip", nargs="*", help="IP Address(es)") parser.add_argument("-f", "--file", help="List of IP addresses file") parser.add_argument("-c", "--config", help="Configuration file (TOML format)") parser.add_argument( "-o", "--output", default="output.csv", help="Output CSV file name" ) parser.add_argument( "-n", "--max-conn", type=int, default=5, help="Max concurrent connections" ) parser.add_argument( "--from", default=0, type=int, dest="from_row", help="From row number" ) parser.add_argument("--to", default=0, type=int, help="To row number") parser.add_argument( "-v", "--verbose", dest="verbose", action="store_true", help="Verbose mode" ) parser.add_argument( "--no-header", dest="header", action="store_false", help="Output without header at the first row", ) parser.set_defaults(header=True) parser.set_defaults(verbose=False) args = parser.parse_args() if args.file is None and len(args.ip) == 0: parser.error("at least one of IP address and --file is required") # Load configuration config = load_config(args.config) pool = Pool(processes=args.max_conn, initializer=init_worker) if args.file: with open(args.file) as f: args.ip = [ a.strip() for a in f.read().split("\n") if ((a.strip() != "") and not a.startswith("#")) ] f = open(args.output, "w") writer = DictWriter(f, fieldnames=config.output.columns) if args.header: writer.writeheader() row = 0 while row < len(args.ip): if row < args.from_row: row += 1 continue if args.to != 0 and row >= args.to: logging.info(f"Stop at row {row}") break logging.info(f"Row: {row}") try: partial_query_ip = partial(query_ip, config) ips = args.ip[row : row + args.max_conn] results = pool.map(partial_query_ip, ips) for data in results: edata = {} for k, v in data.items(): if v is not None: try: edata[k] = v.decode("utf-8") except (AttributeError, UnicodeDecodeError): edata[k] = v writer.writerow(edata) row += 1 except KeyboardInterrupt: pool.terminate() pool.join() break except Exception as e: logging.error(e) if args.verbose: import traceback traceback.print_exc() f.close() if __name__ == "__main__": sys.exit(main())