Skip to main content

Campaign Strategies: Technical Development Tutorial

This comprehensive technical guide provides developers with everything needed to create campaign strategies in Python. Strategies are the core business logic components that control how campaigns operate, manage contact flow, and determine communication sequences.

Prerequisites

  • Python 3.12 knowledge and experience
  • Understanding of async/await patterns
  • Familiarity with dataclasses and type hints
  • Basic understanding of campaign concepts
  • Access to Flametree campaign interface

Strategy Types and Execution Flow

Campaign strategies execute at specific points in the campaign lifecycle:

Contact Import → Data Load Strategy

Communication Start → Agent Conversation
↓ (after each agent message)
Intermediate Analysis Strategy (optional)

Communication End → Aggregation Strategy

Data Load Strategy

Execution Trigger: When new contacts are imported into the campaign

Purpose:

  • Initialize contact data and parameters
  • Plan initial communications
  • Handle duplicate contact merging
  • Set initial contact stages and statuses

Function Signature:

async def run(
contact: Contact,
existing_contact: Contact | None,
logger,
**kwargs
) -> Contact:

Aggregation Strategy

Execution Trigger: After each communication session completes

Purpose:

  • Process conversation results and session outcomes
  • Determine next actions based on conversation results
  • Schedule follow-up communications
  • Update contact status and stage

Function Signature:

async def run(
contact: Contact,
session_results: dict | None,
logger,
**kwargs
) -> Contact:

Contact Answer Intermediate Analysis Strategy

Execution Trigger: After each agent response during active conversations

Purpose:

  • Monitor ongoing conversations in real-time
  • Update contact status during long conversations
  • Extend communication timeouts based on engagement
  • Make intermediate decisions without waiting for session end

Function Signature:

async def on_agent_answer(
contact: Contact,
message: Message,
**kwargs
) -> Contact | None:

Strategy File Structure

Self-Contained Implementation

Critical Requirement: Strategy files must be completely self-contained. Due to technical implementation constraints, you cannot:

  • Import custom modules or split code across multiple files
  • Use external dependencies beyond Python 3.12 built-in libraries
  • Reference external Python files or packages

Allowed Imports: Only Python 3.12 standard library modules:

import datetime
import zoneinfo
import json
import asyncio
import logging
from dataclasses import dataclass
from typing import Literal, Optional, Dict, List
# ... other standard library modules

Multi-Strategy File Organization

For better code organization and reusability, you can implement multiple strategies in a single file:

"""
Campaign Strategy File - Complete Implementation
Contains all strategies for the campaign with shared utilities
"""

import datetime
import zoneinfo
from dataclasses import dataclass
from typing import Literal, Optional, Dict, Tuple

from flametree.api.marketing.types import (
Contact, Communication, CommunicationStatus, CommunicationChannel,
ContactStatus, StatusCommunication, Message, MasterRecord, Attachment
)

# ---- Shared Constants and Configuration ---- #
TIMEZONE = zoneinfo.ZoneInfo("America/Mexico_City")
COMMUNICATION_HOURS = (9, 23)
MAX_RETRIES = 3

# ---- Shared Utility Classes (see modules below) ---- #
class OutcomeResolver:
# Implementation here...
pass

class CallScheduler:
# Implementation here...
pass

# ---- Strategy Functions ---- #

async def data_load_strategy(contact: Contact, existing_contact: Contact | None, logger, **kwargs) -> Contact:
"""Data Load Strategy Implementation"""
# Implementation logic here
return contact

async def aggregation_strategy(contact: Contact, session_results: dict | None, logger, **kwargs) -> Contact:
"""Aggregation Strategy Implementation"""
# Implementation logic here
return contact

async def intermediate_analysis_strategy(contact: Contact, message: Message, **kwargs) -> Contact | None:
"""Intermediate Analysis Strategy Implementation"""
# Implementation logic here
return contact

Strategy Assignment: When uploading the strategy file, specify the exact function name that corresponds to each strategy type in the campaign interface.

Core Data Structures

Contact Object

The Contact object is the primary data structure strategies work with:

@dataclass
class Contact:
# System identification fields (can be None)
id: str | None
external_id: str | None # Person identifier in external system
name: str | None
email: str | None
phone: str | None
object_id: str | None # Campaign subject identifier (e.g., contract ID)

