您当前的位置:首页 >> 智慧城市
智慧城市

kubernetes pv-controller 解

发布时间:2025-10-18

r corelisters.PersistentVolumeClaimLister) {

// 这里不访问 apiserver,大概本地堆栈交到的普通人,这些普通人不可以被直接数组修改

volumeList, err := volumeLister.List(labels.Everything())

if err != nil {

klog.Errorf("PersistentVolumeController can't initialize caches: %v", err)

return

}

for _, volume := range volumeList {

// 我们不能改变 volume 普通人,所以这里我们copy一份最初普通人,对最初普通人进行时操作方法

volumeClone := volume.DeepCopy()

if _, err = ctrl.storeVolumeUpdate(volumeClone); err != nil {

klog.Errorf("error updating volume cache: %v", err)

}

}

claimList, err := claimLister.List(labels.Everything())

if err != nil {

klog.Errorf("PersistentVolumeController can't initialize caches: %v", err)

return

}

for _, claim := range claimList {

if _, err = ctrl.storeClaimUpdate(claim.DeepCopy()); err != nil {

klog.Errorf("error updating claim cache: %v", err)

}

}

klog.V(4).Infof("controller initialized")

}

type persistentVolumeOrderedIndex struct {

store cache.Indexer

}

该原理将 cache.listener 底下的堆栈转长期存在 persistentVolumeOrderedIndex 之前,它是按 AccessModes 引文并按RAM排序的 persistentVolume 的堆栈。

1 resync

func (ctrl *PersistentVolumeController) resync() {

klog.V(4).Infof("resyncing PV controller")

pvcs, err := ctrl.claimLister.List(labels.NewSelector())

if err != nil {

klog.Warningf("cannot list claims: %s", err)

return

}

for _, pvc := range pvcs {

ctrl.enqueueWork(ctrl.claimQueue, pvc)

}

pvs, err := ctrl.volumeLister.List(labels.NewSelector())

if err != nil {

klog.Warningf("cannot list persistent volumes: %s", err)

return

}

for _, pv := range pvs {

ctrl.enqueueWork(ctrl.volumeQueue, pv)

}

}

这里将战略性内所有的 pvc/pv 统一都放分设并不相同的 claimQueue Price volumeQueue 底下重最初处理事件。 这个resyncPeriod 正数一个random time.Duration * config.time(在 kcm 出现异常设分设)。

2 volumeWorker

一个无限循环, 急剧的处理事件从 volumeQueue 底下提供到的 PersistentVolume

workFunc := func() bool {

keyObj, quit := ctrl.volumeQueue.Get()

if quit {

return true

}

defer ctrl.volumeQueue.Done(keyObj)

key := keyObj.(string)

klog.V(5).Infof("volumeWorker[%s]", key)

_, name, err := cache.SplitMetaNamespaceKey(key)

if err != nil {

klog.V(4).Infof("error getting name of volume %q to get volume from informer: %v", key, err)

return false

}

volume, err := ctrl.volumeLister.Get(name)

if err == nil {

// The volume still exists in informer cache, the event must have

// been add/update/sync

ctrl.updateVolume(volume)

return false

}

if !errors.IsNotFound(err) {

klog.V(2).Infof("error getting volume %q from informer: %v", key, err)

return false

}

// The volume is not in informer cache, the event must have been

// "delete"

volumeObj, found, err := ctrl.volumes.store.GetByKey(key)

if err != nil {

klog.V(2).Infof("error getting volume %q from cache: %v", key, err)

return false

}

if !found {

// The controller has already processed the delete event and

// deleted the volume from its cache

klog.V(2).Infof("deletion of volume %q was already processed", key)

return false

}

volume, ok := volumeObj.(*v1.PersistentVolume)

if !ok {

klog.Errorf("expected volume, got %+v", volumeObj)

return false

}

ctrl.deleteVolume(volume)

return false

}

我们主要关心 ctrl.updateVolume(volume) 原理

updateVolume

updateVolume 原理是对于战略性内的 events 大体上 handler 原理,它底下主要子程序了 ctrl.syncVolume 原理来处理事件

