Contents

🕵️ IP2ROOT Partie 1 : Déploiement automatisé d'un C2 via Python et Docker SDK

Introduction

Durant cette série de trois articles nous mettrons en lumière différentes briques techniques de notre projet, ip2root.

Contexte

ip2root est un projet que nous menons au cours de notre 5ème année du cycle Ingénieur Cyberdéfense à l’ENSIBS. Les objectifs du projet sont les suivants :

  • fournir aux équipes de pentest ou Red Team, un outil clé en main permettant d’exploiter automatiquement des vulnérabilités communes afin de gagner du temps
  • A partir d’une adresse réseau ou une plage réseau fournie en entrée, l’outil retourne :
    • un accès en ligne de commande (privilégié si l’escalade de privilège a fonctionné) aux machines compromises
    • un rapport d’exploitation, le tout sans aucune interaction de l’utilisateur

Le projet est open-source et disponible sur Github : https://github.com/ip2root/ip2root

Organisation

Les trois articles de cette série aborderont les thématiques suivantes :

  1. Déploiement automatisé d’un C2 (Serveur de commande et de contrôle) via Python et Docker SDK
  2. Reconnaissance des services grâce aux outils Nmap ainsi que Masscan et corrélation avec les exploits implémentés pour obtenir un accès intial
  3. Enumérer les possibilités d’élévation de privilèges sous Linux et Windows avec Python

Command and Control (C2)

Glossaire

C2 : Une infrastructure Commande et Contrôle, aussi appelée C2 ou C&C, est l’infrastructure utilisée par les attaquants pour maintenir la communication avec des appareils compromis, à la suite d’une première exploitation. (A noter que nous ne souhaitons pas aider les attaquants mais nous avons besoin de nous placer du côté offensif afin de déceler les failles d’un SI donné)

Stager : Une charge utile à executer sur la machine compromise afin que cette dernière puisse recevoir et exécuter les commandes du C2.

Listener : Outil permettant de recevoir des connexions sur un port donné.


Entrons dans le vif du sujet, comment avons-nous choisi notre C2 ? Comment avons-nous automatisé son déploiement ?

Besoins et objectifs du C2

L’outil était initialement prévu pour gérer une seule adresse réseau et donc un seul accès en ligne de commande. Nous sommes alors partis sur un simple listener. Nous avons rapidement observé les limites de cette méthode :

  • complexité d’envoi de fichiers
  • complexité de gestion de plusieurs accès
  • sujets à des bugs (mineurs)

En plus de ces problèmes, nous avons décidé de permettre à l’utilisateur de fournir une plage réseau en entrée. Il nous faut donc pouvoir gérer plusieurs accès en ligne de commande et ceci n’est pas possible avec un simple listener. Pour ce faire, nous avons décidé de déployer un C2 permettant de gérer tous nos accès en ligne de commande.

Le C2 doit répondre à ces besoins afin qu’il corresponde au projet :

  • API exposée -> interagir facilement via notre outil
  • Stagers compatibles linux et windows -> garder l’exhaustivité de notre outil
  • Déployable via Docker -> simplifier l’utilisation de l’outil
  • Intuitif -> tous les utilisateurs de l’outil ne sont pas forcément familiers avec les C2

Après avoir épluché les différentes offres présentes sur le marché, nous avons (notamment) le choix entre Covenant et PS Empire.

Nous choisissons PS Empire car nous avons une (petite) expérience sur ce dernier.

Automatisation du déploiement

Il est maintenant l’heure de mettre les mains dans le cambouis.

La première étape est de vérifier si l’utilisateur ne possède pas déjà un C2 de déployé sur sa machine (dans le cas ou notre outil a déjà été utilisé par ce dernier).

Nous commençons par importer la librairie docker avec pip3 install docker et import docker dans notre code.

Dans notre code, nous pouvons désormais initialiser le client Docker :

1
client = docker.from_env()

On itère dans la liste de ceux présents afin de chercher si un conteneur empire existe déjà :

