Temps de lecture : 8 minutes

Vos applications et middlewares produisent continuellement de précieux logs qui sont en général écrits dans des fichiers, éparpillés dans votre SI. Il est essentiel pour mener des analyses approfondies de les collecter et de les centraliser dans un système comme Elassandra – une distribution améliorée de Elasticsearch.

Si la centralisation des logs applicatifs est largement documentée et donc rapide à mettre en place, celle des équipements réseaux l’est beaucoup moins. Pour remédier à ce manque, nous allons voir étape par étape comment tirer profit des logs générés par votre pare-feu pour monitorer le trafic réseau de votre entreprise.

Vue d’ensemble

La solution que nous allons présenter est basée sur ELK. La suite de cet article assume que vous soyez déjà familier avec cette technologie.

Vue d'ensemble de la plateforme de logs

La procédure suivante a été testée sur un pare-feu de la marque Stormshield, mais devrait fonctionner avec tous les modèles capables d’envoyer leurs journaux via le protocole Syslog.

Étape 1 : Collecte des logs du pare-feu

À première vue, le pare-feu ressemble à une boite noire, pas très bavarde.

Un pare-feu stormshield
Heureusement, son interface de configuration permet d’activer l’envoi de traces réseaux vers un serveur distant, via le protocole Syslog.

Nous créons une VM ou un conteneur Linux qui va réceptionner les traces envoyées par le pare-feu et les persister dans un fichier en attendant de les traiter plus sérieusement. Pour cela nous installons Rsyslog :

# RedHat
yum install rsyslog

# Debian
apt-get install rsyslog

# Alpine
apk add rsyslog

et nous le configurons de la façon suivante :

# /etc/syslog-ng/conf.d/stormshield.conf

source stormshield {
    udp(port(514));
};
destination hosts {
    file("/var/log/remote/firewall/traces.log"  owner(root)
    group(root) perm(0600) dir_perm(0700) create_dirs(yes));
};
log { source(stormshield); destination(hosts); }

Après démarrage de Rsyslog, si tout est bien configuré, le fichier /var/log/remote/firewall/traces.log contiendra les logs du pare-feu. La suite de la procédure ne consiste plus qu’en la configuration d’un pipeline ELK, classique.

Étape 2 : Transport et transformation avec Filebeat et Logstash

Nous utilisons le couple Filebeat / Logstash pour transformer les logs et les transporter vers notre cluster Elassandra (ou Elasticsearch).

La configuration de Filebeat est très simple :

# /etc/filebeat/filebeat.yml
filebeat.prospectors:
- input_type: log
  paths:
  - /var/log/remote/firewall/traces.log
  fields:
    logstash_dest: firewall
  fields_under_root: true

output.logstash:
  hosts: ["localhost:5044"]

La configuration de Logstash est plus complexe car les traces obscures arrivant du pare-feu doivent être transformées en documents JSON intelligibles, enrichis d’informations utiles comme la géolocalisation des adresses IP.

Le fichier Logstash suivant est valide pour les pare-feux de la marque Stormshield qui envoient leurs traces au format WELF (WebTrends Enhanced Log file Format). Dans le cas où votre pare-feu n’utilise pas le même format, vous allez devoir écrire votre propre script Logstash.

input {
  beats {
    port => 5044
  }
}

