...
In dieser Untersuchung wurde geprüftgetestet, ob ein Large Language Model (LLM) die zur Bewertung der Neutralität von Bildungsinhalten ähnlich wie menschliche Experten bewerten kanngenutzt werden kann, indem es diese vergleichbar wie ein Mensch bewertet.
Dabei wurden 2000 Datensätze, bestehend aus Beschreibungstexten und Volltexten, mithilfe eines angepassten Prompts durch ein KI-Modell bewertet und die Ergebnisse mit bereits vorhandenen redaktionellen Bewertungen verglichen. Die Analyse zeigte, dass die KI-Bewertung, insbesondere bei kürzeren Beschreibungstexten, eine hohe Übereinstimmung mit den menschlichen Bewertungen aufwies (MAE = 0,62). Bei den längeren und komplexeren Volltexten hingegen fiel die Abweichung größer aus (MAE = 0,92). Mögliche Ursachen für das schlechtere Abschneiden der Volltexte können Bewertungen sein, die von den Fachredaktionen in die Texte eingeflossen sind oder eine höhere Textqualität gegenüber Volltexte (trotz geringerer Zeichenlänge). Dennoch zeigt die Untersuchung weiteren Forschungsbedarf auf, insbesondere in Bezug auf die Erfassung von komplexen Inhalten in Volltexten und die Entwicklung eines besseren Messinstruments.
...
Entfernt wurden Datensätze, bei denen folgende Felder leer waren:
properties.cclom:general_description (Beschreibungstexte)
additional_data.full_text (Volltexte)
properties.ccm:oeh_quality_neutralness (Neutralitäts-Score der Redaktionen)
Zusätzlich wurden Datensätze ausgeschlossen, deren Textfelder (Beschreibungstexte und Volltexte) weniger als 60 Zeichen umfassen, um ausreichend Material für eine sinnvolle Bewertung zu haben.
properties.cclom:general_description (Beschreibungstexte)
additional_data.full_text (Volltexte)
Schließlich wurde die Anzahl der Datensätze auf 2000 reduziert und normalisiert. Dabei wurden folgende Felder zur gleichmäßigen Verteilung einbezogen:
properties.ccm:oeh_quality_neutralness (Neutralitäts-Score der Redaktionen)
properties.ccm:taxonid (Disziplinen)(
Disziplinen
...
wurden berücksichtigt, um eine ausgewogene Stichprobe der Bildungsinhalte aus unterschiedlichen Fachbereichen zu gewährleisten.
Ein Round-Robin-Verfahren (zyklisches Rundlaufverfahren) wurde angewandt, um
...
die
...
Daten gleichmäßig
...
zu verteilen.
Eine Gleichverteilung ist jedoch auf Grund der vorübergehend gut bewerteten Inhalte nur bedingt möglich.
...
Die Bildungsinhalte sind vorwiegend dem Bereich Schulbildung zuzuordnen und auf der Neutralitäts-Skala hoch bewertet (4 bis 5). Dies lässt sich durch das redaktionelle einpflegen der Inhalte erklären.
...
.
Verteilung der Daten
Ein Großteil der Datensätze ist den Disziplinen: Informatik, Chemie, Physik, Mathematik und Darstellendes Spiel zuzuordnen.
...
Fast alle Inhalte wurden auf der Skala mit 4 oder 5 bewertet, was jedoch im Rahmen der Erwartungen liegt, da von Redaktionen gepflegte Inhalte von eher besserer Qualität sind.
...
Analyse der Textqualität
Da die Beschreibungs- und Volltexte die Grundlage der Bewertung bildeten, wurden diese hinsichtlich ihrer Qualität bewertet.
...
Allerdings zeigte sich, das die Beschreibungstexte im Vergleich zu den Volltexten weniger emotional gestaltet sind (Sentiment-Analyse) und mit geringerer formaler Bildung zu verstanden werden können (SMOG-Index).
Textanalyse der Beschreibungstexte
...
Textanalyse Volltexte
...
Verteilung der Daten
...
...
Fast alle Inhalte wurden auf der Skala mit 4 oder 5 bewertet, was jedoch im Rahmen der Erwartungen liegt, da von Redaktionen gepflegte Inhalte von eher besserer Qualität sind.
...
Testdurchführung
Für die Testdurchführung wurde ein Python-Script entwickelt, das die Beschreibungs- und Volltexte der Bildungsdatensätze von http://WirLernenOnline.de nutzt, um eine KI-basierte Bewertung der Neutralität durchzuführen.
...
Es zeigt sich auch, dass die LLM-Bewertung auf bestimmte Aspekte der Neutralität stärker fokussiert, wie etwa die Vielfalt der Perspektiven und das Risiko für Fehlinterpretationen. Diese Fokusverschiebung könnte zu wertvollen Ergänzungen im Prozess der Neutralitätsbewertung beitragen, sollte jedoch immer im Kontext der redaktionellen Standards interpretiert werden.
Anlage
Quellen für die Promptentwicklung
siehe hier
Tool für die Volltextgenerierung
Code Block |
---|
# URL-basierte Volltext-Anreicherung mit Goose3 # Anforderungen: pip install beautifulsoup4 streamlit requests goose3 # Script als fulltext_enricher.py speichern # Start: steamlit run fulltext_enricher.py import streamlit as st import json import os import datetime import time from goose3 import Goose # List of file extensions to skip (e.g. audio and video formats) SKIPPED_EXTENSIONS = ['.mp4', '.mp3', '.avi', '.mpeg', '.mov', '.wmv', '.flv'] # Function to scrape a webpage using Goose3 and return the extracted information def scrape_page(url, follow_redirect=True): goose = Goose() # Initialize Goose3 with default settings try: if isinstance(url, list): # Check if URL is in list format and convert to string url = url[0] # Skip URLs with certain extensions if any(url.lower().endswith(ext) for ext in SKIPPED_EXTENSIONS): return "skipped", None article = goose.extract(url=url) # Extract the relevant information full_text = article.cleaned_text # Cleaned full text title = article.title # Extracted title summary = article.meta_description[:500] if article.meta_description else full_text[:500] # Meta description as summary or first 500 chars keywords = article.meta_keywords # Extracted meta keywords top_image = article.top_image.src if article.top_image else None # URL of the top image # Fallback: If full text is missing, use meta description if not full_text: full_text = article.meta_description or "No full text available for this page." return { 'title': title, 'full_text': full_text, 'summary': summary, 'keywords': keywords, 'top_image': top_image, 'url': url } except Exception as e: st.error(f"Error scraping {url}: {e}") return None # Function to process the JSON file and enrich it with scraped data def process_json(data, url_field, timeout, save_folder, follow_redirect=True, skip_media_files=True, crawl_all=True, num_records=10): try: total_records = len(data) records_to_process = total_records if crawl_all else min(num_records, total_records) st.write(f"Processing {records_to_process} out of {total_records} records...") enriched_data = [] for i, record in enumerate(data[:records_to_process], 1): # Display the progress message for processing records st.info(f"Processing record {i}/{records_to_process}", icon="ℹ️") # Extract URL using the selected field try: url = eval(f"record{url_field}") # If URL is in list format, convert it to a string (take the first URL) if isinstance(url, list): url = url[0] except Exception as e: st.warning(f"No valid URL found for record {i}/{records_to_process}. Skipping... (Error: {e})", icon="⚠️") continue if url: # Skip media files if the option is selected if skip_media_files and any(url.lower().endswith(ext) for ext in SKIPPED_EXTENSIONS): st.warning(f"Skipping media file URL #{i}: {url}", icon="⚠️") continue # Display the message for scraping the current URL st.success(f"Scraping URL #{i}: {url}", icon="🟢") scraped_data = scrape_page(url, follow_redirect=follow_redirect) if scraped_data != "skipped" and scraped_data: # Only display the summary preview message without additional status info if scraped_data['summary']: st.success(scraped_data['summary'][:250] + "..." if len(scraped_data['summary']) > 250 else scraped_data['summary']) # Add scraped data to the record record['additional_data'] = { 'title': scraped_data['title'], 'full_text': scraped_data['full_text'], 'summary': scraped_data['summary'], 'keywords': scraped_data['keywords'], 'top_image': scraped_data['top_image'], 'final_url': scraped_data['url'] } enriched_data.append(record) # Introduce delay for the given timeout time.sleep(timeout) # Save the enriched JSON file timestamp = datetime.datetime.now().strftime('%Y%m%d_%H%M%S') enriched_file_name = os.path.join(save_folder, f"data_enriched_{timestamp}.json") with open(enriched_file_name, 'w', encoding='utf-8') as f: json.dump(enriched_data, f, ensure_ascii=False, indent=4) st.success(f"Enriched data saved as {enriched_file_name}", icon="🟢") except Exception as e: st.error(f"Error processing JSON file: {e}") # Function to extract available fields from JSON structure def extract_fields(data): field_set = set() # Recursive function to explore the JSON structure def recurse_json(obj, parent_key=''): if isinstance(obj, dict): for key, value in obj.items(): new_key = f"{parent_key}['{key}']" if parent_key else f"['{key}']" field_set.add(new_key) recurse_json(value, new_key) elif isinstance(obj, list): for item in obj: recurse_json(item, parent_key) recurse_json(data) return sorted(list(field_set)) # Streamlit UI st.title('JSON Web Scraper and Enricher using Goose3') # Upload JSON file uploaded_file = st.file_uploader("Choose a JSON file", type="json") if uploaded_file: try: # Load the JSON data data = json.load(uploaded_file) # Extract all field paths from the JSON structure available_fields = extract_fields(data) # Allow the user to choose a URL field url_field = st.selectbox("Select the URL field", available_fields, index=available_fields.index("['properties']['ccm:wwwurl']")) # Other options for processing timeout = st.number_input("Enter delay between requests (seconds)", min_value=0, value=0) save_folder = st.text_input("Folder to save enriched JSON", value=".") follow_redirects = st.checkbox("Follow redirects", value=True) # Option to skip media files (audio/video) skip_media_files = st.checkbox("Skip media files (e.g., .mp4, .mp3, .avi)", value=True) # Option to process all records or only a limited number crawl_all = st.checkbox("Crawl all records", value=True) num_records = st.number_input("Number of records to process", min_value=1, value=10, disabled=crawl_all) if st.button("Start Processing"): process_json(data, url_field, timeout, save_folder, follow_redirects, skip_media_files, crawl_all, num_records) except Exception as e: st.error(f"Error loading JSON file: {e}") |
Tool für die Filterung, Analyse und Bestimmung der Textqualität
Code Block |
---|
# Tool für die Betrachtung, Analyse und Filterung von JSON-Files
# Anforderungen: pip install streamlit pandas orjson matplotlib seaborn plotly textstat nltk langdetect textblob
# Script als datamanager_json.py speichern
# Start: streamlit run datamanager_json.py
import streamlit as st
import pandas as pd
import orjson
import os
import matplotlib.pyplot as plt
import seaborn as sns
import plotly.express as px
import textstat
import re
import nltk
from nltk import ngrams
from collections import Counter, defaultdict
from langdetect import detect, LangDetectException
from textblob import TextBlob
import time
from itertools import cycle
# Download NLTK data
nltk.download('punkt')
# Funktion zum rekursiven Flachlegen von JSON
def flatten_json(y):
out = {}
def flatten(x, name=''):
if isinstance(x, dict):
for a in x:
flatten(x[a], f'{name}{a}_')
elif isinstance(x, list):
# Join list items with a comma
out[name[:-1]] = ', '.join(map(str, x))
else:
out[name[:-1]] = x
flatten(y)
return out
@st.cache_data(show_spinner=False)
def process_uploaded_file(file_path):
try:
records = []
with open(file_path, 'rb') as f:
content = f.read()
data = orjson.loads(content)
if isinstance(data, list):
for record in data:
flat_record = flatten_json(record)
records.append(flat_record)
elif isinstance(data, dict):
flat_record = flatten_json(data)
records.append(flat_record)
else:
st.error("Nicht unterstützte JSON-Struktur.")
return pd.DataFrame()
df = pd.DataFrame(records)
st.success(f"JSON-Daten erfolgreich in DataFrame konvertiert. Spalten: {len(df.columns)}")
st.write(f"Anzahl der Datensätze: {len(df)}")
return df
except Exception as e:
st.error(f"Fehler beim Verarbeiten der Datei: {e}")
return pd.DataFrame()
def merge_similar_fields(df):
pattern = re.compile(r'^(.*?)(?:_\d+)?$')
base_columns = {}
for col in df.columns:
match = pattern.match(col)
if match:
base_name = match.group(1)
if base_name not in base_columns:
base_columns[base_name] = []
base_columns[base_name].append(col)
for base, cols in base_columns.items():
if len(cols) > 1:
df[base] = df[cols].apply(lambda row: ', '.join(row.dropna().astype(str)), axis=1)
df.drop(columns=cols, inplace=True)
return df
def calculate_fill_status(df):
fill_status = df.notnull().mean() * 100
fill_status = fill_status.sort_values(ascending=False)
return fill_status
def get_all_fields(data, parent_key='', fields=None):
if fields is None:
fields = set()
if isinstance(data, dict):
for key, value in data.items():
full_key = f'{parent_key}.{key}' if parent_key else key
fields.add(full_key)
if isinstance(value, dict):
get_all_fields(value, full_key, fields)
elif isinstance(value, list):
for item in value:
if isinstance(item, dict):
get_all_fields(item, full_key, fields)
elif isinstance(data, list):
for item in data:
if isinstance(item, dict):
get_all_fields(item, parent_key, fields)
return fields
def load_json(file_path):
with open(file_path, "rb") as f:
return orjson.loads(f.read())
def save_json(data, file_path):
with open(file_path, "wb") as f:
f.write(orjson.dumps(data, option=orjson.OPT_INDENT_2))
def list_json_files(directory):
return [file for file in os.listdir(directory) if file.endswith(".json")]
def preview_data(data, index=0):
if 0 <= index < len(data):
return data[index]
return {}
def get_nested_value(data, path):
keys = path.split(".")
for key in keys:
if isinstance(data, list):
# Extrahiere Werte für den Schlüssel aus jedem Dict in der Liste
next_data = []
for item in data:
if isinstance(item, dict) and key in item:
next_data.append(item[key])
data = next_data if next_data else None
elif isinstance(data, dict):
data = data.get(key)
else:
return None
if data is None:
return None
# Flatten the list if it's a list of lists
if isinstance(data, list):
flat_data = []
for item in data:
if isinstance(item, list):
flat_data.extend(item)
else:
flat_data.append(item)
return flat_data
return data
def is_field_empty(value):
"""Prüft, ob ein Feld als 'leer' betrachtet wird (z.B. None, leere Strings, Listen, Dicts)."""
if value is None:
return True
if isinstance(value, str) and value.strip() == "":
return True
if isinstance(value, list) and len(value) == 0:
return True
if isinstance(value, dict) and len(value) == 0:
return True
return False
def remove_fields(data, fields_to_remove):
for item in data:
for field in fields_to_remove:
keys = field.split(".")
current_dict = item
for key in keys[:-1]:
if key in current_dict and isinstance(current_dict[key], dict):
current_dict = current_dict[key]
else:
current_dict = {}
break
if keys[-1] in current_dict:
del current_dict[keys[-1]]
return data
def current_timestamp():
return time.strftime("%Y%m%d_%H%M%S")
def text_analysis(df, text_field, min_chars=0):
try:
texts = df[text_field].dropna().astype(str)
except KeyError:
st.error(f"Feld '{text_field}' existiert nicht.")
return
if min_chars > 0:
texts = texts[texts.str.len() >= min_chars]
if 'text_index' not in st.session_state:
st.session_state.text_index = 1
st.subheader("Texte durchsuchen")
if not texts.empty:
# Verwenden Sie Streamlit's native Funktionen für die Navigation
st.markdown("### Vorschau des Textes")
max_index = len(texts)
st.session_state.text_index = st.number_input(
"Datensatznummer",
min_value=1,
max_value=max_index,
value=st.session_state.text_index if st.session_state.text_index <= max_index else max_index,
step=1,
key='text_navigation'
)
current_text = texts.iloc[st.session_state.text_index - 1]
st.text_area("Text", value=current_text, height=200, key='text_display')
st.write(f"**Datensatz Nummer:** {st.session_state.text_index}")
st.subheader("Textstatistik")
num_chars = texts.str.len().sum()
num_words = texts.apply(lambda x: len(x.split())).sum()
avg_chars = texts.str.len().mean()
avg_words = texts.apply(lambda x: len(x.split())).mean()
st.write(f"**Gesamtanzahl der Zeichen:** {num_chars}")
st.write(f"**Gesamtanzahl der Wörter:** {num_words}")
st.write(f"**Durchschnittliche Zeichen pro Text:** {avg_chars:.2f}")
st.write(f"**Durchschnittliche Wörter pro Text:** {avg_words:.2f}")
# **Lesbarkeitsmetriken**
st.subheader("Lesbarkeitsmetriken")
st.markdown("""
**Flesch Reading Ease:** Bewertet, wie leicht ein Text zu lesen ist. Höhere Werte bedeuten leichter lesbare Texte.
**Flesch-Kincaid Grade:** Gibt das Schulniveau an, das für das Verständnis des Textes erforderlich ist.
**Gunning Fog Index:** Misst die Anzahl der Jahre formaler Bildung, die ein Leser benötigt, um den Text zu verstehen.
**SMOG Index:** Schätzt die erforderlichen Jahre formaler Bildung basierend auf der Anzahl der polysyllabischen Wörter.
""")
readability_df = pd.DataFrame({
'Flesch Reading Ease': texts.apply(textstat.flesch_reading_ease),
'Flesch-Kincaid Grade': texts.apply(textstat.flesch_kincaid_grade),
'Gunning Fog Index': texts.apply(textstat.gunning_fog),
'SMOG Index': texts.apply(textstat.smog_index)
})
readability_summary = readability_df.mean().round(2)
st.write(readability_summary.to_frame(name='Durchschnitt').T)
st.markdown("""
**Interpretation der Lesbarkeitsmetriken:**
- **Flesch Reading Ease:** Werte zwischen 60-70 sind für die meisten Erwachsenen gut verständlich.
- **Flesch-Kincaid Grade:** Ein Wert von 8 bedeutet, dass ein Schüler der 8. Klasse den Text verstehen sollte.
- **Gunning Fog Index:** Ein Wert von 12 entspricht dem Niveau eines Highschool-Abschlusses.
- **SMOG Index:** Gibt die geschätzten Jahre formaler Bildung an, die für das Verständnis des Textes erforderlich sind.
""")
# **Sentiment-Analyse**
st.subheader("Sentiment-Analyse")
st.markdown("""
**Sentiment-Analyse:** Bestimmt die emotionale Tonalität der Texte. Die Kategorien sind:
- **Positiv:** Der Text drückt positive Gefühle aus.
- **Negativ:** Der Text drückt negative Gefühle aus.
- **Neutral:** Der Text drückt weder positive noch negative Gefühle aus.
""")
sentiments = texts.apply(lambda x: TextBlob(x).sentiment.polarity)
sentiment_counts = sentiments.apply(lambda x: 'Positiv' if x > 0 else ('Negativ' if x < 0 else 'Neutral')).value_counts()
sentiment_counts_df = sentiment_counts.reset_index()
sentiment_counts_df.columns = ['Sentiment', 'Anzahl der Texte']
fig2 = px.bar(
sentiment_counts_df,
x='Sentiment',
y='Anzahl der Texte',
labels={'Anzahl der Texte': 'Anzahl der Texte'},
title="Verteilung der Sentiment-Kategorien",
hover_data={'Anzahl der Texte': True}
)
fig2.update_traces(marker_color='blue')
fig2.update_layout(xaxis_title='Sentiment', yaxis_title='Anzahl der Texte')
st.plotly_chart(fig2, use_container_width=True, key='sentiment_plot')
st.markdown("""
**Interpretation der Sentiment-Analyse:**
- **Positiv:** Ein hoher Anteil positiver Texte kann auf eine optimistische Stimmung in den Daten hinweisen.
- **Negativ:** Ein hoher Anteil negativer Texte kann auf Herausforderungen oder Kritik in den Daten hinweisen.
- **Neutral:** Ein hoher Anteil neutraler Texte deutet auf sachliche oder informative Inhalte hin.
""")
# **Spracherkennung**
st.subheader("Spracherkennung")
st.markdown("""
**Spracherkennung:** Identifiziert die Sprache der Texte, um sicherzustellen, dass alle Texte in der erwarteten Sprache verfasst sind.
""")
def detect_language(text):
try:
return detect(text)
except LangDetectException:
return "Unbekannt"
languages = texts.apply(detect_language)
language_counts = languages.value_counts()
language_counts_df = language_counts.reset_index()
language_counts_df.columns = ['Sprache', 'Anzahl der Texte']
fig3 = px.bar(
language_counts_df,
x='Sprache',
y='Anzahl der Texte',
labels={'Anzahl der Texte': 'Anzahl der Texte'},
title="Verteilung der erkannten Sprachen",
hover_data={'Anzahl der Texte': True}
)
fig3.update_traces(marker_color='orange')
fig3.update_layout(xaxis_title='Sprache', yaxis_title='Anzahl der Texte')
st.plotly_chart(fig3, use_container_width=True, key='language_plot')
st.markdown("""
**Interpretation der Spracherkennung:**
- **Sprache:** Die erkannten Sprachen geben Aufschluss darüber, welche Sprachen in den Textdaten vorherrschen.
- **Unbekannt:** Ein hoher Anteil an "Unbekannt" kann auf unklare oder gemischte Sprachinhalte hinweisen.
""")
# **Lexikalische Vielfalt**
st.subheader("Lexikalische Vielfalt")
st.markdown("""
**Lexikalische Vielfalt (Typ-Token-Verhältnis, TTR):** Misst die Vielfalt des verwendeten Wortschatzes. Ein höheres TTR weist auf eine größere Wortvielfalt hin, was auf eine reichhaltigere und vielfältigere Sprache im Text hindeutet.
""")
def type_token_ratio(text):
tokens = text.split()
types = set(tokens)
return len(types) / len(tokens) if len(tokens) > 0 else 0
df['TTR'] = texts.apply(type_token_ratio)
ttr_summary = df['TTR'].describe()
st.write("**Statistik des Typ-Token-Verhältnisses (TTR):**")
st.write(ttr_summary)
fig5 = px.histogram(
df,
x='TTR',
nbins=20,
title="Verteilung des Typ-Token-Verhältnisses (TTR)",
labels={'TTR': 'TTR', 'count': 'Anzahl der Texte'},
opacity=0.75
# histnorm='count' entfernt, da 'count' ungültig ist und 'count' das Standardverhalten ist
)
fig5.update_traces(marker_color='green')
st.plotly_chart(fig5, use_container_width=True, key='ttr_plot')
st.markdown("""
**Interpretation der lexikalischen Vielfalt:**
- **Höheres TTR:** Größere Vielfalt im Wortschatz, was auf abwechslungsreichere und reichhaltigere Texte hinweist.
- **Niedrigeres TTR:** Weniger Vielfalt im Wortschatz, was auf wiederholende oder monotone Sprache hinweisen kann.
""")
# **Duplikat-Erkennung**
st.subheader("Duplikat-Erkennung")
st.markdown("""
**Duplikat-Erkennung:** Identifiziert doppelte oder nahezu doppelte Texte, um Redundanzen in den Daten zu vermeiden.
""")
duplicate_counts = df[text_field].duplicated().sum()
st.write(f"**Anzahl der doppelten Texte:** {duplicate_counts}")
if duplicate_counts > 0:
duplicates = df[df[text_field].duplicated(keep=False)]
st.write("**Doppelte Texte:**")
st.write(duplicates[[text_field]])
# **N-Gramm-Analyse**
st.subheader("N-Gramm-Analyse")
st.markdown("""
**N-Gramm-Analyse:** Analysiert häufig vorkommende Phrasen (Bigrams), um gängige Ausdrucksweisen oder Themen zu identifizieren.
""")
def get_ngrams(text, n=2):
tokens = nltk.word_tokenize(text)
return list(ngrams(tokens, n))
bigrams = texts.apply(lambda x: get_ngrams(x, 2)).explode()
bigram_counts = Counter(bigrams).most_common(20)
bigram_df = pd.DataFrame(bigram_counts, columns=['Bigram', 'Anzahl'])
bigram_df['Bigram'] = bigram_df['Bigram'].apply(lambda x: ' '.join(x))
fig6 = px.bar(
bigram_df,
x='Anzahl',
y='Bigram',
orientation='h',
labels={'Anzahl': 'Anzahl der Vorkommen'},
title="Top 20 Bigrams",
hover_data={'Anzahl': True}
)
fig6.update_traces(marker_color='magenta')
st.plotly_chart(fig6, use_container_width=True, key='bigram_plot')
st.markdown("""
**Interpretation der N-Gramm-Analyse:**
- **Häufige Bigrams:** Die am häufigsten vorkommenden Bigrams können auf gängige Phrasen oder Themen in den Texten hinweisen.
""")
def json_filter_tab(df):
st.header("🔄 Datenfilter")
# Da die JSON-Datei bereits zentral geladen ist, keine weitere Auswahl nötig
st.info("Verwenden Sie die bereits geladene JSON-Datei für die Filterung.")
data = df.to_dict(orient='records')
all_fields = set(df.columns.tolist())
# Filter 1: Leere Felder (standardmäßig deaktiviert)
empty_field_filter_active = st.checkbox("🚫 Leere Felder filtern", value=False)
if empty_field_filter_active:
selected_empty_fields = st.multiselect(
"Felder zur Prüfung auf leere Werte auswählen",
options=list(all_fields),
default=[]
)
st.warning("Dieser Filter entfernt Datensätze, bei denen ausgewählte Felder keine Werte enthalten.")
# Filter 2: Feld-Wert Kombination mit Operatoren und Auto-Vervollständigung
field_value_filter_active = st.checkbox("🔍 Feld-Wert Kombinationen filtern")
if field_value_filter_active:
st.warning("Dieser Filter entfernt Datensätze, bei denen die angegebenen Feld-Wert-Kombinationen nicht zutreffen.")
field_value_filters = []
field_value_count = st.number_input("Anzahl der Feld-Wert-Kombinationen", min_value=1, value=1, step=1)
operators = ["=", "!=", ">", "<", ">=", "<="]
operator_map = {
"=": lambda a, b: a == b,
"!=": lambda a, b: a != b,
">": lambda a, b: a > b,
"<": lambda a, b: a < b,
">=": lambda a, b: a >= b,
"<=": lambda a, b: a <= b
}
for i in range(int(field_value_count)):
col1, col2, col3 = st.columns(3)
with col1:
field = st.selectbox(f"Feld {i+1}", options=list(all_fields), key=f"filter_field_{i}")
with col2:
operator = st.selectbox(f"Operator {i+1}", options=operators, key=f"filter_operator_{i}")
with col3:
value = st.text_input(f"Wert {i+1}", key=f"filter_value_{i}")
field_value_filters.append((field, operator, value))
# Filter 3: Mindestzeichenlänge
length_filter_active = st.checkbox("✂️ Filter nach Mindestzeichenlänge")
if length_filter_active:
selected_length_fields = st.multiselect("Felder zur Zeichenlänge auswählen", options=list(all_fields))
min_length = st.number_input("Mindestzeichenlänge", min_value=1, value=30)
st.warning("Dieser Filter entfernt Datensätze, bei denen die Länge der ausgewählten Felder kürzer als die angegebene Mindestlänge ist.")
# Filter 4: Gleichverteilung
balancing_filter_active = st.checkbox("⚖️ Filter zur Gleichverteilung")
if balancing_filter_active:
selected_balancing_fields = st.multiselect("Felder für Gleichverteilung auswählen", options=list(all_fields))
total_count = st.number_input("Gesamtanzahl der Datensätze nach Gleichverteilung", min_value=1, value=100)
st.warning("Dieser Filter reduziert die Daten auf eine Gesamtanzahl und verteilt sie möglichst gleichmäßig auf die angegebenen Felder.")
# Filter 5: Felder aus JSON entfernen
remove_fields_filter_active = st.checkbox("🗑️ Felder aus JSON entfernen")
if remove_fields_filter_active:
fields_to_remove = st.multiselect("Wähle die Felder, die entfernt werden sollen", options=list(all_fields), default=[])
st.warning("Dieser Filter entfernt ausgewählte Felder aus den Datensätzen.")
# Filter 6: Duplizierte Datensätze entfernen
duplicate_filter_active = st.checkbox("🔁 Duplikate entfernen")
if duplicate_filter_active:
duplicate_fields = st.multiselect("Wähle die Felder, auf denen die Duplikate basieren sollen", options=list(all_fields), default=[])
st.warning("Dieser Filter entfernt doppelte Datensätze basierend auf den ausgewählten Feldern.")
# Statusmeldungen und Debugging Informationen
if st.button("✅ Daten filtern und speichern"):
st.info("Starte Filterprozess...")
filtered_data = data.copy() # Kopiere Originaldaten
# Leere Felder filtern
if empty_field_filter_active and selected_empty_fields:
st.info("🚫 Leere Felder filtern...")
filtered_data = [
item for item in filtered_data
if all(
(field_value := get_nested_value(item, field)) is not None
and not is_field_empty(field_value)
for field in selected_empty_fields
)
]
st.write(f"Anzahl der Datensätze nach Filterung leere Felder: {len(filtered_data)}")
# Feld-Wert Kombinationen filtern
if field_value_filter_active and field_value_filters:
st.info("🔍 Feld-Wert Kombinationen filtern...")
for field, operator, value in field_value_filters:
op_func = operator_map[operator]
try:
# Versuche den Wert in einen numerischen Typ zu konvertieren, wenn möglich
try:
value_converted = float(value)
except ValueError:
value_converted = value
filtered_data = [
item for item in filtered_data
if (field_value := get_nested_value(item, field)) is not None and (
(isinstance(field_value, list) and any(
isinstance(v, (int, float, str)) and op_func(v, value_converted) for v in field_value
)) or (isinstance(field_value, (int, float, str)) and op_func(field_value, value_converted))
)
]
except TypeError:
st.error(f"Der Wert im Feld '{field}' kann nicht mit dem Operator '{operator}' verglichen werden.")
st.write(f"Anzahl der Datensätze nach Feld-Wert Filter: {len(filtered_data)}")
# Zeichenlänge filtern
if length_filter_active and selected_length_fields:
st.info("✂️ Filterung nach Zeichenlänge...")
filtered_data = [
item for item in filtered_data
if all(
(field_value := get_nested_value(item, field)) is not None
and (
(isinstance(field_value, str) and len(field_value) >= min_length)
or (isinstance(field_value, list) and any(isinstance(v, str) and len(v) >= min_length for v in field_value))
)
for field in selected_length_fields
)
]
st.write(f"Anzahl der Datensätze nach Filterung der Mindestzeichenlänge: {len(filtered_data)}")
# Gleichverteilung filtern
if balancing_filter_active and selected_balancing_fields:
st.info("⚖️ Starte Gleichverteilung...")
field_groups = defaultdict(list)
for item in filtered_data:
# Erstelle einen hashbaren Schlüssel, indem Listen in Tupel umgewandelt werden
key = tuple(
tuple(get_nested_value(item, field)) if isinstance(get_nested_value(item, field), list) else get_nested_value(item, field)
for field in selected_balancing_fields
)
field_groups[key].append(item)
balanced_data = []
groups = list(field_groups.values())
if groups:
group_cycle = cycle(groups)
while len(balanced_data) < total_count and groups:
try:
group = next(group_cycle)
if group:
balanced_data.append(group.pop(0))
if not group:
groups.remove(group)
group_cycle = cycle(groups)
except StopIteration:
break
filtered_data = balanced_data[:total_count]
st.write(f"Anzahl der Datensätze nach Gleichverteilung: {len(filtered_data)}")
# Duplikate entfernen
if duplicate_filter_active and duplicate_fields:
st.info("🔁 Duplikate entfernen...")
initial_count = len(filtered_data)
if duplicate_fields:
filtered_data = pd.DataFrame(filtered_data).drop_duplicates(subset=duplicate_fields, keep='first').to_dict(orient='records')
filtered_count = len(filtered_data)
st.write(f"Duplikate entfernt: {initial_count - filtered_count}")
st.write(f"Anzahl der verbleibenden Datensätze: {filtered_count}")
else:
st.warning("Bitte wählen Sie mindestens ein Feld für die Duplikatenfilterung aus.")
# Felder entfernen
if remove_fields_filter_active and fields_to_remove:
st.info("🗑️ Felder entfernen...")
filtered_data = remove_fields(filtered_data, fields_to_remove)
st.write(f"Anzahl der Datensätze nach Entfernen der Felder: {len(filtered_data)} (Anzahl bleibt gleich)")
# Speichern der gefilterten Daten mit Namenszusätzen
timestamp = current_timestamp()
filters_applied = []
if empty_field_filter_active and selected_empty_fields:
filters_applied.append("emptyfields")
if field_value_filter_active and field_value_filters:
filters_applied.append("fieldvalue")
if length_filter_active and selected_length_fields:
filters_applied.append(f"minlength{min_length}")
if balancing_filter_active and selected_balancing_fields:
filters_applied.append("balancing")
if duplicate_filter_active and duplicate_fields:
filters_applied.append("duplicates")
if remove_fields_filter_active and fields_to_remove:
filters_applied.append("removefields")
output_filename = f"{st.session_state.get('selected_file', 'output').split('.')[0]}_{'_'.join(filters_applied)}_{timestamp}.json"
output_path = os.path.join(os.getcwd(), output_filename)
save_json(filtered_data, output_path)
st.write(f"Anzahl der Datensätze nach dem Speichern: {len(filtered_data)}")
st.success(f"Gefilterte Daten wurden gespeichert als: {output_filename}")
def data_viewer_tab(df):
st.header("📁 Datenbetrachter")
with st.expander("🔍 Vorschau des Datensatzes"):
if 'viewer_index' not in st.session_state:
st.session_state.viewer_index = 0
current_record = preview_data(df.to_dict(orient='records'), st.session_state.viewer_index)
st.json(current_record)
# Anzeige des aktuellen Datensatzes nach dem Button-Klick
st.write(f"**Datensatz Nummer:** {st.session_state.viewer_index + 1}")
# Buttons zum Blättern (jetzt unter den Elementen)
col_prev, col_next = st.columns([1,1])
with col_prev:
if st.button("⬅️ Vorheriger Datensatz", key='prev_viewer'):
if st.session_state.viewer_index > 0:
st.session_state.viewer_index -= 1
with col_next:
if st.button("➡️ Nächster Datensatz", key='next_viewer'):
if st.session_state.viewer_index < len(df) - 1:
st.session_state.viewer_index += 1
st.subheader("📊 Datensätze anzeigen")
st.markdown("**Hinweis:** Große Datensätze können sehr lange dauern, um angezeigt zu werden.")
col_start, col_end = st.columns(2)
with col_start:
start_num = st.number_input("Start Datensatz Nummer", min_value=1, value=1, step=1, key='start_num')
with col_end:
end_num = st.number_input("Ende Datensatz Nummer", min_value=1, value=min(len(df), 10), step=1, key='end_num')
st.write("") # Leerzeile
if st.button("🔄 Daten anzeigen"):
if end_num < start_num:
st.error("Ende Nummer muss größer oder gleich Start Nummer sein.")
elif end_num > len(df):
st.error(f"Ende Nummer darf nicht größer als die Anzahl der Datensätze ({len(df)}) sein.")
else:
st.write(df.iloc[start_num-1:end_num])
def werteverteilung_tab(df):
st.header("📈 Werteverteilung")
metadata_fields = df.select_dtypes(include=['object', 'category']).columns.tolist()
if not metadata_fields:
st.write("Keine Metadatenfelder gefunden.")
else:
search_field = st.text_input("Feld suchen", "", key='metadata_search')
if search_field:
filtered_fields = [field for field in metadata_fields if search_field.lower() in field.lower()]
else:
filtered_fields = metadata_fields
if filtered_fields:
selected_fields = st.multiselect("Wähle Metadatenfelder zur Visualisierung", filtered_fields, key='metadata_select_multi')
if selected_fields:
for field in selected_fields:
# Handle multiple values by splitting
value_series = df[field].dropna().astype(str).str.split(', ').explode()
value_counts = value_series.value_counts().head(20)
st.write(value_counts.to_frame().reset_index().rename(columns={'index': field, field: 'Anzahl'}))
# Truncate labels for readability
df_counts = value_counts.to_frame().reset_index()
df_counts.columns = [field, 'Anzahl']
df_counts[field] = df_counts[field].apply(lambda x: x[:20] + '...' if len(x) > 20 else x)
fig = px.bar(
df_counts,
x='Anzahl',
y=field,
orientation='h',
labels={'Anzahl': 'Anzahl der Vorkommen', field: 'Feld'},
title="", # Entfernt den Titel über der Grafik
hover_data={'Anzahl': True}
)
fig.update_traces(marker_color='blue')
st.plotly_chart(fig, use_container_width=True, key=f'werteverteilung_{field}')
else:
st.write("Keine Felder gefunden, die dem Suchbegriff entsprechen.")
def fuellstandsanalyse_tab(df):
st.header("📊 Füllstandsanalyse")
st.write("Filtere Daten basierend auf Feldern und Werten und analysiere den Füllstand der Metadaten.")
# Auswahl der Felder und Werte zum Filtern
st.subheader("🔍 Filter auswählen")
selected_fill_fields = st.multiselect(
"Felder zur Filterung auswählen",
options=df.columns.tolist(),
default=[]
)
fill_field_values = {}
for field in selected_fill_fields:
unique_values = df[field].dropna().unique().tolist()
# Optimierung: Verwenden Sie Caching für einzigartige Werte
unique_values = st.cache_data(lambda x: x)(unique_values)
selected_values = st.multiselect(f"Wähle Werte für {field}", unique_values, default=[], key=f"fill_{field}")
fill_field_values[field] = selected_values
# Option, ob Filter eine gemeinsame Datenmenge bilden oder getrennt betrachtet werden
join_option = st.radio(
"Wie sollen die Filter angewendet werden?",
options=["Getrennt betrachten", "Gemeinsame Datenmenge bilden"],
index=0
)
# Auswahl der Felder zur Anzeige des Füllstands
st.subheader("📈 Füllstand anzeigen für:")
display_fill_fields = st.multiselect(
"Wähle Metadatenfelder zur Anzeige des Füllstands",
options=df.columns.tolist(),
default=[]
)
if st.button("🔄 Füllstand analysieren"):
st.info("Starte Füllstandsanalyse...")
if selected_fill_fields and any(fill_field_values[field] for field in selected_fill_fields):
if join_option == "Gemeinsame Datenmenge bilden":
# Filtern der Daten, die alle ausgewählten Feld-Wert-Kombinationen erfüllen
filtered_df = df.copy()
for field, values in fill_field_values.items():
if values:
filtered_df = filtered_df[filtered_df[field].isin(values)]
subsets = {"Gemeinsame Datenmenge": filtered_df}
else:
# Jede Feld-Wert-Kombination als separate Teilmenge
subsets = {}
for field, values in fill_field_values.items():
if values:
for value in values:
subset_name = f"{field} = {value}"
subsets[subset_name] = df[df[field].isin([value])]
else:
# Keine Filter angewendet, eine einzige Teilmenge
subsets = {"Alle Daten": df}
if display_fill_fields:
# Begrenzen der Anzahl der Grafiken, um die Übersichtlichkeit zu wahren
max_columns = 2 # Ändert die Anzahl der Spalten von 3 auf 2
num_subsets = len(subsets)
num_cols = min(max_columns, num_subsets)
cols = st.columns(num_cols)
for idx, (subset_name, subset_df) in enumerate(subsets.items()):
col = cols[idx % num_cols]
with col:
# Füge einen Hinweistext über der Grafik hinzu
st.markdown(f"**Filter:** {subset_name}")
# Entfernt die Überschrift über den Grafiken
fill_status = subset_df[display_fill_fields].notnull().mean() * 100
fill_status = fill_status.sort_values(ascending=False)
# Dynamische Anpassung der Grafikgröße
num_bars = len(fill_status)
if num_bars == 1:
fig_height = 400
else:
fig_height = max(400, num_bars * 50)
# Daten vorbereiten für zweifarbige Balken
fill_percentage = fill_status
empty_percentage = 100 - fill_status
fill_data = pd.DataFrame({
'Metadatenfeld': fill_status.index,
'Gefüllt (%)': fill_percentage.values,
'Leer (%)': empty_percentage.values
})
# Melt the DataFrame für gestapelte Balkendiagramme
fill_data_melted = fill_data.melt(id_vars='Metadatenfeld', value_vars=['Gefüllt (%)', 'Leer (%)'], var_name='Status', value_name='Prozent')
fig = px.bar(
fill_data_melted,
x='Prozent',
y='Metadatenfeld',
color='Status',
orientation='h',
title="", # Entfernt den Titel über der Grafik
labels={'Prozent': 'Prozent (%)', 'Metadatenfeld': 'Metadatenfeld'},
hover_data={'Prozent': True, 'Status': True}
)
fig.update_layout(barmode='stack', height=fig_height, showlegend=True)
st.plotly_chart(fig, use_container_width=True, key=f'fuellstand_plot_{idx}')
else:
st.warning("Bitte wählen Sie mindestens ein Feld zur Anzeige des Füllstands aus.")
def text_analysis_tab(df):
st.header("📝 Textanalyse")
text_fields = df.select_dtypes(include=['object', 'category']).columns.tolist()
if not text_fields:
st.write("Keine Textfelder gefunden.")
else:
search_text_field = st.text_input("Feld suchen", "", key='text_search')
if search_text_field:
filtered_text_fields = [field for field in text_fields if search_text_field.lower() in field.lower()]
else:
filtered_text_fields = text_fields
if filtered_text_fields:
selected_text_field = st.selectbox("Wähle ein Metadatenfeld mit Text", filtered_text_fields, key='text_select')
if selected_text_field:
min_chars = st.number_input("Minimale Anzahl der Zeichen zum Filtern", min_value=0, value=0, step=1, key='text_min_chars')
text_analysis(df, selected_text_field, min_chars)
else:
st.write("Keine Textfelder gefunden, die dem Suchbegriff entsprechen.")
def main():
st.set_page_config(page_title="Universelles Datentool für JSON Dateien", layout="wide")
# Seitenleiste für Titel und JSON-Dateiauswahl
with st.sidebar:
st.title("Universelles Datentool für JSON Dateien")
st.write("Dieses Tool ermöglicht die Analyse und Bearbeitung von JSON-Daten mit beliebiger Struktur.")
data_dir = os.getcwd()
json_files = list_json_files(data_dir)
if not json_files:
st.warning(f"Keine JSON-Dateien im Verzeichnis '{data_dir}' gefunden. Bitte laden Sie eine Datei in das Verzeichnis hoch.")
selected_file = None
else:
selected_file = st.selectbox("📂 Wähle eine JSON-Datei aus dem aktuellen Verzeichnis", json_files, key='json_select')
if st.button("🔍 Datei laden") and selected_file:
with st.spinner("Verarbeite Datei..."):
df = process_uploaded_file(os.path.join(data_dir, selected_file))
if not df.empty:
df = merge_similar_fields(df)
st.session_state['df'] = df
st.session_state['selected_file'] = selected_file # Speichern des ausgewählten Dateinamens
# Vermerk am unteren Ende der Sidebar
st.markdown("---")
st.markdown("**by Jan Schachtschabel**")
# Überprüfen, ob Daten geladen sind
if 'df' in st.session_state and not st.session_state['df'].empty:
df = st.session_state['df']
selected_file = st.session_state.get('selected_file', 'output')
# Erstellen der Tabs im Hauptbereich
tabs = st.tabs(["Datenbetrachter", "Werteverteilung", "Füllstandsanalyse", "Textanalyse", "Datenfilter"])
with tabs[0]:
data_viewer_tab(df)
with tabs[1]:
werteverteilung_tab(df)
with tabs[2]:
fuellstandsanalyse_tab(df)
with tabs[3]:
text_analysis_tab(df)
with tabs[4]:
json_filter_tab(df)
if __name__ == "__main__":
main()
|
Tool für die Bewertung der Neutralität
Python-Script zur Durchführung des Tests und der Bewertung mittels LLM
...