1
2
3
4
5
6
for i in range(len(client.containers.list(all))):
        container = client.containers.get(client.containers.list(all)[i].__getattribute__('short_id'))
        if "empire" in container.attrs['Config']['Image']:
            print('[+] Detected an existing C2 container')
            is_up = True
            client.containers.list(all)[i].start()

Et on le démarre s’il existe : start().

Si aucun conteneur Empire n’existe, nous appelons la fonction permettant d’en déployer un de zéro :

1
infos = deploy_c2()

On crée et démarre le conteneur avec les paramètres souhaités :

  • Image : bcsecurity/empire:latest
  • Ports : 1337:1337, 5000:5000 et 8888:8888
  • Nom : empire
  • Interactif : oui
  • Détaché : oui

Ce qui nous donne en Python avec Docker SDK :

1
container = client.containers.run(image='bcsecurity/empire:latest', ports={'1337/tcp':1337, '5000/tcp':5000, '8888/tcp':8888}, name='empire', tty=True, detach=True)

Une fois que nous avons un conteneur déployé, que ce soit parce qu’il l’était déjà ou après l’avoir créé, il nous faut récupérer le token permettant d’interagir avec l’API REST.

Nous appelons la fonction permettant de récupérer le token de la sorte :

1
 token = c2_token()

Dans la fonction associée, nous commençons par définir l’endpoint de l’API à joindre ainsi que les en-têtes de requête et paramètres à fournir :

1
2
3
4
5
def c2_token():
    url_c2 = 'https://localhost:1337/api/admin/login'
    headers = {"Content-Type": "application/json"}
    param = {"username":"empireadmin", "password":"password123"}
    r_c2 = requests.post(url_c2, headers=headers, json=param, verify=False)

Puis, nous récupérons le token dans la réponse de la requête POST :

1
2
3
json_token = json.loads(r_c2.text)
token = json_token['token']
return token

Maintenant que nous avons récupéré le token, nous pouvons communiquer plus en détail avec l’API du C2.

Afin que le C2 soit opérationnel il nous reste deux tâches à effectuer :

  • Créer un listener (permet de récupérer l’accès déclenché par l’exploitation d’une vulnérabilité)
  • Créer une fonction permettant de générer un stager adapté (charge utile permettant de joindre le serveur de contrôle via le listener)

Pour la première tâche, comme précédemment, nous spécifions l’endpoint de l’API à joindre avec les paramètres requis pour la création d’un listener (nom et port et adresse réseau) :

1
2
3
4
 url_listener = 'https://localhost:1337/api/listeners/http?token={0}'.format(str(token))
param_listener = {"Name":"CLIHTTP", "Port":"8888", "Host":"{0}".format(LOCAL_IP)}
headers = {"Content-Type": "application/json"}
requests.post(url_listener, headers=headers, json=param_listener, verify=False)

Ici, notre listener se nommera CLIHTTP et sera disponible sur le port 8888.

Pour la fonction permettant de générer les stagers, nous définissons les mêmes prérequis pour effectuer la requête sur notre C2 :

1
2
3
4
url_stager = 'https://localhost:1337/api/stagers?token={0}'.format(token)
param_stager = {"StagerName":"{0}".format(system), "Listener":"CLIHTTP"}
headers_stager = {"Content-Type": "application/json"}
r_stager = requests.post(url_stager, headers=headers_stager, json=param_stager, verify=False)

Le nom du stager choisi ("StagerName":"{0}".format(system)) est défini par le paramètre OS entré dans le fichier de configuration de la vulnérabilité exploitée : soit multi/bash pour un Linux, soit multi/launcher pour un Windows. On associe le stager à notre listener : CLIHTTP.

On récupère le payload et on l’encode en base64 afin de faciliter l’envoi du payload (retour à la ligne, caractères spéciaux, etc.) :

1
2
3
4
5
payload = json.loads(r_stager.text)
payload = payload[system]['Output']
message_bytes = payload.encode('ascii')
rs_b64 = base64.b64encode(message_bytes)
return rs_b64