# Campaign management fields
current_stage: str | None
status: ContactStatus # Success, Positive, Uncertain, Negative, Error
communications: list[Communication]

# Data storage
external_data: dict[str, Any] | None # Imported parameters (read-only)
internal_data: dict[str, Any] | None # Strategy-managed data
campaign_results: dict[str, Any] | None # Campaign results data
global_heap: dict[str, Any] | None # Campaign-wide shared data

# Master record reference
master_record: MasterRecord | None

Communication Object

Represents individual communication instances:

@dataclass
class Communication:
id: str | None
name: str # Human-readable communication name
stage_name: str # Stage this communication belongs to
start_date: datetime
finish_date: datetime
status: StatusCommunication # Current status with optional message
parameters: dict[str, Any] | None # Channel-specific parameters
flagged: bool = False # True for currently active communication
session_id: str | None # Associated session identifier
session_status: SessionStatus | None # Final session status
result: dict[str, Any] | None # Communication outcome data

def interrupt(self, message: str | None):
"""Cancel planned or stop active communication"""
self.status = StatusCommunication(
status=CommunicationStatus.INTERRUPTED,
message=message
)

Contact Status Enum

class ContactStatus(Enum):
Success = "Success" # Campaign goal achieved
Positive = "Positive" # Progress towards campaign goal
Uncertain = "Uncertain" # Default, no clear outcome yet
Negative = "Negative" # Campaign unsuccessful
Error = "Error" # Technical error occurred

Communication Status Enum

class CommunicationStatus(Enum):
PLANNED = "PLANNED" # Communication scheduled but not started
STARTED = "STARTED" # Communication is currently active
COMPLETED = "COMPLETED" # Communication finished successfully
INTERRUPTED = "INTERRUPTED" # Communication was cancelled
ERROR = "ERROR" # Communication failed due to error

Communication Channels

class CommunicationChannel(Enum):
telegram = "telegram" # Telegram messaging
whatsapp = "whatsapp" # WhatsApp messaging
chatwoot = "chatwoot" # Chatwoot CRM integration
intercom = "intercom" # Intercom platform
twilio = "twilio" # Twilio voice calls
email = "email" # Email communication
facebook = "facebook" # Facebook Messenger
human = "human" # Human agent handling
email_agent = "email_agent" # AI email agent
message_bird = "message_bird" # MessageBird WhatsApp
sip = "sip" # SIP voice calls
birdapiservice = "birdapiservice" # Bird API service
none = "none" # No specific channel
push = "push" # Push notifications

Message Object

Represents individual messages within conversations:

@dataclass
class Message:
id: str # Unique message identifier
number: int # Message sequence number in conversation
text: str # Message content
session_results: dict # Session results at time of message
meta: Optional[dict] # Additional metadata
attachments: list[Attachment] # Message attachments

👀 Note: session_results contains the conversation results collected up to that message point.

Attachment Object

Represents file attachments in messages:

@dataclass
class Attachment:
id: str # Unique attachment identifier
content_type: str # MIME type of the file
file_name: str # Original filename
file_path: str # Path to stored file

Master Record Object

Contains core contact information:

@dataclass
class MasterRecord:
id: str | None # Master record identifier
external_id: str | None # External system identifier
name: str | None # Contact name
email: str | None # Email address
phone: str | None # Phone number

Status Communication Object

Wraps communication status with optional message:

@dataclass
class StatusCommunication:
status: CommunicationStatus # Current status
message: str | None = None # Optional status message

Working with Nullable Fields

Many Contact fields can be None, requiring careful handling in strategies:

def safe_contact_access(contact: Contact) -> dict:
"""Example of safely accessing potentially None fields"""

# Always check external_data before accessing
template_id = None
if contact.external_data:
template_id = contact.external_data.get("templateId")

# Initialize internal_data if None
if contact.internal_data is None:
contact.internal_data = {}

# Initialize global_heap if None
if contact.global_heap is None:
contact.global_heap = {}

# Safe access to campaign_results
campaign_results = contact.campaign_results or {}

return {
"template_id": template_id,
"has_external_data": contact.external_data is not None,
"internal_data_count": len(contact.internal_data),
"global_heap_count": len(contact.global_heap),
"campaign_results_count": len(campaign_results)
}

