# -*- coding: utf-8 -*-
# Copyright 2023 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Utilities for Commands.

Common methods for Commands such as RunCommand and CompileCommand.
"""
from __future__ import annotations

from typing import AbstractSet, Any, Callable, Sequence

from google.generativeai.notebook import ipython_env
from google.generativeai.notebook import model_registry
from google.generativeai.notebook import parsed_args_lib
from google.generativeai.notebook import post_process_utils
from google.generativeai.notebook.lib import llm_function
from google.generativeai.notebook.lib import llmfn_input_utils
from google.generativeai.notebook.lib import llmfn_output_row
from google.generativeai.notebook.lib import llmfn_outputs
from google.generativeai.notebook.lib import unique_fn


class _GroundTruthLLMFunction(llm_function.LLMFunction):
  """LLMFunction that returns pre-generated ground truth data."""

  def __init__(self, data: Sequence[str]):
    super().__init__(outputs_ipython_display_fn=None)
    self._data = data

  def get_placeholders(self) -> AbstractSet[str]:
    # Ground truth is fixed and thus has no placeholders.
    return frozenset({})

  def _call_impl(
      self, inputs: llmfn_input_utils.LLMFunctionInputs | None
  ) -> Sequence[llmfn_outputs.LLMFnOutputEntry]:
    normalized_inputs = llmfn_input_utils.to_normalized_inputs(inputs)
    if len(self._data) != len(normalized_inputs):
      raise RuntimeError(
          "Ground truth should have same number of entries as inputs: {} vs {}"
          .format(len(self._data), len(normalized_inputs))
      )

    outputs: list[llmfn_outputs.LLMFnOutputEntry] = []
    for idx, (value, prompt_vars) in enumerate(
        zip(self._data, normalized_inputs)
    ):
      output_row = llmfn_output_row.LLMFnOutputRow(
          data={
              llmfn_outputs.ColumnNames.RESULT_NUM: 0,
              llmfn_outputs.ColumnNames.TEXT_RESULT: value,
          },
          result_type=str,
      )
      outputs.append(
          llmfn_outputs.LLMFnOutputEntry(
              prompt_num=0,
              input_num=idx,
              prompt_vars=prompt_vars,
              output_rows=[output_row],
          )
      )
    return outputs


def _get_ipython_display_fn(
    env: ipython_env.IPythonEnv,
) -> Callable[[llmfn_outputs.LLMFnOutputs], None]:
  return lambda x: env.display(x.as_pandas_dataframe())


def create_llm_function(
    models: model_registry.ModelRegistry,
    env: ipython_env.IPythonEnv | None,
    parsed_args: parsed_args_lib.ParsedArgs,
    cell_content: str,
    post_processing_fns: Sequence[post_process_utils.ParsedPostProcessExpr],
) -> llm_function.LLMFunction:
  """Creates an LLMFunction from Command.execute() arguments."""
  prompts: list[str] = [cell_content]

  llmfn_outputs_display_fn = _get_ipython_display_fn(env) if env else None

  llm_fn = llm_function.LLMFunctionImpl(
      model=models.get_model(parsed_args.model_type),
      model_args=parsed_args.model_args,
      prompts=prompts,
      outputs_ipython_display_fn=llmfn_outputs_display_fn,
  )
  if parsed_args.unique:
    llm_fn = llm_fn.add_post_process_reorder_fn(
        name="unique", fn=unique_fn.unique_fn
    )
  for fn in post_processing_fns:
    llm_fn = fn.add_to_llm_function(llm_fn)

  return llm_fn


def _convert_simple_compare_fn(
    name_and_simple_fn: tuple[str, Callable[[str, str], Any]]
) -> tuple[str, llm_function.CompareFn]:
  simple_fn = name_and_simple_fn[1]
  new_fn = lambda x, y: simple_fn(x.result_value(), y.result_value())
  return name_and_simple_fn[0], new_fn


def create_llm_compare_function(
    env: ipython_env.IPythonEnv | None,
    parsed_args: parsed_args_lib.ParsedArgs,
    post_processing_fns: Sequence[post_process_utils.ParsedPostProcessExpr],
) -> llm_function.LLMFunction:
  """Creates an LLMCompareFunction from Command.execute() arguments."""
  llmfn_outputs_display_fn = _get_ipython_display_fn(env) if env else None

  llm_cmp_fn = llm_function.LLMCompareFunction(
      lhs_name_and_fn=parsed_args.lhs_name_and_fn,
      rhs_name_and_fn=parsed_args.rhs_name_and_fn,
      compare_name_and_fns=[
          _convert_simple_compare_fn(x) for x in parsed_args.compare_fn
      ],
      outputs_ipython_display_fn=llmfn_outputs_display_fn,
  )
  for fn in post_processing_fns:
    llm_cmp_fn = fn.add_to_llm_function(llm_cmp_fn)

  return llm_cmp_fn


def create_llm_eval_function(
    models: model_registry.ModelRegistry,
    env: ipython_env.IPythonEnv | None,
    parsed_args: parsed_args_lib.ParsedArgs,
    cell_content: str,
    post_processing_fns: Sequence[post_process_utils.ParsedPostProcessExpr],
) -> llm_function.LLMFunction:
  """Creates an LLMCompareFunction from Command.execute() arguments."""
  llmfn_outputs_display_fn = _get_ipython_display_fn(env) if env else None

  # First construct a regular LLMFunction from the cell contents.
  llm_fn = create_llm_function(
      models=models,
      env=env,
      parsed_args=parsed_args,
      cell_content=cell_content,
      post_processing_fns=post_processing_fns,
  )

  # Next create a LLMCompareFunction.
  ground_truth_fn = _GroundTruthLLMFunction(data=parsed_args.ground_truth)
  llm_cmp_fn = llm_function.LLMCompareFunction(
      lhs_name_and_fn=("actual", llm_fn),
      rhs_name_and_fn=("ground_truth", ground_truth_fn),
      compare_name_and_fns=[
          _convert_simple_compare_fn(x) for x in parsed_args.compare_fn
      ],
      outputs_ipython_display_fn=llmfn_outputs_display_fn,
  )

  return llm_cmp_fn
