Firewall

Contents

Firewall#

Use iptables to block IP addresses by country for inbound ports on a server. This is a whitelist: what is not explicitly allowed is blocked. This is useful, for example, to filter most SSH bruteforce attacks while leaving a webserver freely available. This is certainly not the most efficient approach at filtering because you can easily end with thousands of iptables rules that need to be scanned one by one.

See also

  • A collection of scripts I have written and/or adapted that I currently use on my systems as automated tasks [1]

  • Simple shell script for GNU/Linux, built on iptables, which is able to filter incoming packets based on accepted port numbers and countries. It is aimed to SOHO users. [2]

  • Linux Iptables Just Block By Country - nixCraft [3]

  • Simple stateful firewall - ArchWiki [4]

  • iptables - ArchWiki [5]

  • 25 Most Frequently Used Linux IPTables Rules Examples [6]

Setup#

  1. install the dependencies

    apt-get install iptables python3-yaml python3-requests iptables-persistent
    
  2. answer Yes to all questions

  3. install fpyutils. See reference

  4. create the jobs directories. See reference

    mkdir -p /home/jobs/{scripts,services}/by-user/root
    
  5. create the script

    /home/jobs/scripts/by-user/root/iptables_geoport.py#
    #!/usr/bin/env python3
    #
    # iptables_geoport.py
    #
    # Copyright (C) 2020-2022 Franco Masotti (franco \D\o\T masotti {-A-T-} tutanota \D\o\T com)
    #
    # This program is free software; you can redistribute it and/or modify
    # it under the terms of the GNU General Public License as published by
    # the Free Software Foundation; either version 2 of the License, or
    # (at your option) any later version.
    #
    # This program is distributed in the hope that it will be useful,
    # but WITHOUT ANY WARRANTY; without even the implied warranty of
    # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
    # GNU General Public License for more details.
    #
    # You should have received a copy of the GNU General Public License along
    # with this program; if not, write to the Free Software Foundation, Inc.,
    # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
    
    # OLD NOTICES
    # ===========
    # See url for more info - http://www.cyberciti.biz/faq/?p=3402
    # Author: nixCraft <www.cyberciti.biz> under GPL v.2.0+
    # Post Author: frnmst (Franco Masotti) (franco \D\o\T masotti {-A-T-} tutanota \D\o\T com)
    # New version heavily based on https://wiki.archlinux.org/index.php/Simple_stateful_firewall
    #   https://wiki.archlinux.org/index.php/Iptables
    #   and a little on http://www.thegeekstuff.com/2011/06/iptables-rules-examples/ as well as nixCraft for the bash stuff.
    # ===========
    r"""iptables_geoport.py."""
    
    import copy
    import ipaddress
    import os
    import pathlib
    import shlex
    import sys
    import urllib
    
    import fpyutils
    import requests
    import yaml
    
    
    class UserNotRoot(Exception):
        r"""The user running the script is not root."""
    
    
    ##################
    # Basic commands #
    ##################
    
    
    def reset_rules():
        r"""Reset the chains and the tables."""
        # https://wiki.archlinux.org/index.php/Iptables#Resetting_rules
        #
        # Copyright (C) 2020 Arch Wiki contributors.
        # Permission is granted to copy, distribute and/or modify this document
        # under the terms of the GNU Free Documentation License, Version 1.3
        # or any later version published by the Free Software Foundation;
        # with no Invariant Sections, no Front-Cover Texts, and no Back-Cover Texts.
        # A copy of the license is included in the section entitled "GNU
        # Free Documentation License".
        commands = dict()
        commands['flush_tcp_chain'] = 'iptables --flush TCP'
        commands['flush_udp_chain'] = 'iptables --flush UDP'
        commands['flush_input_chain'] = 'iptables --flush INPUT'
        commands['flush_output_chain'] = 'iptables --flush OUTPUT'
        commands['flush_logging_chain'] = 'iptables --flush LOGGING'
    
        commands['delete_tcp_chain'] = 'iptables --delete-chain TCP'
        commands['delete_udp_chain'] = 'iptables --delete-chain UDP'
        commands['delete_logging_chain'] = 'iptables --delete-chain LOGGING'
    
        commands['flush_mangle_table'] = 'iptables --table mangle --flush'
        commands['delete_mangle_chain'] = 'iptables --table mangle --delete-chain'
        commands['flush_raw_table'] = 'iptables --table raw --flush'
        commands['delete_raw_chain'] = 'iptables --table raw --delete-chain'
        commands['flush_security_table'] = 'iptables --table security --flush'
        commands[
            'delete_security_chain'] = 'iptables --table security --delete-chain'
        commands['accept_input'] = 'iptables --policy INPUT ACCEPT'
        commands['accept_forward'] = 'iptables --policy FORWARD ACCEPT'
        commands['accept_output'] = 'iptables --policy OUTPUT ACCEPT'
    
        # sys._getframe().f_code.co_name is the function name.
        fix_dict_keys(commands, sys._getframe().f_code.co_name, '__')
        return commands
    
    
    def initialize_basic_chains() -> dict:
        r"""Apply some basic rules for a single machine."""
        # https://wiki.archlinux.org/index.php/Simple_stateful_firewall#Firewall_for_a_single_machine
        #
        # Copyright (C) 2020 Arch Wiki contributors.
        # Permission is granted to copy, distribute and/or modify this document
        # under the terms of the GNU Free Documentation License, Version 1.3
        # or any later version published by the Free Software Foundation;
        # with no Invariant Sections, no Front-Cover Texts, and no Back-Cover Texts.
        # A copy of the license is included in the section entitled "GNU
        # Free Documentation License".
        commands = dict()
    
        # Output traffic is NOT filtered.
        commands['output_chain'] = 'iptables --policy OUTPUT ACCEPT'
    
        # Create two user defined chains that will define tcp an udp protocol rules later.
        commands['tcp_chain'] = 'iptables --new-chain TCP'
        commands['udp_chain'] = 'iptables --new-chain UDP'
    
        # For a single machine, however, we simply set the policy of the FORWARD chain to DROP and move on
        commands['drop_forward'] = 'iptables --policy FORWARD DROP'
    
        # The first rule added to the INPUT chain will allow traffic that belongs
        # to established connections, or new valid traffic that is related to these
        # connections such as ICMP errors, or echo replies.
        commands[
            'allow_realted_established'] = 'iptables --append INPUT --match conntrack --ctstate RELATED,ESTABLISHED --jump ACCEPT'
    
        # loopback interface INPUT traffic enabled for ping and debugging stuff.
        commands[
            'loopback'] = 'iptables --append INPUT --in-interface lo --jump ACCEPT'
    
        # Drop all invalid INPUT (i.e. damaged) packets.
        # To do this connection must be tracked (conntrack)
        # and connection state (cstate) is set to INVALID.
        commands[
            'invalid_input'] = 'iptables --append INPUT --match conntrack --ctstate INVALID --jump DROP'
    
        # Allow icmp type 8 (i.e. ping) to all interfaces.
        commands[
            'ping'] = 'iptables --append INPUT --protocol icmp --icmp-type 8 --match conntrack --ctstate NEW --jump ACCEPT'
    
        # TCP snd UDP chains are connected to INPUT chains.
        # These two user-defined chains will manage the ports.
        # Remember that tcp uses SYN to initialize a connection, unlike UDP
        commands[
            'connect_tcp_chain'] = 'iptables --append INPUT --protocol tcp --syn -m conntrack --ctstate NEW --jump TCP'
        commands[
            'connect_udp_chain'] = 'iptables --append INPUT --protocol udp -m conntrack --ctstate NEW --jump UDP'
    
        fix_dict_keys(commands, sys._getframe().f_code.co_name, '__')
        return commands
    
    
    def initialize_logging_chain() -> dict:
        r"""Create the logging chain."""
        # https://wiki.archlinux.org/index.php/Iptables#Logging
        #
        # Copyright (C) 2020 Arch Wiki contributors.
        # Permission is granted to copy, distribute and/or modify this document
        # under the terms of the GNU Free Documentation License, Version 1.3
        # or any later version published by the Free Software Foundation;
        # with no Invariant Sections, no Front-Cover Texts, and no Back-Cover Texts.
        # A copy of the license is included in the section entitled "GNU
        # Free Documentation License".
        commands = dict()
    
        commands['logging_chain'] = 'iptables --new-chain LOGGING'
        commands[
            'connect_logging_chain'] = 'iptables --append INPUT --jump LOGGING'
        commands[
            'logging_limit'] = 'iptables --append LOGGING --match limit --limit 2/hour --limit-burst 10 --jump LOG'
    
        fix_dict_keys(commands, sys._getframe().f_code.co_name, '__')
        return commands
    
    
    def initialize_blocking_rules(drop_packets: bool = True,
                                  logging: bool = True) -> dict:
        r"""Initialize blocking rules."""
        # https://wiki.archlinux.org/index.php/Simple_stateful_firewall#Firewall_for_a_single_machine
        #
        # Copyright (C) 2020 Arch Wiki contributors.
        # Permission is granted to copy, distribute and/or modify this document
        # under the terms of the GNU Free Documentation License, Version 1.3
        # or any later version published by the Free Software Foundation;
        # with no Invariant Sections, no Front-Cover Texts, and no Back-Cover Texts.
        # A copy of the license is included in the section entitled "GNU
        # Free Documentation License".
        commands = dict()
    
        if logging:
            chain = 'LOGGING'
        else:
            chain = 'INPUT'
        if drop_packets:
            # Drop everything.
            commands['drop'] = 'iptables --append ' + chain + ' --jump DROP'
        else:
            # RFC compilant.
            commands[
                'rfc_tcp'] = 'iptables --append ' + chain + ' --protocol tcp --jump REJECT --reject-with tcp-rst'
            commands[
                'rfc_udp'] = 'iptables --append ' + chain + ' --protocol udp --jump REJECT --reject-with icmp-port-unreachable'
            # Other protocols are usually not used, so REJECT those packets with icmp-proto-unreachable.
            commands[
                'proto_unreachable'] = 'iptables --append ' + chain + ' --jump REJECT --reject-with icmp-proto-unreachable'
    
        fix_dict_keys(commands, sys._getframe().f_code.co_name, '__')
        return commands
    
    
    def set_accepted_rules(ports: dict, accepted_ips: dict) -> dict:
        r"""Set accepted rules."""
        assert_input_ports_struct(ports)
        assert_accepted_ips_struct(accepted_ips)
    
        commands = dict()
        # O(ports*chains*ips) <= O(ips^3) because of how this script works.
        i = 0
        for port in ports:
            source = ports[port]['source']
            ips = accepted_ips[source]
            chains, protocols = get_chains_and_protocols(ports[port]['protocol'])
            for w, chain in enumerate(chains):
                for ip in ips:
                    commands[str(i)] = generate_accepted_rule_command(
                        chain, protocols[w], port, ip)
                    i += 1
    
        fix_dict_keys(commands, sys._getframe().f_code.co_name, '__')
        return commands
    
    
    def set_patch_rules(rules: list) -> dict:
        r"""Pass raw commands directly."""
        for r in rules:
            if not isinstance(r, str):
                raise TypeError
    
        commands = dict()
        for i, rule in enumerate(rules):
            commands[str(i)] = rule
    
        fix_dict_keys(commands, sys._getframe().f_code.co_name, '__')
        return commands
    
    
    def initialize_drop_rules() -> dict:
        r"""Initialize drop rules."""
        commands = dict()
        commands['drop'] = 'iptables --policy INPUT DROP'
    
        fix_dict_keys(commands, sys._getframe().f_code.co_name, '__')
        return commands
    
    
    #########
    # Utils #
    #########
    
    
    def get_chains_and_protocols(protocol: str) -> tuple:
        r"""Compute the iptables chain and protocol values."""
        if protocol not in ['tcp', 'udp', 'all']:
            raise ValueError
    
        chains = list()
        protocols = list()
        if protocol == 'tcp':
            chains.append('TCP')
            protocols.append('tcp')
        elif protocol == 'udp':
            chains.append('UDP')
            protocols.append('udp')
        elif protocol == 'all':
            chains.append('TCP')
            chains.append('UDP')
            protocols.append('tcp')
            protocols.append('udp')
    
        return chains, protocols
    
    
    def generate_accepted_rule_command(chain: str, protocol: str, port: str,
                                       remote_ip: str) -> str:
        r"""Generate a single command for the accepted rules."""
        check_port(port)
        check_ip_address(remote_ip)
    
        return ('iptables --append ' + chain + ' --protocol ' + protocol +
                ' --dport ' + port + ' --source ' + remote_ip + ' --jump ACCEPT')
    
    
    def fix_dict_keys(dictionary: dict, prefix: str, separator: str):
        r"""Fix the keys of a dictionary by adding a prefix and separator."""
        d = copy.deepcopy(dictionary)
        for key in d:
            dictionary[prefix + separator + key] = d[key]
            del dictionary[key]
    
    
    def load_zone_file(zone_file: str) -> list:
        r"""Load zone file."""
        zones = list()
        with open(zone_file) as f:
            line = f.readline().strip()
            while line:
                check_ip_address(line)
                zones.append(line)
                line = f.readline().strip()
    
        return zones
    
    
    def load_zone_files(zone_files: list) -> list:
        r"""Load all the zone files content in a flat data structure."""
        for zf in zone_files:
            if not isinstance(zf, str):
                raise TypeError
    
        zones = list()
        for zf in zone_files:
            zones += load_zone_file(zf)
    
        return zones
    
    
    def get_filename_from_url(url: str) -> str:
        r"""Use some tricks to get the filemame from a URL."""
        return pathlib.PurePath(urllib.parse.urlparse(url).path).name
    
    
    def download_zone_file(url: str, dst_directory: str) -> str:
        r"""Save the zone file."""
        filename = get_filename_from_url(url)
        pathlib.Path(dst_directory).mkdir(mode=0o700, parents=True, exist_ok=True)
        full_path = str(pathlib.Path(dst_directory, filename))
        try:
            r = requests.get(url, timeout=60)
            with open(full_path, 'w') as f:
                f.write(r.text)
        except requests.ConnectionError:
            pass
    
        return full_path
    
    
    def download_zone_files(urls: list, cache_directory: str) -> list:
        r"""Download multiple zone files."""
        for u in urls:
            if not isinstance(u, str):
                raise TypeError
    
        files = list()
        for u in urls:
            files.append(download_zone_file(u, cache_directory))
    
        return files
    
    
    def update_accepted_ips_structure(accepted_ips: dict, zones: list):
        r"""Update some data structures."""
        assert_accepted_ips_struct(accepted_ips)
        assert_zones_struct(zones)
    
        accepted_ips['wan'] = zones
        accepted_ips['all'] = accepted_ips['lan'] + accepted_ips['wan']
    
    
    def get_packet_policy(invalid_packet_policy: str) -> bool:
        r"""Update a variable."""
        if invalid_packet_policy not in ['polite', 'rude']:
            raise ValueError
    
        drop = True
        if invalid_packet_policy == 'rude':
            drop = True
        elif invalid_packet_policy == 'polite':
            drop = False
        return drop
    
    
    ##############
    # Assertions #
    ##############
    
    
    def check_ip_address(ip: str):
        r"""Verify that we are dealing with a network address."""
        ipaddress.ip_network(ip, strict=True)
    
    
    def check_port(port: str):
        r"""Check that the input port is a valid port number."""
        if not port.isdigit():
            raise TypeError
        if not (int(port) >= 0 and int(port) <= (2**16) - 1):
            raise ValueError
    
    
    def assert_zones_struct(zones: list):
        r"""Check that the data structure is a list of ip addresses."""
        for z in zones:
            if not isinstance(z, str):
                raise TypeError
            check_ip_address(z)
    
    
    def assert_accepted_ips_struct(accepted_ips: dict):
        r"""Check that the data structure is a list of ip addresses."""
        for ips in accepted_ips:
            if not isinstance(accepted_ips[ips], list):
                raise TypeError
            for j in accepted_ips[ips]:
                if not isinstance(j, str):
                    raise TypeError
                check_ip_address(j)
    
    
    def assert_input_ports_struct(ports: dict):
        r"""Check that the data structure is correct."""
        for port in ports:
            check_port(port)
            if not isinstance(ports[port], dict):
                raise TypeError
            if 'source' not in ports[port]:
                raise ValueError
            if 'protocol' not in ports[port]:
                raise ValueError
            if ports[port]['source'] not in ['lan', 'wan', 'all']:
                raise ValueError
    
    
    ############
    # Pipeline #
    ############
    
    if __name__ == '__main__':
        if os.getuid() != 0:
            raise UserNotRoot
    
        # Load the configuration.
        configuration_file = shlex.quote(sys.argv[1])
        config = yaml.load(open(configuration_file), Loader=yaml.SafeLoader)
        dry_run = config['dry_run']
        cache_directory = config['cache_directory']
        zone_files = config['accepted_ips']['wan']
        accepted_ips = dict()
        accepted_ips['lan'] = config['accepted_ips']['lan']
        accepted_ips['wan'] = list()
        patch_rules = config['patch_rules']
        set_patch_rules_first = config['set_patch_rules_first']
        input_ports = config['input_ports']
        logging = config['logging_enabled']
        invalid_packet_policy = config['invalid_packet_policy']
        drop_packets = get_packet_policy(invalid_packet_policy)
    
        # Get the data.
        zones = load_zone_files(download_zone_files(zone_files, cache_directory))
        update_accepted_ips_structure(accepted_ips, zones)
    
        # Apply the rules.
        reset = reset_rules()
        basic_chains = initialize_basic_chains()
        logging_chain = initialize_logging_chain()
        blocking_rules = initialize_blocking_rules(drop_packets, logging)
        rules = set_accepted_rules(input_ports, accepted_ips)
        patch = set_patch_rules(patch_rules)
        drop_by_default = initialize_drop_rules()
    
        # Merge the rules.
        if set_patch_rules_first:
            commands = {
                **reset,
                **basic_chains,
                **logging_chain,
                **blocking_rules,
                **patch,
                **rules,
                **drop_by_default
            }
        else:
            commands = {
                **reset,
                **basic_chains,
                **logging_chain,
                **blocking_rules,
                **rules,
                **patch,
                **drop_by_default
            }
    
        # Apply the rules.
        for c in commands:
            fpyutils.shell.execute_command_live_output(commands[c],
                                                       dry_run=dry_run)
    
  6. create a configuration file

    /home/jobs/scripts/by-user/root/iptables_geoport.yaml#
    #
    # iptables_geoport.yaml
    #
    # Copyright (C) 2020-2022 Franco Masotti (franco \D\o\T masotti {-A-T-} tutanota \D\o\T com)
    #
    # This program is free software; you can redistribute it and/or modify
    # it under the terms of the GNU General Public License as published by
    # the Free Software Foundation; either version 2 of the License, or
    # (at your option) any later version.
    #
    # This program is distributed in the hope that it will be useful,
    # but WITHOUT ANY WARRANTY; without even the implied warranty of
    # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
    # GNU General Public License for more details.
    #
    # You should have received a copy of the GNU General Public License along
    # with this program; if not, write to the Free Software Foundation, Inc.,
    # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
    
    # If set to true print the commands that would be executed.
    dry_run: true
    
    logging_enabled: true
    
    # {rude,polite}
    invalid_packet_policy: 'rude'
    
    # The path where the ip addresses list will be saved.
    cache_directory: './.cache'
    
    # Have a look at https://www.ipdeny.com/ipblocks/
    accepted_ips:
      wan:
        - 'https://www.ipdeny.com/ipblocks/data/countries/it.zone'
      lan:
        - '192.168.0.0/24'
    
    # If set to 'true' the "patch rules" will be applied before the "input ports" rules.
    set_patch_rules_first: true
    
    # Raw rules that override the default ones.
    # Use "[]" if you do not need patch rules.
    patch_rules:
      # Web server.
      - 'iptables -A TCP -p tcp --dport 80 -j ACCEPT'
      - 'iptables -A TCP -p tcp --dport 443 -j ACCEPT'
    
      # SSH LAN only.
      - 'iptables -A TCP -s 192.168.0.0/24 -p tcp -m tcp --dport 22 -j ACCEPT'
    
      # SMTP.
      - 'iptables -A TCP -p tcp --dport 465 -j ACCEPT'
    
      # IMAP.
      - 'iptables -A TCP -p tcp --dport 993 -j ACCEPT'
    
      # Avahi
      - 'iptables -A UDP -p udp -m udp --dport 5353 -j ACCEPT'
      - 'iptables -A INPUT -p udp --dport 5353 -j ACCEPT'
      - 'iptables -A OUTPUT -p udp --dport 5353 -j ACCEPT'
    
      # KDE Connect.
      - 'iptables -A UDP -p udp --dport 1714:1764 -j ACCEPT'
      - 'iptables -A TCP -p tcp --dport 1714:1764 -j ACCEPT'
    
    # source:       {lan,wan,all}
    # protocol:     {tcp,udp,all}
    input_ports:
      '2222':
        source:     'lan'
        protocol:   'tcp'
      '2223':
        source:     'lan'
        protocol:   'tcp'
      '5555':
        source:     'lan'
        protocol:   'tcp'
      '53':
        source:     'lan'
        protocol:   'all'
    
      # CUPS.
      '631':
        source:     'lan'
        protocol:   'tcp'
    
      '8100':
        source:     'lan'
        protocol:   'tcp'
    
      # Required for SANE.
      '6566':
        source:     'lan'
        protocol:   'all'
    
      # Move SSH rules here for performance reasons.
      '22':
        source:     'wan'
        protocol:   'tcp'
    

    Note

    Rules are scanned sequentially. Move frequently used rules upper in the file to improve performance.

  7. use this Systemd service unit file

    /home/jobs/services/by-user/root/iptables-geoport.service#
    [Unit]
    Description=Apply the iptables geoport rules
    Wants=network-online.target
    After=network-online.target
    Requires=netfilter-persistent.service
    After=netfilter-persistent.service
    
    [Service]
    Type=simple
    # Answer Yes (default value) to dpkg-reconfigure.
    ExecStart=/usr/bin/bash -c '/home/jobs/scripts/by-user/root/iptables_geoport.py /home/jobs/scripts/by-user/root/iptables_geoport.yaml && [ -f /sbin/dpkg-reconfigure ] && /sbin/dpkg-reconfigure --frontend noninteractive iptables-persistent'
    
    User=root
    Group=root
    
  8. fix the permissions

    chmod 700 -R /home/jobs/scripts/by-user/iptables_geoport.*
    chmod 700 -R /home/jobs/services/by-user/root
    
  9. run the deploy script