Reusable Strategy Modules

Outcome Resolution Module

Manages contact status transitions and stage mapping:

NUMBER_ROTATION_LENGTH = 4
TIMEZONE = zoneinfo.ZoneInfo("America/Mexico_City")

@dataclass(frozen=True, slots=True)
class OutcomeCase:
"""Maps outcomes to stages and statuses"""
stage_name: str
contact_status: ContactStatus
apply_rotation: bool = False

class OutcomeResolver:
"""Centralized outcome mapping and stage management"""

# Define outcome mappings
NOT_INTERESTED = OutcomeCase("Not interested", ContactStatus.Negative)
FOLLOW_UP = OutcomeCase("In process", ContactStatus.Positive)
REMINDER = OutcomeCase("Reminder scheduled", ContactStatus.Positive, apply_rotation=True)
TRANSFER_TO_AGENT = OutcomeCase("Transferred to human agent", ContactStatus.Uncertain)
FINISHED = OutcomeCase("Finished", ContactStatus.Success)
PROMISE_TO_FINISH = OutcomeCase("Promise to finish", ContactStatus.Success)
NO_RESPONSE = OutcomeCase("No response", ContactStatus.Uncertain)

# Internal stages
_INITIAL_STAGE = OutcomeCase("Outbound initial", ContactStatus.Uncertain, apply_rotation=True)
_FIRST_MSG_SENT = OutcomeCase("Awaiting for response", ContactStatus.Uncertain)

@classmethod
def get_entry(cls, key: str, default: OutcomeCase | None = None) -> OutcomeCase:
"""Get outcome case by key with fallback"""
if default is None:
default = cls.NO_RESPONSE
return getattr(cls, key, default)

@classmethod
def get_actual_status(cls, contact: Contact, session_results: dict,
force_outcome_status: str | None = None) -> Tuple[str, OutcomeCase]:
"""Determine contact status and stage from session results"""
if force_outcome_status is None:
outcome_status = session_results.get("OutcomeStatus", "NO_RESPONSE")
else:
outcome_status = force_outcome_status

outcome_case = cls.get_entry(outcome_status)

# Apply number rotation if needed
stage_name = outcome_case.stage_name
if outcome_case.apply_rotation:
stage_name = cls._apply_number_rotation(stage_name, contact)
modified_outcome_case = OutcomeCase(
stage_name=stage_name,
contact_status=outcome_case.contact_status,
apply_rotation=False
)
return outcome_status, modified_outcome_case

return outcome_status, outcome_case

@classmethod
def _apply_number_rotation(cls, stage_name: str, contact: Contact) -> str:
"""Apply number rotation to stage name"""
rotation_id: int | None = contact.internal_data.get("rotation_number_id")

if rotation_id is None:
rotation_id = cls._produce_new_number_rotation(contact)
contact.internal_data["rotation_number_id"] = rotation_id

return f"{stage_name} - {rotation_id}"

@staticmethod
def _produce_new_number_rotation(contact: Contact, method: Literal["counter", "random", "by_day"] = "counter") -> int:
"""Generate new rotation number using specified method"""

match method:
case "random":
rotation_id = (
int(datetime.datetime.now(tz=TIMEZONE).timestamp())
% NUMBER_ROTATION_LENGTH
)
case "counter":
rotation_id = contact.global_heap.get("latest_registered_rotation_number_id", -1)
rotation_id = (rotation_id + 1) % NUMBER_ROTATION_LENGTH
contact.global_heap["latest_registered_rotation_number_id"] = rotation_id
case "by_day":
rotation_id = (
datetime.datetime.now(tz=TIMEZONE).date().toordinal()
% NUMBER_ROTATION_LENGTH
)
case _:
raise ValueError(f"Unknown rotation method: {method}")

return rotation_id

Communication Scheduling Module

Manages time-based communication scheduling with rate limiting:

# Configuration constants
COMMUNICATION_HOURS_INTERVAL = (9, 23) # Allowed hours for communication
COMMUNICATION_COUNT_PER_INTERVAL = 7 # Max communications per time bucket
COMMUNICATION_INTERVAL_MINUTES = 59 # Bucket duration in minutes
TIMEZONE = zoneinfo.ZoneInfo("America/Mexico_City")

