Геокодиране на 1 милион адреса: от 8 часа до 12 минути

Реален case study: как свалихме 1M-row geocoding job от 8 часа до 12 минути. Concurrency, batching, caching и математиката зад всяко решение.

| April 30, 2026
Геокодиране на 1 милион адреса: от 8 часа до 12 минути

Logistics клиент пускаше нощно геокодиране на около един милион адреса за доставка. Job-ът тръгваше след 22:00 cutoff-а за маршрутизация за следващия ден и трябваше да приключи преди шофьорите да влязат в 06:00. Месеци наред обикновено приключваше навреме. После обемът започна да расте, job-ът започна да се прехвърля в сутринта и една лоша нощ приключи в 09:15 — момент, в който dispatch вече беше нарязал маршрутите ръчно. Осемчасовият pipeline се беше превърнал в бизнес проблем.

Пет промени по-късно същият нощен job работи за дванадесет минути. Нищо екзотично. Без нов vendor, без пренаписване. Само bounded concurrency, batch endpoint, cache слой, streaming pipeline и normalization pre-pass — приложени в реда, който даде на всяко едно от тях шанс да има значение. Тази статия е play-by-play, с реалните числа, които измерихме, и математиката зад това защо всяка стъпка свърши работа.

Стартовата точка: 8 часа

Оригиналният loop беше тип код, който изглежда разумно в code review и се държи като тухла на скейл. Един worker, sequential calls, нищо async:

# original.py — the eight-hour version
import csv, requests

with open('addresses.csv') as f:
    rows = list(csv.DictReader(f))

results = []
for row in rows:
    r = requests.post('https://api.csv2geo.com/v1/geocode', json=row, timeout=30)
    results.append(r.json())

with open('out.csv', 'w') as f:
    csv.DictWriter(f, fieldnames=results[0].keys()).writeheader()
    csv.DictWriter(f, fieldnames=results[0].keys()).writerows(results)

End-to-end времето за ред беше около 30 ms — TLS handshake reuse през connection pool-а, server-side geocode за 5-15 ms, мрежа обратно. Умножено по един милион реда, това прави 30 000 секунди, или осем часа и двадесет минути. Добави CSV parse и write и job-ът излизаше някъде към осем часа и петнадесет минути.

Нямаше никакъв резерв. 50 ms скок в p99 latency добавяше час. Retry loop върху няколко хиляди 429-ки можеше да удвои runtime-а. Още по-зле — loop-ът зареждаше всички редове в паметта преди първият request да тръгне, което означаваше 400 MB resident set на малка worker box и нула progress, видим за monitoring-а, докато job-ът или приключи, или умре.

Първото нещо, което трябва да усвоиш: single-threaded loop прави едно нещо през цялото време — *чака мрежата*. CPU-то е idle. TCP socket-ът е idle през по-голямата част от round trip-а. Осемте часа са почти изцяло сън. Точно това е workload-ът, за който съществува concurrency.

Стъпка 1: bounded concurrency (8 часа → 90 минути)

Въведохме semaphore-bounded worker pool с concurrency 16. Шестнадесет in-flight request-а във всеки момент, всеки със среден ход 30 ms, дадоха теоретична throughput от ~533 реда в секунда — около 1 875 секунди за милион реда. Измерено: 1 920 секунди, или 32 минути чисто compute. С CSV I/O и долната опашка run-ът излезе около 90 минути end-to-end.

# step1.py — bounded concurrency
import asyncio, aiohttp, csv

CONCURRENCY = 16

async def geocode_one(session, sem, row):
    async with sem:
        async with session.post(
            'https://api.csv2geo.com/v1/geocode', json=row, timeout=30,
        ) as r:
            return await r.json()

async def main():
    with open('addresses.csv') as f:
        rows = list(csv.DictReader(f))
    sem = asyncio.Semaphore(CONCURRENCY)
    async with aiohttp.ClientSession() as session:
        results = await asyncio.gather(*[
            geocode_one(session, sem, row) for row in rows
        ])
    return results

Защо 16, а не 64? Защото кривата се изравнява. Бенчмарквахме на 4, 8, 16, 32, 64 и 128 in-flight. Числата:

