Apache Airflow

Apache Airflow es una plataforma de código abierto creada por Airbnb y gestionada por la Apache Software Foundation, que permite diseñar, programar y monitorizar flujos de trabajo (workflows) programables. Airflow no ejecuta código por sí mismo, sino que coordina cuándo, cómo y en qué orden deben ejecutarse tareas definidas por el usuario. Es una herramienta fundamental en entornos de ingeniería de datos moderna, donde se requiere que procesos complejos se ejecuten en etapas secuenciales o paralelas con condiciones lógicas.

Airflow es útil porque permite automatizar y gestionar pipelines de datos complejos con lógica condicional, dependencias entre tareas, manejo de errores, reintentos, programación periódica y visibilidad total desde una interfaz web. Su diseño modular y extensible permite integrarse con servicios en la nube, bases de datos, APIs, sistemas de ficheros, y más. Además, está pensado para escalar horizontalmente y adaptarse tanto a pequeños scripts diarios como a grandes flujos de datos empresariales.

Arquitectura

Airflow se compone de varios servicios que trabajan juntos:

  • 🪁 Scheduler: Escanea los DAGs y programa las tareas según su definición temporal y dependencias.
  • 🪁 Webserver: Proporciona la interfaz web para visualizar el estado de los DAGs, logs, ejecuciones pasadas y detalles de las tareas.
  • 🪁 Worker(s): Ejecutan las tareas que han sido programadas. Pueden escalarse horizontalmente.
  • 🪁 Metadata Database: Guarda toda la información sobre DAGs, tareas, logs, estados y configuraciones. Usa PostgreSQL o MySQL.
  • 🪁 Triggerer (a partir de Airflow 2): Gestiona los Deferrable Operators y reduce el consumo de recursos al esperar eventos externos.

Componentes Fundamentales

DAGs (Directed Acyclic Graphs)

Los DAGs son la unidad principal de Airflow. Representan un flujo de trabajo como un grafo dirigido sin ciclos. Cada nodo es una tarea, y las aristas representan dependencias entre ellas. Están definidos en archivos Python que se cargan dinámicamente.

from airflow import DAG from datetime import datetime dag = DAG( dag_id="mi_pipeline", start_date=datetime(2024, 1, 1), schedule_interval="@daily" )
Tasks y Operators

Una Task es una unidad de trabajo. Se define a través de un Operator, que es una plantilla que encapsula una acción específica.

  • 📑 PythonOperator: ejecuta una función Python.
  • 📑 BashOperator: ejecuta comandos de bash.
  • 📑 EmailOperator: envía correos.
  • 📑 DockerOperator: ejecuta tareas dentro de un contenedor Docker.
  • 📑 KubernetesPodOperator: ejecuta pods en Kubernetes.
Sensors

Los Sensors son tareas que esperan a que ocurra una condición externa. Por ejemplo, un archivo aparezca, una tabla esté disponible, o una API responda. Existen sensores como FileSensor, S3KeySensor, ExternalTaskSensor.

Deferrable Operators y Triggerer

A partir de Airflow 2, se introdujeron los Deferrable Operators, que permiten suspender una tarea en espera de un evento sin ocupar un worker. Estas tareas son gestionadas por el Triggerer, que utiliza async IO para mantenerlas vivas de manera eficiente.

XCom (Cross-Communication)

XCom (Cross-Communication) es el mecanismo de Airflow para compartir pequeños datos entre tareas de un mismo DAG. Los datos se almacenan en la base de datos de Airflow (tabla xcom).

📄 Puedes “empujar” un valor desde una tarea y “jalarlo” desde otra.

