I’ve been using CI/CD pipelines for years now - GitLab CI, GitHub Actions, Jenkins, you name it. Like most developers, I treated them as configuration files that “just work.” You write a YAML file, push it, and somehow your code gets built, tested, and deployed. But I never really understood what was happening behind the scenes.

That changed when I needed to set up CI/CD for an air-gapped environment at work - no access to GitHub Actions or GitLab’s hosted runners. I needed to understand how these tools actually work under the hood so I could build something custom. That’s when I realized: pipeline runners are just orchestration tools that execute jobs in isolated environments.

In this post, I’ll show you how to build a complete CI/CD pipeline runner from scratch in Python. We’ll implement the core features you use every day: stages, parallel execution, job dependencies, and artifact passing. By the end, you’ll understand exactly how GitLab Runner and GitHub Actions work internally.

What Actually IS a CI/CD Pipeline?

Before we start coding, let’s understand what a CI/CD pipeline actually does.

A CI/CD pipeline is an automated workflow that takes your code from commit to deployment. It’s defined as a series of jobs organized into stages:

stages:
  - build
  - test
  - deploy

build-job:
  stage: build
  script:
    - python -m build

test-job:
  stage: test
  script:
    - pytest tests/

deploy-job:
  stage: deploy
  script:
    - ./deploy.sh

When you push code, a pipeline runner (like GitLab Runner or GitHub Actions) does this:

  1. Parses the pipeline configuration file
  2. Creates a dependency graph of jobs
  3. Executes jobs in isolated environments (usually containers)
  4. Streams logs back to you in real-time
  5. Passes artifacts between jobs
  6. Reports success or failure

The key insight: a pipeline runner is just a job orchestrator. It figures out what to run, in what order, and handles the execution.

Let’s build one ourselves.

Understanding Pipeline Components

Every pipeline has three main components:

1. Stages

Stages define the execution order. Jobs in the same stage can run in parallel, but stages run sequentially:

Stage 1: build     → Stage 2: test        → Stage 3: deploy
  [build-app]        [unit-test]             [deploy-prod]
                     [integration-test]

2. Jobs

Jobs are the actual work units. Each job:

  • Runs in an isolated environment (container)
  • Executes a series of shell commands
  • Can depend on other jobs
  • Can produce artifacts for other jobs

3. Artifacts

Artifacts are files produced by one job that other jobs need. For example:

  • Build job produces dist/ folder
  • Test jobs need dist/ to run tests
  • Deploy job needs dist/ to deploy

Now that we understand the concepts, let’s start building.

Building Our Pipeline Runner

Version 1: Single Job Executor

Let’s start with the absolute basics - executing a single job in a Docker container:

#!/usr/bin/env python3
"""
Pipeline Runner v1: Single job executor
Executes one job in a Docker container and streams logs
"""

import yaml
import subprocess
import sys
from pathlib import Path


class Job:
    """Represents a single pipeline job."""

    def __init__(self, name, config):
        self.name = name
        self.image = config.get('image', 'python:3.11')
        self.script = config.get('script', [])
        self.stage = config.get('stage', 'test')

    def __repr__(self):
        return f"Job({self.name}, stage={self.stage})"


class JobExecutor:
    """Executes a job in a Docker container."""

    def __init__(self, workspace):
        self.workspace = Path(workspace).resolve()

    def run(self, job):
        """Execute a job and stream output."""
        print(f"\n{'='*60}")
        print(f"[{job.name}] Starting job...")
        print(f"[{job.name}] Image: {job.image}")
        print(f"[{job.name}] Stage: {job.stage}")
        print(f"{'='*60}\n")

        # Combine all script commands into one shell command
        script = ' && '.join(job.script)

        # Build docker run command
        cmd = [
            'docker', 'run',
            '--rm',  # Remove container after execution
            '-v', f'{self.workspace}:/workspace',  # Mount workspace
            '-w', '/workspace',  # Set working directory
            job.image,
            'sh', '-c', script
        ]

        try:
            # Run and stream output in real-time
            process = subprocess.Popen(
                cmd,
                stdout=subprocess.PIPE,
                stderr=subprocess.STDOUT,
                text=True,
                bufsize=1
            )

            # Stream output line by line
            for line in process.stdout:
                print(f"[{job.name}] {line}", end='')

            process.wait()

            if process.returncode == 0:
                print(f"\n[{job.name}] ✓ Job completed successfully\n")
                return True
            else:
                print(f"\n[{job.name}] ✗ Job failed with exit code {process.returncode}\n")
                return False

        except Exception as e:
            print(f"\n[{job.name}] ✗ Error: {e}\n")
            return False


class Pipeline:
    """Represents a pipeline with jobs."""

    def __init__(self, config_file):
        self.config_file = Path(config_file)
        self.config = self._load_config()
        self.jobs = self._parse_jobs()

    def _load_config(self):
        """Load and parse YAML configuration."""
        with open(self.config_file) as f:
            return yaml.safe_load(f)

    def _parse_jobs(self):
        """Parse jobs from configuration."""
        jobs = []
        for job_name, job_config in self.config.items():
            if job_name != 'stages' and isinstance(job_config, dict):
                jobs.append(Job(job_name, job_config))
        return jobs

    def run(self, workspace='.'):
        """Execute all jobs sequentially."""
        print(f"\nStarting pipeline from {self.config_file}")
        print(f"Found {len(self.jobs)} job(s)\n")

        executor = JobExecutor(workspace)

        for job in self.jobs:
            success = executor.run(job)
            if not success:
                print("Pipeline failed!")
                return False

        print("✓ Pipeline completed successfully!")
        return True


if __name__ == "__main__":
    if len(sys.argv) < 2:
        print("Usage: python runner_v1.py <pipeline.yml>")
        sys.exit(1)

    pipeline = Pipeline(sys.argv[1])
    success = pipeline.run()
    sys.exit(0 if success else 1)

Testing Version 1

Create a simple pipeline config:

# pipeline.yml
build-job:
  image: python:3.11
  script:
    - echo "Building application..."
    - python --version
    - pip install --quiet build
    - echo "Build complete!"

Run it:

python runner_v1.py pipeline.yml

You should see output like:

Starting pipeline from pipeline.yml
Found 1 job(s)