filter {
  # extract the timestamp
  grok {
    match => {
      "message" => "%{MONTH}%{SPACE}%{MONTHDAY}%{SPACE}%{HOUR}:%{MINUTE}:%{SECOND}%{SPACE}%{IPV4:fw_ip}%{SPACE}%{NUMBER}%{SPACE}%{TIMESTAMP_ISO8601:timestamp_match}"
    }
  }
  date {
    match => [ "timestamp_match", "ISO8601"]
    remove_field => "timestamp_match"
  }

  # extract key=values pairs
  kv {
    trim_key => "\s"
    value_split => "="

    # remove unwanted fields
    # caution: " " before "id" is not a space but byte-mark-order character
    remove_field => ["id", "message", "startime", "time", "host", "source", "type", "input_type" ]
  }

  # split the CPU values if logtype is "monitor"
  if [logtype] == "monitor" {
    mutate {
      split => { "CPU" => "," }
      add_field => { "[cpu][kernel]" => "%{[CPU][0]}" }
      add_field => { "[cpu][user]" => "%{[CPU][1]}" }
      add_field => { "[cpu][interupt]" => "%{[CPU][2]}" }
    }
    mutate {
      convert => {
        "[cpu][user]" => "float"
        "[cpu][kernel]" => "float"
        "[cpu][interupt]" => "float"
      }
    }
  }

  # add a label for priorities
  translate {
    field => "pri"
    destination => "pri_label"
    dictionary => [
      "0", "emergency",
      "1", "alert",
      "2", "critical",
      "3", "error",
      "4", "warning",
      "5", "notice",
      "6", "information",
      "7", "debug"
    ]
  }

  # add a label for classifications
  translate {
    field => "classification"
    destination => "classification"
    dictionary => [
      "0", "application",
      "1", "malware",
      "2", "protection"
    ]
    override => true
  }

  # convert types
  mutate {
    convert => {
      "dstport" => "integer"
      "srcport" => "integer"
      "origdstport" => "integer"
      "modsrcport" => "integer"
      "pri" => "integer"
      "ipv" => "integer"
      "slotlevel" => "integer"
      "rcvd" => "integer"
      "sent" => "integer"
      "ruleid" => "integer"
      "duration" => "float"
    }
  }

  # geolocalize IP addresses SRC
  if "" in [src] {
    cidr {
      address => [ "%{[src]}" ]
      network => [ "0.0.0.0/32", "10.0.0.0/8", "172.16.0.0/12", "192.168.0.0/16", "fc00::/7", "127.0.0.0/8", "::1/128","169.254.0.0/16", "fe80::/10","224.0.0.0/4", "ff00::/8","255.255.255.255/32" ]
      add_tag => "src_private"
    }
    if "src_private" not in [tags] {
      geoip {
        add_tag => "src_geoip"
        source => "src"
        target => "tmp_geoip"
        add_field => { "src_location" => "%{[tmp_geoip][location][lon]}" }
        add_field => { "src_location" => "%{[tmp_geoip][location][lat]}" }
      }
      mutate {
        convert => { "src_location" => "float" }
      }
      mutate {
        remove_field => "tmp_geoip"
      }
    }
  }

  # geolocalize IP addresses DST
  if "" in [dst] {
    cidr {
      address => [ "%{[dst]}" ]
      network => [ "0.0.0.0/32", "10.0.0.0/8", "172.16.0.0/12", "192.168.0.0/16", "fc00::/7", "127.0.0.0/8", "::1/128","169.254.0.0/16", "fe80::/10","224.0.0.0/4", "ff00::/8","255.255.255.255/32" ]
      add_tag => "dst_private"
    }
    if "dst_private" not in [tags] {
      geoip {
        add_tag => "dst_geoip"
        source => "dst"
        target => "tmp_geoip"
        add_field => { "dst_location" => "%{[tmp_geoip][location][lon]}" }
        add_field => { "dst_location" => "%{[tmp_geoip][location][lat]}" }
      }
      mutate {
        convert => { "dst_location" => "float" }
      }
      mutate {
        remove_field => "tmp_geoip"
      }
    }
  }

  # geolocalize IP addresses ORIGDST
  if "" in [origdst] {
    cidr {
      address => [ "%{[origdst]}" ]
      network => [ "0.0.0.0/32", "10.0.0.0/8", "172.16.0.0/12", "192.168.0.0/16", "fc00::/7", "127.0.0.0/8", "::1/128","169.254.0.0/16", "fe80::/10","224.0.0.0/4", "ff00::/8","255.255.255.255/32" ]
      add_tag => "origdst_private"
    }
    if "origdst_private" not in [tags] {
      geoip {
        add_tag => "origdst_geoip"
        source => "origdst"
        target => "tmp_geoip"
        add_field => { "origdst_location" => "%{[tmp_geoip][location][lon]}" }
        add_field => { "origdst_location" => "%{[tmp_geoip][location][lat]}" }
      }
      mutate {
        convert => { "origdst_location" => "float" }
      }
      mutate {
        remove_field => "tmp_geoip"
      }
    }
  }

  # geolocalize IP addresses MODSRC
  if "" in [modsrc] {
    cidr {
      address => [ "%{[modsrc]}" ]
      network => [ "0.0.0.0/32", "10.0.0.0/8", "172.16.0.0/12", "192.168.0.0/16", "fc00::/7", "127.0.0.0/8", "::1/128","169.254.0.0/16", "fe80::/10","224.0.0.0/4", "ff00::/8","255.255.255.255/32" ]
      add_tag => "modsrc_private"
    }
    if "modsrc_private" not in [tags] {
      geoip {
        add_tag => "modsrc_geoip"
        source => "modsrc"
        target => "tmp_geoip"
        add_field => { "modsrc_location" => "%{[tmp_geoip][location][lon]}" }
        add_field => { "modsrc_location" => "%{[tmp_geoip][location][lat]}" }
      }
      mutate {
        convert => { "modsrc_location" => "float" }
      }
      mutate {
        remove_field => "tmp_geoip"
      }
    }
  }
}