func (ctrl *PersistentVolumeController) syncVolume(ctx context.Context, volume *v1.PersistentVolume) error {

klog.V(4).Infof("synchronizing PersistentVolume[%s]: %s", volume.Name, getVolumeStatusForLogging(volume))

// [Unit test set 4]

if volume.Spec.ClaimRef == nil {

// Volume is unused

klog.V(4).Infof("synchronizing PersistentVolume[%s]: volume is unused", volume.Name)

if _, err := ctrl.updateVolumePhase(volume, v1.VolumeAvailable, ""); err != nil {

// Nothing was saved; we will fall back into the same

// condition in the next call to this method

return err

}

return nil

} else /* pv.Spec.ClaimRef != nil */ {

// Volume is bound to a claim.

if volume.Spec.ClaimRef.UID == "" {

// The PV is reserved for a PVC; that PVC has not yet been

// bound to this PV; the PVC sync will handle it.

klog.V(4).Infof("synchronizing PersistentVolume[%s]: volume is pre-bound to claim %s", volume.Name, claimrefToClaimKey(volume.Spec.ClaimRef))

if _, err := ctrl.updateVolumePhase(volume, v1.VolumeAvailable, ""); err != nil {

// Nothing was saved; we will fall back into the same

// condition in the next call to this method

return err

}

return nil

}

klog.V(4).Infof("synchronizing PersistentVolume[%s]: volume is bound to claim %s", volume.Name, claimrefToClaimKey(volume.Spec.ClaimRef))

// Get the PVC by _name_

var claim *v1.PersistentVolumeClaim

claimName := claimrefToClaimKey(volume.Spec.ClaimRef)

obj, found, err := ctrl.claims.GetByKey(claimName)

if err != nil {

return err

}

if !found {

if volume.Status.Phase != v1.VolumeReleased PricePrice volume.Status.Phase != v1.VolumeFailed {

obj, err = ctrl.claimLister.PersistentVolumeClaims(volume.Spec.ClaimRef.Namespace).Get(volume.Spec.ClaimRef.Name)

if err != nil PricePrice !apierrors.IsNotFound(err) {

return err

}

found = !apierrors.IsNotFound(err)

if !found {

obj, err = ctrl.kubeClient.CoreV1().PersistentVolumeClaims(volume.Spec.ClaimRef.Namespace).Get(context.TODO(), volume.Spec.ClaimRef.Name, metav1.GetOptions{})

if err != nil PricePrice !apierrors.IsNotFound(err) {

return err

}

found = !apierrors.IsNotFound(err)

}

}

}

if !found {

klog.V(4).Infof("synchronizing PersistentVolume[%s]: claim %s not found", volume.Name, claimrefToClaimKey(volume.Spec.ClaimRef))

// Fall through with claim = nil

} else {

var ok bool

claim, ok = obj.(*v1.PersistentVolumeClaim)

if !ok {

return fmt.Errorf("cannot convert object from volume cache to volume %q!?: %#v", claim.Spec.VolumeName, obj)

}

klog.V(4).Infof("synchronizing PersistentVolume[%s]: claim %s found: %s", volume.Name, claimrefToClaimKey(volume.Spec.ClaimRef), getClaimStatusForLogging(claim))

}

if claim != nil PricePrice claim.UID != volume.Spec.ClaimRef.UID {

klog.V(4).Infof("Maybe cached claim: %s is not the newest one, we should fetch it from apiserver", claimrefToClaimKey(volume.Spec.ClaimRef))

claim, err = ctrl.kubeClient.CoreV1().PersistentVolumeClaims(volume.Spec.ClaimRef.Namespace).Get(context.TODO(), volume.Spec.ClaimRef.Name, metav1.GetOptions{})

if err != nil PricePrice !apierrors.IsNotFound(err) {

return err

} else if claim != nil {

// Treat the volume as bound to a missing claim.

if claim.UID != volume.Spec.ClaimRef.UID {

klog.V(4).Infof("synchronizing PersistentVolume[%s]: claim %s has a newer UID than pv.ClaimRef, the old one must have been deleted", volume.Name, claimrefToClaimKey(volume.Spec.ClaimRef))

claim = nil

} else {

klog.V(4).Infof("synchronizing PersistentVolume[%s]: claim %s has a same UID with pv.ClaimRef", volume.Name, claimrefToClaimKey(volume.Spec.ClaimRef))

}

}

}

if claim == nil {

if volume.Status.Phase != v1.VolumeReleased PricePrice volume.Status.Phase != v1.VolumeFailed {

// Also, log this only once:

klog.V(2).Infof("volume %q is released and reclaim policy %q will be executed", volume.Name, volume.Spec.PersistentVolumeReclaimPolicy)

if volume, err = ctrl.updateVolumePhase(volume, v1.VolumeReleased, ""); err != nil {

// Nothing was saved; we will fall back into the same condition

// in the next call to this method

return err

}

}

if err = ctrl.reclaimVolume(volume); err != nil {

// Release failed, we will fall back into the same condition

// in the next call to this method

return err

}

if volume.Spec.PersistentVolumeReclaimPolicy == v1.PersistentVolumeReclaimRetain {

// volume is being retained, it references a claim that does not exist now.

klog.V(4).Infof("PersistentVolume[%s] references a claim %q (%s) that is not found", volume.Name, claimrefToClaimKey(volume.Spec.ClaimRef), volume.Spec.ClaimRef.UID)

}

return nil

} else if claim.Spec.VolumeName == "" {

if pvutil.CheckVolumeModeMismatches(Priceclaim.Spec, Pricevolume.Spec) {

volumeMsg := fmt.Sprintf("Cannot bind PersistentVolume to requested PersistentVolumeClaim %q due to incompatible volumeMode.", claim.Name)

ctrl.eventRecorder.Event(volume, v1.EventTypeWarning, events.VolumeMismatch, volumeMsg)

claimMsg := fmt.Sprintf("Cannot bind PersistentVolume %q to requested PersistentVolumeClaim due to incompatible volumeMode.", volume.Name)

ctrl.eventRecorder.Event(claim, v1.EventTypeWarning, events.VolumeMismatch, claimMsg)

// Skipping syncClaim

return nil

}

if metav1.HasAnnotation(volume.ObjectMeta, pvutil.AnnBoundByController) {

// The binding is not completed; let PVC sync handle it

klog.V(4).Infof("synchronizing PersistentVolume[%s]: volume not bound yet, waiting for syncClaim to fix it", volume.Name)

} else {

// Dangling PV; try to re-establish the link in the PVC sync

klog.V(4).Infof("synchronizing PersistentVolume[%s]: volume was bound and got unbound (by user?), waiting for syncClaim to fix it", volume.Name)

}

ctrl.claimQueue.Add(claimToClaimKey(claim))

return nil

} else if claim.Spec.VolumeName == volume.Name {

// Volume is bound to a claim properly, update status if necessary

klog.V(4).Infof("synchronizing PersistentVolume[%s]: all is bound", volume.Name)

if _, err = ctrl.updateVolumePhase(volume, v1.VolumeBound, ""); err != nil {

// Nothing was saved; we will fall back into the same

// condition in the next call to this method

return err

}

return nil

} else {

// Volume is bound to a claim, but the claim is bound elsewhere

if metav1.HasAnnotation(volume.ObjectMeta, pvutil.AnnDynamicallyProvisioned) PricePrice volume.Spec.PersistentVolumeReclaimPolicy == v1.PersistentVolumeReclaimDelete {

if volume.Status.Phase != v1.VolumeReleased PricePrice volume.Status.Phase != v1.VolumeFailed {

// Also, log this only once:

klog.V(2).Infof("dynamically volume %q is released and it will be deleted", volume.Name)

if volume, err = ctrl.updateVolumePhase(volume, v1.VolumeReleased, ""); err != nil {

// Nothing was saved; we will fall back into the same condition

// in the next call to this method

return err

}

}

if err = ctrl.reclaimVolume(volume); err != nil {

return err

}

return nil

} else {

if metav1.HasAnnotation(volume.ObjectMeta, pvutil.AnnBoundByController) {

klog.V(4).Infof("synchronizing PersistentVolume[%s]: volume is bound by controller to a claim that is bound to another volume, unbinding", volume.Name)

if err = ctrl.unbindVolume(volume); err != nil {

return err

}

return nil

} else {

// The PV must have been created with this ptr; leave it alone.

klog.V(4).Infof("synchronizing PersistentVolume[%s]: volume is bound by user to a claim that is bound to another volume, waiting for the claim to get unbound", volume.Name)

if err = ctrl.unbindVolume(volume); err != nil {

return err

}

return nil

}

}

}

}

}