============================================================
[build-job] Starting job...
[build-job] Image: python:3.11
[build-job] Stage: test
============================================================

[build-job] Building application...
[build-job] Python 3.11.9
[build-job] Build complete!

[build-job] ✓ Job completed successfully

✓ Pipeline completed successfully!

Great! We can execute a single job. But real pipelines have multiple stages. Let’s add that.

Version 2: Multi-Stage Pipeline with Sequential Execution

Now let’s add support for stages. Jobs in different stages should run in order:

#!/usr/bin/env python3
"""
Pipeline Runner v2: Multi-stage support
Executes jobs stage by stage in sequential order
"""

import yaml
import subprocess
import sys
from pathlib import Path
from collections import defaultdict


class Job:
    """Represents a single pipeline job."""

    def __init__(self, name, config):
        self.name = name
        self.image = config.get('image', 'python:3.11')
        self.script = config.get('script', [])
        self.stage = config.get('stage', 'test')

    def __repr__(self):
        return f"Job({self.name}, stage={self.stage})"


class JobExecutor:
    """Executes a job in a Docker container."""

    def __init__(self, workspace):
        self.workspace = Path(workspace).resolve()

    def run(self, job):
        """Execute a job and stream output."""
        print(f"[{job.name}] Starting job...")

        script = ' && '.join(job.script)

        cmd = [
            'docker', 'run', '--rm',
            '-v', f'{self.workspace}:/workspace',
            '-w', '/workspace',
            job.image,
            'sh', '-c', script
        ]

        try:
            process = subprocess.Popen(
                cmd,
                stdout=subprocess.PIPE,
                stderr=subprocess.STDOUT,
                text=True,
                bufsize=1
            )

            for line in process.stdout:
                print(f"[{job.name}] {line}", end='')

            process.wait()

            if process.returncode == 0:
                print(f"[{job.name}] ✓ Job completed successfully")
                return True
            else:
                print(f"[{job.name}] ✗ Job failed with exit code {process.returncode}")
                return False

        except Exception as e:
            print(f"[{job.name}] ✗ Error: {e}")
            return False


class Pipeline:
    """Represents a pipeline with stages and jobs."""

    def __init__(self, config_file):
        self.config_file = Path(config_file)
        self.config = self._load_config()
        self.stages = self.config.get('stages', ['test'])
        self.jobs = self._parse_jobs()

    def _load_config(self):
        """Load and parse YAML configuration."""
        with open(self.config_file) as f:
            return yaml.safe_load(f)

    def _parse_jobs(self):
        """Parse jobs from configuration."""
        jobs = []
        for job_name, job_config in self.config.items():
            if job_name != 'stages' and isinstance(job_config, dict):
                jobs.append(Job(job_name, job_config))
        return jobs

    def _group_jobs_by_stage(self):
        """Group jobs by their stage."""
        stages = defaultdict(list)
        for job in self.jobs:
            stages[job.stage].append(job)
        return stages

    def run(self, workspace='.'):
        """Execute pipeline stage by stage."""
        print(f"\n{'='*60}")
        print(f"Starting pipeline: {self.config_file.name}")
        print(f"Stages: {' → '.join(self.stages)}")
        print(f"Total jobs: {len(self.jobs)}")
        print(f"{'='*60}\n")

        executor = JobExecutor(workspace)
        stages_with_jobs = self._group_jobs_by_stage()

        # Execute stages in order
        for stage in self.stages:
            stage_jobs = stages_with_jobs.get(stage, [])

            if not stage_jobs:
                continue

            print(f"\n{'─'*60}")
            print(f"Stage: {stage} ({len(stage_jobs)} job(s))")
            print(f"{'─'*60}\n")

            # Execute all jobs in this stage (sequentially for now)
            for job in stage_jobs:
                success = executor.run(job)
                if not success:
                    print(f"\n✗ Pipeline failed at stage '{stage}'")
                    return False

        print(f"\n{'='*60}")
        print("✓ Pipeline completed successfully!")
        print(f"{'='*60}\n")
        return True


if __name__ == "__main__":
    if len(sys.argv) < 2:
        print("Usage: python runner_v2.py <pipeline.yml>")
        sys.exit(1)

    pipeline = Pipeline(sys.argv[1])
    success = pipeline.run()
    sys.exit(0 if success else 1)

Testing Version 2

Create a multi-stage pipeline:

# pipeline.yml
stages:
  - build
  - test
  - deploy

build-job:
  stage: build
  image: python:3.11
  script:
    - echo "Building application..."
    - mkdir -p dist
    - echo "v1.0.0" > dist/version.txt

test-job:
  stage: test
  image: python:3.11
  script:
    - echo "Running tests..."
    - python -c "print('All tests passed!')"

deploy-job:
  stage: deploy
  image: alpine:latest
  script:
    - echo "Deploying application..."
    - cat dist/version.txt
    - echo "Deployment complete!"

Run it:

python runner_v2.py pipeline.yml

Now we have stages! But notice that if you have multiple jobs in the test stage, they still run sequentially. In real CI/CD, jobs in the same stage run in parallel. Let’s add that.

Version 3: Parallel Job Execution

Real pipeline runners execute jobs in the same stage in parallel. Let’s implement this using Python’s multiprocessing:

#!/usr/bin/env python3
"""
Pipeline Runner v3: Parallel execution
Executes jobs within a stage in parallel using multiprocessing
"""

import yaml
import subprocess
import sys
from pathlib import Path
from collections import defaultdict
from multiprocessing import Pool, Manager
from functools import partial


class Job:
    """Represents a single pipeline job."""

    def __init__(self, name, config):
        self.name = name
        self.image = config.get('image', 'python:3.11')
        self.script = config.get('script', [])
        self.stage = config.get('stage', 'test')

    def __repr__(self):
        return f"Job({self.name}, stage={self.stage})"