| Concurrency | Throughput (rows/sec) | p99 latency (ms) | Effective speedup | |---:|---:|---:|---:| | 1 | 33 | 35 | 1x | | 8 | 250 | 38 | 7.5x | | 16 | 533 | 42 | 16x | | 32 | 760 | 78 | 23x | | 64 | 870 | 210 | 26x | | 128 | 880 | 540 | 26x |

Throughput-ът се изравнява около 32-64, защото насищаме per-account fairness budget-а на upstream-а. Колоната p99 разказва истинската история: след 16 разменяме маргинален throughput за tail latency, който задейства нашата retry логика — а retry-итата са чиста загуба. Шестнадесет беше коляното на кривата. По-задълбочена версия на този анализ има в статията за намиране на concurrency sweet spot на твоя geocoder.

Междинният резултат беше полезен: осемчасовият job вече беше деветдесет минути и не бяхме пипали нищо по адресите, API-то или ценообразуването. Шестнадесет реда промяна преместиха bottleneck-а от "wall clock" на "цена на API call за ред".

Стъпка 2: batch endpoint (90 мин → 35 мин)

Single-row endpoint-ът има фиксиран per-request overhead — TLS, HTTP framing, JSON envelope, server-side request setup. При 30 ms на call някъде между 5 и 10 ms от тях са overhead, в който geocoder-ът не върши полезна работа. Умножено по милион call-а, този overhead е 50-100 минути чиста загуба.

Batch endpoint-ът поема до 1 000 реда на POST. Per-batch latency беше 320 ms — което означава, че amortized cost-ът за ред падна до 0.32 ms. С concurrency 16, in-flight batches дадоха теоретичен таван от 50 000 реда в секунда, доста над всичко, което който и да е друг компонент може да захрани.

# step2.py — batch posts
BATCH_SIZE = 1000

def chunked(iterable, n):
    buf = []
    for x in iterable:
        buf.append(x)
        if len(buf) == n:
            yield buf
            buf = []
    if buf:
        yield buf

async def geocode_batch(session, sem, batch):
    async with sem:
        async with session.post(
            'https://api.csv2geo.com/v1/geocode/batch',
            json={'addresses': batch}, timeout=60,
        ) as r:
            return (await r.json())['results']

Измереното wall time за geocode фазата падна от 32 минути на 8 минути. Добави CSV I/O и run-ът излезе на 35 минути end-to-end. Урокът: когато per-call overhead е значителна част от общия latency, batching не е 2x оптимизация, а 3-4x оптимизация, и колкото по-голям batch имаш, толкова по-близо стигаш до теоретичния таван.

Следствието, което никой не споменава: batch endpoint-ите променят error model-а ти. Failed batch не е "един ред гръмна" — а "1 000 реда са в неопределено състояние". Idempotent retries стават задължителни. Покрихме този pattern в проектиране на batch geocoding queue, и си заслужава да се прочете преди да минеш от singles на batches в production.

Стъпка 3: cache слой (35 мин → 18 мин)

Инструментирахме входната дистрибуция и открихме нещо очевидно в ретроспекция: "един милион адреса" на този клиент всъщност бяха около 380 000 различни адреса. Шофьорите посещават едни и същи сгради ротационно. Standing orders. Recurring deliveries. Hit rate-ът срещу празен cache на първа нощ беше 0%; до трета нощ се стабилизира на ~62%.

Сложихме Redis-backed cache пред batch endpoint-а, ключиран на нормализиран SHA-256 на country|postcode|city|street|house_number. Cache hits се сервираха за приблизително 0.4 ms (един Redis GET, без JSON parse извън кешираното blob). Cache misses падаха през на batch endpoint-а, както преди.

# step3.py — cache wrapper
import hashlib, json
from redis import Redis

r = Redis.from_url(os.environ['REDIS_URL'])

def key(addr):
    parts = [str(addr.get(k, '')).lower().strip()
             for k in ('country', 'postcode', 'city', 'street', 'house_number')]
    return 'geo:v1:' + hashlib.sha256('|'.join(parts).encode()).hexdigest()

