laitimes

Remember a solution to data contamination caused by misuse of parent class attributes in an online security test

author:Senior Internet Architect

In order to improve the efficiency of port scanning, the scanning strategy is usually to detect whether commonly used ports are open, and use a property named all_open_ports in the parent class to record these open ports.

During subsequent testing, you need to check if the ports in question are included in the all_open_ports. If not, further open detection of these ports is required. If the detection results for a port are open, the test will continue and those ports will be logged into all_open_ports so that the next time you encounter the same port, you don't need to repeat the detection.

However, because security testing is multi-threaded, all_open_ports can be understood as a shared variable in some cases, which results in data contamination when security testing is performed simultaneously in two different test environments, affecting the accuracy of the final test results.

To solve this problem, the way variable all_open_ports is stored and accessed needs to be redesigned to ensure data independence and consistency in a multi-threaded environment.

All the code in the blog post is collected in the blogger's GitHub repository: http://github.com/sid10t/project-actual-code/tree/main

Scene reproduction

First, create a parent class, define a class property all_open_ports to record the ports that have been opened, and create a method check_port() to simulate port detection, as follows:

class Parent:
    all_open_ports = set()

    def __init__(self, args):
        self.all_open_ports.update(args.get("open_ports", []))

    def check_port(self, port):
        # 忽略端口扫描...
        if port not in self.all_open_ports:
            self.all_open_ports.add(port)
            print(f"{port} in all_open_ports, {self.all_open_ports}")
            pass           

Create a child class that inherits from the parent class, and construct the scan() method to simulate the security testing process, as follows:

import threading

from parent import Parent


class Child(Parent):
    def __init__(self, args):
        super().__init__(args)
        self.port = args.get("port")

    def scan(self):
        print(threading.current_thread().name, self.all_open_ports)
        self.check_port(self.port)
        pass           

Finally, create a test case, instantiate two Child objects, and run the object method scan() in a multithreaded manner to reproduce the scene, as shown below:

def test_thread():
    c1 = Child({"port": 3001, "open_ports": [22, 3000, 3306]})
    t1 = threading.Thread(target=c1.scan, name="Child_1")
    t1.start()
    t1.join()

    c2 = Child({"port": 5001, "open_ports": [80, 3306, 5000]})
    t2 = threading.Thread(target=c2.scan, name="Child_2")
    t2.start()
    t2.join()

    print("All tasks have finished!")           

Results:

Remember a solution to data contamination caused by misuse of parent class attributes in an online security test

Root cause analysis

The root cause of this problem is that in multi-threading, all_open_ports can be used as shared variables, causing data to be polluted with each other, which can affect the accuracy of the final test results.

Because all_open_ports is a class property defined in the parent class, it means that it is part of the parent class, which is shared by all derived classes (subclasses). In this way, all children of the parent class can access and update the all_open_ports property.

Whenever an instance of a child class is created, if the open_ports parameter is passed, then these ports are added to the all_open_ports collection, and in the check_port method in the parent class, determine whether a given port port exists in the all_open_ports collection, and if not, add the port to the collection. This way, all subclass instances can share and update this property.

Now let's change part of the code to output the address of all_open_ports when printing to determine if the same variable is used, as follows:

def scan(self):
    print(threading.current_thread().name, self.all_open_ports, "id:", id(self.all_open_ports))
    self.check_port(self.port)
    pass           

Results:

Remember a solution to data contamination caused by misuse of parent class attributes in an online security test

So is there any way to solve the current problem?

  • reinitialize the all_open_ports;
  • 上下文管理 contextvar;
  • 线程本地变量 thread.local;

Reinitialize the all_open_ports

The quickest way to reinitialize all_open_ports is to do it, but there is a problem that reinitializing all_open_ports will cause each Child object to have its own separate all_open_ports collection instead of sharing the same collection, which will result in duplicate detection ports, which is contrary to the original design.

Create a test case to look at the current usage of the all_open_ports collection, as shown below:

def test_init_set():
    c1 = Child({"port": 3001, "open_ports": [22, 3000, 3306]})
    c2 = Child({"port": 3002, "open_ports": [80, 443, 3306]})
    print("c1:", c1.all_open_ports, "c2:", c2.all_open_ports)
    c1.scan()
    print("c1:", c1.all_open_ports, "c2:", c2.all_open_ports)
    c2.scan()
    print("c1:", c1.all_open_ports, "c2:", c2.all_open_ports)           

Results:

Remember a solution to data contamination caused by misuse of parent class attributes in an online security test

Based on the results of the run, it can be seen that the all_open_ports collection can be considered as a shared variable in the current situation, even in different threads, and the Child object can share the all_open_ports collection.

In this case, modify the __init__ code in the parent class so that the all_open_ports collection is reinitialized at __init__, as follows:

def __init__(self, args):
    self.all_open_ports = set()
    self.all_open_ports.update(args.get("open_ports", []))           

Results:

Remember a solution to data contamination caused by misuse of parent class attributes in an online security test

According to the running results, it can be found that the all_open_ports in C1 and C2 are completely independent sets, and the addition operation from C1 to the all_open_ports set will not affect C2, which avoids data pollution, but will cause the ports that have been tested in C1 to be re-tested in C2, which is contrary to our original idea of designing the all_open_ports collection to improve efficiency.