class JobExecutor:
    """Executes a job in a Docker container."""

    def __init__(self, workspace):
        self.workspace = Path(workspace).resolve()

    def run(self, job, output_queue=None):
        """Execute a job and stream output."""

        def log(msg):
            """Log a message (to queue if available, else stdout)."""
            if output_queue:
                output_queue.put(msg)
            else:
                print(msg)

        log(f"[{job.name}] Starting job...")

        script = ' && '.join(job.script)

        cmd = [
            'docker', 'run', '--rm',
            '-v', f'{self.workspace}:/workspace',
            '-w', '/workspace',
            job.image,
            'sh', '-c', script
        ]

        try:
            process = subprocess.Popen(
                cmd,
                stdout=subprocess.PIPE,
                stderr=subprocess.STDOUT,
                text=True,
                bufsize=1
            )

            for line in process.stdout:
                log(f"[{job.name}] {line.rstrip()}")

            process.wait()

            if process.returncode == 0:
                log(f"[{job.name}] ✓ Job completed successfully")
                return (job.name, True, None)
            else:
                error_msg = f"Exit code {process.returncode}"
                log(f"[{job.name}] ✗ Job failed: {error_msg}")
                return (job.name, False, error_msg)

        except Exception as e:
            error_msg = str(e)
            log(f"[{job.name}] ✗ Error: {error_msg}")
            return (job.name, False, error_msg)


def run_job_parallel(job, workspace, output_queue):
    """Helper function for parallel execution."""
    executor = JobExecutor(workspace)
    return executor.run(job, output_queue)


class Pipeline:
    """Represents a pipeline with stages and parallel job execution."""

    def __init__(self, config_file):
        self.config_file = Path(config_file)
        self.config = self._load_config()
        self.stages = self.config.get('stages', ['test'])
        self.jobs = self._parse_jobs()

    def _load_config(self):
        """Load and parse YAML configuration."""
        with open(self.config_file) as f:
            return yaml.safe_load(f)

    def _parse_jobs(self):
        """Parse jobs from configuration."""
        jobs = []
        for job_name, job_config in self.config.items():
            if job_name != 'stages' and isinstance(job_config, dict):
                jobs.append(Job(job_name, job_config))
        return jobs

    def _group_jobs_by_stage(self):
        """Group jobs by their stage."""
        stages = defaultdict(list)
        for job in self.jobs:
            stages[job.stage].append(job)
        return stages

    def run(self, workspace='.'):
        """Execute pipeline with parallel job execution per stage."""
        print(f"\n{'='*60}")
        print(f"Starting pipeline: {self.config_file.name}")
        print(f"Stages: {' → '.join(self.stages)}")
        print(f"Total jobs: {len(self.jobs)}")
        print(f"{'='*60}\n")

        workspace = Path(workspace).resolve()
        stages_with_jobs = self._group_jobs_by_stage()

        # Execute stages in order
        for stage in self.stages:
            stage_jobs = stages_with_jobs.get(stage, [])

            if not stage_jobs:
                continue

            print(f"\n{'─'*60}")
            print(f"Stage: {stage} ({len(stage_jobs)} job(s))")
            print(f"{'─'*60}\n")

            if len(stage_jobs) == 1:
                # Single job - run directly
                executor = JobExecutor(workspace)
                job_name, success, error = executor.run(stage_jobs[0])
                if not success:
                    print(f"\n✗ Pipeline failed at stage '{stage}'")
                    return False
            else:
                # Multiple jobs - run in parallel
                manager = Manager()
                output_queue = manager.Queue()

                # Create a partial function with workspace and queue
                run_func = partial(run_job_parallel, workspace=workspace, output_queue=output_queue)

                # Execute jobs in parallel
                with Pool(processes=len(stage_jobs)) as pool:
                    # Start async execution
                    results = pool.map_async(run_func, stage_jobs)

                    # Print output as it arrives
                    while True:
                        if results.ready() and output_queue.empty():
                            break

                        if not output_queue.empty():
                            print(output_queue.get())

                    # Get results
                    job_results = results.get()

                    # Check if all jobs succeeded
                    if not all(success for _, success, _ in job_results):
                        failed_jobs = [name for name, success, _ in job_results if not success]
                        print(f"\n✗ Pipeline failed at stage '{stage}'")
                        print(f"  Failed jobs: {', '.join(failed_jobs)}")
                        return False

        print(f"\n{'='*60}")
        print("✓ Pipeline completed successfully!")
        print(f"{'='*60}\n")
        return True


if __name__ == "__main__":
    if len(sys.argv) < 2:
        print("Usage: python runner_v3.py <pipeline.yml>")
        sys.exit(1)

    pipeline = Pipeline(sys.argv[1])
    success = pipeline.run()
    sys.exit(0 if success else 1)

Testing Version 3

Create a pipeline with parallel jobs:

# pipeline.yml
stages:
  - build
  - test

build-job:
  stage: build
  image: python:3.11
  script:
    - echo "Building..."
    - sleep 2

unit-tests:
  stage: test
  image: python:3.11
  script:
    - echo "Running unit tests..."
    - sleep 3
    - echo "Unit tests passed!"

integration-tests:
  stage: test
  image: python:3.11
  script:
    - echo "Running integration tests..."
    - sleep 3
    - echo "Integration tests passed!"

Run it and notice that unit-tests and integration-tests run simultaneously!

Now we have parallel execution, but there’s a problem: what if test jobs need artifacts from the build job? We need dependency management and artifact passing.

Version 4: Dependencies and Artifacts

This is where it gets interesting. We need to:

  1. Build a dependency graph (which jobs need which other jobs)
  2. Execute jobs in topological order (respecting dependencies)
  3. Pass artifacts between jobs
#!/usr/bin/env python3
"""
Pipeline Runner v4: Dependencies and Artifacts
Supports job dependencies, topological sorting, and artifact passing
"""

import yaml
import subprocess
import sys
import shutil
from pathlib import Path
from collections import defaultdict, deque
from multiprocessing import Pool, Manager
from functools import partial


class Job:
    """Represents a single pipeline job."""

    def __init__(self, name, config):
        self.name = name
        self.image = config.get('image', 'python:3.11')
        self.script = config.get('script', [])
        self.stage = config.get('stage', 'test')
        self.artifacts = config.get('artifacts', {}).get('paths', [])
        self.needs = config.get('needs', [])  # Job dependencies

    def __repr__(self):
        return f"Job({self.name}, stage={self.stage})"