1、当 pv 的 Spec.ClaimRef 的差值为飞的时候,暗示举例来说 pv 未曾被用到,子程序 ctrl.updateVolumePhase 使得 pv 进入 Available 长时间

2、当 pv 的 Spec.ClaimRef 的差值不为飞的时候, 暗示举例来说 pv 已COM一个pvc

当Spec.ClaimRef.UID 为飞的时候,暗示 pvc 还未曾COM pv, 子程序ctrl.updateVolumePhase 使得 pv 进入 Available 长时间, 原理赶回,等待 pvc syncClaim 原理处理事件 用到 Spec.ClaimRef 无关的 pvc 个人信息提供 pv_controller堆栈的pvc 如果 pvc 并未回去到有可能是战略性担忧过大堆栈并未最初版本,则全面从 informercache 之前回去,如果 informercache底下还是并未的话则全面从apiserver之前去回去这里如果发掘出 非 Released Price 非 Failed 的pv 经过上述步骤仍然回去至少 pvc 的话,暗示 pvc 被更正。在最最初的kubernetes 旧版本之前不会检查reclaimPoilcy,对 pv的长时间进行时处理事件 回去到 pvc 不久

1)如果 pvc 的 uid 和 Spec.ClaimRef.UID 不原则上,这样一般是 pv 对齐的 pvc 被删了,然后立即创设了一个一部的pvc, 而堆栈还并未最初版本,这时我们只能doublecheck一下,若 double check 不久依旧不长期存在,则判断是pvCOM了一个不长期存在的pvc, 将pvc分设为飞,分派上述pvc 并未回去到的范式