上下文管理 contextvar

Let's talk about the conclusion first, it doesn't seem to work, I don't know if there is a problem with the way of thinking, I hope you will give me some advice!

Results:

Remember a solution to data contamination caused by misuse of parent class attributes in an online security test

The code looks like this:

class ParentContext:
    all_open_ports = contextvars.ContextVar("all_open_ports", default=set())

    def __init__(self, args):
        open_ports = self.all_open_ports.get()
        open_ports.update(args.get("open_ports", []))
        self.all_open_ports.set(open_ports)

    def check_port(self, port):
        all_open_ports_ = self.all_open_ports.get()
        if port not in all_open_ports_:
            all_open_ports_.add(port)
            self.all_open_ports.set(all_open_ports_)
            print(f"{print_prefix()} Port {port} is added to all_open_ports, {self.all_open_ports.get()}")


class ChildContext(ParentContext):
    def __init__(self, args):
        super().__init__(args)
        self.port = args.get("port")

    def scan(self, port=None):
        self.check_port(port or self.port)
        pass


def test_contextvars(open_ports, port):
    c1 = ChildContext({"port": port, "open_ports": open_ports})
    c1.scan()


if __name__ == '__main__':
    t1 = threading.Thread(target=test_contextvars, name="Child_1", args=([80, 3306, 5000], 5001,))
    t2 = threading.Thread(target=test_contextvars, name="Child_2", args=([22, 3306, 6000], 6001,))
    t1.start()
    t2.start()
    t1.join()
    t2.join()           

线程本地变量 thread.local

threading.local() is a class in the Python standard library that provides a mechanism for creating thread-local storage in a multithreaded environment. It allows each thread to have its own independent copy of variables that are isolated from each other and do not interfere with each other.

When multiple threads execute at the same time, they can access and modify their own thread-local variables without affecting the variables of other threads. This is useful for situations where you need to share data between threads, but you need to maintain data independence.

Next, we create a parent class, ParentLocal, and use threading.local() to store the collection all_open_ports, as follows:

class ParentLocal:
    local = threading.local()

    def __init__(self, args):
        self.local.all_open_ports = getattr(self.local, "all_open_ports", set())
        self.local.all_open_ports.update(args.get("open_ports", []))

    def check_port(self, port):
        if port not in self.local.all_open_ports:
            self.local.all_open_ports.add(port)
            print(f"{self.print_prefix()} Port {port} is added to all_open_ports, {self.local.all_open_ports}")

    def print_prefix(self):
        return f"[{time.strftime('%H:%M:%S', time.localtime())} {threading.current_thread().name}]"           

In the above code, the ParentLocal class defines the initialization method __init__ to get the value of self.local.all_open_ports via the getattr() function. If self.local.all_open_ports doesn't exist, an empty collection is created using set() and assigned to the self.local.all_open_ports. We then use the update() method to add the port in args.get("open_ports", []) to self.local.all_open_ports.

By using the ParentLocal class, we can create multiple instances in a multithreaded environment, and each instance has its own independent all_open_ports variable. This way, data between instances of different threads doesn't interfere with each other.

The Child class remains the same as before, with the following code:

class ChildLocal(ParentLocal):
    def __init__(self, args):
        super().__init__(args)
        self.port = args.get("port")

    def scan(self, port=None):
        self.check_port(port or self.port)
        pass           

In the above code, the ChildLocal class is a subclass that inherits from the ParentLocal class, and the inheritance relationship gives it access to the self.local.all_open_ports collection of the parent class. This allows ChildLocal instances to share data under the same thread without being affected by ChildLocal instances in other threads.

Write the test code like this:

def tset_local(open_ports, port):
    c1 = ChildLocal({"port": port, "open_ports": open_ports})
    c1.scan()
    args = {"port": generate_random_numbers(1)[0], "open_ports": generate_random_numbers(3)}
    print(threading.current_thread().name, args)
    c2 = ChildLocal(args)
    c2.scan()
    time.sleep(3)
    c1.scan(random.randint(8000, 9999))


if __name__ == '__main__':
    t1 = threading.Thread(target=tset_local, name="Child_1", args=([80, 3306, 5000], 5001,))
    t2 = threading.Thread(target=tset_local, name="Child_2", args=([22, 3306, 6000], 6001,))
    t1.start()
    t2.start()
    t1.join()
    t2.join()           

Results:

Remember a solution to data contamination caused by misuse of parent class attributes in an online security test

As we might expect, the ChildLocal instances in the Child1 and Child2 threads share data from the all_open_ports collection with each other, but the ChildLocal instances between different threads cannot share data with each other.

It's important to note that threading.local() objects have the same id value in different threads, because they're actually different instances of the same object. Each thread has its own separate threading.local() object, but they share the same class definition.

When a threading.local() object is created in different threads, a new instance is created for each thread, but the class definitions for those instances are the same. Therefore, their id values are the same.

postscript

Fortunately, we discovered the problem in time and did not cause a safety incident. Now we are sharing this experience in the hope that it can inspire other development teams to improve the security and stability of the system.

作者:sidiot

Link: https://juejin.cn/post/7384326861964476425

Read on