class ArtifactManager:
    """Manages artifact storage and retrieval."""

    def __init__(self, workspace):
        self.workspace = Path(workspace).resolve()
        self.artifact_dir = self.workspace / '.pipeline_artifacts'
        self.artifact_dir.mkdir(exist_ok=True)

    def save_artifacts(self, job_name, artifact_paths):
        """Save artifacts from a job."""
        if not artifact_paths:
            return

        job_artifact_dir = self.artifact_dir / job_name
        job_artifact_dir.mkdir(exist_ok=True)

        for artifact_path in artifact_paths:
            src = self.workspace / artifact_path
            if src.exists():
                dst = job_artifact_dir / artifact_path
                dst.parent.mkdir(parents=True, exist_ok=True)

                if src.is_dir():
                    shutil.copytree(src, dst, dirs_exist_ok=True)
                else:
                    shutil.copy2(src, dst)

                print(f"  Saved artifact: {artifact_path}")

    def load_artifacts(self, job_names):
        """Load artifacts from dependent jobs."""
        for job_name in job_names:
            job_artifact_dir = self.artifact_dir / job_name
            if not job_artifact_dir.exists():
                continue

            # Copy artifacts to workspace
            for item in job_artifact_dir.rglob('*'):
                if item.is_file():
                    rel_path = item.relative_to(job_artifact_dir)
                    dst = self.workspace / rel_path
                    dst.parent.mkdir(parents=True, exist_ok=True)
                    shutil.copy2(item, dst)

    def cleanup(self):
        """Remove all artifacts."""
        if self.artifact_dir.exists():
            shutil.rmtree(self.artifact_dir)


class JobExecutor:
    """Executes a job in a Docker container."""

    def __init__(self, workspace, artifact_manager):
        self.workspace = Path(workspace).resolve()
        self.artifact_manager = artifact_manager

    def run(self, job, output_queue=None):
        """Execute a job and stream output."""

        def log(msg):
            if output_queue:
                output_queue.put(msg)
            else:
                print(msg)

        log(f"[{job.name}] Starting job...")

        # Load artifacts from dependencies
        if job.needs:
            log(f"[{job.name}] Loading artifacts from: {', '.join(job.needs)}")
            self.artifact_manager.load_artifacts(job.needs)

        script = ' && '.join(job.script)

        cmd = [
            'docker', 'run', '--rm',
            '-v', f'{self.workspace}:/workspace',
            '-w', '/workspace',
            job.image,
            'sh', '-c', script
        ]

        try:
            process = subprocess.Popen(
                cmd,
                stdout=subprocess.PIPE,
                stderr=subprocess.STDOUT,
                text=True,
                bufsize=1
            )

            for line in process.stdout:
                log(f"[{job.name}] {line.rstrip()}")

            process.wait()

            if process.returncode == 0:
                # Save artifacts
                if job.artifacts:
                    log(f"[{job.name}] Saving artifacts...")
                    self.artifact_manager.save_artifacts(job.name, job.artifacts)

                log(f"[{job.name}] ✓ Job completed successfully")
                return (job.name, True, None)
            else:
                error_msg = f"Exit code {process.returncode}"
                log(f"[{job.name}] ✗ Job failed: {error_msg}")
                return (job.name, False, error_msg)

        except Exception as e:
            error_msg = str(e)
            log(f"[{job.name}] ✗ Error: {error_msg}")
            return (job.name, False, error_msg)


def run_job_parallel(job, workspace, artifact_manager, output_queue):
    """Helper function for parallel execution."""
    executor = JobExecutor(workspace, artifact_manager)
    return executor.run(job, output_queue)


class Pipeline:
    """Represents a pipeline with dependency-aware execution."""

    def __init__(self, config_file):
        self.config_file = Path(config_file)
        self.config = self._load_config()
        self.stages = self.config.get('stages', ['test'])
        self.jobs = self._parse_jobs()

    def _load_config(self):
        """Load and parse YAML configuration."""
        with open(self.config_file) as f:
            return yaml.safe_load(f)

    def _parse_jobs(self):
        """Parse jobs from configuration."""
        jobs = []
        for job_name, job_config in self.config.items():
            if job_name != 'stages' and isinstance(job_config, dict):
                jobs.append(Job(job_name, job_config))
        return jobs

    def _topological_sort(self, jobs):
        """
        Sort jobs in topological order based on dependencies.
        Returns list of job groups where each group can run in parallel.
        """
        # Build adjacency list and in-degree count
        job_map = {job.name: job for job in jobs}
        in_degree = {job.name: 0 for job in jobs}
        adjacency = defaultdict(list)

        for job in jobs:
            for dep in job.needs:
                if dep in job_map:
                    adjacency[dep].append(job.name)
                    in_degree[job.name] += 1

        # Find jobs with no dependencies
        queue = deque([name for name, degree in in_degree.items() if degree == 0])
        execution_order = []

        while queue:
            # All jobs in current queue can run in parallel
            current_batch = list(queue)
            execution_order.append([job_map[name] for name in current_batch])

            queue.clear()

            # Process current batch
            for job_name in current_batch:
                for dependent in adjacency[job_name]:
                    in_degree[dependent] -= 1
                    if in_degree[dependent] == 0:
                        queue.append(dependent)

        # Check for cycles
        if sum(len(batch) for batch in execution_order) != len(jobs):
            raise ValueError("Circular dependency detected in job dependencies")

        return execution_order

    def _group_jobs_by_stage(self):
        """Group jobs by their stage."""
        stages = defaultdict(list)
        for job in self.jobs:
            stages[job.stage].append(job)
        return stages

    def _execute_job_batch(self, jobs, workspace, artifact_manager):
        """Execute a batch of jobs in parallel."""
        if len(jobs) == 1:
            # Single job - run directly
            executor = JobExecutor(workspace, artifact_manager)
            job_name, success, error = executor.run(jobs[0])
            return [(job_name, success, error)]
        else:
            # Multiple jobs - run in parallel
            manager = Manager()
            output_queue = manager.Queue()

            # Create a partial function
            run_func = partial(
                run_job_parallel,
                workspace=workspace,
                artifact_manager=artifact_manager,
                output_queue=output_queue
            )

            # Execute jobs in parallel
            with Pool(processes=len(jobs)) as pool:
                results = pool.map_async(run_func, jobs)

                # Print output as it arrives
                while True:
                    if results.ready() and output_queue.empty():
                        break

                    if not output_queue.empty():
                        print(output_queue.get())

                return results.get()

    def run(self, workspace='.'):
        """Execute pipeline with dependency resolution."""
        print(f"\n{'='*60}")
        print(f"Starting pipeline: {self.config_file.name}")
        print(f"Stages: {' → '.join(self.stages)}")
        print(f"Total jobs: {len(self.jobs)}")
        print(f"{'='*60}\n")

        workspace = Path(workspace).resolve()
        artifact_manager = ArtifactManager(workspace)
        stages_with_jobs = self._group_jobs_by_stage()

        try:
            # Execute stages in order
            for stage in self.stages:
                stage_jobs = stages_with_jobs.get(stage, [])

                if not stage_jobs:
                    continue

                print(f"\n{'─'*60}")
                print(f"Stage: {stage} ({len(stage_jobs)} job(s))")
                print(f"{'─'*60}\n")

                # Sort jobs by dependencies
                try:
                    execution_batches = self._topological_sort(stage_jobs)
                except ValueError as e:
                    print(f"✗ Error: {e}")
                    return False

                # Execute batches in order
                for batch in execution_batches:
                    job_results = self._execute_job_batch(batch, workspace, artifact_manager)

                    # Check if all jobs succeeded
                    if not all(success for _, success, _ in job_results):
                        failed_jobs = [name for name, success, _ in job_results if not success]
                        print(f"\n✗ Pipeline failed at stage '{stage}'")
                        print(f"  Failed jobs: {', '.join(failed_jobs)}")
                        return False

            print(f"\n{'='*60}")
            print("✓ Pipeline completed successfully!")
            print(f"{'='*60}\n")
            return True

        finally:
            # Cleanup artifacts
            artifact_manager.cleanup()