async def cached_batch(session, sem, batch):
    keys = [key(a) for a in batch]
    cached = r.mget(keys)
    misses = [(i, batch[i]) for i, c in enumerate(cached) if c is None]
    fresh = await geocode_batch(session, sem, [a for _, a in misses]) if misses else []
    pipe = r.pipeline()
    for (idx, _), result in zip(misses, fresh):
        pipe.setex(keys[idx], 30 * 86400, json.dumps(result))
    pipe.execute()
    out = [json.loads(c) if c else None for c in cached]
    for (idx, _), result in zip(misses, fresh):
        out[idx] = result
    return out

При 60% steady-state hit rate само 400 000 от милиона реда реално викаха upstream geocoder-а. Това сви geocode фазата от 8 минути на около 3.5 минути. Добави Redis lookups (все още pipeline-нати, ~0.4 ms × 1 000 batches = 400 ms общо) и CSV I/O и end-to-end run-ът беше 18 минути.

Покрихме пълната TTL стратегия, нормализацията и version-prefix invalidation pattern-а в как да кешираш geocoding резултати. Кратката версия: кеширай завинаги за forward geocoding, нормализирай преди keying, hash-вай за PII safety и абсолютно кеширай no_match резултатите — typo-тата на твоя клиент ще те преследват вечно иначе.

Стъпка 4: streaming pipeline (18 мин → 14 мин)

Pipeline-ът все още зареждаше входния CSV в list преди първият request да тръгне, после натрупваше всички резултати в паметта преди да пише. На милион реда това бяха 400 MB resident, с peak около 700 MB по време на result-merge стъпката. Job-ът работеше на 1 GB worker. Дотук бяхме оцелявали с късмет.

Заменихме load-all/process-all/write-all формата със streaming generator pipeline: четеш ред, batch-ваш го с до 999 други, подаваш batch-а на async worker, пишеш резултата на всеки ред в момента, в който дойде. Bounded buffers между стъпките прилагаха backpressure, когато downstream блокираше.

# step4.py — streaming pipeline
import asyncio, csv

async def producer(path, queue, batch_size=1000):
    with open(path) as f:
        reader = csv.DictReader(f)
        batch = []
        for row in reader:
            batch.append(row)
            if len(batch) == batch_size:
                await queue.put(batch); batch = []
        if batch: await queue.put(batch)
    await queue.put(None)  # sentinel

async def worker(in_q, out_q, session, sem):
    while True:
        batch = await in_q.get()
        if batch is None:
            await out_q.put(None); break
        results = await cached_batch(session, sem, batch)
        await out_q.put((batch, results))

async def writer(out_q, path):
    with open(path, 'w', newline='') as f:
        w = None
        while True:
            item = await out_q.get()
            if item is None: break
            batch, results = item
            for inp, res in zip(batch, results):
                row = {**inp, **(res or {'status': 'no_match'})}
                if w is None:
                    w = csv.DictWriter(f, fieldnames=row.keys()); w.writeheader()
                w.writerow(row)

Wall-clock подобрението беше скромно: от 18 минути на 14 минути. Compute фазата не се промени много — вече беше бърза. Това, което се промени, беше че resident memory падна от 700 MB на 35 MB, worker-ът вече не се нуждаеше от 1 GB кутия и можехме да пуснем няколко job-а паралелно на същата машина. По-важното, pipeline-ът започна да пише първите резултати в първите 30 секунди от run-а, така че monitoring-ът виждаше progress и on-call вече не изпадаше в паника на четвърта минута, когато изглеждаше, че job-ът е замръзнал.

Pattern-ът има по-голямо значение от спестеното време. Pipeline с bounded buffers между стъпките е такъв, който скейлва линейно с размера на входа; load-all/process-all pipeline не. Пълната дискусия за backpressure, queue sizing и OOM избягване е в streaming geocoding на скейл.

Стъпка 5: pre-cleaning на low-confidence редове (14 мин → 12 мин)

Инструментацията показа, че около 8% от редовете се връщат като no_match на първия pass и се retry-ват от outer loop с по-широк geocoder fallback. Гледайки входовете, повечето no-matches не бяха лоши адреси — бяха мръсни входове, които geocoder-ът не можеше да парсне. Неща като "123 Main St., Apt 4B, NYC, NY 10001", където unit number-ът се излива в street полето, или "123 main street ny ny" без separators.

