Introduction:
This project combines real-time data streaming, machine learning for anomaly detection, and interactive visualization to create a comprehensive dashboard. We’ll simulate a system with multiple sensor readings, train an anomaly detection model, and visualize the data with real-time anomalies highlighted. This project is ideal for those interested in machine learning, data science, and building sophisticated data applications.
Project Overview:
Simulate Multi-Sensor Data: Generate a dataset with multiple time-series signals and introduce artificial anomalies.
Train an Anomaly Detection Model: Use an isolation forest or similar model to detect anomalies.
Build a Real-Time Dashboard: Create a Dash dashboard that displays the sensor readings, anomaly scores, and detected anomalies.
Interactive Analysis: Implement interactive features to explore the data and anomalies.
Python Code:
The following code is used to simulate data, train an anomaly detection model, and build a real-time dashboard.
import dash
from dash import dcc, html
from dash.dependencies import Output, Input
import plotly.graph_objs as go
import pandas as pd
import numpy as np
from sklearn.ensemble import IsolationForest
import time
from collections import deque
import random
# Simulate data
def generate_data(n_samples=1000):
np.random.seed(42)
time_index = pd.date_range(start="2023-01-01", periods=n_samples, freq="S")
sensor1 = np.sin(np.linspace(0, 10, n_samples)) + np.random.normal(0, 0.1, n_samples)
sensor2 = np.cos(np.linspace(0, 10, n_samples)) + np.random.normal(0, 0.1, n_samples)
sensor3 = np.random.normal(0, 0.5, n_samples)
df = pd.DataFrame({"sensor1": sensor1, "sensor2": sensor2, "sensor3": sensor3}, index=time_index)
# Introduce anomalies
anomaly_indices = np.random.choice(n_samples, size=int(n_samples * 0.02), replace=False)
df.iloc[anomaly_indices, 0] += np.random.uniform(2, 5, len(anomaly_indices))
df.iloc[anomaly_indices, 1] += np.random.uniform(-3, -1, len(anomaly_indices))
df.iloc[anomaly_indices, 2] += np.random.uniform(-3, 3, len(anomaly_indices))
return df
df = generate_data()
# Train anomaly detection model
model = IsolationForest(contamination=0.02)
model.fit(df)
anomaly_scores = model.decision_function(df)
anomalies = model.predict(df)
# Initialize Dash app
app = dash.Dash(__name__)
# Define layout
app.layout = html.Div(children=[
html.H1(children='Real-Time Anomaly Detection Dashboard'),
dcc.Graph(id='sensor-graph'),
dcc.Graph(id='anomaly-graph'),
dcc.Interval(id='interval-component', interval=1 * 1000, n_intervals=0)
])
# Initialize data structures
max_length = 100
time_data = deque(maxlen=max_length)
sensor1_data = deque(maxlen=max_length)
sensor2_data = deque(maxlen=max_length)
sensor3_data = deque(maxlen=max_length)
anomaly_score_data = deque(maxlen=max_length)
anomaly_data = deque(maxlen=max_length)
# Define callback function for updating graphs
@app.callback(Output('sensor-graph', 'figure'), Output('anomaly-graph', 'figure'), Input('interval-component', 'n_intervals'))
def update_graph(n):
index = (n % len(df))
time_data.append(df.index[index])
sensor1_data.append(df['sensor1'][index])
sensor2_data.append(df['sensor2'][index])
sensor3_data.append(df['sensor3'][index])
anomaly_score_data.append(anomaly_scores[index])
anomaly_data.append(anomalies[index])
sensor_trace1 = go.Scatter(x=list(time_data), y=list(sensor1_data), mode='lines', name='Sensor 1')
sensor_trace2 = go.Scatter(x=list(time_data), y=list(sensor2_data), mode='lines', name='Sensor 2')
sensor_trace3 = go.Scatter(x=list(time_data), y=list(sensor3_data), mode='lines', name='Sensor 3')
anomaly_trace = go.Scatter(x=list(time_data), y=list(anomaly_score_data), mode='lines', name='Anomaly Score')
anomaly_scatter = go.Scatter(x=[time_data[i] for i in range(len(time_data)) if anomaly_data[i] == -1],
y=[anomaly_score_data[i] for i in range(len(time_data)) if anomaly_data[i] == -1],
mode='markers', marker={'color': 'red'}, name='Anomalies')
return {'data': [sensor_trace1, sensor_trace2, sensor_trace3],
'layout': go.Layout(title='Sensor Readings', xaxis=dict(title='Time'), yaxis=dict(title='Value'))},
{'data': [anomaly_trace, anomaly_scatter],
'layout': go.Layout(title='Anomaly Scores', xaxis=dict(title='Time'), yaxis=dict(title='Score'))}
if __name__ == '__main__':
app.run_server(debug=True)
Enhancements and Further Exploration:
Real-Time Data: Replace the simulated data with a real-time data stream (e.g., from a Kafka topic or an API).
Advanced Anomaly Detection: Experiment with more sophisticated anomaly detection models (e.g., autoencoders, LSTM-based models).
Interactive Controls: Add interactive controls to adjust the anomaly detection threshold, filter data, and zoom in on specific time ranges.
Alerting System: Implement an alerting system that sends notifications when anomalies are detected.
Database Integration: Store the data and anomaly results in a database for long-term analysis.
Model retraining: Retrain the model on new data periodically.
This project combines multiple advanced concepts and provides a framework for building a powerful real-time anomaly detection system.