if __name__ == "__main__":
    if len(sys.argv) < 2:
        print("Usage: python runner_v4.py <pipeline.yml>")
        sys.exit(1)

    pipeline = Pipeline(sys.argv[1])
    success = pipeline.run()
    sys.exit(0 if success else 1)

Testing Version 4

Create a pipeline with dependencies and artifacts:

# pipeline.yml
stages:
  - build
  - test
  - deploy

build-app:
  stage: build
  image: python:3.11
  script:
    - echo "Building application..."
    - mkdir -p dist
    - echo "app-v1.0.0" > dist/app.txt
    - echo "Build complete!"
  artifacts:
    paths:
      - dist/

unit-tests:
  stage: test
  image: python:3.11
  needs:
    - build-app
  script:
    - echo "Running unit tests..."
    - ls -la dist/
    - cat dist/app.txt
    - echo "Tests passed!"

integration-tests:
  stage: test
  image: python:3.11
  needs:
    - build-app
  script:
    - echo "Running integration tests..."
    - cat dist/app.txt
    - echo "Integration tests passed!"

deploy-prod:
  stage: deploy
  image: alpine:latest
  needs:
    - unit-tests
    - integration-tests
  script:
    - echo "Deploying to production..."
    - cat dist/app.txt
    - echo "Deployed!"

Run it:

python runner_v4.py pipeline.yml

Notice how:

  • build-app runs first
  • unit-tests and integration-tests run in parallel (both depend on build-app)
  • deploy-prod runs only after both test jobs complete
  • Artifacts (the dist/ folder) are passed between jobs!

This is getting close to a real CI/CD runner. Let’s add one more version with production-ready features.

Version 5: Production-Ready Features

Let’s add the final touches: environment variables, branch filtering, timeouts, and better error handling:

#!/usr/bin/env python3
"""
Pipeline Runner v5: Production-ready
Complete CI/CD runner with all features
"""

import yaml
import subprocess
import sys
import shutil
import os
import re
from pathlib import Path
from collections import defaultdict, deque
from multiprocessing import Pool, Manager
from functools import partial
import time


def get_current_branch():
    """Get the current git branch."""
    try:
        result = subprocess.run(
            ['git', 'rev-parse', '--abbrev-ref', 'HEAD'],
            capture_output=True,
            text=True,
            timeout=5
        )
        return result.stdout.strip() if result.returncode == 0 else 'main'
    except:
        return 'main'


def substitute_variables(text, variables):
    """Substitute ${VAR} style variables in text."""
    if not isinstance(text, str):
        return text

    for key, value in variables.items():
        text = text.replace(f'$}', str(value))
        text = text.replace(f'${key}', str(value))

    return text


class Job:
    """Represents a single pipeline job."""

    def __init__(self, name, config, global_variables=None):
        self.name = name
        self.image = config.get('image', 'python:3.11')
        self.script = config.get('script', [])
        self.stage = config.get('stage', 'test')
        self.artifacts = config.get('artifacts', {}).get('paths', [])
        self.needs = config.get('needs', [])
        self.only = config.get('only', [])  # Branch filter
        self.timeout = config.get('timeout', 3600)  # Default 1 hour

        # Substitute variables in image and script
        variables = global_variables or {}
        self.image = substitute_variables(self.image, variables)
        self.script = [substitute_variables(cmd, variables) for cmd in self.script]

    def should_run(self, branch):
        """Check if job should run on current branch."""
        if not self.only:
            return True
        return branch in self.only

    def __repr__(self):
        return f"Job({self.name}, stage={self.stage})"