class CallScheduler:
"""Advanced communication scheduler with rate limiting and time windows"""

@classmethod
async def register_call(cls, storage: Dict) -> Tuple[Optional[datetime.datetime], Dict]:
"""
Reserve the next available communication slot
Returns (scheduled_time, updated_storage) or (None, storage) if no slots available
"""

# Initialize scheduler data
schedule_data = storage.setdefault("schedule_data", {})
bucket_counts = schedule_data.setdefault("bucket_counts", {})

now = datetime.datetime.now(tz=TIMEZONE)
today = now.date()

# Clean old bucket data
cls._clean_old_buckets(bucket_counts, today, TIMEZONE)

# Try to find slot today (not earlier than current time)
slot_time = cls._find_slot_for_date(bucket_counts, today, from_time=now, timezone=TIMEZONE)

# If not found today, try tomorrow
if slot_time is None:
tomorrow = today + datetime.timedelta(days=1)
slot_time = cls._find_slot_for_date(bucket_counts, tomorrow, timezone=TIMEZONE)

if slot_time is None:
return None, storage

# Reserve the slot
bucket_start = cls._get_bucket_start(slot_time, TIMEZONE)
bucket_key = str(int(bucket_start.timestamp()))
bucket_counts[bucket_key] = bucket_counts.get(bucket_key, 0) + 1

# Update last used bucket
schedule_data["last_bucket"] = int(bucket_start.timestamp())

return slot_time, storage

@classmethod
def _clean_old_buckets(cls, bucket_counts: Dict[str, int], current_date: datetime.date, timezone):
"""Remove expired bucket data"""
day_start = datetime.datetime.combine(current_date, datetime.time.min, tzinfo=timezone)
cutoff_timestamp = int(day_start.timestamp())

keys_to_remove = [key for key in bucket_counts.keys() if int(key) < cutoff_timestamp]
for key in keys_to_remove:
del bucket_counts[key]

@classmethod
def _find_slot_for_date(cls, bucket_counts: Dict[str, int], date: datetime.date,
from_time: Optional[datetime.datetime] = None, timezone=None) -> Optional[datetime.datetime]:
"""Find first available slot for specified date"""
start_hour, end_hour = COMMUNICATION_HOURS_INTERVAL

day_start = datetime.datetime(date.year, date.month, date.day, start_hour, 0, tzinfo=timezone)
day_end = datetime.datetime(date.year, date.month, date.day, end_hour, 0, tzinfo=timezone)

search_start = max(day_start, from_time) if from_time else day_start
bucket_start = cls._align_to_bucket_boundary(search_start, day_start)
bucket_duration = datetime.timedelta(minutes=COMMUNICATION_INTERVAL_MINUTES)

while bucket_start + bucket_duration <= day_end:
bucket_key = str(int(bucket_start.timestamp()))
current_count = bucket_counts.get(bucket_key, 0)

if current_count < COMMUNICATION_COUNT_PER_INTERVAL:
slot_offset = (bucket_duration / COMMUNICATION_COUNT_PER_INTERVAL) * current_count
slot_time = bucket_start + slot_offset

if from_time is None or slot_time >= from_time:
return slot_time

bucket_start += bucket_duration

return None