Добавихме libpostal-style normalization pass преди cache lookup-а. Изваждаме unit numbers в собствено поле. Разширяваме USPS abbreviations. Местим bleed-through state codes извън city имена. Pre-clean отнемаше около 0.05 ms на ред — пренебрежимо — и свали no-match rate-а от 8% на около 0.6%.

# step5.py — normalization pre-pass
import re

UNIT_PATTERNS = re.compile(r',?\s+(apt|unit|ste|suite|#)\s*[\w-]+', re.I)
USPS_ABBR = {' street': ' st', ' avenue': ' ave', ' boulevard': ' blvd',
             ' road': ' rd', ' drive': ' dr', ' lane': ' ln'}

def normalize_row(row):
    street = row.get('street', '')
    m = UNIT_PATTERNS.search(street)
    if m:
        row['unit'] = m.group(0).strip(', ')
        street = UNIT_PATTERNS.sub('', street)
    s = ' ' + street.lower()
    for k, v in USPS_ABBR.items():
        s = s.replace(k, v)
    row['street'] = s.strip()
    return row

Елиминирането на retry loop-а спести около две минути wall time на типичен run. По-важно — редовете, които преди падаха в по-широкия fallback (по-бавен, по-скъп), сега удряха primary geocoder-а от първия опит — изваждайки ги от 200-300 ms latency tier и пренасяйки ги в 3-15 ms tier. Пълната таксономия на това какво да нормализираш и какво да не пипаш е в address parsing преди geocoding.

Кумулативният резултат, дванадесет минути за един милион реда, е приблизително 1 389 реда в секунда устойчиво. Четиридесет пъти по-бързо, отколкото където стартирахме. Същият хардуер, същото upstream API, същият dataset.

Какво не помогна

За честност, три неща, които пробвахме и не помръднаха стрелката:

HTTP/3. Сменихме клиента с QUIC-capable HTTP библиотека, очаквайки по-малко round-trip stalls на cold connections. Измерената разлика на нашето concurrency ниво: около 4 ms за целия run. Connection pool-ът вече беше топъл — HTTP/2 multiplexing вършеше работата, която HTTP/3 би свършил. Ако пускаш short-lived workers на cold pools, твоят mileage може да е различен.

gRPC. Geocoding API-то предлагаше gRPC endpoint с protobuf payloads. Теоретично по-бързо — по-малък wire format, без JSON parse. Измерено: 2-3 ms на batch по-бързо, но допълнителна седмица integration работа, по-лош error model на partial batch failures и по-лоша debuggability. Cost-benefit-ът не оправдаваше за batch geocoding. (За low-latency single-row interactive use cases gRPC е по-убедително.)

Regional pinning. Worker-ите на клиента работеха в us-east-1, API-то има multi-region front door и предположението беше, че forcing на request-ите към най-близкия backend ще обръсне RTT. Измерихме 1-2 ms подобрение, без статистически значима throughput промяна на нашето concurrency. Variance-ът от concurrent load на multi-tenant pool-а беше по-голям от gain-а. Заслужава си за single-digit-ms latency budgets, но не за batch.

Pattern-ът и в трите случая: оптимизации, които изглеждат привлекателно при benchmark microreadings, често изчезват в production traffic при concurrency-то, на което системата реално работи. Първо измерване.

Цена за милион

Осемчасовият pipeline струваше около $500 на милион реда: един милион billed lookups при $0.50 на хиляда. След всичките пет оптимизации:

| Фаза | Billed lookups | API spend | |---|---:|---:| | 1M raw редове | 1,000,000 | $500 | | След cache (60% hit) | 400,000 | $200 | | След pre-clean (no_match drop, по-малко fallbacks) | ~370,000 | $185 |

Сметката падна от $500 на $185 на милион — 63% намаление в API spend, върху 40x speedup. На година, при 30M-row месечен обем на клиента, това са спестени $113,400 ($180,000 → $66,600) без нито една предоговорка на договор. Cache TTL стратегията и математиката при по-високи hit rates са детайлно в статията за caching.

Скромна инженерна инвестиция — три седмици за двама инженери — се изплаща за шест седмици при тарифата на клиента и продължава да се натрупва.

Често задавани въпроси

Защо не започнахте с cache-а?