class ArtifactManager:
    """Manages artifact storage and retrieval."""

    def __init__(self, workspace):
        self.workspace = Path(workspace).resolve()
        self.artifact_dir = self.workspace / '.pipeline_artifacts'
        self.artifact_dir.mkdir(exist_ok=True)

    def save_artifacts(self, job_name, artifact_paths):
        """Save artifacts from a job."""
        if not artifact_paths:
            return

        job_artifact_dir = self.artifact_dir / job_name
        job_artifact_dir.mkdir(exist_ok=True)

        saved_count = 0
        for artifact_path in artifact_paths:
            src = self.workspace / artifact_path
            if src.exists():
                dst = job_artifact_dir / artifact_path
                dst.parent.mkdir(parents=True, exist_ok=True)

                if src.is_dir():
                    shutil.copytree(src, dst, dirs_exist_ok=True)
                else:
                    shutil.copy2(src, dst)

                saved_count += 1

        return saved_count

    def load_artifacts(self, job_names):
        """Load artifacts from dependent jobs."""
        loaded_count = 0
        for job_name in job_names:
            job_artifact_dir = self.artifact_dir / job_name
            if not job_artifact_dir.exists():
                continue

            for item in job_artifact_dir.rglob('*'):
                if item.is_file():
                    rel_path = item.relative_to(job_artifact_dir)
                    dst = self.workspace / rel_path
                    dst.parent.mkdir(parents=True, exist_ok=True)
                    shutil.copy2(item, dst)
                    loaded_count += 1

        return loaded_count

    def cleanup(self):
        """Remove all artifacts."""
        if self.artifact_dir.exists():
            shutil.rmtree(self.artifact_dir)


class JobExecutor:
    """Executes a job in a Docker container."""

    def __init__(self, workspace, artifact_manager):
        self.workspace = Path(workspace).resolve()
        self.artifact_manager = artifact_manager

    def run(self, job, output_queue=None):
        """Execute a job with timeout and proper error handling."""

        def log(msg):
            if output_queue:
                output_queue.put(msg)
            else:
                print(msg)

        start_time = time.time()
        log(f"[{job.name}] Starting job...")
        log(f"[{job.name}] Image: {job.image}")

        # Load artifacts from dependencies
        if job.needs:
            log(f"[{job.name}] Loading artifacts from dependencies...")
            count = self.artifact_manager.load_artifacts(job.needs)
            if count > 0:
                log(f"[{job.name}] Loaded {count} artifact file(s)")

        script = ' && '.join(job.script)

        cmd = [
            'docker', 'run', '--rm',
            '-v', f'{self.workspace}:/workspace',
            '-w', '/workspace',
            job.image,
            'sh', '-c', script
        ]

        try:
            process = subprocess.Popen(
                cmd,
                stdout=subprocess.PIPE,
                stderr=subprocess.STDOUT,
                text=True,
                bufsize=1
            )

            # Read output with timeout
            start = time.time()
            for line in process.stdout:
                if time.time() - start > job.timeout:
                    process.kill()
                    log(f"[{job.name}] ✗ Job timed out after {job.timeout}s")
                    return (job.name, False, "Timeout")

                log(f"[{job.name}] {line.rstrip()}")

            process.wait()

            if process.returncode == 0:
                # Save artifacts
                if job.artifacts:
                    log(f"[{job.name}] Saving artifacts...")
                    count = self.artifact_manager.save_artifacts(job.name, job.artifacts)
                    if count > 0:
                        log(f"[{job.name}] Saved {count} artifact(s)")

                duration = time.time() - start_time
                log(f"[{job.name}] ✓ Job completed successfully ({duration:.1f}s)")
                return (job.name, True, None)
            else:
                error_msg = f"Exit code {process.returncode}"
                log(f"[{job.name}] ✗ Job failed: {error_msg}")
                return (job.name, False, error_msg)

        except Exception as e:
            error_msg = str(e)
            log(f"[{job.name}] ✗ Error: {error_msg}")
            return (job.name, False, error_msg)


def run_job_parallel(job, workspace, artifact_manager, output_queue):
    """Helper function for parallel execution."""
    executor = JobExecutor(workspace, artifact_manager)
    return executor.run(job, output_queue)


class Pipeline:
    """Complete pipeline runner with all features."""

    def __init__(self, config_file):
        self.config_file = Path(config_file)
        self.config = self._load_config()
        self.stages = self.config.get('stages', ['test'])
        self.variables = self.config.get('variables', {})
        self.jobs = self._parse_jobs()
        self.current_branch = get_current_branch()

    def _load_config(self):
        """Load and parse YAML configuration."""
        with open(self.config_file) as f:
            return yaml.safe_load(f)

    def _parse_jobs(self):
        """Parse jobs from configuration."""
        jobs = []
        for job_name, job_config in self.config.items():
            if job_name not in ['stages', 'variables'] and isinstance(job_config, dict):
                jobs.append(Job(job_name, job_config, self.variables))
        return jobs

    def _topological_sort(self, jobs):
        """Sort jobs in topological order based on dependencies."""
        job_map = {job.name: job for job in jobs}
        in_degree = {job.name: 0 for job in jobs}
        adjacency = defaultdict(list)

        for job in jobs:
            for dep in job.needs:
                if dep in job_map:
                    adjacency[dep].append(job.name)
                    in_degree[job.name] += 1

        queue = deque([name for name, degree in in_degree.items() if degree == 0])
        execution_order = []

        while queue:
            current_batch = list(queue)
            execution_order.append([job_map[name] for name in current_batch])
            queue.clear()

            for job_name in current_batch:
                for dependent in adjacency[job_name]:
                    in_degree[dependent] -= 1
                    if in_degree[dependent] == 0:
                        queue.append(dependent)

        if sum(len(batch) for batch in execution_order) != len(jobs):
            raise ValueError("Circular dependency detected in job dependencies")

        return execution_order

    def _group_jobs_by_stage(self):
        """Group jobs by their stage."""
        stages = defaultdict(list)
        for job in self.jobs:
            if job.should_run(self.current_branch):
                stages[job.stage].append(job)
        return stages

    def _execute_job_batch(self, jobs, workspace, artifact_manager):
        """Execute a batch of jobs in parallel."""
        if len(jobs) == 1:
            executor = JobExecutor(workspace, artifact_manager)
            job_name, success, error = executor.run(jobs[0])
            return [(job_name, success, error)]
        else:
            manager = Manager()
            output_queue = manager.Queue()

            run_func = partial(
                run_job_parallel,
                workspace=workspace,
                artifact_manager=artifact_manager,
                output_queue=output_queue
            )

            with Pool(processes=len(jobs)) as pool:
                results = pool.map_async(run_func, jobs)

                while True:
                    if results.ready() and output_queue.empty():
                        break

                    if not output_queue.empty():
                        print(output_queue.get())

                return results.get()

    def run(self, workspace='.'):
        """Execute complete pipeline."""
        print(f"\n{'='*60}")
        print(f"Pipeline Runner v5")
        print(f"{'='*60}")
        print(f"Config: {self.config_file.name}")
        print(f"Branch: {self.current_branch}")
        print(f"Stages: {' → '.join(self.stages)}")
        print(f"Total jobs: {len(self.jobs)}")
        if self.variables:
            print(f"Variables: {', '.join(f'{k}={v}' for k, v in self.variables.items())}")
        print(f"{'='*60}\n")

        workspace = Path(workspace).resolve()
        artifact_manager = ArtifactManager(workspace)
        stages_with_jobs = self._group_jobs_by_stage()

        # Count jobs that will run
        total_jobs = sum(len(jobs) for jobs in stages_with_jobs.values())
        if total_jobs == 0:
            print("No jobs to run on this branch.")
            return True

        pipeline_start = time.time()

        try:
            for stage in self.stages:
                stage_jobs = stages_with_jobs.get(stage, [])

                if not stage_jobs:
                    continue

                print(f"\n{'─'*60}")
                print(f"Stage: {stage} ({len(stage_jobs)} job(s))")
                print(f"{'─'*60}\n")

                try:
                    execution_batches = self._topological_sort(stage_jobs)
                except ValueError as e:
                    print(f"✗ Error: {e}")
                    return False

                for batch in execution_batches:
                    job_results = self._execute_job_batch(batch, workspace, artifact_manager)

                    if not all(success for _, success, _ in job_results):
                        failed_jobs = [name for name, success, _ in job_results if not success]
                        print(f"\n{'='*60}")
                        print(f"✗ Pipeline failed at stage '{stage}'")
                        print(f"  Failed jobs: {', '.join(failed_jobs)}")
                        print(f"{'='*60}\n")
                        return False

            duration = time.time() - pipeline_start
            print(f"\n{'='*60}")
            print(f"✓ Pipeline completed successfully!")
            print(f"  Duration: {duration:.1f}s")
            print(f"  Jobs executed: {total_jobs}")
            print(f"{'='*60}\n")
            return True

        finally:
            artifact_manager.cleanup()