output {
  elasticsearch {
    hosts => ["localhost:9200"]
    manage_template => true
    index => "firewall_%{logtype}_%{+YYYY_MM}"
    document_type => "logs"
  }
  stdout { codec => rubydebug }
}

Revenons sur la géolocalisation des adresses IP :

if "" in [src] {
  cidr {
    address => [ "%{[src]}" ]
    network => [ "0.0.0.0/32", "10.0.0.0/8", "172.16.0.0/12", "192.168.0.0/16", "fc00::/7", "127.0.0.0/8", "::1/128","169.254.0.0/16", "fe80::/10","224.0.0.0/4", "ff00::/8","255.255.255.255/32" ]
    add_tag => "src_private"
  }
  if "src_private" not in [tags] {
    geoip {
      add_tag => "src_geoip"
      source => "src"
      target => "tmp_geoip"
      add_field => { "src_location" => "%{[tmp_geoip][location][lon]}" }
      add_field => { "src_location" => "%{[tmp_geoip][location][lat]}" }
    }
    mutate {
      convert => { "src_location" => "float" }
    }
    mutate {
      remove_field => "tmp_geoip"
    }
  }
}

Les adresses IP internes sont exclues avec le filtre cidr, le reste est géolocalisé en utilisant le plugin Geoip filter qui interroge la base Geolite2 embarquée localement. Cette opération est répétée pour les 4 champs des adresses IP : src, dst, modsrc et, origdst.

Ne démarrez pas Logstash tout de suite, il faut d’abord préparer notre instance Elassandra.

Étape 3 : Configuration des indexes Elassandra

La différence entre Elasticsearch et Elassandra réside principalement dans la façon dont les données sont distribuées. Un cluster Elassandra fonctionne en mode peer-to-peer : tous les replicas peuvent enregistrer les écritures, éliminant ainsi le single-point-of-write de Elasticsearch. Par ailleurs, Elassandra économise de l’espace de stockage en utilisant un format binaire plus compressé pour la sérialisation des données sources.

Dans la configuration Logstash, nous avons indiqué le nom de l’index à utiliser : firewall_%{logtype}_%{+YYYY_MM}. C’est un pattern, cela signifie qu’un nouvel index sera créé chaque mois.