Une fois tout ceci fait, nous avons finalisé le déploiement automatique et transparent du C2 pour l’utilisateur.

Client

Pour administrer le C2 et commander les machines infectées qui sont inscrites dessus, il faut un client.

Nous utiliserons Starkiller. Toujours dans l’optique de faciliter la vie de l’utilisateur, nous vérifions s’il existe déjà dans le répertoire /tmp de notre utilisateur, sinon, nous le téléchargeons depuis github:

1
2
3
4
5
6
 STARKILLER_PATH = '/tmp/starkiller'
if not os.path.exists(STARKILLER_PATH):
    STARKILLER_URL = 'https://github.com/BC-SECURITY/Starkiller/releases/download/v1.10.0/starkiller-1.10.0.AppImage'
    print('[+] Downloading starkiller in {} from {}'.format(STARKILLER_PATH, STARKILLER_URL))
    r_github = requests.get(STARKILLER_URL, allow_redirects=True)
    open(STARKILLER_PATH, 'wb').write(r_github.content)

Une fois qu’il est téléchargé, nous appliquons les droits d’exécution sur le binaire puis nous le démarrons.

1
2
subprocess.Popen(["chmod", "+x", STARKILLER_PATH])
subprocess.Popen([STARKILLER_PATH])

L’utilisateur n’a alors plus qu’à entrer ses identifiants pour administrer le C2 : https://md.floppy.sh/uploads/1ca17bc5-4576-42a2-b1d2-c13514009c53.png


Conclusion

Pour conclure, nous avons pu mettre en oeuvre l’automatisation du déploiement d’un C2 pour notre projet à l’aide de Python, Docker SDK et de PS Empire. Cela permet de faciliter la vie de l’utilisateur et de lui faire gagner du temps lors d’audits de sécurité.

Le code complet est disponible sur le Github du projet à l’adresse suivante : https://github.com/ip2root/ip2root/blob/dev/c2/c2.py

A noter que l’implémentation du C2 est encore en développement et l’outil n’est pas encore complètement compatible avec. La version fonctionnelle de notre outil (sans le C2 pour l’instant) est disponible ici : https://github.com/ip2root/ip2root Le prochain article traitera de la partie Reconnaissance des services grâce aux outils Nmap et Masscan et corrélation avec les exploits implémentés pour obtenir un accès intial avec les fichiers de configuration Python

Annexe

Code complet

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
import docker
import os
import subprocess
import requests
import json
import base64
from time import sleep
import urllib3
import sys
import more_itertools
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)

def c2(LOCAL_IP) -> None | str:
    check_docker()
    is_up = False
    client = docker.from_env()

    for i in range(len(client.containers.list(all))):
        container = client.containers.get(client.containers.list(all)[i].__getattribute__('short_id'))
        if "empire" in container.attrs['Config']['Image']:
            print('[+] Detected an existing C2 container')
            is_up = True
            client.containers.list(all)[i].start()
            for c in more_itertools.ncycles(['|', '/', '-', '\\'], 100):
                logs = '   ' + str(container.logs(tail=1).decode('utf-8'))
                sys.stdout.write('\033[2K\r[+] Starting the Docker... ' + c)
                if "WARNING: your terminal doesn't support" not in logs:
                    sys.stdout.write((logs.replace('\r', '')).replace('\n', ''))
                sys.stdout.flush()
                sleep(0.1)
                if 'Plugin csharpserver ran successfully!' in container.logs(tail=3).decode('utf-8'):
                    print('\n[+] C2 started successfully from existing docker (ID: {0})'.format(client.containers.list(all)[i].short_id))
                    break
            token = c2_token()
            print('[+] C2 token : {0}'.format(token))
            print('[+] Listening on port 8888 (CLIHTTP)')
            return client.containers.list(all)[i].short_id, token
    if not is_up:
        infos = deploy_c2(LOCAL_IP)
        return infos

