diff --git a/rules/reports.smk b/rules/reports.smk index a2c24468..bd1b448c 100644 --- a/rules/reports.smk +++ b/rules/reports.smk @@ -14,7 +14,8 @@ rule heatmap_sensors_per_minute_per_time_segment: participant_file = "data/external/participant_files/{pid}.yaml", time_segments_labels = "data/interim/time_segments/{pid}_time_segments_labels.csv" params: - pid = "{pid}" + pid = "{pid}", + time_segments_type = config["TIME_SEGMENTS"]["TYPE"] output: "reports/interim/{pid}/heatmap_sensors_per_minute_per_time_segment.html" script: diff --git a/src/visualization/heatmap_feature_correlation_matrix.py b/src/visualization/heatmap_feature_correlation_matrix.py index 5deff5c5..1a42d447 100644 --- a/src/visualization/heatmap_feature_correlation_matrix.py +++ b/src/visualization/heatmap_feature_correlation_matrix.py @@ -26,7 +26,7 @@ features = pd.read_csv(snakemake.input["all_sensor_features"]) if time_segments_type == "FREQUENCY": - features["local_segment_label"] = features["local_segment_label"].str.split("\d+", expand=True, n=1)[0] + features["local_segment_label"] = features["local_segment_label"].str.replace(r"[0-9]{4}", "") if time_segments_type == "EVENT": features["local_segment_label"] = "event" diff --git a/src/visualization/heatmap_phone_data_yield_per_participant_per_time_segment.py b/src/visualization/heatmap_phone_data_yield_per_participant_per_time_segment.py index 34897d6e..1e6f79fc 100644 --- a/src/visualization/heatmap_phone_data_yield_per_participant_per_time_segment.py +++ b/src/visualization/heatmap_phone_data_yield_per_participant_per_time_segment.py @@ -1,4 +1,3 @@ -from plotly_color_utils import sample_colorscale import pandas as pd import plotly.express as px import yaml @@ -59,7 +58,7 @@ time_segments = pd.read_csv(snakemake.input["time_segments_file"])["label"].uniq phone_data_yield = pd.read_csv(snakemake.input["phone_data_yield"], parse_dates=["local_segment_start_datetime", "local_segment_end_datetime"]).sort_values(by=["pid", "local_segment_start_datetime"]) if time_segments_type == "FREQUENCY": - phone_data_yield["local_segment_label"] = phone_data_yield["local_segment_label"].str.split("\d+", expand=True, n=1)[0] + phone_data_yield["local_segment_label"] = phone_data_yield["local_segment_label"].str.replace(r"[0-9]{4}", "") html_file = open(snakemake.output[0], "w", encoding="utf-8") if phone_data_yield.empty: diff --git a/src/visualization/heatmap_sensors_per_minute_per_time_segment.py b/src/visualization/heatmap_sensors_per_minute_per_time_segment.py index dd524322..7c69a756 100644 --- a/src/visualization/heatmap_sensors_per_minute_per_time_segment.py +++ b/src/visualization/heatmap_sensors_per_minute_per_time_segment.py @@ -5,6 +5,11 @@ from importlib import util from pathlib import Path import yaml +# import filter_data_by_segment from src/features/utils/utils.py +spec = util.spec_from_file_location("util", str(Path(snakemake.scriptdir).parent / "features" / "utils" / "utils.py")) +mod = util.module_from_spec(spec) +spec.loader.exec_module(mod) +filter_data_by_segment = getattr(mod, "filter_data_by_segment") def colors2colorscale(colors): colorscale = [] @@ -16,85 +21,83 @@ def colors2colorscale(colors): colorscale.append([1, colors[i]]) return colorscale -def getSensorsPerMinPerSegmentHeatmap(phone_data_yield, pid, time_segment, html_file): - - x_axis_labels = [pd.Timedelta(minutes=x) for x in phone_data_yield.columns] +def getDataForPlot(phone_data_yield_per_segment): + # calculate the length (in minute) of per segment instance + phone_data_yield_per_segment["length"] = phone_data_yield_per_segment["timestamps_segment"].str.split(",").apply(lambda x: int((int(x[1])-int(x[0])) / (1000 * 60))) + # calculate the number of sensors logged at least one row of data per minute. + phone_data_yield_per_segment = phone_data_yield_per_segment.groupby(["local_segment", "length", "local_date", "local_hour", "local_minute"])[["sensor", "local_date_time"]].max().reset_index() + # extract local start datetime of the segment from "local_segment" column + phone_data_yield_per_segment["local_segment_start_datetimes"] = pd.to_datetime(phone_data_yield_per_segment["local_segment"].apply(lambda x: x.split("#")[1].split(",")[0])) + # calculate the number of minutes after local start datetime of the segment + phone_data_yield_per_segment["minutes_after_segment_start"] = ((phone_data_yield_per_segment["local_date_time"] - phone_data_yield_per_segment["local_segment_start_datetimes"]) / pd.Timedelta(minutes=1)).astype("int") - fig = go.Figure(data=go.Heatmap(z=phone_data_yield.values.tolist(), - x=x_axis_labels, - y=phone_data_yield.index, - zmin=0, zmax=16, - colorscale=colors2colorscale(colors), - colorbar=dict(thickness=25, tickvals=[1/2 + x for x in range(16)],ticktext=[x for x in range(16)]))) + # impute missing rows with 0 + columns_for_full_index = phone_data_yield_per_segment[["local_segment_start_datetimes", "length"]].drop_duplicates(keep="first") + columns_for_full_index = columns_for_full_index.apply(lambda row: [[row["local_segment_start_datetimes"], x] for x in range(row["length"] + 1)], axis=1) + full_index = [] + for columns in columns_for_full_index: + full_index = full_index + columns + full_index = pd.MultiIndex.from_tuples(full_index, names=("local_segment_start_datetimes", "minutes_after_segment_start")) + phone_data_yield_per_segment = phone_data_yield_per_segment.set_index(["local_segment_start_datetimes", "minutes_after_segment_start"]).reindex(full_index).reset_index().fillna(0) - fig.update_layout(title="Number of sensors with any data per minute for " + time_segment + " segments. Pid: "+pid+". Label: " + label + "
y-axis shows the start (date and time) of a time segment.
x-axis shows the time since the start of the time segment.
z-axis (color) shows how many sensors logged at least one row of data per minute.") - fig["layout"].update(margin=dict(t=160)) - - html_file.write(fig.to_html(full_html=False, include_plotlyjs="cdn")) - - - - - -# import filter_data_by_segment from src/features/utils/utils.py -spec = util.spec_from_file_location("util", str(Path(snakemake.scriptdir).parent / "features" / "utils" / "utils.py")) -mod = util.module_from_spec(spec) -spec.loader.exec_module(mod) -filter_data_by_segment = getattr(mod, "filter_data_by_segment") - - - - - - + # transpose the dataframe per local start datetime of the segment and discard the useless index layer + phone_data_yield_per_segment = phone_data_yield_per_segment.groupby("local_segment_start_datetimes")[["minutes_after_segment_start", "sensor"]].apply(lambda x: x.set_index("minutes_after_segment_start").transpose()) + phone_data_yield_per_segment.index = phone_data_yield_per_segment.index.get_level_values("local_segment_start_datetimes") + return phone_data_yield_per_segment +def getSensorsPerMinPerSegmentHeatmap(phone_data_yield, pid, label, time_segment, html_file): + if phone_data_yield.empty: + html_file.write("There is no sensor data of " + time_segment + " segments for " + pid + " (pid) and " + label + " (label).
") + else: + phone_data_yield.sort_index(inplace=True) + x_axis_labels = [pd.Timedelta(minutes=x) for x in phone_data_yield.columns] + + fig = go.Figure(data=go.Heatmap(z=phone_data_yield.values.tolist(), + x=x_axis_labels, + y=phone_data_yield.index, + zmin=0, zmax=16, + colorscale=colors2colorscale(colors), + colorbar=dict(thickness=25, tickvals=[1/2 + x for x in range(16)],ticktext=[x for x in range(16)]))) + + fig.update_layout(title="Number of sensors with any data per minute for " + time_segment + " segments. Pid: "+pid+". Label: " + label + "
y-axis shows the start (date and time) of a time segment.
x-axis shows the time since the start of the time segment.
z-axis (color) shows how many sensors logged at least one row of data per minute.") + fig["layout"].update(margin=dict(t=160)) + + html_file.write(fig.to_html(full_html=False, include_plotlyjs="cdn")) + return colors = ["red", "#3D0751", "#423176", "#414381", "#3F5688", "#42678B", "#42768C", "#45868B", "#4A968A", "#53A485", "#5FB57E", "#76C170", "#91CF63", "#B4DA55", "#D9E152", "#F8E755", "#DEE00F"] pid = snakemake.params["pid"] -time_segments_labels = pd.read_csv(snakemake.input["time_segments_labels"], header=0) +time_segments_type = snakemake.params["time_segments_type"] +time_segments_labels = pd.read_csv(snakemake.input["time_segments_labels"]) with open(snakemake.input["participant_file"], "r", encoding="utf-8") as f: participant_file = yaml.safe_load(f) label = participant_file["PHONE"]["LABEL"] phone_data_yield = pd.read_csv(snakemake.input["phone_data_yield"], parse_dates=["local_date_time"]) +if time_segments_type == "FREQUENCY": + phone_data_yield["assigned_segments"] = phone_data_yield["assigned_segments"].str.replace(r"[0-9]{4}#", "#") + time_segments_labels["label"] = time_segments_labels["label"].str.replace(r"[0-9]{4}", "") +if time_segments_type == "PERIODIC": + phone_data_yield["assigned_segments"] = phone_data_yield["assigned_segments"].str.replace(r"_RR\d+SS#", "#") + time_segments_labels["label"] = time_segments_labels["label"].str.replace(r"_RR\d+SS", "") html_file = open(snakemake.output[0], "a", encoding="utf-8") if phone_data_yield.empty: html_file.write("There is no sensor data for " + pid + " (pid) and " + label + " (label).") else: - for time_segment in time_segments_labels["label"]: + data_for_plot = pd.DataFrame() + for time_segment in set(time_segments_labels["label"]): phone_data_yield_per_segment = filter_data_by_segment(phone_data_yield, time_segment) - - if phone_data_yield_per_segment.empty: - html_file.write("There is no sensor data of " + time_segment + " segments for " + pid + " (pid) and " + label + " (label).
") - else: - # calculate the length (in minute) of per segment instance - phone_data_yield_per_segment["length"] = phone_data_yield_per_segment["timestamps_segment"].str.split(",").apply(lambda x: int((int(x[1])-int(x[0])) / (1000 * 60))) - # calculate the number of sensors logged at least one row of data per minute. - phone_data_yield_per_segment = phone_data_yield_per_segment.groupby(["local_segment", "length", "local_date", "local_hour", "local_minute"])[["sensor", "local_date_time"]].max().reset_index() - # extract local start datetime of the segment from "local_segment" column - phone_data_yield_per_segment["local_segment_start_datetimes"] = pd.to_datetime(phone_data_yield_per_segment["local_segment"].apply(lambda x: x.split("#")[1].split(",")[0])) - # calculate the number of minutes after local start datetime of the segment - phone_data_yield_per_segment["minutes_after_segment_start"] = ((phone_data_yield_per_segment["local_date_time"] - phone_data_yield_per_segment["local_segment_start_datetimes"]) / pd.Timedelta(minutes=1)).astype("int") - - # impute missing rows with 0 - columns_for_full_index = phone_data_yield_per_segment[["local_segment_start_datetimes", "length"]].drop_duplicates(keep="first") - columns_for_full_index = columns_for_full_index.apply(lambda row: [[row["local_segment_start_datetimes"], x] for x in range(row["length"] + 1)], axis=1) - full_index = [] - for columns in columns_for_full_index: - full_index = full_index + columns - full_index = pd.MultiIndex.from_tuples(full_index, names=("local_segment_start_datetimes", "minutes_after_segment_start")) - phone_data_yield_per_segment = phone_data_yield_per_segment.set_index(["local_segment_start_datetimes", "minutes_after_segment_start"]).reindex(full_index).reset_index().fillna(0) - - # transpose the dataframe per local start datetime of the segment and discard the useless index layer - phone_data_yield_per_segment = phone_data_yield_per_segment.groupby("local_segment_start_datetimes")[["minutes_after_segment_start", "sensor"]].apply(lambda x: x.set_index("minutes_after_segment_start").transpose()) - phone_data_yield_per_segment.index = phone_data_yield_per_segment.index.get_level_values("local_segment_start_datetimes") - - # get heatmap - getSensorsPerMinPerSegmentHeatmap(phone_data_yield_per_segment, pid, time_segment, html_file) - + if not phone_data_yield_per_segment.empty: + data_for_plot_per_segment = getDataForPlot(phone_data_yield_per_segment) + if time_segments_type == "EVENT": + data_for_plot = pd.concat([data_for_plot, data_for_plot_per_segment], axis=0) + else: + getSensorsPerMinPerSegmentHeatmap(data_for_plot_per_segment, pid, label, time_segment, html_file) + if time_segments_type == "EVENT": + getSensorsPerMinPerSegmentHeatmap(data_for_plot, pid, label, "event", html_file) html_file.close() diff --git a/src/visualization/histogram_phone_data_yield.py b/src/visualization/histogram_phone_data_yield.py index a8b90d77..998d4fd9 100644 --- a/src/visualization/histogram_phone_data_yield.py +++ b/src/visualization/histogram_phone_data_yield.py @@ -6,7 +6,7 @@ time_segments_type = snakemake.params["time_segments_type"] phone_data_yield = pd.read_csv(snakemake.input[0]) if time_segments_type == "FREQUENCY": - phone_data_yield["local_segment_label"] = phone_data_yield["local_segment_label"].str.split("\d+", expand=True, n=1)[0] + phone_data_yield["local_segment_label"] = phone_data_yield["local_segment_label"].str.replace(r"[0-9]{4}", "") if time_segments_type == "EVENT": phone_data_yield["local_segment_label"] = "event" diff --git a/src/visualization/plotly_color_utils.py b/src/visualization/plotly_color_utils.py deleted file mode 100644 index 12ffb279..00000000 --- a/src/visualization/plotly_color_utils.py +++ /dev/null @@ -1,835 +0,0 @@ -""" -Source: https://github.com/plotly/plotly.py/pull/3136 -===== -colors -===== -Functions that manipulate colors and arrays of colors. ------ -There are three basic types of color types: rgb, hex and tuple: -rgb - An rgb color is a string of the form 'rgb(a,b,c)' where a, b and c are -integers between 0 and 255 inclusive. -hex - A hex color is a string of the form '#xxxxxx' where each x is a -character that belongs to the set [0,1,2,3,4,5,6,7,8,9,a,b,c,d,e,f]. This is -just the set of characters used in the hexadecimal numeric system. -tuple - A tuple color is a 3-tuple of the form (a,b,c) where a, b and c are -floats between 0 and 1 inclusive. ------ -Colormaps and Colorscales: -A colormap or a colorscale is a correspondence between values - Pythonic -objects such as strings and floats - to colors. -There are typically two main types of colormaps that exist: numerical and -categorical colormaps. -Numerical: ----------- -Numerical colormaps are used when the coloring column being used takes a -spectrum of values or numbers. -A classic example from the Plotly library: -``` -rainbow_colorscale = [ - [0, 'rgb(150,0,90)'], [0.125, 'rgb(0,0,200)'], - [0.25, 'rgb(0,25,255)'], [0.375, 'rgb(0,152,255)'], - [0.5, 'rgb(44,255,150)'], [0.625, 'rgb(151,255,0)'], - [0.75, 'rgb(255,234,0)'], [0.875, 'rgb(255,111,0)'], - [1, 'rgb(255,0,0)'] -] -``` -Notice that this colorscale is a list of lists with each inner list containing -a number and a color. These left hand numbers in the nested lists go from 0 to -1, and they are like pointers tell you when a number is mapped to a specific -color. -If you have a column of numbers `col_num` that you want to plot, and you know -``` -min(col_num) = 0 -max(col_num) = 100 -``` -then if you pull out the number `12.5` in the list and want to figure out what -color the corresponding chart element (bar, scatter plot, etc) is going to be, -you'll figure out that proportionally 12.5 to 100 is the same as 0.125 to 1. -So, the point will be mapped to 'rgb(0,0,200)'. -All other colors between the pinned values in a colorscale are linearly -interpolated. -Categorical: ------------- -Alternatively, a categorical colormap is used to assign a specific value in a -color column to a specific color everytime it appears in the dataset. -A column of strings in a panadas.dataframe that is chosen to serve as the -color index would naturally use a categorical colormap. However, you can -choose to use a categorical colormap with a column of numbers. -Be careful! If you have a lot of unique numbers in your color column you will -end up with a colormap that is massive and may slow down graphing performance. -""" -from __future__ import absolute_import - -import decimal -from numbers import Number -import six - -from _plotly_utils import exceptions - - -# Built-in qualitative color sequences and sequential, -# diverging and cyclical color scales. - -DEFAULT_PLOTLY_COLORS = [ - "rgb(31, 119, 180)", - "rgb(255, 127, 14)", - "rgb(44, 160, 44)", - "rgb(214, 39, 40)", - "rgb(148, 103, 189)", - "rgb(140, 86, 75)", - "rgb(227, 119, 194)", - "rgb(127, 127, 127)", - "rgb(188, 189, 34)", - "rgb(23, 190, 207)", -] - -PLOTLY_SCALES = { - "Greys": [[0, "rgb(0,0,0)"], [1, "rgb(255,255,255)"]], - "YlGnBu": [ - [0, "rgb(8,29,88)"], - [0.125, "rgb(37,52,148)"], - [0.25, "rgb(34,94,168)"], - [0.375, "rgb(29,145,192)"], - [0.5, "rgb(65,182,196)"], - [0.625, "rgb(127,205,187)"], - [0.75, "rgb(199,233,180)"], - [0.875, "rgb(237,248,217)"], - [1, "rgb(255,255,217)"], - ], - "Greens": [ - [0, "rgb(0,68,27)"], - [0.125, "rgb(0,109,44)"], - [0.25, "rgb(35,139,69)"], - [0.375, "rgb(65,171,93)"], - [0.5, "rgb(116,196,118)"], - [0.625, "rgb(161,217,155)"], - [0.75, "rgb(199,233,192)"], - [0.875, "rgb(229,245,224)"], - [1, "rgb(247,252,245)"], - ], - "YlOrRd": [ - [0, "rgb(128,0,38)"], - [0.125, "rgb(189,0,38)"], - [0.25, "rgb(227,26,28)"], - [0.375, "rgb(252,78,42)"], - [0.5, "rgb(253,141,60)"], - [0.625, "rgb(254,178,76)"], - [0.75, "rgb(254,217,118)"], - [0.875, "rgb(255,237,160)"], - [1, "rgb(255,255,204)"], - ], - "Bluered": [[0, "rgb(0,0,255)"], [1, "rgb(255,0,0)"]], - # modified RdBu based on - # www.sandia.gov/~kmorel/documents/ColorMaps/ColorMapsExpanded.pdf - "RdBu": [ - [0, "rgb(5,10,172)"], - [0.35, "rgb(106,137,247)"], - [0.5, "rgb(190,190,190)"], - [0.6, "rgb(220,170,132)"], - [0.7, "rgb(230,145,90)"], - [1, "rgb(178,10,28)"], - ], - # Scale for non-negative numeric values - "Reds": [ - [0, "rgb(220,220,220)"], - [0.2, "rgb(245,195,157)"], - [0.4, "rgb(245,160,105)"], - [1, "rgb(178,10,28)"], - ], - # Scale for non-positive numeric values - "Blues": [ - [0, "rgb(5,10,172)"], - [0.35, "rgb(40,60,190)"], - [0.5, "rgb(70,100,245)"], - [0.6, "rgb(90,120,245)"], - [0.7, "rgb(106,137,247)"], - [1, "rgb(220,220,220)"], - ], - "Picnic": [ - [0, "rgb(0,0,255)"], - [0.1, "rgb(51,153,255)"], - [0.2, "rgb(102,204,255)"], - [0.3, "rgb(153,204,255)"], - [0.4, "rgb(204,204,255)"], - [0.5, "rgb(255,255,255)"], - [0.6, "rgb(255,204,255)"], - [0.7, "rgb(255,153,255)"], - [0.8, "rgb(255,102,204)"], - [0.9, "rgb(255,102,102)"], - [1, "rgb(255,0,0)"], - ], - "Rainbow": [ - [0, "rgb(150,0,90)"], - [0.125, "rgb(0,0,200)"], - [0.25, "rgb(0,25,255)"], - [0.375, "rgb(0,152,255)"], - [0.5, "rgb(44,255,150)"], - [0.625, "rgb(151,255,0)"], - [0.75, "rgb(255,234,0)"], - [0.875, "rgb(255,111,0)"], - [1, "rgb(255,0,0)"], - ], - "Portland": [ - [0, "rgb(12,51,131)"], - [0.25, "rgb(10,136,186)"], - [0.5, "rgb(242,211,56)"], - [0.75, "rgb(242,143,56)"], - [1, "rgb(217,30,30)"], - ], - "Jet": [ - [0, "rgb(0,0,131)"], - [0.125, "rgb(0,60,170)"], - [0.375, "rgb(5,255,255)"], - [0.625, "rgb(255,255,0)"], - [0.875, "rgb(250,0,0)"], - [1, "rgb(128,0,0)"], - ], - "Hot": [ - [0, "rgb(0,0,0)"], - [0.3, "rgb(230,0,0)"], - [0.6, "rgb(255,210,0)"], - [1, "rgb(255,255,255)"], - ], - "Blackbody": [ - [0, "rgb(0,0,0)"], - [0.2, "rgb(230,0,0)"], - [0.4, "rgb(230,210,0)"], - [0.7, "rgb(255,255,255)"], - [1, "rgb(160,200,255)"], - ], - "Earth": [ - [0, "rgb(0,0,130)"], - [0.1, "rgb(0,180,180)"], - [0.2, "rgb(40,210,40)"], - [0.4, "rgb(230,230,50)"], - [0.6, "rgb(120,70,20)"], - [1, "rgb(255,255,255)"], - ], - "Electric": [ - [0, "rgb(0,0,0)"], - [0.15, "rgb(30,0,100)"], - [0.4, "rgb(120,0,100)"], - [0.6, "rgb(160,90,0)"], - [0.8, "rgb(230,200,0)"], - [1, "rgb(255,250,220)"], - ], - "Viridis": [ - [0, "#440154"], - [0.06274509803921569, "#48186a"], - [0.12549019607843137, "#472d7b"], - [0.18823529411764706, "#424086"], - [0.25098039215686274, "#3b528b"], - [0.3137254901960784, "#33638d"], - [0.3764705882352941, "#2c728e"], - [0.4392156862745098, "#26828e"], - [0.5019607843137255, "#21918c"], - [0.5647058823529412, "#1fa088"], - [0.6274509803921569, "#28ae80"], - [0.6901960784313725, "#3fbc73"], - [0.7529411764705882, "#5ec962"], - [0.8156862745098039, "#84d44b"], - [0.8784313725490196, "#addc30"], - [0.9411764705882353, "#d8e219"], - [1, "#fde725"], - ], - "Cividis": [ - [0.000000, "rgb(0,32,76)"], - [0.058824, "rgb(0,42,102)"], - [0.117647, "rgb(0,52,110)"], - [0.176471, "rgb(39,63,108)"], - [0.235294, "rgb(60,74,107)"], - [0.294118, "rgb(76,85,107)"], - [0.352941, "rgb(91,95,109)"], - [0.411765, "rgb(104,106,112)"], - [0.470588, "rgb(117,117,117)"], - [0.529412, "rgb(131,129,120)"], - [0.588235, "rgb(146,140,120)"], - [0.647059, "rgb(161,152,118)"], - [0.705882, "rgb(176,165,114)"], - [0.764706, "rgb(192,177,109)"], - [0.823529, "rgb(209,191,102)"], - [0.882353, "rgb(225,204,92)"], - [0.941176, "rgb(243,219,79)"], - [1.000000, "rgb(255,233,69)"], - ], -} - - -def color_parser(colors, function): - """ - Takes color(s) and a function and applies the function on the color(s) - In particular, this function identifies whether the given color object - is an iterable or not and applies the given color-parsing function to - the color or iterable of colors. If given an iterable, it will only be - able to work with it if all items in the iterable are of the same type - - rgb string, hex string or tuple - """ - if isinstance(colors, str): - return function(colors) - - if isinstance(colors, tuple) and isinstance(colors[0], Number): - return function(colors) - - if hasattr(colors, "__iter__"): - if isinstance(colors, tuple): - new_color_tuple = tuple(function(item) for item in colors) - return new_color_tuple - - else: - new_color_list = [function(item) for item in colors] - return new_color_list - - -def validate_colors(colors, colortype="tuple"): - """ - Validates color(s) and returns a list of color(s) of a specified type - """ - from numbers import Number - - if colors is None: - colors = DEFAULT_PLOTLY_COLORS - - if isinstance(colors, str): - if colors in PLOTLY_SCALES: - colors_list = colorscale_to_colors(PLOTLY_SCALES[colors]) - # TODO: fix _gantt.py/_scatter.py so that they can accept the - # actual colorscale and not just a list of the first and last - # color in the plotly colorscale. In resolving this issue we - # will be removing the immediate line below - colors = [colors_list[0]] + [colors_list[-1]] - elif "rgb" in colors or "#" in colors: - colors = [colors] - else: - raise exceptions.PlotlyError( - "If your colors variable is a string, it must be a " - "Plotly scale, an rgb color or a hex color." - ) - - elif isinstance(colors, tuple): - if isinstance(colors[0], Number): - colors = [colors] - else: - colors = list(colors) - - # convert color elements in list to tuple color - for j, each_color in enumerate(colors): - if "rgb" in each_color: - each_color = color_parser(each_color, unlabel_rgb) - for value in each_color: - if value > 255.0: - raise exceptions.PlotlyError( - "Whoops! The elements in your rgb colors " - "tuples cannot exceed 255.0." - ) - each_color = color_parser(each_color, unconvert_from_RGB_255) - colors[j] = each_color - - if "#" in each_color: - each_color = color_parser(each_color, hex_to_rgb) - each_color = color_parser(each_color, unconvert_from_RGB_255) - - colors[j] = each_color - - if isinstance(each_color, tuple): - for value in each_color: - if value > 1.0: - raise exceptions.PlotlyError( - "Whoops! The elements in your colors tuples " - "cannot exceed 1.0." - ) - colors[j] = each_color - - if colortype == "rgb" and not isinstance(colors, six.string_types): - for j, each_color in enumerate(colors): - rgb_color = color_parser(each_color, convert_to_RGB_255) - colors[j] = color_parser(rgb_color, label_rgb) - - return colors - - -def validate_colors_dict(colors, colortype="tuple"): - """ - Validates dictionary of color(s) - """ - # validate each color element in the dictionary - for key in colors: - if "rgb" in colors[key]: - colors[key] = color_parser(colors[key], unlabel_rgb) - for value in colors[key]: - if value > 255.0: - raise exceptions.PlotlyError( - "Whoops! The elements in your rgb colors " - "tuples cannot exceed 255.0." - ) - colors[key] = color_parser(colors[key], unconvert_from_RGB_255) - - if "#" in colors[key]: - colors[key] = color_parser(colors[key], hex_to_rgb) - colors[key] = color_parser(colors[key], unconvert_from_RGB_255) - - if isinstance(colors[key], tuple): - for value in colors[key]: - if value > 1.0: - raise exceptions.PlotlyError( - "Whoops! The elements in your colors tuples " - "cannot exceed 1.0." - ) - - if colortype == "rgb": - for key in colors: - colors[key] = color_parser(colors[key], convert_to_RGB_255) - colors[key] = color_parser(colors[key], label_rgb) - - return colors - - -def convert_colors_to_same_type( - colors, - colortype="rgb", - scale=None, - return_default_colors=False, - num_of_defualt_colors=2, -): - """ - Converts color(s) to the specified color type - Takes a single color or an iterable of colors, as well as a list of scale - values, and outputs a 2-pair of the list of color(s) converted all to an - rgb or tuple color type, aswell as the scale as the second element. If - colors is a Plotly Scale name, then 'scale' will be forced to the scale - from the respective colorscale and the colors in that colorscale will also - be coverted to the selected colortype. If colors is None, then there is an - option to return portion of the DEFAULT_PLOTLY_COLORS - :param (str|tuple|list) colors: either a plotly scale name, an rgb or hex - color, a color tuple or a list/tuple of colors - :param (list) scale: see docs for validate_scale_values() - :rtype (tuple) (colors_list, scale) if scale is None in the function call, - then scale will remain None in the returned tuple - """ - colors_list = [] - - if colors is None and return_default_colors is True: - colors_list = DEFAULT_PLOTLY_COLORS[0:num_of_defualt_colors] - - if isinstance(colors, str): - if colors in PLOTLY_SCALES: - colors_list = colorscale_to_colors(PLOTLY_SCALES[colors]) - if scale is None: - scale = colorscale_to_scale(PLOTLY_SCALES[colors]) - - elif "rgb" in colors or "#" in colors: - colors_list = [colors] - - elif isinstance(colors, tuple): - if isinstance(colors[0], Number): - colors_list = [colors] - else: - colors_list = list(colors) - - elif isinstance(colors, list): - colors_list = colors - - # validate scale - if scale is not None: - validate_scale_values(scale) - - if len(colors_list) != len(scale): - raise exceptions.PlotlyError( - "Make sure that the length of your scale matches the length " - "of your list of colors which is {}.".format(len(colors_list)) - ) - - # convert all colors to rgb - for j, each_color in enumerate(colors_list): - if "#" in each_color: - each_color = color_parser(each_color, hex_to_rgb) - each_color = color_parser(each_color, label_rgb) - colors_list[j] = each_color - - elif isinstance(each_color, tuple): - each_color = color_parser(each_color, convert_to_RGB_255) - each_color = color_parser(each_color, label_rgb) - colors_list[j] = each_color - - if colortype == "rgb": - return (colors_list, scale) - elif colortype == "tuple": - for j, each_color in enumerate(colors_list): - each_color = color_parser(each_color, unlabel_rgb) - each_color = color_parser(each_color, unconvert_from_RGB_255) - colors_list[j] = each_color - return (colors_list, scale) - else: - raise exceptions.PlotlyError( - "You must select either rgb or tuple for your colortype variable." - ) - - -def convert_dict_colors_to_same_type(colors_dict, colortype="rgb"): - """ - Converts a colors in a dictionary of colors to the specified color type - :param (dict) colors_dict: a dictionary whose values are single colors - """ - for key in colors_dict: - if "#" in colors_dict[key]: - colors_dict[key] = color_parser(colors_dict[key], hex_to_rgb) - colors_dict[key] = color_parser(colors_dict[key], label_rgb) - - elif isinstance(colors_dict[key], tuple): - colors_dict[key] = color_parser(colors_dict[key], convert_to_RGB_255) - colors_dict[key] = color_parser(colors_dict[key], label_rgb) - - if colortype == "rgb": - return colors_dict - elif colortype == "tuple": - for key in colors_dict: - colors_dict[key] = color_parser(colors_dict[key], unlabel_rgb) - colors_dict[key] = color_parser(colors_dict[key], unconvert_from_RGB_255) - return colors_dict - else: - raise exceptions.PlotlyError( - "You must select either rgb or tuple for your colortype variable." - ) - - -def validate_scale_values(scale): - """ - Validates scale values from a colorscale - :param (list) scale: a strictly increasing list of floats that begins - with 0 and ends with 1. Its usage derives from a colorscale which is - a list of two-lists (a list with two elements) of the form - [value, color] which are used to determine how interpolation weighting - works between the colors in the colorscale. Therefore scale is just - the extraction of these values from the two-lists in order - """ - if len(scale) < 2: - raise exceptions.PlotlyError( - "You must input a list of scale values that has at least two values." - ) - - if (scale[0] != 0) or (scale[-1] != 1): - raise exceptions.PlotlyError( - "The first and last number in your scale must be 0.0 and 1.0 " - "respectively." - ) - - if not all(x < y for x, y in zip(scale, scale[1:])): - raise exceptions.PlotlyError( - "'scale' must be a list that contains a strictly increasing " - "sequence of numbers." - ) - - -def validate_colorscale(colorscale): - """Validate the structure, scale values and colors of colorscale.""" - if not isinstance(colorscale, list): - # TODO Write tests for these exceptions - raise exceptions.PlotlyError("A valid colorscale must be a list.") - if not all(isinstance(innerlist, list) for innerlist in colorscale): - raise exceptions.PlotlyError("A valid colorscale must be a list of lists.") - colorscale_colors = colorscale_to_colors(colorscale) - scale_values = colorscale_to_scale(colorscale) - - validate_scale_values(scale_values) - validate_colors(colorscale_colors) - - -def make_colorscale(colors, scale=None): - """ - Makes a colorscale from a list of colors and a scale - Takes a list of colors and scales and constructs a colorscale based - on the colors in sequential order. If 'scale' is left empty, a linear- - interpolated colorscale will be generated. If 'scale' is a specificed - list, it must be the same legnth as colors and must contain all floats - For documentation regarding to the form of the output, see - https://plot.ly/python/reference/#mesh3d-colorscale - :param (list) colors: a list of single colors - """ - colorscale = [] - - # validate minimum colors length of 2 - if len(colors) < 2: - raise exceptions.PlotlyError( - "You must input a list of colors that has at least two colors." - ) - - if scale is None: - scale_incr = 1.0 / (len(colors) - 1) - return [[i * scale_incr, color] for i, color in enumerate(colors)] - - else: - if len(colors) != len(scale): - raise exceptions.PlotlyError( - "The length of colors and scale must be the same." - ) - - validate_scale_values(scale) - - colorscale = [list(tup) for tup in zip(scale, colors)] - return colorscale - - -def find_intermediate_color(lowcolor, highcolor, intermed, colortype="tuple"): - """ - Returns the color at a given distance between two colors - This function takes two color tuples, where each element is between 0 - and 1, along with a value 0 < intermed < 1 and returns a color that is - intermed-percent from lowcolor to highcolor. If colortype is set to 'rgb', - the function will automatically convert the rgb type to a tuple, find the - intermediate color and return it as an rgb color. - """ - if colortype == "rgb": - # convert to tuple color, eg. (1, 0.45, 0.7) - lowcolor = unlabel_rgb(lowcolor) - highcolor = unlabel_rgb(highcolor) - - diff_0 = float(highcolor[0] - lowcolor[0]) - diff_1 = float(highcolor[1] - lowcolor[1]) - diff_2 = float(highcolor[2] - lowcolor[2]) - - inter_med_tuple = ( - lowcolor[0] + intermed * diff_0, - lowcolor[1] + intermed * diff_1, - lowcolor[2] + intermed * diff_2, - ) - - if colortype == "rgb": - # back to an rgb string, e.g. rgb(30, 20, 10) - inter_med_rgb = label_rgb(inter_med_tuple) - return inter_med_rgb - - return inter_med_tuple - - -def unconvert_from_RGB_255(colors): - """ - Return a tuple where each element gets divided by 255 - Takes a (list of) color tuple(s) where each element is between 0 and - 255. Returns the same tuples where each tuple element is normalized to - a value between 0 and 1 - """ - return (colors[0] / (255.0), colors[1] / (255.0), colors[2] / (255.0)) - - -def convert_to_RGB_255(colors): - """ - Multiplies each element of a triplet by 255 - Each coordinate of the color tuple is rounded to the nearest float and - then is turned into an integer. If a number is of the form x.5, then - if x is odd, the number rounds up to (x+1). Otherwise, it rounds down - to just x. This is the way rounding works in Python 3 and in current - statistical analysis to avoid rounding bias - :param (list) rgb_components: grabs the three R, G and B values to be - returned as computed in the function - """ - rgb_components = [] - - for component in colors: - rounded_num = decimal.Decimal(str(component * 255.0)).quantize( - decimal.Decimal("1"), rounding=decimal.ROUND_HALF_EVEN - ) - # convert rounded number to an integer from 'Decimal' form - rounded_num = int(rounded_num) - rgb_components.append(rounded_num) - - return (rgb_components[0], rgb_components[1], rgb_components[2]) - - -def n_colors(lowcolor, highcolor, n_colors, colortype="tuple"): - """ - Splits a low and high color into a list of n_colors colors in it - Accepts two color tuples and returns a list of n_colors colors - which form the intermediate colors between lowcolor and highcolor - from linearly interpolating through RGB space. If colortype is 'rgb' - the function will return a list of colors in the same form. - """ - if colortype == "rgb": - # convert to tuple - lowcolor = unlabel_rgb(lowcolor) - highcolor = unlabel_rgb(highcolor) - - diff_0 = float(highcolor[0] - lowcolor[0]) - incr_0 = diff_0 / (n_colors - 1) - diff_1 = float(highcolor[1] - lowcolor[1]) - incr_1 = diff_1 / (n_colors - 1) - diff_2 = float(highcolor[2] - lowcolor[2]) - incr_2 = diff_2 / (n_colors - 1) - list_of_colors = [] - - for index in range(n_colors): - new_tuple = ( - lowcolor[0] + (index * incr_0), - lowcolor[1] + (index * incr_1), - lowcolor[2] + (index * incr_2), - ) - list_of_colors.append(new_tuple) - - if colortype == "rgb": - # back to an rgb string - list_of_colors = color_parser(list_of_colors, label_rgb) - - return list_of_colors - - -def label_rgb(colors): - """ - Takes tuple (a, b, c) and returns an rgb color 'rgb(a, b, c)' - """ - return "rgb(%s, %s, %s)" % (colors[0], colors[1], colors[2]) - - -def unlabel_rgb(colors): - """ - Takes rgb color(s) 'rgb(a, b, c)' and returns tuple(s) (a, b, c) - This function takes either an 'rgb(a, b, c)' color or a list of - such colors and returns the color tuples in tuple(s) (a, b, c) - """ - str_vals = "" - for index in range(len(colors)): - try: - float(colors[index]) - str_vals = str_vals + colors[index] - except ValueError: - if colors[index] == "," or colors[index] == ".": - str_vals = str_vals + colors[index] - - str_vals = str_vals + "," - numbers = [] - str_num = "" - for char in str_vals: - if char != ",": - str_num = str_num + char - else: - numbers.append(float(str_num)) - str_num = "" - return (numbers[0], numbers[1], numbers[2]) - - -def hex_to_rgb(value): - """ - Calculates rgb values from a hex color code. - :param (string) value: Hex color string - :rtype (tuple) (r_value, g_value, b_value): tuple of rgb values - """ - value = value.lstrip("#") - hex_total_length = len(value) - rgb_section_length = hex_total_length // 3 - return tuple( - int(value[i : i + rgb_section_length], 16) - for i in range(0, hex_total_length, rgb_section_length) - ) - - -def colorscale_to_colors(colorscale): - """ - Extracts the colors from colorscale as a list - """ - color_list = [] - for item in colorscale: - color_list.append(item[1]) - return color_list - - -def colorscale_to_scale(colorscale): - """ - Extracts the interpolation scale values from colorscale as a list - """ - scale_list = [] - for item in colorscale: - scale_list.append(item[0]) - return scale_list - - -def convert_colorscale_to_rgb(colorscale): - """ - Converts the colors in a colorscale to rgb colors - A colorscale is an array of arrays, each with a numeric value as the - first item and a color as the second. This function specifically is - converting a colorscale with tuple colors (each coordinate between 0 - and 1) into a colorscale with the colors transformed into rgb colors - """ - for color in colorscale: - color[1] = convert_to_RGB_255(color[1]) - - for color in colorscale: - color[1] = label_rgb(color[1]) - return colorscale - - -def named_colorscales(): - """ - Returns lowercased names of built-in continuous colorscales. - """ - from _plotly_utils.basevalidators import ColorscaleValidator - - return [c for c in ColorscaleValidator("", "").named_colorscales] - - -def get_colorscale(name): - """ - Returns the colorscale for a given name. See `named_colorscales` for the - built-in colorscales. - """ - from _plotly_utils.basevalidators import ColorscaleValidator - - if not isinstance(name, str): - raise exceptions.PlotlyError("Name argument have to be a string.") - - name = name.lower() - if name[-2:] == "_r": - should_reverse = True - name = name[:-2] - else: - should_reverse = False - - if name in ColorscaleValidator("", "").named_colorscales: - colorscale = ColorscaleValidator("", "").named_colorscales[name] - else: - raise exceptions.PlotlyError(f"Colorscale {name} is not a built-in scale.") - - if should_reverse: - colorscale = colorscale[::-1] - return make_colorscale(colorscale) - - -def sample_colorscale(colorscale, samplepoints, low=0.0, high=1.0, colortype="rgb"): - """ - Samples a colorscale at specific points. - Interpolates between colors in a colorscale to find the specific colors - corresponding to the specified sample values. The colorscale can be specified - as a list of `[scale, color]` pairs, as a list of colors, or as a named - plotly colorscale. The samplepoints can be specefied as an iterable of specific - points in the range [0.0, 1.0], or as an integer number of points which will - be spaced equally between the low value (default 0.0) and the high value - (default 1.0). The output is a list of colors, formatted according to the - specified colortype. - """ - from bisect import bisect_left - - try: - validate_colorscale(colorscale) - except exceptions.PlotlyError: - if isinstance(colorscale, str): - colorscale = get_colorscale(colorscale) - else: - colorscale = make_colorscale(colorscale) - - scale = colorscale_to_scale(colorscale) - validate_scale_values(scale) - colors = colorscale_to_colors(colorscale) - colors = validate_colors(colors, colortype="tuple") - - if isinstance(samplepoints, int): - samplepoints = [ - low + idx / (samplepoints - 1) * (high - low) for idx in range(samplepoints) - ] - elif isinstance(samplepoints, float): - samplepoints = [samplepoints] - - sampled_colors = [] - for point in samplepoints: - high = bisect_left(scale, point) - low = high - 1 - interpolant = (point - scale[low]) / (scale[high] - scale[low]) - sampled_color = find_intermediate_color(colors[low], colors[high], interpolant) - sampled_colors.append(sampled_color) - return validate_colors(sampled_colors, colortype=colortype)