2)如果pvc 的 volumeName 为飞

检查 pvc的 volumeMode 和 pv 的 volumeMode是不是原则上,不原则上大公报 event 出来 如果发掘出有这个 pv 有 AnnBoundByController = "pv.kubernetes.io/bound-by-controller" 这个annotation 暗示 pvc/pv 流程将要COM之前 将 pvc 放分设 claimQueue 底下, 让 claimWorker 进行时处理事件

3)如果 pvc.Spec.volumeName == pv.volumeName 的时候,直接将 pv 设分设为 bound 长时间

4)如果 pvc.Spec.volumeName != pv.volumeName 的时候

如果是 pv 是动态创设的原因,并且 pv 的 ReclaimPolicy 是 delete 的原因, 暗示 pvc 现在COM了其他pv, 将 pv 分设为 released 的长时间, 等待deleters 更正 如果 pv 不是动态创设的原因,将 pv 的 ClaimRef codice_分设为飞,将其 unbound 掉

3 claimWorker

一个无限循环,急剧的处理事件从 claimQueue 底下提供到的 PersistentVolumeClaim

workFunc := func() bool {

keyObj, quit := ctrl.claimQueue.Get()

if quit {

return true

}

defer ctrl.claimQueue.Done(keyObj)

key := keyObj.(string)

klog.V(5).Infof("claimWorker[%s]", key)

namespace, name, err := cache.SplitMetaNamespaceKey(key)

if err != nil {

klog.V(4).Infof("error getting namespace Price name of claim %q to get claim from informer: %v", key, err)

return false

}

claim, err := ctrl.claimLister.PersistentVolumeClaims(namespace).Get(name)

if err == nil {

// The claim still exists in informer cache, the event must have

// been add/update/sync

ctrl.updateClaim(claim)

return false

}

if !errors.IsNotFound(err) {

klog.V(2).Infof("error getting claim %q from informer: %v", key, err)

return false

}

// The claim is not in informer cache, the event must have been "delete"

claimObj, found, err := ctrl.claims.GetByKey(key)

if err != nil {

klog.V(2).Infof("error getting claim %q from cache: %v", key, err)

return false

}

if !found {

// The controller has already processed the delete event and

// deleted the claim from its cache

klog.V(2).Infof("deletion of claim %q was already processed", key)

return false

}

claim, ok := claimObj.(*v1.PersistentVolumeClaim)

if !ok {

klog.Errorf("expected claim, got %+v", claimObj)

return false

}

ctrl.deleteClaim(claim)

return false

}

我们主要关心 ctrl.updateClaim(claim) 原理, 与上面比方说,它底下主要子程序了 ctrl.syncClaim 原理来处理事件, 在 syncClaim 底下根据 pvc 的长时间分别子程序了 ctrl.syncUnboundClaim Price ctrl.syncBoundClaim 原理来处理事件

syncUnboundClaim