SANE#

To be able to use a remote scanner with SANE using this setup you need to follow these steps

See also

  • SANE - ArchWiki [8]

  • Secure use of iptables and connection tracking helpers [7]

  1. open TCP and UDP ports 6566

    /home/jobs/scripts/by-user/root/iptables_geoport.yaml (extract)#
    65# source:       {lan,wan,all}
    66# protocol:     {tcp,udp,all}
    67input_ports:
    68  '2222':
    69    source:     'lan'
    70    protocol:   'tcp'
    71  '2223':
    72    source:     'lan'
    73    protocol:   'tcp'
    74  '5555':
    75    source:     'lan'
    76    protocol:   'tcp'
    77  '53':
    78    source:     'lan'
    79    protocol:   'all'
    80
    81  # CUPS.
    82  '631':
    83    source:     'lan'
    84    protocol:   'tcp'
    85
    86  '8100':
    87    source:     'lan'
    88    protocol:   'tcp'
    89
    90  # Required for SANE.
    91  '6566':
    92    source:     'lan'
    93    protocol:   'all'
    94
    95  # Move SSH rules here for performance reasons.
    96  '22':
    97    source:     'wan'
    98    protocol:   'tcp'
    
  2. add this file to load the kernel module

    /etc/modprobe.d/nf_conntrack.conf#
    options nf_conntrack nf_conntrack_helper=1
    
  3. add this file to load the kernel module

    /etc/modules-load.d/nf_conntrack_sane.conf#
    nf_conntrack_sane
    
  4. reboot

  5. check if the rules are active: 1 should be returned by the cat command

    cat /proc/sys/net/netfilter/nf_conntrack_helper
    1
    

Footnotes