@classmethod
def _align_to_bucket_boundary(cls, target_time: datetime.datetime, day_start: datetime.datetime) -> datetime.datetime:
"""Align time to nearest bucket boundary"""
minutes_from_start = (target_time - day_start).total_seconds() / 60
bucket_index = int(minutes_from_start // COMMUNICATION_INTERVAL_MINUTES)

if minutes_from_start % COMMUNICATION_INTERVAL_MINUTES > 0:
bucket_index += 1

return day_start + datetime.timedelta(minutes=bucket_index * COMMUNICATION_INTERVAL_MINUTES)

@classmethod
def _get_bucket_start(cls, slot_time: datetime.datetime, timezone) -> datetime.datetime:
"""Get bucket start time for given slot"""
day_start = datetime.datetime.combine(
slot_time.date(),
datetime.time(COMMUNICATION_HOURS_INTERVAL[0]),
tzinfo=timezone
)
minutes_from_start = (slot_time - day_start).total_seconds() / 60
bucket_index = int(minutes_from_start // COMMUNICATION_INTERVAL_MINUTES)

return day_start + datetime.timedelta(minutes=bucket_index * COMMUNICATION_INTERVAL_MINUTES)

@classmethod
def adjust_time_to_business_hours(cls, planned_time: datetime.datetime) -> datetime.datetime:
"""Adjust planned time to fall within business hours"""
start_hour, end_hour = COMMUNICATION_HOURS_INTERVAL
hour = planned_time.hour

# If within allowed interval
if start_hour <= hour < end_hour:
return planned_time

# If before allowed interval on same day
if hour < start_hour:
return planned_time.replace(hour=start_hour, minute=0, second=0, microsecond=0)

# If after end of allowed interval, shift to next day
next_day = planned_time + datetime.timedelta(days=1)
return next_day.replace(hour=start_hour, minute=0, second=0, microsecond=0)

Communication Creation Helper

Standardizes communication creation with proper parameters:

EMPTY_DATA_PLACEHOLDER = "unknown"

async def create_whatsapp_communication(
contact: Contact,
stage_name: str,
name: str,
start_at: datetime.datetime,
duration_min: int = 1440, # 24 hours default
extra_params: Dict | None = None,
language: str = "en"
) -> Communication:
"""Create a WhatsApp communication with standardized parameters"""

# Build environment information
env_info = contact.external_data.copy() if contact.external_data else {}
env_info.update({
"Client Name": contact.name or EMPTY_DATA_PLACEHOLDER,
"Client Email": contact.email or EMPTY_DATA_PLACEHOLDER,
"Client Phone": contact.phone or EMPTY_DATA_PLACEHOLDER,
})

if extra_params:
env_info.update(extra_params)

parameters = {
"env_info": env_info,
"session_info": env_info,
"language": language,
}

communication = Communication(
id=None,
name=name,
stage_name=stage_name,
start_date=start_at,
finish_date=start_at + datetime.timedelta(minutes=duration_min),
status=StatusCommunication(status=CommunicationStatus.PLANNED, message=None),
parameters=parameters,
session_id=None,
session_status=None,
result=None,
flagged=False
)

return communication

Retry Logic Module

Manages communication retry attempts with configurable limits:

class RetryManager:
"""Manages retry attempts for failed communications"""

def __init__(self, max_retries: int = 3, retry_delays: Dict[str, int] = None):
self.max_retries = max_retries
self.retry_delays = retry_delays or {
"NO_RESPONSE": 1440, # 24 hours
"PROMISE_TO_FINISH": 1440, # 24 hours
"DEFAULT": 720 # 12 hours
}

def should_retry(self, contact: Contact, outcome_status: str) -> bool:
"""Check if contact should be retried based on attempt count"""
attempts = contact.internal_data.get("_communication_attempts", 0)
return attempts < self.max_retries

def get_retry_delay(self, outcome_status: str) -> int:
"""Get retry delay in minutes for given outcome"""
return self.retry_delays.get(outcome_status, self.retry_delays["DEFAULT"])

def increment_attempts(self, contact: Contact) -> int:
"""Increment and return attempt count"""
attempts = contact.internal_data.get("_communication_attempts", 0) + 1
contact.internal_data["_communication_attempts"] = attempts
return attempts

def reset_attempts(self, contact: Contact):
"""Reset attempt counter"""
contact.internal_data["_communication_attempts"] = 0

Complete Strategy Implementation Examples

Data Load Strategy Example

TIMEZONE = zoneinfo.ZoneInfo("America/Mexico_City")
TARGET_TEMPLATE_IDS = ("DocumentVerificationIsWaiting", "AppLastStep")
INITIAL_STAGE_NAME = "Outbound initial"
FINISH_STAGE_NAME = "Finished"
SKIP_STAGES = {"Finished", "Reminder scheduled", "Outbound initial"}

async def data_load_strategy(contact: Contact, existing_contact: Contact | None, logger, **kwargs) -> Contact:
"""
Initialize contacts and schedule first communication
Handles duplicate contacts and validates target criteria
"""

now = datetime.datetime.now(tz=TIMEZONE)

# Initialize data structures if None
if contact.internal_data is None:
contact.internal_data = {}
if contact.global_heap is None:
contact.global_heap = {}

# Validate contact has required template ID
template_id = contact.external_data.get("templateId") if contact.external_data else None
if template_id is None:
logger.error(f"Contact {contact.id} missing templateId")
contact.current_stage = FINISH_STAGE_NAME
return contact

# Check if contact meets target criteria
if template_id not in TARGET_TEMPLATE_IDS:
logger.info(f"Contact {contact.id} with template {template_id} not in target")
contact.current_stage = FINISH_STAGE_NAME
contact.status = ContactStatus.Success
return contact

# Skip already processed contacts
if (existing_contact and
existing_contact.current_stage in SKIP_STAGES):
logger.info(f"Contact {contact.id} already processed")
return contact

# Cancel any existing planned communications
if existing_contact:
for comm in contact.communications:
if comm.status.status == CommunicationStatus.PLANNED:
comm.interrupt("Interrupted by new load")

# Schedule initial communication using CallScheduler
start_time, contact.global_heap = await CallScheduler.register_call(contact.global_heap)

if start_time is None:
logger.warning(f"No available slots for contact {contact.id}")
return contact

# Create first communication
_, outcome_case = OutcomeResolver.get_actual_status(
contact, {}, force_outcome_status="_INITIAL_STAGE"
)

first_comm = await create_whatsapp_communication(
contact=contact,
stage_name=outcome_case.stage_name,
name="Initial WhatsApp Message",
start_at=start_time,
language="es"
)

contact.communications.append(first_comm)
contact.current_stage = outcome_case.stage_name

return contact

Aggregation Strategy Example

TIMEZONE = zoneinfo.ZoneInfo("America/Mexico_City")

async def aggregation_strategy(contact: Contact, session_results: dict | None, logger, **kwargs) -> Contact:
"""
Process completed sessions and determine next actions
Handles various conversation outcomes and schedules follow-ups
"""
retry_manager = RetryManager(max_retries=3)

now = datetime.datetime.now(tz=TIMEZONE)
actual_session_results = session_results or {}

# Initialize data structures if None
if contact.internal_data is None:
contact.internal_data = {}

# Mark current communication as completed
for comm in contact.communications:
if comm.flagged:
comm.status.status = CommunicationStatus.COMPLETED
logger.info(f"Communication completed with status: {comm.session_status}")

# Determine outcome and update contact
outcome_status, outcome_case = OutcomeResolver.get_actual_status(
contact, actual_session_results
)
contact.status = outcome_case.contact_status
contact.current_stage = outcome_case.stage_name

# Handle specific outcomes
match outcome_status:
case "REMINDER":
# Schedule reminder for specific date
reminder_date_str = actual_session_results.get("FollowUpDate")
try:
reminder_dt = datetime.datetime.fromisoformat(reminder_date_str)
if reminder_dt.tzinfo is None:
reminder_dt = reminder_dt.replace(tzinfo=TIMEZONE)
except Exception as e:
logger.error(f"Invalid reminder date: {reminder_date_str} - {e}")
reminder_dt = now + datetime.timedelta(days=1)

reminder_comm = await create_whatsapp_communication(
contact=contact,
stage_name="Reminder scheduled",
name="Scheduled Reminder",
start_at=reminder_dt,
language="es"
)
contact.communications.append(reminder_comm)

case "NO_RESPONSE" | "PROMISE_TO_FINISH":
# Handle retry logic
if retry_manager.should_retry(contact, outcome_status):
attempts = retry_manager.increment_attempts(contact)
delay_minutes = retry_manager.get_retry_delay(outcome_status)

retry_time = CallScheduler.adjust_time_to_business_hours(
now + datetime.timedelta(minutes=delay_minutes)
)

retry_comm = await create_whatsapp_communication(
contact=contact,
stage_name="Outbound initial",
name=f"Follow-up #{attempts}",
start_at=retry_time,
language="es"
)
contact.communications.append(retry_comm)
else:
logger.info(f"Max retries exceeded for contact {contact.id}")

case "TRANSFER_TO_AGENT":
# Log transfer, no additional action needed
logger.info(f"Contact {contact.id} transferred to human agent")

case "FINISHED":
# Mark as successfully completed
retry_manager.reset_attempts(contact)
logger.info(f"Contact {contact.id} successfully completed campaign")

return contact

Intermediate Analysis Strategy Example

TIMEZONE = zoneinfo.ZoneInfo("America/Mexico_City")
CHAT_DURATION_MIN = 1440 # 24 hours
SHORT_CHAT_DURATION_MIN = 180 # 3 hours for quick cases

async def intermediate_analysis_strategy(contact: Contact, message: Message, **kwargs) -> Contact | None:
"""
Monitor conversations in real-time and update contact status
Extends conversation timeouts and tracks engagement
"""

now = datetime.datetime.now(tz=TIMEZONE)
session_results = message.session_results or {}

# Initialize internal_data if None
if contact.internal_data is None:
contact.internal_data = {}

# Update timestamp for latest activity
contact.internal_data["_last_message_timestamp_int"] = str(int(now.timestamp()))

# Handle first message sent
if message.number == 1:
_, outcome_case = OutcomeResolver.get_actual_status(
contact, session_results, force_outcome_status="_FIRST_MSG_SENT"
)
contact.status = outcome_case.contact_status
contact.current_stage = outcome_case.stage_name

# Handle subsequent messages (user engagement)
elif message.number >= 2:
outcome_status, outcome_case = OutcomeResolver.get_actual_status(
contact, session_results
)
contact.status = outcome_case.contact_status
contact.current_stage = outcome_case.stage_name

# Extend communication timeout when user is engaged
for communication in contact.communications:
if communication.flagged:
# Use shorter duration for certain outcomes
if outcome_case.stage_name in ("SCHEDULE_REMINDER",):
new_finish = now + datetime.timedelta(minutes=SHORT_CHAT_DURATION_MIN)
else:
new_finish = now + datetime.timedelta(minutes=CHAT_DURATION_MIN)

communication.finish_date = new_finish

return contact

Working with Campaign and Communication Results

Strategies can access and update both campaign results and individual communication results:

def process_campaign_results(contact: Contact, session_results: dict) -> Contact:
"""Example of working with campaign and communication results"""

# Initialize campaign_results if None
if contact.campaign_results is None:
contact.campaign_results = {}

# Extract data from session results
customer_name = session_results.get("CustomerName")
payment_promise = session_results.get("PromiseToPayDate")
debt_verified = session_results.get("DebtVerified", False)

# Update campaign results (visible in Contact interface)
if customer_name:
contact.campaign_results["verified_name"] = customer_name
if payment_promise:
contact.campaign_results["promised_payment_date"] = payment_promise
if debt_verified:
contact.campaign_results["debt_acknowledged"] = True

# Store results in active communication
for comm in contact.communications:
if comm.flagged:
if comm.result is None:
comm.result = {}
comm.result.update({
"outcome_status": session_results.get("OutcomeStatus", "UNKNOWN"),
"conversation_length": session_results.get("MessageCount", 0),
"resolution_achieved": session_results.get("ResolutionAchieved", False)
})

return contact

Advanced Strategy Patterns

Dynamic Campaign Parameters

def get_dynamic_parameters(contact: Contact, base_params: dict) -> dict:
"""Generate dynamic parameters based on contact data"""

params = base_params.copy()

# Safe access to external data
external_data = contact.external_data or {}
debt_amount = external_data.get("amount", "0")
due_date = external_data.get("due_date", "")

params["env_info"].update({
"Personalized_Amount": f"${debt_amount}",
"Days_Overdue": calculate_days_overdue(due_date),
"Customer_Segment": determine_customer_segment(contact),
})

return params

Testing and Debugging

Logging Best Practices

async def example_strategy_with_logging(contact: Contact, session_results: dict | None, logger, **kwargs) -> Contact:
"""Example showing comprehensive logging practices"""

# Log strategy entry
logger.info(f"STRATEGY: Processing contact {contact.id} in stage {contact.current_stage}")

try:
# Log important data points
logger.debug(f"Session results: {session_results}")
logger.debug(f"Contact external data: {contact.external_data}")

# Log business logic decisions
outcome_status = session_results.get("OutcomeStatus", "NO_RESPONSE") if session_results else "NO_RESPONSE"
logger.info(f"STRATEGY: Outcome status determined as: {outcome_status}")

# Process outcome...

# Log final state
logger.info(f"STRATEGY: Contact {contact.id} moved to stage {contact.current_stage} with status {contact.status}")

return contact

except Exception as e:
logger.error(f"STRATEGY: Error processing contact {contact.id}: {str(e)}")
# Return contact unchanged on error
return contact

Error Handling Patterns

def safe_data_extraction(contact: Contact, key: str, default_value: any = None) -> any:
"""Safely extract data from contact with fallback"""
try:
if contact.external_data is None:
return default_value
value = contact.external_data.get(key, default_value)
if value is None or value == "":
return default_value
return value
except Exception:
return default_value

def safe_datetime_parsing(date_string: str, timezone, fallback_days: int = 1) -> datetime.datetime:
"""Safely parse datetime with fallback"""
try:
dt = datetime.datetime.fromisoformat(date_string)
if dt.tzinfo is None:
dt = dt.replace(tzinfo=timezone)
return dt
except Exception:
return datetime.datetime.now(tz=timezone) + datetime.timedelta(days=fallback_days)

Data Initialization Best Practices

Critical: Always initialize None fields before use to prevent errors:

def initialize_contact_data(contact: Contact):
"""Initialize contact data structures to prevent None errors"""

# Initialize internal_data for strategy use
if contact.internal_data is None:
contact.internal_data = {}

# Initialize global_heap for campaign-wide data
if contact.global_heap is None:
contact.global_heap = {}

# Initialize campaign_results for result storage
if contact.campaign_results is None:
contact.campaign_results = {}

async def safe_strategy_wrapper(contact: Contact, *args, **kwargs) -> Contact:
"""Wrapper to ensure data initialization in all strategies"""

# Always initialize before processing
initialize_contact_data(contact)

# Your strategy logic here...
return contact

Performance Optimization

Efficient Data Access

def optimize_contact_processing(contact: Contact) -> Contact:
"""Optimize contact data access patterns"""

# Cache frequently accessed values with None checks
contact_data = contact.external_data or {}
internal_data = contact.internal_data or {}

# Initialize internal_data if None
if contact.internal_data is None:
contact.internal_data = {}

# Batch updates to internal_data
updates = {}
updates["last_processed"] = datetime.datetime.now().isoformat()
updates["processing_version"] = "2.0"
contact.internal_data.update(updates)

return contact

Integration Examples

External API Integration

import json
from urllib.request import Request, urlopen
from urllib.error import URLError

API_URL = "https://api.example-crm.com/contacts/update"

async def update_external_crm(contact: Contact, outcome: str) -> bool:
"""Example external CRM update (using only standard library)"""

try:
# Prepare API request
payload = {
"external_id": contact.external_id,
"campaign_outcome": outcome,
"last_contact_date": datetime.datetime.now().isoformat(),
"current_stage": contact.current_stage
}

# Make HTTP request using urllib (standard library)
request = Request(
API_URL,
data=json.dumps(payload).encode('utf-8'),
headers={
'Content-Type': 'application/json',
'Authorization': 'Bearer YOUR_API_TOKEN'
}
)

with urlopen(request, timeout=10) as response:
result = json.loads(response.read().decode('utf-8'))
return result.get('status') == 'success'

except URLError as e:
# Log error but don't fail the strategy
print(f"CRM update failed: {e}")
return False

Deployment Checklist

Pre-Upload Validation

  • All required imports are Python 3.12 standard library only
  • No external module dependencies or custom imports
  • Strategy functions have correct signatures
  • All constants and configuration are defined within the file
  • Error handling is implemented for external data access
  • Logging statements are included for debugging
  • Test scenarios have been validated

Function Naming Convention

When uploading strategies to the platform:

  • Data Load Strategy: Function name (e.g., data_load_strategy)
  • Aggregation Strategy: Function name (e.g., aggregation_strategy)
  • Intermediate Analysis: Function name (e.g., intermediate_analysis_strategy)

Ensure the exact function names are specified in the campaign strategy configuration interface.

FAQ

Can I use external libraries in my strategy?

No, strategy files must be completely self-contained and can only use Python 3.12 standard library modules.

How do I share code between multiple strategies?

Implement multiple strategies in a single file with shared utility classes and functions at the top of the file.

What happens if my strategy throws an exception?

The contact will be returned unchanged. Always implement proper error handling and logging to prevent strategy failures.

Can I modify the Contact object in intermediate analysis?

Yes, but changes should be minimal and focused on real-time monitoring. Major logic should remain in the aggregation strategy.