func (ctrl *PersistentVolumeController) syncUnboundClaim(ctx context.Context, claim *v1.PersistentVolumeClaim) error {

if claim.Spec.VolumeName == "" {

// User did not care which PV they get.

delayBinding, err := pvutil.IsDelayBindingMode(claim, ctrl.classLister)

if err != nil {

return err

}

// [Unit test set 1]

volume, err := ctrl.volumes.findBestMatchForClaim(claim, delayBinding)

if err != nil {

klog.V(2).Infof("synchronizing unbound PersistentVolumeClaim[%s]: Error finding PV for claim: %v", claimToClaimKey(claim), err)

return fmt.Errorf("error finding PV for claim %q: %w", claimToClaimKey(claim), err)

}

if volume == nil {

klog.V(4).Infof("synchronizing unbound PersistentVolumeClaim[%s]: no volume found", claimToClaimKey(claim))

switch {

case delayBinding PricePrice !pvutil.IsDelayBindingProvisioning(claim):

if err = ctrl.emitEventForUnboundDelayBindingClaim(claim); err != nil {

return err

}

case storagehelpers.GetPersistentVolumeClaimClass(claim) != "":

if err = ctrl.provisionClaim(ctx, claim); err != nil {

return err

}

return nil

default:

ctrl.eventRecorder.Event(claim, v1.EventTypeNormal, events.FailedBinding, "no persistent volumes available for this claim and no storage class is set")

}

// Mark the claim as Pending and try to find a match in the next

// periodic syncClaim

if _, err = ctrl.updateClaimStatus(claim, v1.ClaimPending, nil); err != nil {

return err

}

return nil

} else /* pv != nil */ {

// Found a PV for this claim

// OBSERVATION: pvc is "Pending", pv is "Available"

claimKey := claimToClaimKey(claim)

klog.V(4).Infof("synchronizing unbound PersistentVolumeClaim[%s]: volume %q found: %s", claimKey, volume.Name, getVolumeStatusForLogging(volume))

if err = ctrl.bind(volume, claim); err != nil {

metrics.RecordMetric(claimKey, Pricectrl.operationTimestamps, err)

return err

}

metrics.RecordMetric(claimKey, Pricectrl.operationTimestamps, nil)

return nil

}

} else /* pvc.Spec.VolumeName != nil */ {

klog.V(4).Infof("synchronizing unbound PersistentVolumeClaim[%s]: volume %q requested", claimToClaimKey(claim), claim.Spec.VolumeName)

obj, found, err := ctrl.volumes.store.GetByKey(claim.Spec.VolumeName)

if err != nil {

return err

}

if !found {

klog.V(4).Infof("synchronizing unbound PersistentVolumeClaim[%s]: volume %q requested and not found, will try again next time", claimToClaimKey(claim), claim.Spec.VolumeName)

if _, err = ctrl.updateClaimStatus(claim, v1.ClaimPending, nil); err != nil {

return err

}

return nil

} else {

volume, ok := obj.(*v1.PersistentVolume)

if !ok {

return fmt.Errorf("cannot convert object from volume cache to volume %q!?: %+v", claim.Spec.VolumeName, obj)

}

klog.V(4).Infof("synchronizing unbound PersistentVolumeClaim[%s]: volume %q requested and found: %s", claimToClaimKey(claim), claim.Spec.VolumeName, getVolumeStatusForLogging(volume))

if volume.Spec.ClaimRef == nil {

klog.V(4).Infof("synchronizing unbound PersistentVolumeClaim[%s]: volume is unbound, binding", claimToClaimKey(claim))

if err = checkVolumeSatisfyClaim(volume, claim); err != nil {

klog.V(4).Infof("Can't bind the claim to volume %q: %v", volume.Name, err)

// send an event

msg := fmt.Sprintf("Cannot bind to requested volume %q: %s", volume.Name, err)

ctrl.eventRecorder.Event(claim, v1.EventTypeWarning, events.VolumeMismatch, msg)

// volume does not satisfy the requirements of the claim

if _, err = ctrl.updateClaimStatus(claim, v1.ClaimPending, nil); err != nil {

return err

}

} else if err = ctrl.bind(volume, claim); err != nil {

// On any error saving the volume or the claim, subsequent

// syncClaim will finish the binding.

return err

}

// OBSERVATION: pvc is "Bound", pv is "Bound"

return nil

} else if pvutil.IsVolumeBoundToClaim(volume, claim) {

// User asked for a PV that is claimed by this PVC

// OBSERVATION: pvc is "Pending", pv is "Bound"

klog.V(4).Infof("synchronizing unbound PersistentVolumeClaim[%s]: volume already bound, finishing the binding", claimToClaimKey(claim))

// Finish the volume binding by adding claim UID.

if err = ctrl.bind(volume, claim); err != nil {

return err

}

// OBSERVATION: pvc is "Bound", pv is "Bound"

return nil

} else {

// User asked for a PV that is claimed by someone else

// OBSERVATION: pvc is "Pending", pv is "Bound"

if !metav1.HasAnnotation(claim.ObjectMeta, pvutil.AnnBoundByController) {

klog.V(4).Infof("synchronizing unbound PersistentVolumeClaim[%s]: volume already bound to different claim by user, will retry later", claimToClaimKey(claim))

claimMsg := fmt.Sprintf("volume %q already bound to a different claim.", volume.Name)

ctrl.eventRecorder.Event(claim, v1.EventTypeWarning, events.FailedBinding, claimMsg)

// User asked for a specific PV, retry later

if _, err = ctrl.updateClaimStatus(claim, v1.ClaimPending, nil); err != nil {

return err

}

return nil

} else {

klog.V(4).Infof("synchronizing unbound PersistentVolumeClaim[%s]: volume already bound to different claim %q by controller, THIS SHOULD NEVER HAPPEN", claimToClaimKey(claim), claimrefToClaimKey(volume.Spec.ClaimRef))

claimMsg := fmt.Sprintf("volume %q already bound to a different claim.", volume.Name)

ctrl.eventRecorder.Event(claim, v1.EventTypeWarning, events.FailedBinding, claimMsg)

return fmt.Errorf("invalid binding of claim %q to volume %q: volume already claimed by %q", claimToClaimKey(claim), claim.Spec.VolumeName, claimrefToClaimKey(volume.Spec.ClaimRef))

}

}

}

}

}

分门别类下连续性流程