Le typage des champs Elasticsearch est inféré automatiquement à l’insertion du premier document. Néanmoins, nous aimerions être certains que les adresses IP et les coordonnées GPS soient reconnues comme telles. Nous allons donc définir un mapping par défaut pour tous nos futurs indexes.

Nous utilisons pour cela l’API template d’Elasticsearch :

curl -X PUT -H Content-Type:application/json localhost:9200/_template/firewall -d'
{
  "template": "firewall_*",
  "version": 50001,
  "settings": {
    "index.refresh_interval": "5s"
  },
  "mappings": {
    "_default_": {
      "properties": {
        "src_location": {
          "type": "geo_point"
        },
        "dst_location": {
          "type": "geo_point"
        },
        "origdst_location": {
          "type": "geo_point"
        },
        "modsrc_location": {
          "type": "geo_point"
        },
        "src": {
          "type": "ip"
        },
        "dst": {
          "type": "ip"
        },
        "origdst": {
          "type": "ip"
        },
        "modsrc": {
          "type": "ip"
        }
      }
    }
  }
}'

La ligne "index.refresh_interval": "5s" permet d’économiser des ressources en diminuant le délai de rafraîchissement des indexes (par défaut 1 seconde).

Tous les composants – Rsyslog, Filebeat, Logstash et Elassandra – peuvent maintenant être démarrés.

Étape 4 : Visualisation avec Kibana

Il nous reste la partie la plus amusante : concevoir des visualisations et analyser les données avec Kibana.

À titre d’exemple voici quelques visualisations que l’on peut assembler pour créer un dashboard interactif :

  • L’évolution du trafic par rapport au temps

Visualisation du trafic par rapport au temps

  • La répartition géographique des adresses IP externes

Visualisation de la répartition géographique

  • Le top 5 des adresses IP sources et destinataires

Visualisation du top 5 des adresses IP

  • Le nombre d’adresses IP détectées par VLAN

Visualisation du nombre d’adresses IP par VLAN

  • La répartition des types d’événements

Visualisation de la repartition des types d'évenements

Haute disponibilité

Elassandra est un fork d’Elasticsearch modifié pour fonctionner à l’intérieur de Cassandra, le moteur NoSQL massivement distribué utilisé par Netflix, Uber ou encore Spotify.

Nous avons déployé Elassandra sur 2 serveurs, en prenant soin d’avoir un facteur de réplication cassandra à 2 (chaque noeud héberge 100% des données). Avec un Elasticsearch standard, le risque de ce genre de configuration à 2 nœuds est d’avoir un split brain, c’est à dire que les 2 nœuds Elasticsearch perdant temporairement la connectivité, se mettent à écrire chacun de leur coté des données dans les mêmes shards. Après l’incident, comme il est impossible de fusionner les documents des shards Elasticsearch ayant divergé, cela conduit à une perte de données. Dans le cas d’Elassandra, Cassandra rejoue les hint handoff, c’est à dire les données mises en attente durant 3h à destination de l’autre noeud. Si l’incident a duré plus de 3h, un cassandra repair resynchronisera les deux noeuds, et dans les deux cas, on ne pert pas de logs !

Conclusion

La suite ELK est aussi bien adaptée à la centralisation des logs applicatifs que des traces réseaux, moyennant l’intégration d’un serveur Syslog et le développement de scripts Logstash spécifiques.

Une fois les logs centralisées dans Elassandra, Kibana nous permet de construire des tableaux de bord interactifs pour faire du monitoring ou du troubleshooting. Il est également possible d’utiliser des outils comme Elastalert pour détecter des comportements anormaux (pannes, attaques DDOS, localisations suspectes, etc) et envoyer des alertes automatiquement.

Par ailleurs, pour sécuriser l’accès à ces logs, nous avons déployé le plugin Elassandra Enterprise qui assure le chiffrement TLS des connexions, l’authentification et la traçabilité des accès Elasticsearch. Pour nos administrateurs, l’utilisation de Kibana requière alors une authentification Cassandra pour accéder aux indexes Elasticsearch.