diff --git a/condarecipe/pysus/meta.yaml b/condarecipe/pysus/meta.yaml index 1eeaaef0..41f45164 100644 --- a/condarecipe/pysus/meta.yaml +++ b/condarecipe/pysus/meta.yaml @@ -32,7 +32,6 @@ requirements: - pyarrow - python - requests - - elasticsearch test: imports: diff --git a/pyproject.toml b/pyproject.toml index cc22b17b..0e57d815 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -23,7 +23,6 @@ Unidecode = "^1.3.6" dateparser = "^1.1.8" pandas = "^2.2.2" urwid = "^2.1.2" -elasticsearch = { version = "7.16.2", extras=["preprocessing"] } # FTP bigtree = "^0.12.2" aioftp = "^0.21.4" diff --git a/pysus/online_data/ESUS.py b/pysus/online_data/ESUS.py deleted file mode 100644 index ef2b990e..00000000 --- a/pysus/online_data/ESUS.py +++ /dev/null @@ -1,98 +0,0 @@ -import os -from datetime import date - -import pandas as pd -from elasticsearch import Elasticsearch, helpers -from loguru import logger -from pysus.ftp import CACHEPATH - - -def download(uf, cache=True, checkmemory=True): - """ - Download ESUS data by UF - :param uf: rj, mg, etc - :param cache: if results should be cached on disk - :return: DataFrame if data fits in memory, - other an iterator of chunks of size 1000. - """ - uf = uf.lower() - user = "user-public-notificacoes" - pwd = "Za4qNXdyQNSa9YaA" - today = date.today() - dt = today.strftime("_%d_%m_%Y") - base = f"desc-esus-notifica-estado-{uf}" # desc-notificacoes-esusve- - url = f"https://{user}:{pwd}@elasticsearch-saps.saude.gov.br" # noqa: E231 - out = f"ESUS_{uf}_{dt}.parquet" - - cachefile = os.path.join(CACHEPATH, out) - tempfile = os.path.join(CACHEPATH, f"ESUS_temp_{uf.upper()}.csv.gz") - if os.path.exists(cachefile): - logger.info(f"Local parquet file found at {cachefile}") - df = pd.read_parquet(cachefile) - elif os.path.exists(tempfile): - logger.info(f"Local csv file found at {tempfile}") - df = pd.read_csv(tempfile, chunksize=1000) - else: - fname = fetch(base, uf, url) - size = os.stat(fname).st_size - if size > 50e6 and checkmemory: - print(f"Downloaded data is to large: {size / 1e6} MB compressed.") - print( - "Only loading the first 1000 rows. If your computer has enough" - + " memory, set 'checkmemory' to False" - ) - print(f"The full data is in {fname}") - df = pd.read_csv(fname, chunksize=1000) - else: - df = pd.read_csv(fname, low_memory=False) - print(f"{df.shape[0]} records downloaded.") - os.unlink(fname) - if cache: - df.to_parquet(cachefile) - logger.info(f"Data stored as parquet at {cachefile}") - - return df - - -def fetch(base, uf, url): - UF = uf.upper() - print(f"Reading ESUS data for {UF}") - es = Elasticsearch([url], send_get_body_as="POST") - body = {"query": {"match_all": {}}} - results = helpers.scan(es, query=body, index=base) - # df = pd.DataFrame.from_dict( - # [document['_source'] for document in results] - # ) - - chunker = chunky_fetch(results, 3000) - h = 1 - tempfile = os.path.join(CACHEPATH, f"ESUS_temp_{UF}.csv.gz") - for ch in chunker: - df = pd.DataFrame.from_dict(ch) - df.sintomas = df["sintomas"].str.replace( - ";", - "", - ) # remove os ; - if h: - df.to_csv(tempfile) - h = 0 - else: - df.to_csv(tempfile, mode="a", header=False) - # df = pd.read_csv('temp.csv.gz') - - return tempfile - - -def chunky_fetch(results, chunk_size=3000): - """Fetches data in chunks to preserve memory""" - data = [] - i = 0 - for d in results: - data.append(d["_source"]) - i += 1 - if i == chunk_size: - yield data - data = [] - i = 0 - else: - yield data diff --git a/pysus/preprocessing/ESUS.py b/pysus/preprocessing/ESUS.py deleted file mode 100644 index 110215c6..00000000 --- a/pysus/preprocessing/ESUS.py +++ /dev/null @@ -1,69 +0,0 @@ -import numpy as np -import pandas as pd -from pysus.online_data.ESUS import download - - -def cases_by_age_and_sex(UF, start="2020-03-01", end="2020-08-31"): - """ - Fetches ESUS covid line list and aggregates by age and sex returning these - counts between start and end dates. - :param UF: State code - :param start: Start date - :param end: end date - :return: dataframe - """ - df = download(uf=UF) - - # Transformando as colunas em datetime type - for cname in df: - if cname.startswith("data"): - df[cname] = pd.to_datetime(df[cname], errors="coerce") - - # Eliminando os valores nulos nas colunas com datas importantes - old_size = len(df) - df.dropna( - subset=["dataNotificacao", "dataInicioSintomas", "dataTeste"], - inplace=True, - ) - print( - f"Removed {old_size - len(df)} rows with missing dates of symptoms, " - "notification or testing" - ) - - # Desconsiderando os resultados negativos ou inconclusivos - df = df.loc[ - ~df.resultadoTeste.isin(["Negativo", "Inconclusivo ou Indeterminado"]) - ] - - # Removendo sexo indeterminado - df = df.loc[df.sexo.isin(["Masculino", "Feminino"])] - - # determinando a data dos primeiros sintomas como a data do index - - df["datesint"] = df["dataInicioSintomas"] - df.set_index("datesint", inplace=True) - df.sort_index(inplace=True, ascending=True) - - # vamos limitar a data inicial e a data final considerando apenas a - # primeira onda - - df = df.loc[start:end] - - ini = np.arange(0, 81, 5) - fin = np.arange(5, 86, 5) - fin[-1] = 120 - faixa_etaria = { - f"[{i},{f})": (i, f) for i, f in zip(ini, fin) # noqa: E231 - } - - labels = list(faixa_etaria.keys()) - df["faixa_etaria"] = [ - labels[i - 1] for i in np.digitize(df.idade, bins=ini) - ] - - agreg = ( - df[["sexo", "faixa_etaria"]].groupby(["faixa_etaria", "sexo"]).size() - ) - agreg = agreg.reset_index() - agreg.columns = ["faixa_etaria", "sexo", "n"] - return agreg diff --git a/pysus/tests/test_esus.py b/pysus/tests/test_esus.py deleted file mode 100644 index 68f159bc..00000000 --- a/pysus/tests/test_esus.py +++ /dev/null @@ -1,16 +0,0 @@ -import unittest - -import pytest -from pysus.online_data.ESUS import download - - -class MyTestCase(unittest.TestCase): - @pytest.mark.skip(reason="This test takes too long") - @pytest.mark.timeout(5) - def test_download(self): - df = download(uf="se") - self.assertGreater(len(df), 0) - - -if __name__ == "__main__": - unittest.main() diff --git a/setup.cfg b/setup.cfg index 157d9dfb..f4ccdd89 100644 --- a/setup.cfg +++ b/setup.cfg @@ -18,7 +18,7 @@ max-line-length = 79 ignore = D202,D203,W503,E203 [isort] -known_third_party = dbfread,elasticsearch,geobr,geocoder,numpy,pandas,pyarrow,pyreaddbc,requests,tqdm,urllib3 +known_third_party = dbfread,geobr,geocoder,numpy,pandas,pyarrow,pyreaddbc,requests,tqdm,urllib3 ensure_newline_before_comments=true line_length = 79 multi_line_output = 3