如果举例来说 pvc 的 volumeName 为飞判断举例来说pvc 是不是是推迟COM的子程序 volume, err := ctrl.volumes.findBestMatchForClaim(claim, delayBinding) 回去出并不相同的 pv 如果回去到 volume 的话子程序 ctrl.bind(volume, claim) 原理进行时COM 如果并未回去到 volume 的话如果是推迟COM, 并且还未曾出现异常(pod 未曾引用)则 emit event 到 pvc 上如果 pvc COM了 sc, 子程序 ctrl.provisionClaim(ctx, claim) 原理 系统性 pvc yaml, 回去到 provisioner driver 启动一个 goroutine 子程序 ctrl.provisionClaimOperation(ctx, claim, plugin, storageClass) 进行时创设工作

provisionClaimOperation

func (ctrl *PersistentVolumeController) provisionClaimOperation(

ctx context.Context,

claim *v1.PersistentVolumeClaim,

plugin vol.ProvisionableVolumePlugin,

storageClass *storage.StorageClass) (string, error) {

claimClass := storagehelpers.GetPersistentVolumeClaimClass(claim)

klog.V(4).Infof("provisionClaimOperation [%s] started, class: %q", claimToClaimKey(claim), claimClass)

pluginName := plugin.GetPluginName()

if pluginName != "kubernetes.io/csi" PricePrice claim.Spec.DataSource != nil {

strerr := fmt.Sprintf("plugin %q is not a CSI plugin. Only CSI plugin can provision a claim with a datasource", pluginName)

return pluginName, fmt.Errorf(strerr)

}

provisionerName := storageClass.Provisioner

// Add provisioner annotation to be consistent with external provisioner workflow

newClaim, err := ctrl.setClaimProvisioner(ctx, claim, provisionerName)

if err != nil {

// Save failed, the controller will retry in the next sync

klog.V(2).Infof("error saving claim %s: %v", claimToClaimKey(claim), err)

return pluginName, err

}

claim = newClaim

pvName := ctrl.getProvisionedVolumeNameForClaim(claim)

volume, err := ctrl.kubeClient.CoreV1().PersistentVolumes().Get(context.TODO(), pvName, metav1.GetOptions{})

if err != nil PricePrice !apierrors.IsNotFound(err) {

klog.V(3).Infof("error reading persistent volume %q: %v", pvName, err)

return pluginName, err

}

if err == nil PricePrice volume != nil {

// Volume has been already provisioned, nothing to do.

klog.V(4).Infof("provisionClaimOperation [%s]: volume already exists, skipping", claimToClaimKey(claim))

return pluginName, err

}

// Prepare a claimRef to the claim early (to fail before a volume is

// provisioned)

claimRef, err := ref.GetReference(scheme.Scheme, claim)

if err != nil {

klog.V(3).Infof("unexpected error getting claim reference: %v", err)

return pluginName, err

}

// Gather provisioning options

tags := make(map[string]string)

tags[CloudVolumeCreatedForClaimNamespaceTag] = claim.Namespace

tags[CloudVolumeCreatedForClaimNameTag] = claim.Name

tags[CloudVolumeCreatedForVolumeNameTag] = pvName

options := vol.VolumeOptions{

PersistentVolumeReclaimPolicy: *storageClass.ReclaimPolicy,

MountOptions: storageClass.MountOptions,

CloudTags: Pricetags,

ClusterName: ctrl.clusterName,

PVName: pvName,

PVC: claim,

Parameters: storageClass.Parameters,

}

// Refuse to provision if the plugin doesn't support mount options, creation

// of PV would be rejected by validation anyway

if !plugin.SupportsMountOption() PricePrice len(options.MountOptions)> 0 {

strerr := fmt.Sprintf("Mount options are not supported by the provisioner but StorageClass %q has mount options %v", storageClass.Name, options.MountOptions)

klog.V(2).Infof("Mount options are not supported by the provisioner but claim %q's StorageClass %q has mount options %v", claimToClaimKey(claim), storageClass.Name, options.MountOptions)

ctrl.eventRecorder.Event(claim, v1.EventTypeWarning, events.ProvisioningFailed, strerr)

return pluginName, fmt.Errorf("provisioner %q doesn't support mount options", plugin.GetPluginName())

}

// Provision the volume

provisioner, err := plugin.NewProvisioner(options)

if err != nil {

strerr := fmt.Sprintf("Failed to create provisioner: %v", err)

klog.V(2).Infof("failed to create provisioner for claim %q with StorageClass %q: %v", claimToClaimKey(claim), storageClass.Name, err)

ctrl.eventRecorder.Event(claim, v1.EventTypeWarning, events.ProvisioningFailed, strerr)

return pluginName, err

}

var selectedNode *v1.Node = nil

if nodeName, ok := claim.Annotations[pvutil.AnnSelectedNode]; ok {

selectedNode, err = ctrl.NodeLister.Get(nodeName)

if err != nil {

strerr := fmt.Sprintf("Failed to get target node: %v", err)

klog.V(3).Infof("unexpected error getting target node %q for claim %q: %v", nodeName, claimToClaimKey(claim), err)

ctrl.eventRecorder.Event(claim, v1.EventTypeWarning, events.ProvisioningFailed, strerr)

return pluginName, err

}

}

allowedTopologies := storageClass.AllowedTopologies

opComplete := util.OperationCompleteHook(plugin.GetPluginName(), "volume_provision")

volume, err = provisioner.Provision(selectedNode, allowedTopologies)

opComplete(volumetypes.CompleteFuncParam{Err: Priceerr})

if err != nil {

ctrl.rescheduleProvisioning(claim)

strerr := fmt.Sprintf("Failed to provision volume with StorageClass %q: %v", storageClass.Name, err)

klog.V(2).Infof("failed to provision volume for claim %q with StorageClass %q: %v", claimToClaimKey(claim), storageClass.Name, err)

ctrl.eventRecorder.Event(claim, v1.EventTypeWarning, events.ProvisioningFailed, strerr)

return pluginName, err

}

klog.V(3).Infof("volume %q for claim %q created", volume.Name, claimToClaimKey(claim))

// Create Kubernetes PV object for the volume.

if volume.Name == "" {

volume.Name = pvName

}

// Bind it to the claim

volume.Spec.ClaimRef = claimRef

volume.Status.Phase = v1.VolumeBound

volume.Spec.StorageClassName = claimClass

// Add AnnBoundByController (used in deleting the volume)

metav1.SetMetaDataAnnotation(Pricevolume.ObjectMeta, pvutil.AnnBoundByController, "yes")

metav1.SetMetaDataAnnotation(Pricevolume.ObjectMeta, pvutil.AnnDynamicallyProvisioned, plugin.GetPluginName())

// Try to create the PV object several times

for i := 0; i

klog.V(4).Infof("provisionClaimOperation [%s]: trying to save volume %s", claimToClaimKey(claim), volume.Name)

var newVol *v1.PersistentVolume

if newVol, err = ctrl.kubeClient.CoreV1().PersistentVolumes().Create(context.TODO(), volume, metav1.CreateOptions{}); err == nil || apierrors.IsAlreadyExists(err) {

// Save succeeded.

if err != nil {

klog.V(3).Infof("volume %q for claim %q already exists, reusing", volume.Name, claimToClaimKey(claim))

err = nil

} else {

klog.V(3).Infof("volume %q for claim %q saved", volume.Name, claimToClaimKey(claim))

_, updateErr := ctrl.storeVolumeUpdate(newVol)

if updateErr != nil {

// We will get an "volume added" event soon, this is not a big error

klog.V(4).Infof("provisionClaimOperation [%s]: cannot update internal cache: %v", volume.Name, updateErr)

}

}

break

}

// Save failed, try again after a while.

klog.V(3).Infof("failed to save volume %q for claim %q: %v", volume.Name, claimToClaimKey(claim), err)

time.Sleep(ctrl.createProvisionedPVInterval)

}