def main():
    """CLI entry point."""
    if len(sys.argv) < 2:
        print("Pipeline Runner v5 - A minimal CI/CD pipeline runner")
        print("\nUsage:")
        print("  python runner.py <pipeline.yml> [workspace]")
        print("\nExample:")
        print("  python runner.py .gitlab-ci.yml")
        print("  python runner.py pipeline.yml /path/to/workspace")
        sys.exit(1)

    config_file = sys.argv[1]
    workspace = sys.argv[2] if len(sys.argv) > 2 else '.'

    if not Path(config_file).exists():
        print(f"Error: Config file '{config_file}' not found")
        sys.exit(1)

    try:
        pipeline = Pipeline(config_file)
        success = pipeline.run(workspace)
        sys.exit(0 if success else 1)
    except Exception as e:
        print(f"Fatal error: {e}")
        import traceback
        traceback.print_exc()
        sys.exit(1)


if __name__ == "__main__":
    main()

Testing Version 5

Create a complete pipeline with all features:

# pipeline.yml
stages:
  - build
  - test
  - deploy

variables:
  APP_VERSION: "1.0.0"
  PYTHON_IMAGE: "python:3.11"

build-app:
  stage: build
  image: ${PYTHON_IMAGE}
  script:
    - echo "Building version ${APP_VERSION}..."
    - mkdir -p dist
    - echo "app-${APP_VERSION}" > dist/app.txt
    - echo "Build complete!"
  artifacts:
    paths:
      - dist/

unit-tests:
  stage: test
  image: ${PYTHON_IMAGE}
  needs:
    - build-app
  script:
    - echo "Running unit tests..."
    - cat dist/app.txt
    - sleep 2
    - echo "Unit tests passed!"

integration-tests:
  stage: test
  image: ${PYTHON_IMAGE}
  needs:
    - build-app
  script:
    - echo "Running integration tests..."
    - cat dist/app.txt
    - sleep 2
    - echo "Integration tests passed!"

deploy-staging:
  stage: deploy
  image: alpine:latest
  needs:
    - unit-tests
    - integration-tests
  script:
    - echo "Deploying to staging..."
    - cat dist/app.txt
    - echo "Deployed to staging!"

deploy-production:
  stage: deploy
  image: alpine:latest
  needs:
    - unit-tests
    - integration-tests
  only:
    - main
  script:
    - echo "Deploying to production..."
    - cat dist/app.txt
    - echo "Deployed to production!"

Run it:

python runner.py pipeline.yml

You’ll see:

  • Variable substitution (${APP_VERSION}, ${PYTHON_IMAGE})
  • Dependency-based execution order
  • Parallel test execution
  • Branch filtering (deploy-production only on main)
  • Execution time tracking
  • Proper artifact passing

We now have a fully functional CI/CD pipeline runner!

Testing Your Pipeline Runner

Let’s test it with a real Python project. Create this structure:

my-project/
├── pipeline.yml
├── src/
│   └── calculator.py
└── tests/
    └── test_calculator.py

src/calculator.py:

def add(a, b):
    return a + b

def multiply(a, b):
    return a * b

tests/test_calculator.py:

import sys
sys.path.insert(0, 'src')

from calculator import add, multiply

def test_add():
    assert add(2, 3) == 5
    assert add(-1, 1) == 0

def test_multiply():
    assert multiply(3, 4) == 12
    assert multiply(0, 5) == 0

if __name__ == "__main__":
    test_add()
    test_multiply()
    print("All tests passed!")

pipeline.yml:

stages:
  - test
  - build
  - deploy

variables:
  PYTHON_VERSION: "3.11"

