Skip to content

Production Integration

Production Checklist

Before deploying in production, confirm:

  • Tested all the streams in the Sandbox
  • Implemented retry with exponential backoff
  • Implemented automatic renewal of token
  • Set 4xx/5 error monitoring xx
  • Set up rate limit alerts
  • Stored credentials in environment variables (not in code)
  • Implemented audit logs
  • Tested failure scenarios (token expired, rate limit, timeout)

Error Treatment

Treatment Hierarchy

Python
import requests
import time
import logging

logger = logging.getLogger(__name__)

def requisicao_robusta(url: str, headers: dict, method: str = "GET", 
                        payload: dict = None, max_retries: int = 3) -> dict:
    """Requisição com tratamento completo de erros"""

    for tentativa in range(1, max_retries + 1):
        try:
            response = requests.request(
                method, url,
                json=payload,
                headers=headers,
                timeout=30  # Timeout de 30s
            )

            if response.status_code == 401:
                logger.warning("Token expirado — renovando...")
                raise TokenExpiredError()

            if response.status_code == 429:
                retry_after = int(response.headers.get("Retry-After", 60))
                logger.warning(f"Rate limit atingido. Aguardando {retry_after}s")
                time.sleep(retry_after)
                continue

            if response.status_code >= 500:
                if tentativa < max_retries:
                    wait = 2 ** tentativa
                    logger.warning(f"Erro {response.status_code}. Retry {tentativa}/{max_retries} em {wait}s")
                    time.sleep(wait)
                    continue

            response.raise_for_status()
            return response.json()

        except requests.Timeout:
            logger.error(f"Timeout na tentativa {tentativa}")
            if tentativa == max_retries:
                raise
            time.sleep(2 ** tentativa)

    raise Exception(f"Falha após {max_retries} tentativas")

Retry and Backoff Exponential

Python
import time
import random

def backoff_exponencial(tentativa: int, base: float = 1.0, 
                         max_wait: float = 60.0, jitter: bool = True) -> float:
    """Calcula tempo de espera com backoff exponencial + jitter"""
    wait = min(base * (2 ** tentativa), max_wait)
    if jitter:
        wait *= (0.5 + random.random() * 0.5)  # Jitter de 50-100%
    return wait

# Uso
for tentativa in range(5):
    try:
        resultado = fazer_requisicao(url, headers)
        break
    except APIError as e:
        if e.status_code in (429, 500, 502, 503, 504):
            wait = backoff_exponencial(tentativa)
            print(f"⏳ Aguardando {wait:.1f}s antes do retry...")
            time.sleep(wait)
        else:
            raise  # Não fazer retry para erros 4xx (exceto 429)

Rate Limit Control

Headers Monitoring

Python
def monitorar_rate_limit(response: requests.Response):
    """Monitora e alerta sobre uso do rate limit"""
    limit = int(response.headers.get("X-RateLimit-Limit", 0))
    remaining = int(response.headers.get("X-RateLimit-Remaining", 0))
    reset = response.headers.get("X-RateLimit-Reset", "")

    if limit > 0:
        percentual = (remaining / limit) * 100

        if percentual < 10:
            logger.warning(f"⚠️ Rate limit crítico: {remaining}/{limit} restantes ({percentual:.0f}%)")
        elif percentual < 20:
            logger.info(f"Rate limit baixo: {remaining}/{limit} restantes")

Preventive Limiter Rate

Python
from threading import Semaphore
import time

class RateLimiter:
    """Limita requisições para não exceder o rate limit"""

    def __init__(self, max_per_minute: int = 40):
        self.max_per_minute = max_per_minute
        self.semaphore = Semaphore(max_per_minute)
        self.requests_times = []

    def acquire(self):
        now = time.time()
        # Remover timestamps > 1 minuto atrás
        self.requests_times = [t for t in self.requests_times if now - t < 60]

        if len(self.requests_times) >= self.max_per_minute:
            wait = 60 - (now - self.requests_times[0])
            time.sleep(wait)

        self.requests_times.append(time.time())

limiter = RateLimiter(max_per_minute=40)  # 80% do limite de produção

Logs and Audits

Python
import logging
import json
from datetime import datetime

# Configurar logger estruturado
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s %(levelname)s %(name)s %(message)s'
)

def log_requisicao(method: str, url: str, status: int, 
                    duration_ms: float, payload: dict = None):
    """Log estruturado de requisições para auditoria"""
    logger.info(json.dumps({
        "timestamp": datetime.utcnow().isoformat(),
        "method": method,
        "url": url,
        "status": status,
        "duration_ms": round(duration_ms, 2),
        "payload_keys": list(payload.keys()) if payload else []
    }))

Monitoring and Observability

Metric Alert on
Error rate 4xx/5 xx > 1% of requests
Rate limit used > 80%
Response time (p95) > 3 seconds
Failed Syncs > 0 in 30 min

Integration Healthcheck

Python
def healthcheck_api(auth) -> bool:
    """Verifica se a API está acessível"""
    try:
        headers = auth.get_headers()
        response = requests.get(
            "https://apis.pontotel.com.br/pontotel/api/v4/empregadores/",
            headers=headers,
            params={"page_size": 1},
            timeout=10
        )
        return response.status_code == 200
    except Exception:
        return False

Next Steps