Геокодиране на 1 милион адреса: от 8 часа до 12 минути
Реален case study: как свалихме 1M-row geocoding job от 8 часа до 12 минути. Concurrency, batching, caching и математиката зад всяко решение.
Logistics клиент пускаше нощно геокодиране на около един милион адреса за доставка. Job-ът тръгваше след 22:00 cutoff-а за маршрутизация за следващия ден и трябваше да приключи преди шофьорите да влязат в 06:00. Месеци наред обикновено приключваше навреме. После обемът започна да расте, job-ът започна да се прехвърля в сутринта и една лоша нощ приключи в 09:15 — момент, в който dispatch вече беше нарязал маршрутите ръчно. Осемчасовият pipeline се беше превърнал в бизнес проблем.
Пет промени по-късно същият нощен job работи за дванадесет минути. Нищо екзотично. Без нов vendor, без пренаписване. Само bounded concurrency, batch endpoint, cache слой, streaming pipeline и normalization pre-pass — приложени в реда, който даде на всяко едно от тях шанс да има значение. Тази статия е play-by-play, с реалните числа, които измерихме, и математиката зад това защо всяка стъпка свърши работа.
Стартовата точка: 8 часа
Оригиналният loop беше тип код, който изглежда разумно в code review и се държи като тухла на скейл. Един worker, sequential calls, нищо async:
# original.py — the eight-hour version
import csv, requests
with open('addresses.csv') as f:
rows = list(csv.DictReader(f))
results = []
for row in rows:
r = requests.post('https://api.csv2geo.com/v1/geocode', json=row, timeout=30)
results.append(r.json())
with open('out.csv', 'w') as f:
csv.DictWriter(f, fieldnames=results[0].keys()).writeheader()
csv.DictWriter(f, fieldnames=results[0].keys()).writerows(results)End-to-end времето за ред беше около 30 ms — TLS handshake reuse през connection pool-а, server-side geocode за 5-15 ms, мрежа обратно. Умножено по един милион реда, това прави 30 000 секунди, или осем часа и двадесет минути. Добави CSV parse и write и job-ът излизаше някъде към осем часа и петнадесет минути.
Нямаше никакъв резерв. 50 ms скок в p99 latency добавяше час. Retry loop върху няколко хиляди 429-ки можеше да удвои runtime-а. Още по-зле — loop-ът зареждаше всички редове в паметта преди първият request да тръгне, което означаваше 400 MB resident set на малка worker box и нула progress, видим за monitoring-а, докато job-ът или приключи, или умре.
Първото нещо, което трябва да усвоиш: single-threaded loop прави едно нещо през цялото време — *чака мрежата*. CPU-то е idle. TCP socket-ът е idle през по-голямата част от round trip-а. Осемте часа са почти изцяло сън. Точно това е workload-ът, за който съществува concurrency.
Стъпка 1: bounded concurrency (8 часа → 90 минути)
Въведохме semaphore-bounded worker pool с concurrency 16. Шестнадесет in-flight request-а във всеки момент, всеки със среден ход 30 ms, дадоха теоретична throughput от ~533 реда в секунда — около 1 875 секунди за милион реда. Измерено: 1 920 секунди, или 32 минути чисто compute. С CSV I/O и долната опашка run-ът излезе около 90 минути end-to-end.
# step1.py — bounded concurrency
import asyncio, aiohttp, csv
CONCURRENCY = 16
async def geocode_one(session, sem, row):
async with sem:
async with session.post(
'https://api.csv2geo.com/v1/geocode', json=row, timeout=30,
) as r:
return await r.json()
async def main():
with open('addresses.csv') as f:
rows = list(csv.DictReader(f))
sem = asyncio.Semaphore(CONCURRENCY)
async with aiohttp.ClientSession() as session:
results = await asyncio.gather(*[
geocode_one(session, sem, row) for row in rows
])
return resultsЗащо 16, а не 64? Защото кривата се изравнява. Бенчмарквахме на 4, 8, 16, 32, 64 и 128 in-flight. Числата:
| Concurrency | Throughput (rows/sec) | p99 latency (ms) | Effective speedup | |---:|---:|---:|---:| | 1 | 33 | 35 | 1x | | 8 | 250 | 38 | 7.5x | | 16 | 533 | 42 | 16x | | 32 | 760 | 78 | 23x | | 64 | 870 | 210 | 26x | | 128 | 880 | 540 | 26x |
Throughput-ът се изравнява около 32-64, защото насищаме per-account fairness budget-а на upstream-а. Колоната p99 разказва истинската история: след 16 разменяме маргинален throughput за tail latency, който задейства нашата retry логика — а retry-итата са чиста загуба. Шестнадесет беше коляното на кривата. По-задълбочена версия на този анализ има в статията за намиране на concurrency sweet spot на твоя geocoder.
Междинният резултат беше полезен: осемчасовият job вече беше деветдесет минути и не бяхме пипали нищо по адресите, API-то или ценообразуването. Шестнадесет реда промяна преместиха bottleneck-а от "wall clock" на "цена на API call за ред".
Стъпка 2: batch endpoint (90 мин → 35 мин)
Single-row endpoint-ът има фиксиран per-request overhead — TLS, HTTP framing, JSON envelope, server-side request setup. При 30 ms на call някъде между 5 и 10 ms от тях са overhead, в който geocoder-ът не върши полезна работа. Умножено по милион call-а, този overhead е 50-100 минути чиста загуба.
Batch endpoint-ът поема до 1 000 реда на POST. Per-batch latency беше 320 ms — което означава, че amortized cost-ът за ред падна до 0.32 ms. С concurrency 16, in-flight batches дадоха теоретичен таван от 50 000 реда в секунда, доста над всичко, което който и да е друг компонент може да захрани.
# step2.py — batch posts
BATCH_SIZE = 1000
def chunked(iterable, n):
buf = []
for x in iterable:
buf.append(x)
if len(buf) == n:
yield buf
buf = []
if buf:
yield buf
async def geocode_batch(session, sem, batch):
async with sem:
async with session.post(
'https://api.csv2geo.com/v1/geocode/batch',
json={'addresses': batch}, timeout=60,
) as r:
return (await r.json())['results']Измереното wall time за geocode фазата падна от 32 минути на 8 минути. Добави CSV I/O и run-ът излезе на 35 минути end-to-end. Урокът: когато per-call overhead е значителна част от общия latency, batching не е 2x оптимизация, а 3-4x оптимизация, и колкото по-голям batch имаш, толкова по-близо стигаш до теоретичния таван.
Следствието, което никой не споменава: batch endpoint-ите променят error model-а ти. Failed batch не е "един ред гръмна" — а "1 000 реда са в неопределено състояние". Idempotent retries стават задължителни. Покрихме този pattern в проектиране на batch geocoding queue, и си заслужава да се прочете преди да минеш от singles на batches в production.
Стъпка 3: cache слой (35 мин → 18 мин)
Инструментирахме входната дистрибуция и открихме нещо очевидно в ретроспекция: "един милион адреса" на този клиент всъщност бяха около 380 000 различни адреса. Шофьорите посещават едни и същи сгради ротационно. Standing orders. Recurring deliveries. Hit rate-ът срещу празен cache на първа нощ беше 0%; до трета нощ се стабилизира на ~62%.
Сложихме Redis-backed cache пред batch endpoint-а, ключиран на нормализиран SHA-256 на country|postcode|city|street|house_number. Cache hits се сервираха за приблизително 0.4 ms (един Redis GET, без JSON parse извън кешираното blob). Cache misses падаха през на batch endpoint-а, както преди.
# step3.py — cache wrapper
import hashlib, json
from redis import Redis
r = Redis.from_url(os.environ['REDIS_URL'])
def key(addr):
parts = [str(addr.get(k, '')).lower().strip()
for k in ('country', 'postcode', 'city', 'street', 'house_number')]
return 'geo:v1:' + hashlib.sha256('|'.join(parts).encode()).hexdigest()
async def cached_batch(session, sem, batch):
keys = [key(a) for a in batch]
cached = r.mget(keys)
misses = [(i, batch[i]) for i, c in enumerate(cached) if c is None]
fresh = await geocode_batch(session, sem, [a for _, a in misses]) if misses else []
pipe = r.pipeline()
for (idx, _), result in zip(misses, fresh):
pipe.setex(keys[idx], 30 * 86400, json.dumps(result))
pipe.execute()
out = [json.loads(c) if c else None for c in cached]
for (idx, _), result in zip(misses, fresh):
out[idx] = result
return outПри 60% steady-state hit rate само 400 000 от милиона реда реално викаха upstream geocoder-а. Това сви geocode фазата от 8 минути на около 3.5 минути. Добави Redis lookups (все още pipeline-нати, ~0.4 ms × 1 000 batches = 400 ms общо) и CSV I/O и end-to-end run-ът беше 18 минути.
Покрихме пълната TTL стратегия, нормализацията и version-prefix invalidation pattern-а в как да кешираш geocoding резултати. Кратката версия: кеширай завинаги за forward geocoding, нормализирай преди keying, hash-вай за PII safety и абсолютно кеширай no_match резултатите — typo-тата на твоя клиент ще те преследват вечно иначе.
Стъпка 4: streaming pipeline (18 мин → 14 мин)
Pipeline-ът все още зареждаше входния CSV в list преди първият request да тръгне, после натрупваше всички резултати в паметта преди да пише. На милион реда това бяха 400 MB resident, с peak около 700 MB по време на result-merge стъпката. Job-ът работеше на 1 GB worker. Дотук бяхме оцелявали с късмет.
Заменихме load-all/process-all/write-all формата със streaming generator pipeline: четеш ред, batch-ваш го с до 999 други, подаваш batch-а на async worker, пишеш резултата на всеки ред в момента, в който дойде. Bounded buffers между стъпките прилагаха backpressure, когато downstream блокираше.
# step4.py — streaming pipeline
import asyncio, csv
async def producer(path, queue, batch_size=1000):
with open(path) as f:
reader = csv.DictReader(f)
batch = []
for row in reader:
batch.append(row)
if len(batch) == batch_size:
await queue.put(batch); batch = []
if batch: await queue.put(batch)
await queue.put(None) # sentinel
async def worker(in_q, out_q, session, sem):
while True:
batch = await in_q.get()
if batch is None:
await out_q.put(None); break
results = await cached_batch(session, sem, batch)
await out_q.put((batch, results))
async def writer(out_q, path):
with open(path, 'w', newline='') as f:
w = None
while True:
item = await out_q.get()
if item is None: break
batch, results = item
for inp, res in zip(batch, results):
row = {**inp, **(res or {'status': 'no_match'})}
if w is None:
w = csv.DictWriter(f, fieldnames=row.keys()); w.writeheader()
w.writerow(row)Wall-clock подобрението беше скромно: от 18 минути на 14 минути. Compute фазата не се промени много — вече беше бърза. Това, което се промени, беше че resident memory падна от 700 MB на 35 MB, worker-ът вече не се нуждаеше от 1 GB кутия и можехме да пуснем няколко job-а паралелно на същата машина. По-важното, pipeline-ът започна да пише първите резултати в първите 30 секунди от run-а, така че monitoring-ът виждаше progress и on-call вече не изпадаше в паника на четвърта минута, когато изглеждаше, че job-ът е замръзнал.
Pattern-ът има по-голямо значение от спестеното време. Pipeline с bounded buffers между стъпките е такъв, който скейлва линейно с размера на входа; load-all/process-all pipeline не. Пълната дискусия за backpressure, queue sizing и OOM избягване е в streaming geocoding на скейл.
Стъпка 5: pre-cleaning на low-confidence редове (14 мин → 12 мин)
Инструментацията показа, че около 8% от редовете се връщат като no_match на първия pass и се retry-ват от outer loop с по-широк geocoder fallback. Гледайки входовете, повечето no-matches не бяха лоши адреси — бяха мръсни входове, които geocoder-ът не можеше да парсне. Неща като "123 Main St., Apt 4B, NYC, NY 10001", където unit number-ът се излива в street полето, или "123 main street ny ny" без separators.
Добавихме libpostal-style normalization pass преди cache lookup-а. Изваждаме unit numbers в собствено поле. Разширяваме USPS abbreviations. Местим bleed-through state codes извън city имена. Pre-clean отнемаше около 0.05 ms на ред — пренебрежимо — и свали no-match rate-а от 8% на около 0.6%.
# step5.py — normalization pre-pass
import re
UNIT_PATTERNS = re.compile(r',?\s+(apt|unit|ste|suite|#)\s*[\w-]+', re.I)
USPS_ABBR = {' street': ' st', ' avenue': ' ave', ' boulevard': ' blvd',
' road': ' rd', ' drive': ' dr', ' lane': ' ln'}
def normalize_row(row):
street = row.get('street', '')
m = UNIT_PATTERNS.search(street)
if m:
row['unit'] = m.group(0).strip(', ')
street = UNIT_PATTERNS.sub('', street)
s = ' ' + street.lower()
for k, v in USPS_ABBR.items():
s = s.replace(k, v)
row['street'] = s.strip()
return rowЕлиминирането на retry loop-а спести около две минути wall time на типичен run. По-важно — редовете, които преди падаха в по-широкия fallback (по-бавен, по-скъп), сега удряха primary geocoder-а от първия опит — изваждайки ги от 200-300 ms latency tier и пренасяйки ги в 3-15 ms tier. Пълната таксономия на това какво да нормализираш и какво да не пипаш е в address parsing преди geocoding.
Кумулативният резултат, дванадесет минути за един милион реда, е приблизително 1 389 реда в секунда устойчиво. Четиридесет пъти по-бързо, отколкото където стартирахме. Същият хардуер, същото upstream API, същият dataset.
Какво не помогна
За честност, три неща, които пробвахме и не помръднаха стрелката:
HTTP/3. Сменихме клиента с QUIC-capable HTTP библиотека, очаквайки по-малко round-trip stalls на cold connections. Измерената разлика на нашето concurrency ниво: около 4 ms за целия run. Connection pool-ът вече беше топъл — HTTP/2 multiplexing вършеше работата, която HTTP/3 би свършил. Ако пускаш short-lived workers на cold pools, твоят mileage може да е различен.
gRPC. Geocoding API-то предлагаше gRPC endpoint с protobuf payloads. Теоретично по-бързо — по-малък wire format, без JSON parse. Измерено: 2-3 ms на batch по-бързо, но допълнителна седмица integration работа, по-лош error model на partial batch failures и по-лоша debuggability. Cost-benefit-ът не оправдаваше за batch geocoding. (За low-latency single-row interactive use cases gRPC е по-убедително.)
Regional pinning. Worker-ите на клиента работеха в us-east-1, API-то има multi-region front door и предположението беше, че forcing на request-ите към най-близкия backend ще обръсне RTT. Измерихме 1-2 ms подобрение, без статистически значима throughput промяна на нашето concurrency. Variance-ът от concurrent load на multi-tenant pool-а беше по-голям от gain-а. Заслужава си за single-digit-ms latency budgets, но не за batch.
Pattern-ът и в трите случая: оптимизации, които изглеждат привлекателно при benchmark microreadings, често изчезват в production traffic при concurrency-то, на което системата реално работи. Първо измерване.
Цена за милион
Осемчасовият pipeline струваше около $500 на милион реда: един милион billed lookups при $0.50 на хиляда. След всичките пет оптимизации:
| Фаза | Billed lookups | API spend | |---|---:|---:| | 1M raw редове | 1,000,000 | $500 | | След cache (60% hit) | 400,000 | $200 | | След pre-clean (no_match drop, по-малко fallbacks) | ~370,000 | $185 |
Сметката падна от $500 на $185 на милион — 63% намаление в API spend, върху 40x speedup. На година, при 30M-row месечен обем на клиента, това са спестени $113,400 ($180,000 → $66,600) без нито една предоговорка на договор. Cache TTL стратегията и математиката при по-високи hit rates са детайлно в статията за caching.
Скромна инженерна инвестиция — три седмици за двама инженери — се изплаща за шест седмици при тарифата на клиента и продължава да се натрупва.
Често задавани въпроси
Защо не започнахте с cache-а?
Защото cache hit rate е емпиричен. Със single-threaded 30 ms loop не можеш да генерираш достатъчно request volume, за да измериш hit rate-а в полезен timeframe. Първо ни трябваше concurrency, за да наблюдаваме traffic patterns, после batching, за да направим cache lookup amortized cost-а пренебрежим спрямо upstream call-а. Редът има значение: всяка стъпка отвори бюджет за следващата.
С какъв concurrency да започна?
Шестнадесет е разумен default. Бенчмаркай нагоре оттам по powers of two, гледайки p99 latency. Спри, когато p99 започне да се изкачва по-бързо, отколкото throughput-ът се подобрява — това е точката, в която създаваш queue depth на upstream-а и retry-итата изяждат печалбите ти. Пълният метод е в статията за concurrency tuning.
60% cache hit rate нормално ли е?
За repeat-visit workloads (логистика, field service, recurring B2B), да, често по-високо. За one-shot list-cleaning workloads (маркетинг списък, който геокодираш веднъж), hit rate-ът срещу собствения ти cache ще е близо до нула — но per-job hit rate в рамките на един batch може все пак да е 5-15% при дубликати, и пак си заслужава кеширането за следващия run. Hit rate-ът ти е свойство на твоя traffic, не на cache-а.
Какво ако моят geocoder няма batch endpoint?
Първите три оптимизации (concurrency, cache, streaming) те водят по-голямата част от пътя без batching. Измерихме съпоставимия pipeline само със single-row calls: 22 минути вместо 12. Все още 22x по-бързо от началната точка. Ако имаш каквото и да е влияние върху upstream-а, поискай batch endpoint — това е feature-ът с най-голям leverage, който биха могли да пуснат за high-volume customers.
Параллелизирахте ли CSV четенията?
Не. CSV parse е бърз — на милион реда беше около 8 секунди със стандартната библиотека. Параллелизирането е footgun: двама readers на един файл произвеждат дубликати или out-of-order редове, освен ако първо не split-неш файла, което добавя сложност за малка печалба. Правилният ход е да се увериш, че reader-ът streams (не зарежда всички редове в паметта) и захранва worker pool-а стабилно. Bottleneck-ите винаги бяха downstream от CSV.
Как се справяте с partial batch failures?
Retry-ваш failed редовете индивидуално с idempotency keys. Batch endpoint-ът връща per-row status, така че 3-row failure в 1 000-row batch е идентифицируем. Прекарай тези три през single-row retry path с exponential backoff. Ако нямаш idempotency keys, рискуваш double-billing на retries — реална грижа, която понякога заличава cache savings, ако я игнорираш.
Как изглеждаше monitoring-ът преди и след?
Преди: един counter за "job done". On-call научаваше за failures от липсващ CSV в S3 на следващата сутрин. След: per-stage throughput counters (rows in, rows cached, rows geocoded, rows written), p50/p95/p99 latency на upstream call-а, cache hit/miss rates, no_match rates и heartbeat, който fires-ва на всеки 5 секунди, докато pipeline-ът е жив. Streaming pipeline-ът направи всичко това възможно — вече нямаше "замразен четири минути после изхвърля всичко наведнъж" моменти, които да объркат dashboard-ите.
Заключение
Четиридесет пъти по-бързо, шестдесет и три процента по-евтино, същото upstream API, същият dataset, същият хардуер. Пет оптимизации, приложени в реда, в който всяка си заслужи мястото. Без silver bullet. Без нов vendor. Без пренаписване. Bounded concurrency, batch endpoint, cache layer, streaming pipeline, normalization pre-pass — това е целият номер.
По-дълбокият мета-урок, за всеки, който върти подобен pipeline: 30-ms-per-row loop прави едно нещо през цялото време и това нещо е чакане. Осемте часа бяха сън. Веднъж щом спреш програмата си да спи, всичко останало е амортизация на фиксирани разходи и избягване на повторна работа. Измери своя traffic, намери най-дългия сън, оправи него. После погледни пак.
I.A. / CSV2GEO Creator
Свързани статии
- Concurrency Tuning за Geocoding: намиране на твоя Sweet Spot
- Проектиране на Batch Geocoding Queue: SQS, BullMQ или Custom
- Как да кешираш Geocoding резултати: TTL, ключове и 90% намаление на разходите
- Streaming Geocoding на скейл: Backpressure, Memory и Throughput
- Go Geocoding Tutorial: Goroutines, Bounded Concurrency и Backoff
Use our batch geocoding tool to convert thousands of addresses to coordinates in minutes. Start with 100 free addresses.
Try Batch Geocoding Free →