def check_docker():
    res_docker = subprocess.check_output('which docker', shell=True, universal_newlines=True)
    if 'docker' not in res_docker:
        print('[-] Docker is not installed on your system. Please intall it from https://docs.docker.com')
        exit
    res_group = subprocess.check_output('id -nG "$(whoami)" | grep -qw "docker" && echo 1 || echo 0 && id -nG "$(whoami)" | grep -qw "root" && echo 1 || echo 0', shell=True, universal_newlines=True)
    if '1' not in res_group:
        print('[-] Your current user is not part of the docker group. Add it or start ip2root with a user that is part of the docker group.')
        exit

def starkiller():
    STARKILLER_PATH = '/tmp/starkiller'
    if not os.path.exists(STARKILLER_PATH):
        STARKILLER_URL = 'https://github.com/BC-SECURITY/Starkiller/releases/download/v1.10.0/starkiller-1.10.0.AppImage'
        print('[+] Downloading starkiller in {} from {}'.format(STARKILLER_PATH, STARKILLER_URL))
        r_github = requests.get(STARKILLER_URL, allow_redirects=True)
        open(STARKILLER_PATH, 'wb').write(r_github.content)
    subprocess.Popen(["chmod", "+x", STARKILLER_PATH])
    subprocess.Popen([STARKILLER_PATH])

def deploy_c2(LOCAL_IP):
    print('[+] Deploying C2 container')
    C2_LISTENER_PORT = 8888
    client = docker.from_env()
    container = client.containers.run(image='bcsecurity/empire:latest', ports={'1337/tcp':1337, '5000/tcp':5000, '{}/tcp'.format(C2_LISTENER_PORT):C2_LISTENER_PORT}, name='empire', tty=True, detach=True)
    for c in more_itertools.ncycles(['|', '/', '-', '\\'], 100):
        logs = '   ' + str(container.logs(tail=1).decode('utf-8'))
        sys.stdout.write('\033[2K\r[+] Starting the Docker... ' + c)
        sys.stdout.write((logs.replace('\r', '').replace('\n', '')))
        sys.stdout.flush()
        sleep(0.1)
        if 'Plugin csharpserver ran successfully!' in logs:
            print('\n[+] C2 created successfully')
            break
    token = c2_token()
    c2_listener(token, LOCAL_IP)
    print('[+] C2 container created successfully (Docker ID : {0})'.format(container.short_id))
    print('[+] Listener created on port {} (CLIHTTP)'.format(C2_LISTENER_PORT))
    return container.short_id, token

def c2_token():
    url_c2 = 'https://localhost:1337/api/admin/login'
    headers = {"Content-Type": "application/json"}
    param = {"username":"empireadmin", "password":"password123"}
    r_c2 = requests.post(url_c2, headers=headers, json=param, verify=False)
    json_token = json.loads(r_c2.text)
    token = json_token['token']
    return token

def c2_listener(token, LOCAL_IP):
    url_listener = 'https://localhost:1337/api/listeners/http?token={0}'.format(str(token))
    param_listener = {"Name":"CLIHTTP", "Port":"8888", "Host":"{0}".format(LOCAL_IP)}
    headers = {"Content-Type": "application/json"}
    requests.post(url_listener, headers=headers, json=param_listener, verify=False)

def get_stager(system, token):
    if system == 'linux':
        system = 'multi/bash'
    elif system == 'windows':
        system = 'multi/launcher'
    else:
        return "Error: OS not defined."

    url_stager = 'https://localhost:1337/api/stagers?token={0}'.format(token)
    param_stager = {"StagerName":"{0}".format(system), "Listener":"CLIHTTP"}
    headers_stager = {"Content-Type": "application/json"}
    r_stager = requests.post(url_stager, headers=headers_stager, json=param_stager, verify=False)
    payload = json.loads(r_stager.text)
    payload = payload[system]['Output']
    message_bytes = payload.encode('ascii')
    rs_b64 = base64.b64encode(message_bytes)

    return rs_b64