Защото cache hit rate е емпиричен. Със single-threaded 30 ms loop не можеш да генерираш достатъчно request volume, за да измериш hit rate-а в полезен timeframe. Първо ни трябваше concurrency, за да наблюдаваме traffic patterns, после batching, за да направим cache lookup amortized cost-а пренебрежим спрямо upstream call-а. Редът има значение: всяка стъпка отвори бюджет за следващата.

С какъв concurrency да започна?

Шестнадесет е разумен default. Бенчмаркай нагоре оттам по powers of two, гледайки p99 latency. Спри, когато p99 започне да се изкачва по-бързо, отколкото throughput-ът се подобрява — това е точката, в която създаваш queue depth на upstream-а и retry-итата изяждат печалбите ти. Пълният метод е в статията за concurrency tuning.

60% cache hit rate нормално ли е?

За repeat-visit workloads (логистика, field service, recurring B2B), да, често по-високо. За one-shot list-cleaning workloads (маркетинг списък, който геокодираш веднъж), hit rate-ът срещу собствения ти cache ще е близо до нула — но per-job hit rate в рамките на един batch може все пак да е 5-15% при дубликати, и пак си заслужава кеширането за следващия run. Hit rate-ът ти е свойство на твоя traffic, не на cache-а.

Какво ако моят geocoder няма batch endpoint?

Първите три оптимизации (concurrency, cache, streaming) те водят по-голямата част от пътя без batching. Измерихме съпоставимия pipeline само със single-row calls: 22 минути вместо 12. Все още 22x по-бързо от началната точка. Ако имаш каквото и да е влияние върху upstream-а, поискай batch endpoint — това е feature-ът с най-голям leverage, който биха могли да пуснат за high-volume customers.

Параллелизирахте ли CSV четенията?

Не. CSV parse е бърз — на милион реда беше около 8 секунди със стандартната библиотека. Параллелизирането е footgun: двама readers на един файл произвеждат дубликати или out-of-order редове, освен ако първо не split-неш файла, което добавя сложност за малка печалба. Правилният ход е да се увериш, че reader-ът streams (не зарежда всички редове в паметта) и захранва worker pool-а стабилно. Bottleneck-ите винаги бяха downstream от CSV.

Как се справяте с partial batch failures?

Retry-ваш failed редовете индивидуално с idempotency keys. Batch endpoint-ът връща per-row status, така че 3-row failure в 1 000-row batch е идентифицируем. Прекарай тези три през single-row retry path с exponential backoff. Ако нямаш idempotency keys, рискуваш double-billing на retries — реална грижа, която понякога заличава cache savings, ако я игнорираш.

Как изглеждаше monitoring-ът преди и след?

Преди: един counter за "job done". On-call научаваше за failures от липсващ CSV в S3 на следващата сутрин. След: per-stage throughput counters (rows in, rows cached, rows geocoded, rows written), p50/p95/p99 latency на upstream call-а, cache hit/miss rates, no_match rates и heartbeat, който fires-ва на всеки 5 секунди, докато pipeline-ът е жив. Streaming pipeline-ът направи всичко това възможно — вече нямаше "замразен четири минути после изхвърля всичко наведнъж" моменти, които да объркат dashboard-ите.

Заключение

Четиридесет пъти по-бързо, шестдесет и три процента по-евтино, същото upstream API, същият dataset, същият хардуер. Пет оптимизации, приложени в реда, в който всяка си заслужи мястото. Без silver bullet. Без нов vendor. Без пренаписване. Bounded concurrency, batch endpoint, cache layer, streaming pipeline, normalization pre-pass — това е целият номер.

По-дълбокият мета-урок, за всеки, който върти подобен pipeline: 30-ms-per-row loop прави едно нещо през цялото време и това нещо е чакане. Осемте часа бяха сън. Веднъж щом спреш програмата си да спи, всичко останало е амортизация на фиксирани разходи и избягване на повторна работа. Измери своя traffic, намери най-дългия сън, оправи него. После погледни пак.

I.A. / CSV2GEO Creator

Свързани статии

Ready to geocode your addresses?

Use our batch geocoding tool to convert thousands of addresses to coordinates in minutes. Start with 100 free addresses.

Try Batch Geocoding Free →