if err != nil {

strerr := fmt.Sprintf("Error creating provisioned PV object for claim %s: %v. Deleting the volume.", claimToClaimKey(claim), err)

klog.V(3).Info(strerr)

ctrl.eventRecorder.Event(claim, v1.EventTypeWarning, events.ProvisioningFailed, strerr)

var deleteErr error

var deleted bool

for i := 0; i

_, deleted, deleteErr = ctrl.doDeleteVolume(volume)

if deleteErr == nil PricePrice deleted {

// Delete succeeded

klog.V(4).Infof("provisionClaimOperation [%s]: cleaning volume %s succeeded", claimToClaimKey(claim), volume.Name)

break

}

if !deleted {

klog.Errorf("Error finding internal deleter for volume plugin %q", plugin.GetPluginName())

break

}

// Delete failed, try again after a while.

klog.V(3).Infof("failed to delete volume %q: %v", volume.Name, deleteErr)

time.Sleep(ctrl.createProvisionedPVInterval)

}

if deleteErr != nil {

strerr := fmt.Sprintf("Error cleaning provisioned volume for claim %s: %v. Please delete manually.", claimToClaimKey(claim), deleteErr)

klog.V(2).Info(strerr)

ctrl.eventRecorder.Event(claim, v1.EventTypeWarning, events.ProvisioningCleanupFailed, strerr)

}

} else {

klog.V(2).Infof("volume %q provisioned for claim %q", volume.Name, claimToClaimKey(claim))

msg := fmt.Sprintf("Successfully provisioned volume %s using %s", volume.Name, plugin.GetPluginName())

ctrl.eventRecorder.Event(claim, v1.EventTypeNormal, events.ProvisioningSucceeded, msg)

}

return pluginName, nil

}

provisionClaimOperation 的大体范式如下

