Add a method to get Calls data.

Add a test for this.
communication
junos 2021-01-05 17:00:45 +01:00
parent 98f945add1
commit b2d93e0686
6 changed files with 40 additions and 20 deletions

View File

@ -9,6 +9,7 @@ dependencies:
- flake8 - flake8
- jupyterlab - jupyterlab
- mypy - mypy
- pandas
- psycopg2 - psycopg2
- python-dotenv - python-dotenv
- sqlalchemy - sqlalchemy

View File

@ -1,17 +1,8 @@
from datetime import datetime from datetime import datetime
from sqlalchemy import ( from sqlalchemy import (TIMESTAMP, BigInteger, Boolean, Column, Float,
TIMESTAMP, ForeignKey, Integer, SmallInteger, String,
BigInteger, UniqueConstraint)
Boolean,
Column,
Float,
ForeignKey,
Integer,
SmallInteger,
String,
UniqueConstraint,
)
from sqlalchemy.dialects.postgresql import ARRAY as PSQL_ARRAY from sqlalchemy.dialects.postgresql import ARRAY as PSQL_ARRAY
from sqlalchemy.dialects.postgresql import INTEGER as PSQL_INTEGER from sqlalchemy.dialects.postgresql import INTEGER as PSQL_INTEGER
from sqlalchemy.dialects.postgresql import JSONB as PSQL_JSONB from sqlalchemy.dialects.postgresql import JSONB as PSQL_JSONB

View File

View File

@ -0,0 +1,17 @@
from typing import List
import pandas as pd
from config.models import Call, Participant
from setup import db_engine, session
def get_call_data(usernames: List) -> pd.DataFrame:
query_calls = (
session.query(Call, Participant.username)
.filter(Participant.id == Call.participant_id)
.filter(Participant.username.in_(usernames))
)
with db_engine.connect() as connection:
df_calls = pd.read_sql(query_calls.statement, connection)
return df_calls

View File

@ -1,21 +1,29 @@
import os import os
import sqlalchemy.engine.url import sqlalchemy.engine.url
from dotenv import load_dotenv
from sqlalchemy import create_engine from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker from sqlalchemy.orm import sessionmaker
from dotenv import load_dotenv
load_dotenv() load_dotenv()
testing: bool = False
db_password = os.getenv("DB_PASSWORD") db_password = os.getenv("DB_PASSWORD")
db_uri = sqlalchemy.engine.url.URL( db_uri = sqlalchemy.engine.url.URL(
drivername='postgresql+psycopg2', drivername="postgresql+psycopg2",
username="staw_db", username="staw_db",
password=db_password, password=db_password,
host="212.235.208.113", host="212.235.208.113",
port=5432, port=5432,
database="staw" database="staw",
) )
db_engine = create_engine('sqlite:///:memory:', echo=True) if testing:
db_engine = create_engine("sqlite:///:memory:", echo=True)
else:
db_engine = create_engine(db_uri)
Session = sessionmaker(bind=db_engine) Session = sessionmaker(bind=db_engine)
session = Session()

View File

@ -4,6 +4,7 @@ from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker from sqlalchemy.orm import sessionmaker
from config.models import LightSensor, Participant from config.models import LightSensor, Participant
from features.communication import get_call_data
from setup import db_uri from setup import db_uri
@ -23,11 +24,13 @@ class DatabaseConnection(unittest.TestCase):
connection.close() connection.close()
def test_get_participant(self): def test_get_participant(self):
self.participant_0 = self.session.query(Participant).first() participant_0 = self.session.query(Participant).first()
self.assertIsNotNone(self.participant_0) self.assertIsNotNone(participant_0)
print(self.participant_0)
def test_get_light_data(self): def test_get_light_data(self):
light_0 = self.session.query(Participant).join(LightSensor).first() light_0 = self.session.query(Participant).join(LightSensor).first()
self.assertIsNotNone(light_0) self.assertIsNotNone(light_0)
print(light_0)
def test_get_calls_data(self):
calls = get_call_data(["nokia_0000003"])
self.assertIsNotNone(calls)