lint-code:
  stage: test
  image: python:${PYTHON_VERSION}
  script:
    - echo "Linting code..."
    - python -m py_compile src/*.py
    - echo "Lint passed!"

unit-tests:
  stage: test
  image: python:${PYTHON_VERSION}
  script:
    - echo "Running unit tests..."
    - python tests/test_calculator.py
    - echo "Tests passed!"

build-package:
  stage: build
  image: python:${PYTHON_VERSION}
  needs:
    - lint-code
    - unit-tests
  script:
    - echo "Building package..."
    - mkdir -p dist
    - cp -r src dist/
    - echo "v1.0.0" > dist/VERSION
  artifacts:
    paths:
      - dist/

deploy-app:
  stage: deploy
  image: alpine:latest
  needs:
    - build-package
  script:
    - echo "Deploying application..."
    - ls -la dist/
    - cat dist/VERSION
    - echo "Deployment complete!"

Run your pipeline:

python runner.py pipeline.yml

You’ll see:

  1. lint-code and unit-tests run in parallel
  2. build-package waits for both to complete
  3. Artifacts (dist/) are passed to deploy-app
  4. Total execution time is optimized through parallelization

What We Built vs. What Production Runners Do

Our pipeline runner demonstrates the core concepts, but production tools like GitLab Runner and GitHub Actions have many more features:

What we built:

  • ✅ Multi-stage pipelines
  • ✅ Parallel job execution within stages
  • ✅ Job dependencies (needs/dependencies)
  • ✅ Artifact passing between jobs
  • ✅ Variable substitution
  • ✅ Branch filtering (only: branches)
  • ✅ Job timeouts
  • ✅ Real-time log streaming
  • ✅ Topological sorting for dependencies
  • ✅ Docker container isolation

What production runners add:

  • Distributed execution: Run jobs across multiple machines
  • Caching: Cache dependencies (node_modules, pip packages) between runs
  • Matrix builds: Run same job with different parameters (Python 3.9, 3.10, 3.11)
  • Webhooks: Trigger on git push, PR, tag events
  • Secrets management: Secure credential storage and injection
  • Web UI: Visual pipeline visualization and logs
  • Docker-in-Docker: Build Docker images within jobs
  • Service containers: Database/Redis for integration tests
  • Retry mechanisms: Automatic retry on transient failures
  • Manual triggers: Approval gates for deployments
  • Resource management: CPU/memory limits per job
  • Security features: Isolation, sandboxing, permission control

Understanding the Architecture

Our runner has four main components:

1. Configuration Parser

Reads YAML and builds job objects with all metadata (stage, dependencies, artifacts, etc.)

2. Dependency Resolver

Uses topological sorting to determine execution order. This ensures jobs run only after their dependencies complete.

3. Job Scheduler

Groups jobs into batches that can run in parallel. Uses Python’s multiprocessing for parallel execution.

4. Job Executor

Spawns Docker containers, mounts workspace, executes scripts, streams logs, and manages artifacts.

The flow:

YAML Config → Parse Jobs → Build Dependency Graph →
Topological Sort → Execute Batches → Save Artifacts → Report Results

Real-World Use Cases

You could actually use this runner for:

  1. Local CI/CD testing: Test your pipeline config before pushing
  2. Air-gapped environments: No external CI/CD access
  3. Custom workflows: Company-specific requirements
  4. Learning tool: Understand how CI/CD works internally
  5. Embedded systems: Limited connectivity to cloud runners
  6. On-premise deployments: Full control over execution environment

Performance Considerations

Our runner is reasonably efficient, but here’s what impacts performance:

What makes it fast:

  • Parallel execution within stages
  • Dependency-aware scheduling (no unnecessary waiting)
  • Docker container reuse
  • Minimal artifact copying

What could slow it down:

  • Docker image pulls (first time)
  • Large artifacts being copied
  • Many jobs in sequence (long critical path)
  • Heavy multiprocessing overhead for tiny jobs

Optimizations you could add:

  • Cache Docker images
  • Compress artifacts
  • Parallel artifact downloads
  • Job scheduling across machines

Extending the Runner

Here are features you could add:

1. Caching

build:
  cache:
    key: ${CI_COMMIT_REF}
    paths:
      - node_modules/

2. Matrix Builds

test:
  image: python:${VERSION}
  matrix:
    VERSION: ["3.9", "3.10", "3.11"]

3. Service Containers

integration-test:
  services:
    - postgres:13
    - redis:6

4. Manual Triggers

deploy:
  when: manual  # Require manual approval

5. Retry Logic

flaky-test:
  retry: 2  # Retry up to 2 times

Conclusion

By building this CI/CD pipeline runner, we’ve demystified how GitLab Runner and GitHub Actions work internally. The core concepts aren’t that complicated:

  • Parse configuration into job objects
  • Build a dependency graph to understand execution order
  • Use topological sorting to schedule jobs correctly
  • Execute jobs in containers for isolation
  • Pass artifacts between dependent jobs
  • Stream logs for visibility

GitLab’s innovation wasn’t inventing these concepts - it was packaging them into a developer-friendly tool with great UX, distributed execution, and enterprise features.

Understanding these fundamentals makes you a better DevOps engineer. When pipelines are slow, you’ll know why. When jobs fail mysteriously, you’ll know where to look. When you need custom CI/CD, you’ll know how to build it.

Further Learning

If you want to dive deeper:

  • GitLab Runner source code: Written in Go, shows production implementation
  • GitHub Actions runner: Open source, similar concepts
  • Tekton: Kubernetes-native pipelines
  • Drone CI: Simple, container-focused CI/CD
  • Argo Workflows: DAG-based workflow engine for Kubernetes

I also recommend reading about:

  • Directed Acyclic Graphs (DAGs): Foundation of job dependencies
  • Topological sorting: Algorithm for dependency resolution
  • Container orchestration: Kubernetes, Docker Swarm
  • Workflow engines: Apache Airflow, Prefect

Next Steps

Want to take this further? Here are some ideas:

  1. Add a web UI: Use Flask + WebSockets for real-time pipeline visualization
  2. Implement caching: Cache pip/npm packages between runs
  3. Add distributed execution: Run jobs across multiple machines
  4. Build a webhook server: Trigger on git push events
  5. Create a VS Code extension: Visualize pipelines in your editor

Let me know in the comments what you’d like to see next!


Announcements

  • If you’re interested in more systems programming and DevOps content, follow me on Twitter/X where I share what I’m learning.
  • I’m available for Python and DevOps consulting - if you need help with CI/CD, automation, or infrastructure, reach out via email.


If you found this helpful, share it on X and tag me @muhammad_o7 - I’d love to hear your thoughts! You can also connect with me on LinkedIn.

Want to be notified about posts like this? Subscribe to my RSS feed or leave your email here