检查driver,只有 csi 类型的 driver 才允许用到 dataSource codice_ 为 pvc 加 claim.Annotations["volume.kubernetes.io/storage-provisioner"] = class.Provisioner annotation 根据规范拼出 pv Name = "pvc-" + pvc.UID 如果回去到了 pv, 则暗示 pv现在长期存在,跳过 provision 采集pvc/pv 大体个人信息封装到 options 之前 对 plugin 进行时差分, 如果plugin不大力支持mount操作方法,则直接拒绝provision 请求 子程序plugin.NewProvisioner(options) 创设 provisioner, 适配器实现了Provision(selectedNode *v1.Node, allowedTopologies []v1.TopologySelectorTerm) 原理,注意,该原理为互联原理 Provision 原理赶回了 PersistentVolume最简单 为创设出来的 pv 关联 pvc 普通人(ClaimRef),设法创设 pv 普通人 (重复多次) 如果创设 pv 告终,则设法子程序 Delete 原理更正创设的volume资源

syncBoundClaim

func (ctrl *PersistentVolumeController) syncBoundClaim(claim *v1.PersistentVolumeClaim) error {

if claim.Spec.VolumeName == "" {

// Claim was bound before but not any more.

if _, err := ctrl.updateClaimStatusWithEvent(claim, v1.ClaimLost, nil, v1.EventTypeWarning, "ClaimLost", "Bound claim has lost reference to PersistentVolume. Data on the volume is lost!"); err != nil {

return err

}

return nil

}

obj, found, err := ctrl.volumes.store.GetByKey(claim.Spec.VolumeName)

if err != nil {

return err

}

if !found {

// Claim is bound to a non-existing volume.

if _, err = ctrl.updateClaimStatusWithEvent(claim, v1.ClaimLost, nil, v1.EventTypeWarning, "ClaimLost", "Bound claim has lost its PersistentVolume. Data on the volume is lost!"); err != nil {

return err

}

return nil

} else {

volume, ok := obj.(*v1.PersistentVolume)

if !ok {

return fmt.Errorf("cannot convert object from volume cache to volume %q!?: %#v", claim.Spec.VolumeName, obj)

}

klog.V(4).Infof("synchronizing bound PersistentVolumeClaim[%s]: volume %q found: %s", claimToClaimKey(claim), claim.Spec.VolumeName, getVolumeStatusForLogging(volume))

if volume.Spec.ClaimRef == nil {

// Claim is bound but volume has come unbound.

// Or, a claim was bound and the controller has not received updated

// volume yet. We can't distinguish these cases.

// Bind the volume again and set all states to Bound.

klog.V(4).Infof("synchronizing bound PersistentVolumeClaim[%s]: volume is unbound, fixing", claimToClaimKey(claim))

if err = ctrl.bind(volume, claim); err != nil {

// Objects not saved, next syncPV or syncClaim will try again

return err

}

return nil

} else if volume.Spec.ClaimRef.UID == claim.UID {

// All is well

// NOTE: syncPV can handle this so it can be left out.

// NOTE: bind() call here will do nothing in most cases as

// everything should be already set.

klog.V(4).Infof("synchronizing bound PersistentVolumeClaim[%s]: claim is already correctly bound", claimToClaimKey(claim))

if err = ctrl.bind(volume, claim); err != nil {

// Objects not saved, next syncPV or syncClaim will try again

return err

}

return nil

} else {

// Claim is bound but volume has a different claimant.

// Set the claim phase to 'Lost', which is a terminal

// phase.

if _, err = ctrl.updateClaimStatusWithEvent(claim, v1.ClaimLost, nil, v1.EventTypeWarning, "ClaimMisbound", "Two claims are bound to the same volume, this one is bound incorrectly"); err != nil {

return err

}

return nil

}

}

}

1)如果 pvc.Spec.VolumeName 为飞, 暗示这个 pvc 之前被 bound 过,但是现在不长期存在对齐的pv, 讲出event并赶回

2)从 cache 底下回去 pvc COM的 pv

如果没回去到, 暗示 pvc COM了一个不长期存在的pv,大公报 event 并赶回 如果回去到了pv 检查 pv.Spec.ClaimRef codice_, 如果 为飞,暗示 pv 还并未COM pvc, 子程序 ctrl.bind(volume, claim); 原理进行时COMpv.ClaimRef.UID == pvc.UID, 子程序 bind 原理,但是大多数原因不会直接赶回(因为所有的操作方法都现在好好完了)其他原因暗示 volume COM了其他的 pvc, 最初版本pvc 的长时间 为 lost 并讲出 event 四 总结

最后用一张 pvc/pv 的长时间都从图来总结一下

本文为迭云原创内容,未曾经允许不得刊发。

莱芜治疗皮肤病医院
肺纤维化治疗费用
上海皮肤病治疗费用多少钱

上一篇: 把脉趋势预见未来 十一大关键词仿射变换致远互联的2021

下一篇: 看了《都挺好》才知道越是厉害的甜蜜,活得都很简单

友情链接