context['ti'].xcom_push(key='clave', value='valor', execution_date=otra_fecha) valor = context['ti'].xcom_pull(key='clave', task_ids='tarea_origen')
  • 🔸xcom_push()
  • 📄 Se usa para enviar datos. Internamente, Airflow ejecuta:

    XCom.set( key="mi_clave", value="mi_valor", task_id=task_instance.task_id, dag_id=task_instance.dag_id, execution_date=task_instance.execution_date # ¡Importante! )
    • execution_date: Siempre se guarda en XCom y determina a qué ejecución (DAGRun) pertenece el dato.
    • ∘ Puedes sobrescribir el execution_date.
  • 🔸xcom_pull()
  • Se usa para recuperar datos. Por defecto, busca registros con:

    • ⊡ Mismo dag_id (DAG actual).
    • ⊡ Mismo execution_date (solo datos de la ejecución actual).

    📄 Por defecto, se filtran los resultados por la ejecución actual.

    SELECT * FROM xcom WHERE dag_id = ? AND execution_date = ? AND key = ?;

    ∘ No mezcla datos entre diferentes ejecuciones del DAG.

    El execution_date es crucial porque Airflow, por defecto, aísla los XComs entre ejecuciones, requiriendo que se especifique manualmente un execution_date diferente para acceder a datos de otros DAGRuns.

No abuses de XCom, no está diseñado para datos grandes (usa sistemas externos como S3 o Redis para eso).

Hooks y Providers
  • 🔸Hooks: Son interfaces reutilizables para interactuar con sistemas externos: S3, MySQL, BigQuery, etc.
  • Los hooks, también conocidos como "conectores" o "enlaces", son componentes fundamentales en Airflow que actúan como interfaces, puentes o conectores entre Airflow y sistemas externos. Estos hooks permiten la interacción, conexión y comunicación con diversas plataformas, bases de datos y servicios.

    Características clave:

    • Interfaz unificada: Proporcionan una forma estandarizada de interactuar con sistemas externos.
    • Manejo de conexiones: Gestionan automáticamente las conexiones, sesiones y autenticaciones.
    • Reutilizables: Pueden ser usados múltiples veces en diferentes tareas y DAGs.

    Tipos Comunes de Hooks:

    1. Database Hooks:
      • PostgresHook: Para PostgreSQL (conexiones a PostgreSQL).
      • MySqlHook: Para MySQL (gestión de MySQL).
    2. Cloud Hooks:
      • S3Hook: Interactúa con Amazon S3 (almacenamiento en S3).
      • GCSHook: Para Google Cloud Storage (acceso a GCS).
    3. API Hooks:
      • HttpHook: Para llamadas HTTP (solicitudes HTTP).
      • SlackHook: Notificaciones en Slack (mensajes a Slack).

      Los hooks de Airflow simplifican la interacción con sistemas externos al abstraer la lógica de conexión, recuperando credenciales de las Connections de Airflow, estableciendo la conexión, ejecutando operaciones y cerrándola automáticamente al finalizar su uso.

    📄 Ejemplo de Uso:

    from airflow.providers.postgres.hooks.postgres import PostgresHook # Crear instancia del hook (inicialización del hook) hook = PostgresHook(postgres_conn_id='mi_postgres') # Ejecutar consulta (uso del hook para consultar) resultados = hook.get_records("SELECT * FROM tabla") # El hook maneja la conexión automáticamente (sin necesidad de abrir/cerrar manualmente)
  • 🔸Providers: Son paquetes que agrupan hooks, operators, sensors y configuraciones para integrar servicios como Google Cloud, AWS, Slack, Snowflake, etc.
Variables y Connections
  • Variables: Claves/valores que se almacenan en la base de datos y pueden usarse desde cualquier DAG.
  • Connections: Configuraciones predefinidas de conexión con servicios externos (host, puerto, usuario, contraseña, etc.).

📄 Estas se configuran desde la UI o vía CLI:

airflow variables set nombre valor airflow connections add my_db --conn-uri postgres://user:pass@host/db
Trigger Rules y Condicionales

Las Trigger Rules controlan cuándo se ejecuta una tarea según el estado de las tareas anteriores. Ejemplo: all_success, one_failed, all_done, etc.

📄 Puedes crear flujos condicionales dinámicos usando operadores como BranchPythonOperator:

from airflow.operators.python import BranchPythonOperator def elegir_ruta(): return "task_a" if condicion else "task_b" BranchPythonOperator( task_id='elige', python_callable=elegir_ruta, dag=dag )
Setup y Teardown

A partir de Airflow 2.6, se introdujo setup y teardown para definir tareas que deben ejecutarse al inicio o al final de un DAG (como preparar o limpiar recursos), independientemente del éxito o fallo de otras tareas.

@dag.setup() def inicializar(): ... @dag.teardown() def limpiar(): ...
Decorators

Los Decorators (@task, @dag) permiten definir tareas y DAGs de forma más limpia y funcional desde Python puro.

from airflow.decorators import dag, task @task def suma(a, b): return a + b @dag(schedule="@daily", start_date=datetime(2024, 1, 1)) def flujo(): suma(3, 5) flujo_dag = flujo()

Proyecto con Docker Compose en Linux

Creación de un entorno virtual:
Activar el entorno virtual:

Puedes consultar la documentación oficial de Apache Airflow para instalarlo utilizando Docker Compose.

Descarga del archivo docker-compose.yaml:
Creación de carpetas del entorno:

En tu máquina, puedes corregir los permisos (50000:1000) para que el contenedor corra como un usuario no-root.

Edita el archivo /etc/containers/registries.conf:

[registries.search] registries = ['docker.io']
Asignación de variable de entorno:

Revisa con cat .env que el UID tenga un valor numérico; si no, establécelo manualmente.

Levantar el servicio:
Levantar la interfaz web:
Interfaz web de Airflow:

La contraseña predeterminada para el usuario airflow en las instalaciones iniciales de Apache Airflow es también airflow.

⌭ Modificar el docker-compose.yaml para añadir una base de datos.

services: postgres: image: postgres:13 environment: POSTGRES_USER: airflow POSTGRES_PASSWORD: airflow POSTGRES_DB: airflow volumes: - postgres-db-volume:/var/lib/postgresql/data healthcheck: test: ["CMD", "pg_isready", "-U", "airflow"] interval: 10s retries: 5 start_period: 5s restart: always ports: - "5432:5432" pgadmin: container_name: pgadmin4_container2 image: dpage/pgadmin4 restart: always environment: PGADMIN_DEFAULT_EMAIL: admin@admin.com PGADMIN_DEFAULT_PASSWORD: root ports: - "5050:80"
Detener el servicio:
Levantar Airflow:
Interfaz web de pgAdmin:

La contraseña predeterminada para el usuario admin@admin.com en el panel de login de la base de datos pgAdmin es root.

❏ Buscar el contenedor postgre.

ryuzak1@ubuntu: ~

ryuzak1@ubuntu:~podman ps -a
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES 3398a6229828 docker.io/library/postgres:13 postgres 9 minutes ago Up 9 minutes (healthy) 0.0.0.0:5432->5432/tcp airflow_postgres_1 9c6544ddbcdd docker.io/dpage/pgadmin4:latest 9 minutes ago Up 9 minutes 0.0.0.0:5050->80/tcp pgadmin4_container2 514587874e67 docker.io/library/redis:7.2-bookworm redis-server 9 minutes ago Up 9 minutes (healthy) airflow_redis_1 a5aa5aa7161b docker.io/apache/airflow:3.0.1 -c if [[ -z "1000... 9 minutes ago Exited (0) 8 minutes ago airflow_airflow-init_1 f2d305d2d01d docker.io/apache/airflow:3.0.1 bash -c airflow 9 minutes ago Exited (2) 9 minutes ago airflow_airflow-cli_1 d32d734888a4 docker.io/apache/airflow:3.0.1 api-server 9 minutes ago Up 9 minutes (healthy) 0.0.0.0:8080->8080/tcp airflow_airflow-apiserver_1 6544fcc60245 docker.io/apache/airflow:3.0.1 scheduler 9 minutes ago Up 9 minutes (healthy) airflow_airflow-scheduler_1 e94074a424a0 docker.io/apache/airflow:3.0.1 dag-processor 9 minutes ago Up 9 minutes (healthy) airflow_airflow-dag-processor_1 8ec81ef33dc1 docker.io/apache/airflow:3.0.1 triggerer 9 minutes ago Up 9 minutes (healthy) airflow_airflow-triggerer_1 256a2bb757be docker.io/apache/airflow:3.0.1 celery flower 9 minutes ago Up 9 minutes (healthy) 0.0.0.0:5555->5555/tcp airflow_flower_1 445a415b36a9 docker.io/apache/airflow:3.0.1 celery worker 9 minutes ago Up 9 minutes (unhealthy) airflow_airflow-worker_1

❏ Buscar la IP del contenedor postgre.

ryuzak1@ubuntu: ~

ryuzak1@ubuntu:~podman inspect 3398a6229828 | jq '.[0].NetworkSettings.Networks'
{ "airflow_default": { "EndpointID": "", "Gateway": "10.89.0.1", "IPAddress": "10.89.0.92", "IPPrefixLen": 24, "IPv6Gateway": "", "GlobalIPv6Address": "", "GlobalIPv6PrefixLen": 0, "MacAddress": "92:42:1f:c3:21:99", "NetworkID": "airflow_default", "DriverOpts": null, "IPAMConfig": null, "Links": null, "Aliases": [ "postgres", "3398a6229828" ] } }

Se tiene que añadir un nuevo servidor con los siguientes valores: en Name, poner 'ps_db'. En la sección de Connections, colocar la IP en el campo Hostname, y 'airflow' en Username y Password. Por último, hacer clic en Save.

El proyecto requiere una base de datos para almacenar la información extraída de productos de la web de Amazon. Para crearla, haz clic derecho sobre "Databases" y establece el nombre 'amazon_books'.

En la sección de "Admin" y luego en "Connections", crea una nueva conexión con los siguientes valores:

Este proyecto de Airflow automatiza la extracción de información de libros de ingeniería de datos desde la página de resultados de búsqueda de Amazon. Utiliza Python para realizar el scraping web, parseando el HTML con BeautifulSoup para extraer títulos, autores, precios y calificaciones. Los datos extraídos se almacenan temporalmente utilizando XComs y luego se insertan en una base de datos PostgreSQL. El flujo de trabajo incluye la creación de la tabla en PostgreSQL (si no existe), la obtención de los datos de Amazon y la posterior inserción de estos datos en la tabla, todo orquestado por Airflow de forma diaria.

⌭ Crear un script con el nombre dag.py en la carpeta dags y modificar la fecha.

from datetime import datetime, timedelta from airflow import DAG import requests import pandas as pd from bs4 import BeautifulSoup from airflow.operators.python import PythonOperator from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator from airflow.providers.postgres.hooks.postgres import PostgresHook # Headers que funcionaron en el script exitoso headers = { "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36", "Accept-Language": "en-US,en;q=0.9" } def get_amazon_data_books(num_books, ti): print(f"🚀 Iniciando scraping para {num_books} libros") base_url = "https://www.amazon.com/s?k=data+engineering+books" books = [] page = 1 while len(books) < num_books: url = f"{base_url}&page={page}" print(f"📖 Procesando página {page}: {url}") response = requests.get(url, headers=headers, timeout=30) print(f"🔍 Status code: {response.status_code}") if response.status_code != 200: print(f"❌ Error en la petición: {response.status_code}") break soup = BeautifulSoup(response.content, "html.parser") book_containers = soup.find_all('div', {'data-component-type': 's-search-result'}) print(f"📚 Encontrados {len(book_containers)} contenedores de libros") for book in book_containers: try: title = book.find('h2').text.strip() if book.find('h2') else None author = book.find('a', {'class': 'a-size-base'}) price = book.find('span', {'class': 'a-price-whole'}) rating = book.find('span', {'class': 'a-icon-alt'}) if title and author and price and rating: books.append({ "Title": title[:100], # Limitar longitud para DB "Author": author.text.strip(), "Price": price.text.strip(), "Rating": rating.text.split()[0] # Solo el número }) print(f"✅ Añadido: {title[:30]}...") except Exception as e: print(f"⚠️ Error procesando libro: {str(e)}") continue page += 1 if page > 3: # Límite de páginas para pruebas break df = pd.DataFrame(books[:num_books]) print(f"📊 Total de libros obtenidos: {len(df)}") ti.xcom_push(key='book_data', value=df.to_dict('records')) def insert_book_data_into_postgres(ti): book_data = ti.xcom_pull(key='book_data', task_ids='fetch_book_data') print(f"📥 Datos recibidos para insertar: {len(book_data) if book_data else 0} registros") if not book_data: raise ValueError("No se encontraron datos de libros") hook = PostgresHook(postgres_conn_id='books_connection') conn = hook.get_conn() cursor = conn.cursor() try: for book in book_data: cursor.execute(""" INSERT INTO books (title, authors, price, rating) VALUES (%s, %s, %s, %s) """, (book['Title'], book['Author'], book['Price'], book['Rating'])) conn.commit() print(f"💾 Insertados {len(book_data)} registros en PostgreSQL") except Exception as e: conn.rollback() print(f"❌ Error en la inserción: {str(e)}") raise finally: cursor.close() conn.close() default_args = { 'owner': 'airflow', 'depends_on_past': False, 'start_date': datetime(2025, 5, 14), 'retries': 1, 'retry_delay': timedelta(minutes=5), } dag = DAG( dag_id='fetch_and_store_amazon_books', default_args=default_args, description='A simple DAG to fetch book data from Amazon and store it in Postgres', schedule=timedelta(days=1), ) #operators : Python Operator and PostgresOperator #hooks - allows connection to postgres fetch_book_data_task = PythonOperator( task_id='fetch_book_data', python_callable=get_amazon_data_books, op_args=[50], # Number of books to fetch dag=dag, ) create_table_task = SQLExecuteQueryOperator( task_id='create_table', conn_id='books_connection', sql=""" CREATE TABLE IF NOT EXISTS books ( id SERIAL PRIMARY KEY, title TEXT NOT NULL, authors TEXT, price TEXT, rating TEXT ); """, dag=dag, ) insert_book_data_task = PythonOperator( task_id='insert_book_data', python_callable=insert_book_data_into_postgres, dag=dag, ) #dependencies fetch_book_data_task >> create_table_task >> insert_book_data_task

En la sección de DAGs, localiza y ejecuta el DAG denominado fetch_and_store_amazon_books haciendo clic en el botón de "Play".

Posteriormente, dirígete a pgAdmin para verificar los datos recopilados de Amazon. Para ello, haz clic derecho sobre la sección "Tables" y selecciona la opción "Query Tool".

Proyecto en la Nube

En el mundo del análisis de datos, la capacidad de extraer información valiosa de redes sociales como Twitter y almacenarla de manera eficiente es fundamental. Un proyecto ETL (Extract, Transform, Load) que utiliza Python para conectarse a la API de Twitter, procesar los datos y cargarlos en un bucket de Amazon S3 puede ser una solución escalable y automatizada para este propósito.

El proyecto consiste en un flujo automatizado que:

  1. Extrae tweets en tiempo real o históricos mediante la API de Twitter (usando bibliotecas como tweepy o python-twitter).
  2. Transforma los datos crudos, aplicando filtros, limpieza de texto, análisis de sentimientos o estructurado en un formato óptimo (como CSV, JSON o Parquet).
  3. Carga la información procesada en un bucket de Amazon S3, utilizando el SDK de AWS (boto3), donde podrá ser consultada por herramientas de analytics o machine learning.

Este proceso puede ejecutarse periódicamente mediante servicios como AWS Lambda o Airflow, garantizando una base de datos actualizada para su posterior análisis.

Generar un Access Token para Twitter

Para comenzar a usar la API de Twitter, dirígete a Twitter Developer Platform, inicia sesión con tu cuenta de Twitter, y luego solicita el acceso de desarrollador gratuito a través del nivel "Essential".

Una vez dentro del panel de desarrollador de Twitter, navega a la sección "Projects & Apps" y selecciona "Overview". Aquí, procederás a usar el proyecto default. Este proyecto te proporcionará las credenciales esenciales para interactuar con la API de Twitter: el Bearer Token , con las opciones de generar el Bearer Token.

Conexión con la API de Twitter
Creación de un entorno virtual:
Activar el entorno virtual:
Instalar Dependencias:

⌭ Crear un script en Python que se conecta a la API de Twitter usando tweepy y recupera los 10 últimos tweets de un usuario, mostrándolos por pantalla. Sirve para analizar o monitorear contenido público de una cuenta.

import tweepy import time # Bearer Token de tu proyecto (API v2) bearer_token = "" # Sustituye con tu Bearer Token # Crear cliente con API v2 client = tweepy.Client(bearer_token=bearer_token) # Nombre de usuario (sin el @) username = "elonmusk" # Obtener el ID del usuario a partir del nombre try: user = client.get_user(username=username) user_id = user.data.id #Obtener los últimos tweets del usuario response = client.get_users_tweets(id=user_id, max_results=10, tweet_fields=["created_at", "text"]) #Imprimir los tweets for tweet in response.data: print(tweet.text) except tweepy.TooManyRequests as e: print("Límite de peticiones alcanzado. Esperando 15 minutos...") time.sleep(15 * 60) # Esperar 15 minutos

⌭ El siguiente script se conecta con la API v2 de Twitter para extraer los últimos tweets del usuario especificado (en este caso, elonmusk), y luego guarda información relevante como el texto del tweet, número de likes, retweets y la fecha de creación en un archivo CSV llamado refined_tweets.csv.

import tweepy import pandas as pd import json from datetime import datetime from zoneinfo import ZoneInfo import s3fs # Opcional si vas a guardar en S3, puedes omitir si solo guardas localmente def run_twitter_etl(): # Token de autenticación (Bearer Token de API v2) bearer_token = "" # Sustituye con tu Bearer Token # Cliente de Twitter API v2 client = tweepy.Client(bearer_token=bearer_token) # Nombre de usuario sin el '@' username = "elonmusk" try: # Obtener información del usuario (necesario para obtener su ID) user_response = client.get_user(username=username) if user_response.data is None: print(f"Usuario '{username}' no encontrado.") return user = user_response.data user_id = user.id print(f"ID del usuario '{username}': {user_id}") # Obtener últimos tweets del usuario response = client.get_users_tweets( id=user_id, max_results=5, # Máximo permitido por request en API gratuita tweet_fields=["created_at", "text", "public_metrics"] ) # Verificar si se recibieron tweets tweets_data = response.data if tweets_data is None: print("No se encontraron tweets o el acceso está limitado por el nivel de la cuenta.") return # Procesar los tweets recibidos tweets = [] for tweet in tweets_data: metrics = tweet.public_metrics refined_tweet = { "user": user.username, "text": tweet.text, "created_at": tweet.created_at.astimezone(ZoneInfo("America/Mexico_City")).strftime("%Y-%m-%d %H:%M:%S"), "likes": metrics["like_count"], "retweets": metrics["retweet_count"] } tweets.append(refined_tweet) # Crear DataFrame y mostrar su contenido df = pd.DataFrame(tweets) print("\nContenido del DataFrame:") print(df) # Guardar los datos en un archivo CSV df.to_csv('refined_tweets.csv', index=False) print("\nArchivo 'refined_tweets.csv' creado con éxito.") except tweepy.TooManyRequests: print("Has excedido el límite de peticiones. Espera unos minutos antes de volver a intentar.") except tweepy.TweepyException as e: print("Error al conectar con la API de Twitter:", e) # Ejecutar la función run_twitter_etl()
Instalación de Dependencias:

Como variante se puede utilizar la API de YouTube Data v3 para extraer y procesar todos los comentarios de un video específico de YouTube. Primero se configura la conexión con la API, luego se realizan solicitudes paginadas para obtener los comentarios (hasta 100 por petición) y las procesa para extraer información clave como el autor, el texto del comentario y la fecha de publicación. La aplicación maneja automáticamente la paginación para recolectar todos los comentarios disponibles. Los datos son estructurados en una lista de diccionarios para su fácil análisis, mostrando finalmente un resumen con el total de comentarios recolectados. El código incluye manejo de errores para casos donde la estructura de los comentarios no sea la esperada, y está diseñado para ser fácilmente integrable en sistemas más grandes de análisis de datos o moderación de contenido.

Para obtener una Clave de API, ve a la Consola de APIs de Google, donde podrás crear un nuevo proyecto o seleccionar uno ya existente. Después, activa la API de YouTube Data API v3 y dirígete a la sección "Credenciales" para crear una nueva Clave de API, la cual deberás copiar una vez generada.

⌭ El script para obtener comentarios de YouTube quedaría de la siguiente forma:

import os import csv import googleapiclient.discovery def main(): """ Función principal que obtiene comentarios de un video de YouTube usando la API v3. """ # Configuración inicial (solo para desarrollo) # Deshabilita la verificación HTTPS de OAuthlib cuando se ejecuta localmente. # ¡NO mantener esta opción activa en producción! os.environ["OAUTHLIB_INSECURE_TRANSPORT"] = "1" # Configuración de la API de YouTube api_service_name = "youtube" api_version = "v3" DEVELOPER_KEY = "TU_CLAVE_DE_API" # Reemplaza con tu clave real # ID del video de YouTube que quieres analizar video_id = "q8q3OFFfY6c" # Ejemplo - reemplaza con tu video ID # Inicializa el cliente de la API de YouTube youtube = googleapiclient.discovery.build( api_service_name, api_version, developerKey=DEVELOPER_KEY ) # Lista para almacenar todos los comentarios comments_list = [] # Primera solicitud para obtener comentarios request = youtube.commentThreads().list( part="snippet,replies", videoId=video_id, maxResults=100 # Máximo permitido por la API ) response = request.execute() # Procesa los comentarios iniciales comments_list.extend(process_comments(response['items'])) # Paginación: sigue solicitando comentarios mientras haya más páginas while response.get('nextPageToken', None): request = youtube.commentThreads().list( part='snippet,replies', videoId=video_id, pageToken=response['nextPageToken'], maxResults=100 ) response = request.execute() comments_list.extend(process_comments(response['items'])) # Muestra resultados finales print(f"\nTotal de comentarios obtenidos: {len(comments_list)}") print("\nPrimeros 3 comentarios:") for comment in comments_list[:3]: print(f"Autor: {comment['author']}") print(f"Comentario: {comment['comment'][:50]}...") # Muestra solo los primeros 50 caracteres print(f"Fecha: {comment['published_at']}") print("-" * 50) # Guardar en un archivo CSV with open("comentarios_youtube.csv", mode="w", newline="", encoding="utf-8") as f: writer = csv.DictWriter(f, fieldnames=["author", "comment", "published_at"]) writer.writeheader() writer.writerows(comments_list) print("\n✅ Comentarios guardados en 'comentarios_youtube.csv'") def process_comments(response_items): """ Procesa los comentarios crudos de la API y extrae información relevante. Args: response_items: Lista de comentarios sin procesar de la API Returns: Lista de diccionarios con información estructurada de cada comentario """ comments = [] for comment in response_items: try: top_comment = comment['snippet']['topLevelComment']['snippet'] author = top_comment['authorDisplayName'] comment_text = top_comment['textOriginal'] publish_time = top_comment['publishedAt'] comment_info = { 'author': author, 'comment': comment_text, 'published_at': publish_time } comments.append(comment_info) except KeyError as e: print(f"Error procesando comentario: {e}") print(f'Procesados {len(comments)} comentarios en este lote.') return comments if __name__ == "__main__": main()
Creación de Recusos en AWS

Para desplegar Apache Airflow en una instancia EC2 de AWS, primero debes iniciar sesión en la consola de AWS y dirigirte al servicio EC2 para lanzar una nueva instancia. Elige una Amazon Machine Image (AMI) basada en Ubuntu, como Ubuntu Server 24.04 LTS, ya que es compatible y ampliamente usada. Luego, selecciona un tipo de instancia adecuado; para pruebas, una t2.micro es suficiente si estás en el plan gratuito. Configura el almacenamiento (el valor por defecto de 8 GB suele ser suficiente) y elige o crea un par de claves SSH para acceder de forma segura a tu máquina. Asegúrate de configurar correctamente el grupo de seguridad: permitiendo el tráfico en el puerto 22 para el acceso SSH y el puerto 8080 para la interfaz web de Airflow.

Para iniciar una sesión SSH en tu instancia EC2, primero necesitas ubicarte en el directorio donde se encuentran las claves SSH que descargaste previamente, ya que estas son esenciales para la autenticación segura. Una vez en la carpeta correcta, dirígete a la consola de AWS, selecciona tu instancia de Airflow en la sección EC2, y haz clic en el botón "Conectar"; allí encontrarás los detalles y el comando SSH exacto que debes usar para acceder a tu máquina.

Cambiar los permisos para la clave ssh:
Conectarse a la instancia:
Actualizar las dependencias:
Instalar Python3:
Instalar Airflow:
Instalar Dependencias:
Buscar Airflow:
Agregar Airflow al PATH:
Crear la carpeta para almacenar los DAGs:

Posteriormente, deberás editar el archivo de configuración de Airflow, airflow.cfg. Dentro de este archivo, busca la variable dags_folder y modifica su valor para que apunte a la carpeta que creaste para tus DAGs, específicamente a la ruta /home/ubuntu/airflow/twitter_dag.

Usar el SequentialExecutor es la mejor opción para tu servidor con recursos limitados, ya que es más simple y ligero. Para configurarlo, deberás abrir tu archivo airflow.cfg y cambiar la configuración de executor de LocalExecutor a SequentialExecutor.

Adicionalmente, dentro del mismo archivo airflow.cfg, localiza la sección [core] y ajusta los siguientes parámetros: establece parallelism = 1, max_active_runs_per_dag = 1, y workers = 1.

Crear las carpetas para la gestión de Airflow:
Inicializar Airflow y la base de datos:
Crear el usuario admin::

Si te encuentras con problemas para añadir un nuevo usuario mediante los comandos habituales, una alternativa es utilizar airflow standalone. Sin embargo, ten en cuenta que este modo inicia muchos servicios simultáneamente, lo que podría requerir una instancia EC2 con más memoria de la que ofrece el plan gratuito, incurriendo en costos. Puedes optar por iniciar airflow standalone solo el tiempo suficiente para crear el usuario, asegurándote de guardar bien las credenciales de acceso. Es importante considerar que, debido a la posible saturación de la instancia en este modo, podría ser necesario un reinicio manual desde la consola de EC2. Si esto sucede, también necesitarás obtener una nueva conexión SSH, ya que la instancia podría cambiar su dirección, aunque tu clave SSH original seguirá siendo válida.

Levantar solo el servidor web:
Luego ejecuta el scheduler en otra terminal:

Para acceder a la interfaz web de Airflow en tu instancia EC2, primero dirígete a la sección de instancias en la consola de AWS y haz clic sobre tu instancia de Airflow para obtener su DNS público. Luego, pega este DNS en tu navegador web, asegurándote de apuntar al puerto 8080 (ej. http://tu-dns-publico:8080). Es crucial que la aplicación sea accesible externamente; para ello, navega hasta la sección de "Seguridad" dentro de los detalles de tu instancia, entra en el grupo de seguridad asociado y, a través del botón "Editar reglas de entrada" (Edit inbound rules), añade una nueva regla que permita todo el tráfico (All traffic) desde cualquier origen (Anywhere-IPv4).

Ten en cuenta que permitir todo el tráfico desde cualquier origen es un riesgo de seguridad. Sin embargo, dado que esta instancia es solo para pruebas y se eliminará después de terminar para evitar el consumo de recursos gratuitos, es aceptable en este contexto.

Ahora que Airflow está funcional, es necesario tener nuestro DAG listo. Este DAG, llamado twitter_dag.py, hará una llamada a la función run_twitter_etl que se encuentra dentro de nuestro script de Twitter, twitter_etl.py.

⌭ El siguiente DAG corre diariamente y ejecuta una función Python llamada run_twitter_etl, la cual conecta a la API de Twitter, extrae datos y los transforma o guarda en S3 de AWS.

# Importa la clase timedelta para manejar intervalos de tiempo. from datetime import timedelta, datetime # Importa la clase principal DAG de Airflow para definir el flujo de trabajo. from airflow import DAG # Importa el operador para ejecutar funciones de Python dentro del DAG. from airflow.providers.standard.operators.python import PythonOperator # Asegúrate de usar este import # Importa la función personalizada que realiza el proceso ETL desde un archivo externo. from twitter_etl import run_twitter_etl # Define argumentos por defecto que se aplican a todas las tareas del DAG. default_args = { 'owner': 'airflow', # Dueño del DAG (informativo) 'depends_on_past': False, # No depende de ejecuciones anteriores 'start_date': datetime(2020, 11, 8), # Fecha de inicio del DAG 'email': ['airflow@example.com'], # Correo de contacto (opcional) 'email_on_failure': False, # No enviar correo si falla 'email_on_retry': False, # No enviar correo al reintentar 'retries': 1, # Reintentar una vez en caso de error 'retry_delay': timedelta(minutes=1) # Esperar 1 minuto antes del reintento } # Define el DAG principal sin 'schedule_interval' aquí dag = DAG( 'twitter_dag', # Nombre del DAG default_args=default_args, # Usa los argumentos definidos antes description='Our first DAG with ETL process!', catchup=False # Para evitar que ejecute dag runs pasados ) # Asigna 'schedule_interval' después de la creación del DAG dag.schedule_interval = timedelta(days=1) # Se ejecuta cada 1 día # Define una tarea del DAG que ejecuta la función 'run_twitter_etl' desde el archivo importado. run_etl = PythonOperator( task_id='complete_twitter_etl', # Identificador de la tarea python_callable=run_twitter_etl, # Función que se ejecutará dag=dag, # DAG al que pertenece la tarea ) # Ejecuta la tarea (útil para visualizar en ciertas versiones, aunque no siempre necesario). run_etl

Para crear un bucket S3, inicia sesión en la Consola de AWS y navega hasta el servicio S3. Una vez allí, haz clic en "Crear bucket" y asígnale un nombre único que sea globalmente distintivo (por ejemplo, airflow-bucket-twitter-datos). Puedes dejar las configuraciones de bloqueo de acceso público y el resto de las opciones por defecto para empezar, a menos que tengas requisitos de seguridad específicos. Finalmente, haz clic en "Crear bucket" para completar el proceso.

En el script ETL es necesario modificar la línea donde se guarda el DataFrame para que apunte al bucket de Amazon S3. Esto se logra reemplazando la ruta local por df.to_csv('s3://airflow-bucket-twitter-datos/refined_tweets.csv', index=False), lo cual permite que los datos procesados se almacenen directamente en el bucket especificado.

En una terminal separada, necesitarás establecer una nueva conexión SSH a tu instancia EC2. Una vez que hayas accedido exitosamente a la instancia, el siguiente paso es navegar hasta el directorio de Airflow, que es donde se almacenan y gestionan los DAGs de tu proyecto.

Ahora, dentro del directorio twitter_dags, es fundamental crear los dos archivos de Python: twitter_etl.py y twitter_dag.py. Es crucial recordar que tu script twitter_etl.pydebe estar configurado para apuntar al bucket S3 que se creará, ya que allí se almacenarán los datos procesados, tal como se mostrará más adelante.

Después de realizar esos cambios, es necesario detener y reiniciar los servicios de Airflow para que detecte las nuevas configuraciones. Y luego, vuelve a ejecutar los comandos airflow api-server --port 8080 --workers 1 y airflow scheduler para reiniciar el servicio de Airflow. Esto asegurará que los cambios en la carpeta de DAGs y la configuración sean aplicados correctamente.

Encuentra el PID del proceso que está usando el puerto con:

Asegúrate de detener cualquier proceso que esté ocupando el puerto antes de iniciar el scheduler.

Matar los procesos que están ocupando el puerto:
Matar los procesos asociados a Airflow:
Matar los procesos asociados a Airflow:
Matar los procesos asociados a gunicorn:
Permisos extendidos:
Si necesitas resetear completamente la base de datos, puedes usar:

Cuidado: reset elimina todo lo que tengas en la base de datos de Airflow (DAGs ejecutados, conexiones, variables, etc.).

Aplicar todas las migraciones necesarias a la base de datos:
Levantar el servidor web:
Luego ejecuta el scheduler en otra terminal:
Verificar que el DAG esté disponible en la lista:
Ejecutar el DAG manualmente:

Para otorgar permisos de escritura a tu bucket S3, ve a la sección de Instancias en la consola de AWS. Arriba, en el menú desplegable de "Acciones" (Actions), selecciona "Seguridad" (Security) y luego "Modificar rol de IAM" (Modify IAM role). Aquí, crea un nuevo rol de IAM, asignándolo al servicio EC2 para permitirle interactuar con otros servicios de AWS. A continuación, busca y adjunta las políticas AmazonS3FullAccess y AmazonEC2FullAccess (esta última es opcional para el contexto de S3, pero útil para gestión de EC2). Finalmente, nombra el rol ec2_s3_airflow_role. Tras refrescar la página, el nuevo rol debería aparecer y estar listo para ser asociado a tu instancia.

Fue imposible hacer que los DAGs corrieran en la interfaz web usando la instancia gratuita de AWS EC2. Aunque se intentó especificar y reiniciar rutas alternativas en el archivo de configuración, Airflow simplemente no detectaba los DAGs al iniciar el scheduler y la interfaz web por separado. El modo standalone de Airflow, que podría haber sido una solución, tampoco funcionó debido a la limitación de recursos de la instancia gratuita. Sin embargo, al ejecutar todo el proceso de forma manual, el flujo completo se completó sin errores, lo que sugiere que el problema reside en la capacidad de la instancia para gestionar simultáneamente todos los servicios de Airflow necesarios para la detección automática de DAGs.

En una instancia con más recursos, el modo standalone de Airflow debería ejecutarse sin ningún problema.

Ejecutar el DAG manualmente con python: